Client.cpp
Go to the documentation of this file.
00001 #include <Client.hpp>
00002 
00003 #include <iostream>
00004 
00005 namespace msp {
00006 
00007 PeriodicTimer::PeriodicTimer(std::function<void()> funct, const double period_seconds)
00008     : funct(funct), running(false)
00009 {
00010     period_us = std::chrono::duration<uint, std::micro>(uint(period_seconds*1e6));
00011 }
00012 
00013 void PeriodicTimer::start() {
00014     // only start thread if period is above 0
00015     if(!(period_us.count()>0))
00016         return;
00017 
00018     thread_ptr = std::shared_ptr<std::thread>(new std::thread(
00019     [this]{
00020         running = true;
00021         while(running) {
00022             // call function and wait until end of period
00023             const auto tstart = std::chrono::high_resolution_clock::now();
00024             funct();
00025             std::this_thread::sleep_until(tstart+period_us);
00026         } // while running
00027     }
00028     ));
00029 }
00030 
00031 void PeriodicTimer::stop() {
00032     running = false;
00033     if(thread_ptr!=nullptr && thread_ptr->joinable()) {
00034         thread_ptr->join();
00035     }
00036 }
00037 
00038 void PeriodicTimer::setPeriod(const double period_seconds) {
00039     stop();
00040     period_us = std::chrono::duration<uint, std::micro>(uint(period_seconds*1e6));
00041     start();
00042 }
00043 
00044 } // namespace msp
00045 
00046 namespace msp {
00047 namespace client {
00048 
00049 Client::Client() : port(io), running(false), print_warnings(false) { }
00050 
00051 Client::~Client() {
00052     for(const std::pair<msp::ID, msp::Request*> d : subscribed_requests)
00053         delete d.second;
00054 
00055     for(const std::pair<msp::ID, SubscriptionBase*> s : subscriptions)
00056         delete s.second;
00057 }
00058 
00059 void Client::connect(const std::string &device, const uint baudrate) {
00060     port.open(device);
00061 
00062     port.set_option(asio::serial_port::baud_rate(baudrate));
00063     port.set_option(asio::serial_port::parity(asio::serial_port::parity::none));
00064     port.set_option(asio::serial_port::character_size(asio::serial_port::character_size(8)));
00065     port.set_option(asio::serial_port::stop_bits(asio::serial_port::stop_bits::one));
00066 }
00067 
00068 void Client::start() {
00069     thread = std::thread([this]{
00070         running = true;
00071         while(running) { processOneMessage(); }
00072     });
00073 }
00074 
00075 void Client::stop() {
00076     running = false;
00077     io.stop();
00078     thread.join();
00079 }
00080 
00081 uint8_t Client::read() {
00082     if(buffer.sgetc()==EOF) {
00083         asio::read(port, buffer, asio::transfer_exactly(1));
00084     }
00085 
00086     return uint8_t(buffer.sbumpc());
00087 }
00088 
00089 bool Client::sendData(const uint8_t id, const ByteVector &data) {
00090     std::lock_guard<std::mutex> lock(mutex_send);
00091     ByteVector msg;
00092     msg.push_back('$');                                 // preamble1
00093     msg.push_back('M');                                 // preamble2
00094     msg.push_back('<');                                 // direction
00095     msg.push_back(uint8_t(data.size()));                // data size
00096     msg.push_back(id);                                  // message_id
00097     msg.insert(msg.end(), data.begin(), data.end());    // data
00098     msg.push_back( crc(id, data) );                     // crc
00099 
00100     const std::size_t bytes_written = asio::write(port, asio::buffer(msg.data(), msg.size()));
00101 
00102     return (bytes_written==msg.size());
00103 }
00104 
00105 int Client::request(msp::Request &request, const double timeout) {
00106     msp::ByteVector data;
00107     const int success = request_raw(uint8_t(request.id()), data, timeout);
00108     if(success==1) { request.decode(data); }
00109     return success;
00110 }
00111 
00112 int Client::request_raw(const uint8_t id, ByteVector &data, const double timeout) {
00113     // send request
00114     if(!sendRequest(id)) { return false; }
00115 
00116     // wait for thread to received message
00117     std::unique_lock<std::mutex> lock(mutex_cv_request);
00118     const auto predicate = [&]{
00119         mutex_request.lock();
00120         const bool received = (request_received!=NULL) && (request_received->id==id);
00121         // unlock to wait for next message
00122         if(!received) { mutex_request.unlock(); }
00123         return received;
00124     };
00125 
00126     if(timeout>0) {
00127         if(!cv_request.wait_for(lock, std::chrono::milliseconds(uint(timeout*1e3)), predicate))
00128             return -1;
00129     }
00130     else {
00131         cv_request.wait(lock, predicate);
00132     }
00133 
00134     // check message status and decode
00135     const bool success = request_received->status==OK;
00136     if(success) { data = request_received->data; }
00137     mutex_request.unlock();
00138     return success;
00139 }
00140 
00141 bool Client::respond(const msp::Response &response, const bool wait_ack) {
00142     return respond_raw(uint8_t(response.id()), response.encode(), wait_ack);
00143 }
00144 
00145 bool Client::respond_raw(const uint8_t id, const ByteVector &data, const bool wait_ack) {
00146     // send response
00147     if(!sendData(id, data)) { return false; }
00148 
00149     if(!wait_ack)
00150         return true;
00151 
00152     // wait for thread to received message
00153     std::unique_lock<std::mutex> lock(mutex_cv_ack);
00154     cv_ack.wait(lock, [&]{
00155         mutex_request.lock();
00156         const bool received = (request_received!=NULL) && (request_received->id==id);
00157         // unlock to wait for next message
00158         if(!received) { mutex_request.unlock(); }
00159         return received;
00160     });
00161 
00162     // check status, expect ACK without payload
00163     const bool success = request_received->status==OK;
00164     mutex_request.unlock();
00165     return success;
00166 }
00167 
00168 uint8_t Client::crc(const uint8_t id, const ByteVector &data) {
00169     uint8_t crc = uint8_t(data.size())^id;
00170     for(const uint8_t d : data) { crc = crc^d; }
00171     return crc;
00172 }
00173 
00174 void Client::processOneMessage() {
00175     std::lock_guard<std::mutex> lck(mutex_buffer);
00176 
00177     const std::size_t bytes_transferred = asio::read_until(port, buffer, "$M");
00178 
00179     // ignore and remove header bytes
00180     buffer.consume(bytes_transferred);
00181 
00182     MessageStatus status = OK;
00183 
00184     // message direction
00185     const uint8_t dir = read();
00186     const bool ok_id = (dir!='!');
00187 
00188     // payload length
00189     const uint8_t len = read();
00190 
00191     // message ID
00192     const uint8_t id = read();
00193 
00194     if(print_warnings && !ok_id) {
00195         std::cerr << "Message with ID " << uint(id) << " is not recognised!" << std::endl;
00196     }
00197 
00198     // payload
00199     std::vector<uint8_t> data;
00200     for(uint i(0); i<len; i++) {
00201         data.push_back(read());
00202     }
00203 
00204     // CRC
00205     const uint8_t rcv_crc = read();
00206     const uint8_t exp_crc = crc(id,data);
00207     const bool ok_crc = (rcv_crc==exp_crc);
00208 
00209     if(print_warnings && !ok_crc) {
00210         std::cerr << "Message with ID " << uint(id) << " has wrong CRC! (expected: " << uint(exp_crc) << ", received: "<< uint(rcv_crc) << ")" << std::endl;
00211     }
00212 
00213     if(!ok_id) { status = FAIL_ID; }
00214     else if(!ok_crc) { status = FAIL_CRC; }
00215 
00216     mutex_request.lock();
00217     request_received.reset(new ReceivedMessage());
00218     request_received->id = id;
00219     request_received->data = data;
00220     request_received->status = status;
00221     mutex_request.unlock();
00222 
00223     // notify waiting request methods
00224     cv_request.notify_one();
00225     // notify waiting respond methods
00226     cv_ack.notify_one();
00227 
00228     // check subscriptions
00229     mutex_callbacks.lock();
00230     if(status==OK && subscriptions.count(ID(id))) {
00231         // fetch message type, decode payload
00232         msp::Request *const req = subscribed_requests.at(ID(id));
00233         req->decode(data);
00234         // call callback
00235         subscriptions.at(ID(id))->call(*req);
00236     }
00237     mutex_callbacks.unlock();
00238 }
00239 
00240 } // namespace client
00241 } // namespace msp


msp
Author(s): Christian Rauch
autogenerated on Mon Oct 9 2017 03:02:13