Go to the documentation of this file.00001 #include <Client.hpp>
00002
00003 #include <iostream>
00004
00005 namespace msp {
00006
00007 PeriodicTimer::PeriodicTimer(std::function<void()> funct, const double period_seconds)
00008 : funct(funct), running(false)
00009 {
00010 period_us = std::chrono::duration<uint, std::micro>(uint(period_seconds*1e6));
00011 }
00012
00013 void PeriodicTimer::start() {
00014
00015 if(!(period_us.count()>0))
00016 return;
00017
00018 thread_ptr = std::shared_ptr<std::thread>(new std::thread(
00019 [this]{
00020 running = true;
00021 while(running) {
00022
00023 const auto tstart = std::chrono::high_resolution_clock::now();
00024 funct();
00025 std::this_thread::sleep_until(tstart+period_us);
00026 }
00027 }
00028 ));
00029 }
00030
00031 void PeriodicTimer::stop() {
00032 running = false;
00033 if(thread_ptr!=nullptr && thread_ptr->joinable()) {
00034 thread_ptr->join();
00035 }
00036 }
00037
00038 void PeriodicTimer::setPeriod(const double period_seconds) {
00039 stop();
00040 period_us = std::chrono::duration<uint, std::micro>(uint(period_seconds*1e6));
00041 start();
00042 }
00043
00044 }
00045
00046 namespace msp {
00047 namespace client {
00048
00049 Client::Client() : port(io), running(false), print_warnings(false) { }
00050
00051 Client::~Client() {
00052 for(const std::pair<msp::ID, msp::Request*> d : subscribed_requests)
00053 delete d.second;
00054
00055 for(const std::pair<msp::ID, SubscriptionBase*> s : subscriptions)
00056 delete s.second;
00057 }
00058
00059 void Client::connect(const std::string &device, const uint baudrate) {
00060 port.open(device);
00061
00062 port.set_option(asio::serial_port::baud_rate(baudrate));
00063 port.set_option(asio::serial_port::parity(asio::serial_port::parity::none));
00064 port.set_option(asio::serial_port::character_size(asio::serial_port::character_size(8)));
00065 port.set_option(asio::serial_port::stop_bits(asio::serial_port::stop_bits::one));
00066 }
00067
00068 void Client::start() {
00069 thread = std::thread([this]{
00070 running = true;
00071 while(running) { processOneMessage(); }
00072 });
00073 }
00074
00075 void Client::stop() {
00076 running = false;
00077 io.stop();
00078 thread.join();
00079 }
00080
00081 uint8_t Client::read() {
00082 if(buffer.sgetc()==EOF) {
00083 asio::read(port, buffer, asio::transfer_exactly(1));
00084 }
00085
00086 return uint8_t(buffer.sbumpc());
00087 }
00088
00089 bool Client::sendData(const uint8_t id, const ByteVector &data) {
00090 std::lock_guard<std::mutex> lock(mutex_send);
00091 ByteVector msg;
00092 msg.push_back('$');
00093 msg.push_back('M');
00094 msg.push_back('<');
00095 msg.push_back(uint8_t(data.size()));
00096 msg.push_back(id);
00097 msg.insert(msg.end(), data.begin(), data.end());
00098 msg.push_back( crc(id, data) );
00099
00100 const std::size_t bytes_written = asio::write(port, asio::buffer(msg.data(), msg.size()));
00101
00102 return (bytes_written==msg.size());
00103 }
00104
00105 int Client::request(msp::Request &request, const double timeout) {
00106 msp::ByteVector data;
00107 const int success = request_raw(uint8_t(request.id()), data, timeout);
00108 if(success==1) { request.decode(data); }
00109 return success;
00110 }
00111
00112 int Client::request_raw(const uint8_t id, ByteVector &data, const double timeout) {
00113
00114 if(!sendRequest(id)) { return false; }
00115
00116
00117 std::unique_lock<std::mutex> lock(mutex_cv_request);
00118 const auto predicate = [&]{
00119 mutex_request.lock();
00120 const bool received = (request_received!=NULL) && (request_received->id==id);
00121
00122 if(!received) { mutex_request.unlock(); }
00123 return received;
00124 };
00125
00126 if(timeout>0) {
00127 if(!cv_request.wait_for(lock, std::chrono::milliseconds(uint(timeout*1e3)), predicate))
00128 return -1;
00129 }
00130 else {
00131 cv_request.wait(lock, predicate);
00132 }
00133
00134
00135 const bool success = request_received->status==OK;
00136 if(success) { data = request_received->data; }
00137 mutex_request.unlock();
00138 return success;
00139 }
00140
00141 bool Client::respond(const msp::Response &response, const bool wait_ack) {
00142 return respond_raw(uint8_t(response.id()), response.encode(), wait_ack);
00143 }
00144
00145 bool Client::respond_raw(const uint8_t id, const ByteVector &data, const bool wait_ack) {
00146
00147 if(!sendData(id, data)) { return false; }
00148
00149 if(!wait_ack)
00150 return true;
00151
00152
00153 std::unique_lock<std::mutex> lock(mutex_cv_ack);
00154 cv_ack.wait(lock, [&]{
00155 mutex_request.lock();
00156 const bool received = (request_received!=NULL) && (request_received->id==id);
00157
00158 if(!received) { mutex_request.unlock(); }
00159 return received;
00160 });
00161
00162
00163 const bool success = request_received->status==OK;
00164 mutex_request.unlock();
00165 return success;
00166 }
00167
00168 uint8_t Client::crc(const uint8_t id, const ByteVector &data) {
00169 uint8_t crc = uint8_t(data.size())^id;
00170 for(const uint8_t d : data) { crc = crc^d; }
00171 return crc;
00172 }
00173
00174 void Client::processOneMessage() {
00175 std::lock_guard<std::mutex> lck(mutex_buffer);
00176
00177 const std::size_t bytes_transferred = asio::read_until(port, buffer, "$M");
00178
00179
00180 buffer.consume(bytes_transferred);
00181
00182 MessageStatus status = OK;
00183
00184
00185 const uint8_t dir = read();
00186 const bool ok_id = (dir!='!');
00187
00188
00189 const uint8_t len = read();
00190
00191
00192 const uint8_t id = read();
00193
00194 if(print_warnings && !ok_id) {
00195 std::cerr << "Message with ID " << uint(id) << " is not recognised!" << std::endl;
00196 }
00197
00198
00199 std::vector<uint8_t> data;
00200 for(uint i(0); i<len; i++) {
00201 data.push_back(read());
00202 }
00203
00204
00205 const uint8_t rcv_crc = read();
00206 const uint8_t exp_crc = crc(id,data);
00207 const bool ok_crc = (rcv_crc==exp_crc);
00208
00209 if(print_warnings && !ok_crc) {
00210 std::cerr << "Message with ID " << uint(id) << " has wrong CRC! (expected: " << uint(exp_crc) << ", received: "<< uint(rcv_crc) << ")" << std::endl;
00211 }
00212
00213 if(!ok_id) { status = FAIL_ID; }
00214 else if(!ok_crc) { status = FAIL_CRC; }
00215
00216 mutex_request.lock();
00217 request_received.reset(new ReceivedMessage());
00218 request_received->id = id;
00219 request_received->data = data;
00220 request_received->status = status;
00221 mutex_request.unlock();
00222
00223
00224 cv_request.notify_one();
00225
00226 cv_ack.notify_one();
00227
00228
00229 mutex_callbacks.lock();
00230 if(status==OK && subscriptions.count(ID(id))) {
00231
00232 msp::Request *const req = subscribed_requests.at(ID(id));
00233 req->decode(data);
00234
00235 subscriptions.at(ID(id))->call(*req);
00236 }
00237 mutex_callbacks.unlock();
00238 }
00239
00240 }
00241 }