Client.cpp
Go to the documentation of this file.
00001 #include <Client.hpp>
00002 #include <cstdlib>
00003 #include <iostream>
00004 
00005 typedef unsigned int uint;
00006 
00007 namespace msp {
00008 namespace client {
00009 
00010 Client::Client() :
00011     port(io),
00012     log_level_(SILENT),
00013     msp_ver_(1),
00014     fw_variant(FirmwareVariant::INAV) {}
00015 
00016 Client::~Client() {}
00017 
00018 void Client::setLoggingLevel(const LoggingLevel& level) { log_level_ = level; }
00019 
00020 bool Client::setVersion(const int& ver) {
00021     if(ver == 1 || ver == 2) {
00022         msp_ver_ = ver;
00023         return true;
00024     }
00025     return false;
00026 }
00027 
00028 int Client::getVersion() const { return msp_ver_; }
00029 
00030 void Client::setVariant(const FirmwareVariant& v) { fw_variant = v; }
00031 
00032 FirmwareVariant Client::getVariant() const { return fw_variant; }
00033 
00034 bool Client::start(const std::string& device, const size_t baudrate) {
00035     return connectPort(device, baudrate) && startReadThread() &&
00036            startSubscriptions();
00037 }
00038 
00039 bool Client::stop() {
00040     return disconnectPort() && stopReadThread() && stopSubscriptions();
00041 }
00042 
00043 bool Client::connectPort(const std::string& device, const size_t baudrate) {
00044     try {
00045         port.open(device);
00046         port.set_option(asio::serial_port::baud_rate(uint(baudrate)));
00047         port.set_option(
00048             asio::serial_port::parity(asio::serial_port::parity::none));
00049         port.set_option(asio::serial_port::character_size(
00050             asio::serial_port::character_size(8)));
00051         port.set_option(
00052             asio::serial_port::stop_bits(asio::serial_port::stop_bits::one));
00053     }
00054     catch(const std::system_error& e) {
00055         const int ecode = e.code().value();
00056         throw std::runtime_error("Error when opening '" + device +
00057                                  "': " + e.code().category().message(ecode) +
00058                                  " (error code: " + std::to_string(ecode) +
00059                                  ")");
00060     }
00061     return isConnected();
00062 }
00063 
00064 bool Client::disconnectPort() {
00065     asio::error_code ec;
00066     port.close(ec);
00067     if(ec) return false;
00068     return true;
00069 }
00070 
00071 bool Client::isConnected() const { return port.is_open(); }
00072 
00073 bool Client::startReadThread() {
00074     // no point reading if we arent connected to anything
00075     if(!isConnected()) return false;
00076     // can't start if we are already running
00077     if(running_.test_and_set()) return false;
00078     // hit it!
00079     thread = std::thread([this] {
00080         asio::async_read_until(port,
00081                                buffer,
00082                                std::bind(&Client::messageReady,
00083                                          this,
00084                                          std::placeholders::_1,
00085                                          std::placeholders::_2),
00086                                std::bind(&Client::processOneMessage,
00087                                          this,
00088                                          std::placeholders::_1,
00089                                          std::placeholders::_2));
00090         io.run();
00091     });
00092     return true;
00093 }
00094 
00095 bool Client::stopReadThread() {
00096     bool rc = false;
00097     if(running_.test_and_set()) {
00098         io.stop();
00099         thread.join();
00100         io.reset();
00101         rc = true;
00102     }
00103     running_.clear();
00104     return rc;
00105 }
00106 
00107 bool Client::startSubscriptions() {
00108     bool rc = true;
00109     for(const auto& sub : subscriptions) {
00110         rc &= sub.second->start();
00111     }
00112     return rc;
00113 }
00114 
00115 bool Client::stopSubscriptions() {
00116     bool rc = true;
00117     for(const auto& sub : subscriptions) {
00118         rc &= sub.second->stop();
00119     }
00120     return rc;
00121 }
00122 
00123 bool Client::sendMessage(msp::Message& message, const double& timeout) {
00124     if(log_level_ >= DEBUG)
00125         std::cout << "sending message - ID " << size_t(message.id())
00126                   << std::endl;
00127     if(!sendData(message.id(), message.encode())) {
00128         if(log_level_ >= WARNING)
00129             std::cerr << "message failed to send" << std::endl;
00130         return false;
00131     }
00132     // prepare the condition check
00133     std::unique_lock<std::mutex> lock(cv_response_mtx);
00134     const auto predicate = [&] {
00135         mutex_response.lock();
00136         const bool received = (request_received != nullptr) &&
00137                               (request_received->id == message.id());
00138         // unlock to wait for next message
00139         if(!received) {
00140             mutex_response.unlock();
00141         }
00142         return received;
00143     };
00144     // depending on the timeout, we may wait a fixed amount of time, or
00145     // indefinitely
00146     if(timeout > 0) {
00147         if(!cv_response.wait_for(
00148                lock,
00149                std::chrono::milliseconds(size_t(timeout * 1e3)),
00150                predicate)) {
00151             if(log_level_ >= INFO)
00152                 std::cout << "timed out waiting for response to message ID "
00153                           << size_t(message.id()) << std::endl;
00154             return false;
00155         }
00156     }
00157     else {
00158         cv_response.wait(lock, predicate);
00159     }
00160     // check status
00161     const bool recv_success = request_received->status == OK;
00162     ByteVector data;
00163     if(recv_success) {
00164         // make local copy of the data so that the read thread can keep moving
00165         data = request_received->payload;
00166     }
00167     mutex_response.unlock();
00168     // decode the local copy of the payload
00169     bool decode_success = false;
00170     if(recv_success) {
00171         decode_success = message.decode(data);
00172     }
00173     return recv_success && decode_success;
00174 }
00175 
00176 bool Client::sendMessageNoWait(const msp::Message& message) {
00177     if(log_level_ >= DEBUG)
00178         std::cout << "async sending message - ID " << size_t(message.id())
00179                   << std::endl;
00180     if(!sendData(message.id(), message.encode())) {
00181         if(log_level_ >= WARNING)
00182             std::cerr << "async sendData failed" << std::endl;
00183         return false;
00184     }
00185     return true;
00186 }
00187 
00188 uint8_t Client::extractChar() {
00189     if(buffer.sgetc() == EOF) {
00190         if(log_level_ >= WARNING)
00191             std::cerr << "buffer returned EOF; reading char directly from port"
00192                       << std::endl;
00193         asio::read(port, buffer, asio::transfer_exactly(1));
00194     }
00195     return uint8_t(buffer.sbumpc());
00196 }
00197 
00198 bool Client::sendData(const msp::ID id, const ByteVector& data) {
00199     if(log_level_ >= DEBUG)
00200         std::cout << "sending: " << size_t(id) << " | " << data;
00201     ByteVector msg;
00202     if(msp_ver_ == 2) {
00203         msg = packMessageV2(id, data);
00204     }
00205     else {
00206         msg = packMessageV1(id, data);
00207     }
00208     if(log_level_ >= DEBUG) std::cout << "packed: " << msg;
00209     asio::error_code ec;
00210     std::size_t bytes_written;
00211     {
00212         std::lock_guard<std::mutex> lock(mutex_send);
00213         bytes_written =
00214             asio::write(port, asio::buffer(msg.data(), msg.size()), ec);
00215     }
00216     if(ec == asio::error::operation_aborted && log_level_ >= WARNING) {
00217         // operation_aborted error probably means the client is being closed
00218         std::cerr << "------------------> WRITE FAILED <--------------------"
00219                   << std::endl;
00220         return false;
00221     }
00222     if(log_level_ >= DEBUG)
00223         std::cout << "write complete: " << bytes_written << " vs " << msg.size()
00224                   << std::endl;
00225     return (bytes_written == msg.size());
00226 }
00227 
00228 ByteVector Client::packMessageV1(const msp::ID id,
00229                                  const ByteVector& data) const {
00230     ByteVector msg;
00231     msg.push_back('$');                               // preamble1
00232     msg.push_back('M');                               // preamble2
00233     msg.push_back('<');                               // direction
00234     msg.push_back(uint8_t(data.size()));              // data size
00235     msg.push_back(uint8_t(id));                       // message_id
00236     msg.insert(msg.end(), data.begin(), data.end());  // data
00237     msg.push_back(crcV1(uint8_t(id), data));          // crc
00238     return msg;
00239 }
00240 
00241 uint8_t Client::crcV1(const uint8_t id, const ByteVector& data) const {
00242     uint8_t crc = uint8_t(data.size()) ^ id;
00243     for(const uint8_t d : data) {
00244         crc = crc ^ d;
00245     }
00246     return crc;
00247 }
00248 
00249 ByteVector Client::packMessageV2(const msp::ID id,
00250                                  const ByteVector& data) const {
00251     ByteVector msg;
00252     msg.push_back('$');                           // preamble1
00253     msg.push_back('X');                           // preamble2
00254     msg.push_back('<');                           // direction
00255     msg.push_back(0);                             // flag
00256     msg.push_back(uint8_t(uint16_t(id) & 0xFF));  // message_id low bits
00257     msg.push_back(uint8_t(uint16_t(id) >> 8));    // message_id high bits
00258 
00259     const uint16_t size = uint16_t(data.size());
00260     msg.push_back(uint8_t(size & 0xFF));  // data size low bits
00261     msg.push_back(uint8_t(size >> 8));    // data size high bits
00262 
00263     msg.insert(msg.end(), data.begin(), data.end());                  // data
00264     msg.push_back(crcV2(0, ByteVector(msg.begin() + 3, msg.end())));  // crc
00265 
00266     return msg;
00267 }
00268 
00269 uint8_t Client::crcV2(uint8_t crc, const ByteVector& data) const {
00270     for(const uint8_t& p : data) {
00271         crc = crcV2(crc, p);
00272     }
00273     return crc;
00274 }
00275 
00276 uint8_t Client::crcV2(uint8_t crc, const uint8_t& b) const {
00277     crc ^= b;
00278     for(int ii = 0; ii < 8; ++ii) {
00279         if(crc & 0x80) {
00280             crc = uint8_t(crc << 1) ^ 0xD5;
00281         }
00282         else {
00283             crc = uint8_t(crc << 1);
00284         }
00285     }
00286     return crc;
00287 }
00288 
00289 void Client::processOneMessage(const asio::error_code& ec,
00290                                const std::size_t& bytes_transferred) {
00291     if(log_level_ >= DEBUG)
00292         std::cout << "processOneMessage on " << bytes_transferred << " bytes"
00293                   << std::endl;
00294 
00295     if(ec == asio::error::operation_aborted) {
00296         // operation_aborted error probably means the client is being closed
00297         // notify waiting request methods
00298         cv_response.notify_all();
00299         return;
00300     }
00301 
00302     // ignore and remove header bytes
00303     const uint8_t msg_marker = extractChar();
00304     if(msg_marker != '$')
00305         std::cerr << "Message marker " << size_t(msg_marker)
00306                   << " is not recognised!" << std::endl;
00307 
00308     // message version
00309     int ver                  = 0;
00310     const uint8_t ver_marker = extractChar();
00311     if(ver_marker == 'M') ver = 1;
00312     if(ver_marker == 'X') ver = 2;
00313     if(ver == 0) {
00314         std::cerr << "Version marker " << size_t(ver_marker)
00315                   << " is not recognised!" << std::endl;
00316     }
00317 
00318     ReceivedMessage recv_msg;
00319     if(ver == 2)
00320         recv_msg = processOneMessageV2();
00321     else
00322         recv_msg = processOneMessageV1();
00323 
00324     {
00325         std::lock_guard<std::mutex> lock2(cv_response_mtx);
00326         std::lock_guard<std::mutex> lock(mutex_response);
00327         request_received.reset(new ReceivedMessage(recv_msg));
00328     }
00329     // notify waiting request methods
00330     cv_response.notify_all();
00331 
00332     // check subscriptions
00333     {
00334         std::lock_guard<std::mutex> lock(mutex_subscriptions);
00335         std::lock_guard<std::mutex> lock2(mutex_response);
00336         if(request_received->status == OK &&
00337            subscriptions.count(ID(request_received->id))) {
00338             subscriptions.at(ID(request_received->id))
00339                 ->decode(request_received->payload);
00340         }
00341     }
00342 
00343     asio::async_read_until(port,
00344                            buffer,
00345                            std::bind(&Client::messageReady,
00346                                      this,
00347                                      std::placeholders::_1,
00348                                      std::placeholders::_2),
00349                            std::bind(&Client::processOneMessage,
00350                                      this,
00351                                      std::placeholders::_1,
00352                                      std::placeholders::_2));
00353 
00354     if(log_level_ >= DEBUG)
00355         std::cout << "processOneMessage finished" << std::endl;
00356 }
00357 
00358 std::pair<iterator, bool> Client::messageReady(iterator begin,
00359                                                iterator end) const {
00360     iterator i             = begin;
00361     const size_t available = size_t(std::distance(begin, end));
00362 
00363     if(available < 2) return std::make_pair(begin, false);
00364 
00365     if(*i == '$' && *(i + 1) == 'M') {
00366         // not even enough data for a header
00367         if(available < 6) return std::make_pair(begin, false);
00368 
00369         const uint8_t payload_size = uint8_t(*(i + 3));
00370         // incomplete xfer
00371         if(available < size_t(5 + payload_size + 1))
00372             return std::make_pair(begin, false);
00373 
00374         std::advance(i, 5 + payload_size + 1);
00375     }
00376     else if(*i == '$' && *(i + 1) == 'X') {
00377         // not even enough data for a header
00378         if(available < 9) return std::make_pair(begin, false);
00379 
00380         const uint16_t payload_size =
00381             uint8_t(*(i + 6)) | uint8_t(*(i + 7) << 8);
00382 
00383         // incomplete xfer
00384         if(available < size_t(8 + payload_size + 1))
00385             return std::make_pair(begin, false);
00386 
00387         std::advance(i, 8 + payload_size + 1);
00388     }
00389     else {
00390         for(; i != end; ++i) {
00391             if(*i == '$') break;
00392         }
00393         // implicitly consume all if $ not found
00394     }
00395 
00396     return std::make_pair(i, true);
00397 }
00398 
00399 ReceivedMessage Client::processOneMessageV1() {
00400     ReceivedMessage ret;
00401 
00402     ret.status = OK;
00403 
00404     // message direction
00405     const uint8_t dir = extractChar();
00406     const bool ok_id  = (dir != '!');
00407 
00408     // payload length
00409     const uint8_t len = extractChar();
00410 
00411     // message ID
00412     uint8_t id = extractChar();
00413     ret.id     = msp::ID(id);
00414 
00415     if(log_level_ >= WARNING && !ok_id) {
00416         std::cerr << "Message v1 with ID " << size_t(ret.id)
00417                   << " is not recognised!" << std::endl;
00418     }
00419 
00420     // payload
00421     for(size_t i(0); i < len; i++) {
00422         ret.payload.push_back(extractChar());
00423     }
00424 
00425     // CRC
00426     const uint8_t rcv_crc = extractChar();
00427     const uint8_t exp_crc = crcV1(id, ret.payload);
00428     const bool ok_crc     = (rcv_crc == exp_crc);
00429 
00430     if(log_level_ >= WARNING && !ok_crc) {
00431         std::cerr << "Message v1 with ID " << size_t(ret.id)
00432                   << " has wrong CRC! (expected: " << size_t(exp_crc)
00433                   << ", received: " << size_t(rcv_crc) << ")" << std::endl;
00434     }
00435 
00436     if(!ok_id) {
00437         ret.status = FAIL_ID;
00438     }
00439     else if(!ok_crc) {
00440         ret.status = FAIL_CRC;
00441     }
00442 
00443     return ret;
00444 }
00445 
00446 ReceivedMessage Client::processOneMessageV2() {
00447     ReceivedMessage ret;
00448 
00449     ret.status = OK;
00450 
00451     uint8_t exp_crc = 0;
00452 
00453     // message direction
00454     const uint8_t dir = extractChar();
00455     if(log_level_ >= DEBUG) std::cout << "dir: " << size_t(dir) << std::endl;
00456     const bool ok_id = (dir != '!');
00457 
00458     // flag
00459     const uint8_t flag = extractChar();
00460     if(log_level_ >= DEBUG) std::cout << "flag: " << size_t(flag) << std::endl;
00461     exp_crc = crcV2(exp_crc, flag);
00462 
00463     // message ID
00464     const uint8_t id_low  = extractChar();
00465     const uint8_t id_high = extractChar();
00466     uint16_t id           = uint16_t(id_low) | uint16_t(id_high << 8);
00467     ret.id                = msp::ID(id);
00468     if(log_level_ >= DEBUG) std::cout << "id: " << size_t(id) << std::endl;
00469     exp_crc = crcV2(exp_crc, id_low);
00470     exp_crc = crcV2(exp_crc, id_high);
00471 
00472     // payload length
00473     const uint8_t len_low  = extractChar();
00474     const uint8_t len_high = extractChar();
00475     uint32_t len           = uint32_t(len_low) | (uint32_t(len_high) << 8);
00476     exp_crc                = crcV2(exp_crc, len_low);
00477     exp_crc                = crcV2(exp_crc, len_high);
00478     if(log_level_ >= DEBUG) std::cout << "len: " << len << std::endl;
00479 
00480     if(log_level_ >= WARNING && !ok_id) {
00481         std::cerr << "Message v2 with ID " << size_t(ret.id)
00482                   << " is not recognised!" << std::endl;
00483     }
00484 
00485     // payload
00486     ByteVector data;
00487     for(size_t i(0); i < len; i++) {
00488         ret.payload.push_back(extractChar());
00489     }
00490 
00491     exp_crc = crcV2(exp_crc, ret.payload);
00492 
00493     // CRC
00494     const uint8_t rcv_crc = extractChar();
00495 
00496     const bool ok_crc = (rcv_crc == exp_crc);
00497 
00498     if(log_level_ >= WARNING && !ok_crc) {
00499         std::cerr << "Message v2 with ID " << size_t(ret.id)
00500                   << " has wrong CRC! (expected: " << size_t(exp_crc)
00501                   << ", received: " << size_t(rcv_crc) << ")" << std::endl;
00502     }
00503 
00504     if(!ok_id) {
00505         ret.status = FAIL_ID;
00506     }
00507     else if(!ok_crc) {
00508         ret.status = FAIL_CRC;
00509     }
00510 
00511     return ret;
00512 }
00513 
00514 }  // namespace client
00515 }  // namespace msp


msp
Author(s): Christian Rauch
autogenerated on Thu Jun 20 2019 19:40:38