00001 #include <Client.hpp>
00002 #include <cstdlib>
00003 #include <iostream>
00004
00005 typedef unsigned int uint;
00006
00007 namespace msp {
00008 namespace client {
00009
00010 Client::Client() :
00011 port(io),
00012 log_level_(SILENT),
00013 msp_ver_(1),
00014 fw_variant(FirmwareVariant::INAV) {}
00015
00016 Client::~Client() {}
00017
00018 void Client::setLoggingLevel(const LoggingLevel& level) { log_level_ = level; }
00019
00020 bool Client::setVersion(const int& ver) {
00021 if(ver == 1 || ver == 2) {
00022 msp_ver_ = ver;
00023 return true;
00024 }
00025 return false;
00026 }
00027
00028 int Client::getVersion() const { return msp_ver_; }
00029
00030 void Client::setVariant(const FirmwareVariant& v) { fw_variant = v; }
00031
00032 FirmwareVariant Client::getVariant() const { return fw_variant; }
00033
00034 bool Client::start(const std::string& device, const size_t baudrate) {
00035 return connectPort(device, baudrate) && startReadThread() &&
00036 startSubscriptions();
00037 }
00038
00039 bool Client::stop() {
00040 return disconnectPort() && stopReadThread() && stopSubscriptions();
00041 }
00042
00043 bool Client::connectPort(const std::string& device, const size_t baudrate) {
00044 try {
00045 port.open(device);
00046 port.set_option(asio::serial_port::baud_rate(uint(baudrate)));
00047 port.set_option(
00048 asio::serial_port::parity(asio::serial_port::parity::none));
00049 port.set_option(asio::serial_port::character_size(
00050 asio::serial_port::character_size(8)));
00051 port.set_option(
00052 asio::serial_port::stop_bits(asio::serial_port::stop_bits::one));
00053 }
00054 catch(const std::system_error& e) {
00055 const int ecode = e.code().value();
00056 throw std::runtime_error("Error when opening '" + device +
00057 "': " + e.code().category().message(ecode) +
00058 " (error code: " + std::to_string(ecode) +
00059 ")");
00060 }
00061 return isConnected();
00062 }
00063
00064 bool Client::disconnectPort() {
00065 asio::error_code ec;
00066 port.close(ec);
00067 if(ec) return false;
00068 return true;
00069 }
00070
00071 bool Client::isConnected() const { return port.is_open(); }
00072
00073 bool Client::startReadThread() {
00074
00075 if(!isConnected()) return false;
00076
00077 if(running_.test_and_set()) return false;
00078
00079 thread = std::thread([this] {
00080 asio::async_read_until(port,
00081 buffer,
00082 std::bind(&Client::messageReady,
00083 this,
00084 std::placeholders::_1,
00085 std::placeholders::_2),
00086 std::bind(&Client::processOneMessage,
00087 this,
00088 std::placeholders::_1,
00089 std::placeholders::_2));
00090 io.run();
00091 });
00092 return true;
00093 }
00094
00095 bool Client::stopReadThread() {
00096 bool rc = false;
00097 if(running_.test_and_set()) {
00098 io.stop();
00099 thread.join();
00100 io.reset();
00101 rc = true;
00102 }
00103 running_.clear();
00104 return rc;
00105 }
00106
00107 bool Client::startSubscriptions() {
00108 bool rc = true;
00109 for(const auto& sub : subscriptions) {
00110 rc &= sub.second->start();
00111 }
00112 return rc;
00113 }
00114
00115 bool Client::stopSubscriptions() {
00116 bool rc = true;
00117 for(const auto& sub : subscriptions) {
00118 rc &= sub.second->stop();
00119 }
00120 return rc;
00121 }
00122
00123 bool Client::sendMessage(msp::Message& message, const double& timeout) {
00124 if(log_level_ >= DEBUG)
00125 std::cout << "sending message - ID " << size_t(message.id())
00126 << std::endl;
00127 if(!sendData(message.id(), message.encode())) {
00128 if(log_level_ >= WARNING)
00129 std::cerr << "message failed to send" << std::endl;
00130 return false;
00131 }
00132
00133 std::unique_lock<std::mutex> lock(cv_response_mtx);
00134 const auto predicate = [&] {
00135 mutex_response.lock();
00136 const bool received = (request_received != nullptr) &&
00137 (request_received->id == message.id());
00138
00139 if(!received) {
00140 mutex_response.unlock();
00141 }
00142 return received;
00143 };
00144
00145
00146 if(timeout > 0) {
00147 if(!cv_response.wait_for(
00148 lock,
00149 std::chrono::milliseconds(size_t(timeout * 1e3)),
00150 predicate)) {
00151 if(log_level_ >= INFO)
00152 std::cout << "timed out waiting for response to message ID "
00153 << size_t(message.id()) << std::endl;
00154 return false;
00155 }
00156 }
00157 else {
00158 cv_response.wait(lock, predicate);
00159 }
00160
00161 const bool recv_success = request_received->status == OK;
00162 ByteVector data;
00163 if(recv_success) {
00164
00165 data = request_received->payload;
00166 }
00167 mutex_response.unlock();
00168
00169 bool decode_success = false;
00170 if(recv_success) {
00171 decode_success = message.decode(data);
00172 }
00173 return recv_success && decode_success;
00174 }
00175
00176 bool Client::sendMessageNoWait(const msp::Message& message) {
00177 if(log_level_ >= DEBUG)
00178 std::cout << "async sending message - ID " << size_t(message.id())
00179 << std::endl;
00180 if(!sendData(message.id(), message.encode())) {
00181 if(log_level_ >= WARNING)
00182 std::cerr << "async sendData failed" << std::endl;
00183 return false;
00184 }
00185 return true;
00186 }
00187
00188 uint8_t Client::extractChar() {
00189 if(buffer.sgetc() == EOF) {
00190 if(log_level_ >= WARNING)
00191 std::cerr << "buffer returned EOF; reading char directly from port"
00192 << std::endl;
00193 asio::read(port, buffer, asio::transfer_exactly(1));
00194 }
00195 return uint8_t(buffer.sbumpc());
00196 }
00197
00198 bool Client::sendData(const msp::ID id, const ByteVector& data) {
00199 if(log_level_ >= DEBUG)
00200 std::cout << "sending: " << size_t(id) << " | " << data;
00201 ByteVector msg;
00202 if(msp_ver_ == 2) {
00203 msg = packMessageV2(id, data);
00204 }
00205 else {
00206 msg = packMessageV1(id, data);
00207 }
00208 if(log_level_ >= DEBUG) std::cout << "packed: " << msg;
00209 asio::error_code ec;
00210 std::size_t bytes_written;
00211 {
00212 std::lock_guard<std::mutex> lock(mutex_send);
00213 bytes_written =
00214 asio::write(port, asio::buffer(msg.data(), msg.size()), ec);
00215 }
00216 if(ec == asio::error::operation_aborted && log_level_ >= WARNING) {
00217
00218 std::cerr << "------------------> WRITE FAILED <--------------------"
00219 << std::endl;
00220 return false;
00221 }
00222 if(log_level_ >= DEBUG)
00223 std::cout << "write complete: " << bytes_written << " vs " << msg.size()
00224 << std::endl;
00225 return (bytes_written == msg.size());
00226 }
00227
00228 ByteVector Client::packMessageV1(const msp::ID id,
00229 const ByteVector& data) const {
00230 ByteVector msg;
00231 msg.push_back('$');
00232 msg.push_back('M');
00233 msg.push_back('<');
00234 msg.push_back(uint8_t(data.size()));
00235 msg.push_back(uint8_t(id));
00236 msg.insert(msg.end(), data.begin(), data.end());
00237 msg.push_back(crcV1(uint8_t(id), data));
00238 return msg;
00239 }
00240
00241 uint8_t Client::crcV1(const uint8_t id, const ByteVector& data) const {
00242 uint8_t crc = uint8_t(data.size()) ^ id;
00243 for(const uint8_t d : data) {
00244 crc = crc ^ d;
00245 }
00246 return crc;
00247 }
00248
00249 ByteVector Client::packMessageV2(const msp::ID id,
00250 const ByteVector& data) const {
00251 ByteVector msg;
00252 msg.push_back('$');
00253 msg.push_back('X');
00254 msg.push_back('<');
00255 msg.push_back(0);
00256 msg.push_back(uint8_t(uint16_t(id) & 0xFF));
00257 msg.push_back(uint8_t(uint16_t(id) >> 8));
00258
00259 const uint16_t size = uint16_t(data.size());
00260 msg.push_back(uint8_t(size & 0xFF));
00261 msg.push_back(uint8_t(size >> 8));
00262
00263 msg.insert(msg.end(), data.begin(), data.end());
00264 msg.push_back(crcV2(0, ByteVector(msg.begin() + 3, msg.end())));
00265
00266 return msg;
00267 }
00268
00269 uint8_t Client::crcV2(uint8_t crc, const ByteVector& data) const {
00270 for(const uint8_t& p : data) {
00271 crc = crcV2(crc, p);
00272 }
00273 return crc;
00274 }
00275
00276 uint8_t Client::crcV2(uint8_t crc, const uint8_t& b) const {
00277 crc ^= b;
00278 for(int ii = 0; ii < 8; ++ii) {
00279 if(crc & 0x80) {
00280 crc = uint8_t(crc << 1) ^ 0xD5;
00281 }
00282 else {
00283 crc = uint8_t(crc << 1);
00284 }
00285 }
00286 return crc;
00287 }
00288
00289 void Client::processOneMessage(const asio::error_code& ec,
00290 const std::size_t& bytes_transferred) {
00291 if(log_level_ >= DEBUG)
00292 std::cout << "processOneMessage on " << bytes_transferred << " bytes"
00293 << std::endl;
00294
00295 if(ec == asio::error::operation_aborted) {
00296
00297
00298 cv_response.notify_all();
00299 return;
00300 }
00301
00302
00303 const uint8_t msg_marker = extractChar();
00304 if(msg_marker != '$')
00305 std::cerr << "Message marker " << size_t(msg_marker)
00306 << " is not recognised!" << std::endl;
00307
00308
00309 int ver = 0;
00310 const uint8_t ver_marker = extractChar();
00311 if(ver_marker == 'M') ver = 1;
00312 if(ver_marker == 'X') ver = 2;
00313 if(ver == 0) {
00314 std::cerr << "Version marker " << size_t(ver_marker)
00315 << " is not recognised!" << std::endl;
00316 }
00317
00318 ReceivedMessage recv_msg;
00319 if(ver == 2)
00320 recv_msg = processOneMessageV2();
00321 else
00322 recv_msg = processOneMessageV1();
00323
00324 {
00325 std::lock_guard<std::mutex> lock2(cv_response_mtx);
00326 std::lock_guard<std::mutex> lock(mutex_response);
00327 request_received.reset(new ReceivedMessage(recv_msg));
00328 }
00329
00330 cv_response.notify_all();
00331
00332
00333 {
00334 std::lock_guard<std::mutex> lock(mutex_subscriptions);
00335 std::lock_guard<std::mutex> lock2(mutex_response);
00336 if(request_received->status == OK &&
00337 subscriptions.count(ID(request_received->id))) {
00338 subscriptions.at(ID(request_received->id))
00339 ->decode(request_received->payload);
00340 }
00341 }
00342
00343 asio::async_read_until(port,
00344 buffer,
00345 std::bind(&Client::messageReady,
00346 this,
00347 std::placeholders::_1,
00348 std::placeholders::_2),
00349 std::bind(&Client::processOneMessage,
00350 this,
00351 std::placeholders::_1,
00352 std::placeholders::_2));
00353
00354 if(log_level_ >= DEBUG)
00355 std::cout << "processOneMessage finished" << std::endl;
00356 }
00357
00358 std::pair<iterator, bool> Client::messageReady(iterator begin,
00359 iterator end) const {
00360 iterator i = begin;
00361 const size_t available = size_t(std::distance(begin, end));
00362
00363 if(available < 2) return std::make_pair(begin, false);
00364
00365 if(*i == '$' && *(i + 1) == 'M') {
00366
00367 if(available < 6) return std::make_pair(begin, false);
00368
00369 const uint8_t payload_size = uint8_t(*(i + 3));
00370
00371 if(available < size_t(5 + payload_size + 1))
00372 return std::make_pair(begin, false);
00373
00374 std::advance(i, 5 + payload_size + 1);
00375 }
00376 else if(*i == '$' && *(i + 1) == 'X') {
00377
00378 if(available < 9) return std::make_pair(begin, false);
00379
00380 const uint16_t payload_size =
00381 uint8_t(*(i + 6)) | uint8_t(*(i + 7) << 8);
00382
00383
00384 if(available < size_t(8 + payload_size + 1))
00385 return std::make_pair(begin, false);
00386
00387 std::advance(i, 8 + payload_size + 1);
00388 }
00389 else {
00390 for(; i != end; ++i) {
00391 if(*i == '$') break;
00392 }
00393
00394 }
00395
00396 return std::make_pair(i, true);
00397 }
00398
00399 ReceivedMessage Client::processOneMessageV1() {
00400 ReceivedMessage ret;
00401
00402 ret.status = OK;
00403
00404
00405 const uint8_t dir = extractChar();
00406 const bool ok_id = (dir != '!');
00407
00408
00409 const uint8_t len = extractChar();
00410
00411
00412 uint8_t id = extractChar();
00413 ret.id = msp::ID(id);
00414
00415 if(log_level_ >= WARNING && !ok_id) {
00416 std::cerr << "Message v1 with ID " << size_t(ret.id)
00417 << " is not recognised!" << std::endl;
00418 }
00419
00420
00421 for(size_t i(0); i < len; i++) {
00422 ret.payload.push_back(extractChar());
00423 }
00424
00425
00426 const uint8_t rcv_crc = extractChar();
00427 const uint8_t exp_crc = crcV1(id, ret.payload);
00428 const bool ok_crc = (rcv_crc == exp_crc);
00429
00430 if(log_level_ >= WARNING && !ok_crc) {
00431 std::cerr << "Message v1 with ID " << size_t(ret.id)
00432 << " has wrong CRC! (expected: " << size_t(exp_crc)
00433 << ", received: " << size_t(rcv_crc) << ")" << std::endl;
00434 }
00435
00436 if(!ok_id) {
00437 ret.status = FAIL_ID;
00438 }
00439 else if(!ok_crc) {
00440 ret.status = FAIL_CRC;
00441 }
00442
00443 return ret;
00444 }
00445
00446 ReceivedMessage Client::processOneMessageV2() {
00447 ReceivedMessage ret;
00448
00449 ret.status = OK;
00450
00451 uint8_t exp_crc = 0;
00452
00453
00454 const uint8_t dir = extractChar();
00455 if(log_level_ >= DEBUG) std::cout << "dir: " << size_t(dir) << std::endl;
00456 const bool ok_id = (dir != '!');
00457
00458
00459 const uint8_t flag = extractChar();
00460 if(log_level_ >= DEBUG) std::cout << "flag: " << size_t(flag) << std::endl;
00461 exp_crc = crcV2(exp_crc, flag);
00462
00463
00464 const uint8_t id_low = extractChar();
00465 const uint8_t id_high = extractChar();
00466 uint16_t id = uint16_t(id_low) | uint16_t(id_high << 8);
00467 ret.id = msp::ID(id);
00468 if(log_level_ >= DEBUG) std::cout << "id: " << size_t(id) << std::endl;
00469 exp_crc = crcV2(exp_crc, id_low);
00470 exp_crc = crcV2(exp_crc, id_high);
00471
00472
00473 const uint8_t len_low = extractChar();
00474 const uint8_t len_high = extractChar();
00475 uint32_t len = uint32_t(len_low) | (uint32_t(len_high) << 8);
00476 exp_crc = crcV2(exp_crc, len_low);
00477 exp_crc = crcV2(exp_crc, len_high);
00478 if(log_level_ >= DEBUG) std::cout << "len: " << len << std::endl;
00479
00480 if(log_level_ >= WARNING && !ok_id) {
00481 std::cerr << "Message v2 with ID " << size_t(ret.id)
00482 << " is not recognised!" << std::endl;
00483 }
00484
00485
00486 ByteVector data;
00487 for(size_t i(0); i < len; i++) {
00488 ret.payload.push_back(extractChar());
00489 }
00490
00491 exp_crc = crcV2(exp_crc, ret.payload);
00492
00493
00494 const uint8_t rcv_crc = extractChar();
00495
00496 const bool ok_crc = (rcv_crc == exp_crc);
00497
00498 if(log_level_ >= WARNING && !ok_crc) {
00499 std::cerr << "Message v2 with ID " << size_t(ret.id)
00500 << " has wrong CRC! (expected: " << size_t(exp_crc)
00501 << ", received: " << size_t(rcv_crc) << ")" << std::endl;
00502 }
00503
00504 if(!ok_id) {
00505 ret.status = FAIL_ID;
00506 }
00507 else if(!ok_crc) {
00508 ret.status = FAIL_CRC;
00509 }
00510
00511 return ret;
00512 }
00513
00514 }
00515 }