Client.cpp
Go to the documentation of this file.
1 #include <Client.hpp>
2 #include <cstdlib>
3 #include <iostream>
4 
5 typedef unsigned int uint;
6 
7 namespace msp {
8 namespace client {
9 
11  port(io),
12  log_level_(SILENT),
13  msp_ver_(1),
14  fw_variant(FirmwareVariant::INAV) {}
15 
17 
18 void Client::setLoggingLevel(const LoggingLevel& level) { log_level_ = level; }
19 
20 bool Client::setVersion(const int& ver) {
21  if(ver == 1 || ver == 2) {
22  msp_ver_ = ver;
23  return true;
24  }
25  return false;
26 }
27 
28 int Client::getVersion() const { return msp_ver_; }
29 
31 
33 
34 bool Client::start(const std::string& device, const size_t baudrate) {
35  return connectPort(device, baudrate) && startReadThread() &&
37 }
38 
39 bool Client::stop() {
41 }
42 
43 bool Client::connectPort(const std::string& device, const size_t baudrate) {
44  try {
45  port.open(device);
46  port.set_option(asio::serial_port::baud_rate(uint(baudrate)));
47  port.set_option(
48  asio::serial_port::parity(asio::serial_port::parity::none));
49  port.set_option(asio::serial_port::character_size(
50  asio::serial_port::character_size(8)));
51  port.set_option(
52  asio::serial_port::stop_bits(asio::serial_port::stop_bits::one));
53  }
54  catch(const std::system_error& e) {
55  const int ecode = e.code().value();
56  throw std::runtime_error("Error when opening '" + device +
57  "': " + e.code().category().message(ecode) +
58  " (error code: " + std::to_string(ecode) +
59  ")");
60  }
61  return isConnected();
62 }
63 
65  asio::error_code ec;
66  port.close(ec);
67  if(ec) return false;
68  return true;
69 }
70 
71 bool Client::isConnected() const { return port.is_open(); }
72 
74  // no point reading if we arent connected to anything
75  if(!isConnected()) return false;
76  // can't start if we are already running
77  if(running_.test_and_set()) return false;
78  // hit it!
79  thread = std::thread([this] {
80  asio::async_read_until(port,
81  buffer,
82  std::bind(&Client::messageReady,
83  this,
84  std::placeholders::_1,
85  std::placeholders::_2),
86  std::bind(&Client::processOneMessage,
87  this,
88  std::placeholders::_1,
89  std::placeholders::_2));
90  io.run();
91  });
92  return true;
93 }
94 
96  bool rc = false;
97  if(running_.test_and_set()) {
98  io.stop();
99  thread.join();
100  io.reset();
101  rc = true;
102  }
103  running_.clear();
104  return rc;
105 }
106 
108  bool rc = true;
109  for(const auto& sub : subscriptions) {
110  rc &= sub.second->start();
111  }
112  return rc;
113 }
114 
116  bool rc = true;
117  for(const auto& sub : subscriptions) {
118  rc &= sub.second->stop();
119  }
120  return rc;
121 }
122 
123 bool Client::sendMessage(msp::Message& message, const double& timeout) {
124  if(log_level_ >= DEBUG)
125  std::cout << "sending message - ID " << size_t(message.id())
126  << std::endl;
127  if(!sendData(message.id(), message.encode())) {
128  if(log_level_ >= WARNING)
129  std::cerr << "message failed to send" << std::endl;
130  return false;
131  }
132  // prepare the condition check
133  std::unique_lock<std::mutex> lock(cv_response_mtx);
134  const auto predicate = [&] {
135  mutex_response.lock();
136  const bool received = (request_received != nullptr) &&
137  (request_received->id == message.id());
138  // unlock to wait for next message
139  if(!received) {
140  mutex_response.unlock();
141  }
142  return received;
143  };
144  // depending on the timeout, we may wait a fixed amount of time, or
145  // indefinitely
146  if(timeout > 0) {
147  if(!cv_response.wait_for(
148  lock,
149  std::chrono::milliseconds(size_t(timeout * 1e3)),
150  predicate)) {
151  if(log_level_ >= INFO)
152  std::cout << "timed out waiting for response to message ID "
153  << size_t(message.id()) << std::endl;
154  return false;
155  }
156  }
157  else {
158  cv_response.wait(lock, predicate);
159  }
160  // check status
161  const bool recv_success = request_received->status == OK;
162  ByteVector data;
163  if(recv_success) {
164  // make local copy of the data so that the read thread can keep moving
165  data = request_received->payload;
166  }
167  mutex_response.unlock();
168  // decode the local copy of the payload
169  const bool decode_success = data.size() == 0 ? true : message.decode(data);
170  return recv_success && decode_success;
171 }
172 
174  if(log_level_ >= DEBUG)
175  std::cout << "async sending message - ID " << size_t(message.id())
176  << std::endl;
177  if(!sendData(message.id(), message.encode())) {
178  if(log_level_ >= WARNING)
179  std::cerr << "async sendData failed" << std::endl;
180  return false;
181  }
182  return true;
183 }
184 
186  if(buffer.sgetc() == EOF) {
187  if(log_level_ >= WARNING)
188  std::cerr << "buffer returned EOF; reading char directly from port"
189  << std::endl;
190  asio::read(port, buffer, asio::transfer_exactly(1));
191  }
192  return uint8_t(buffer.sbumpc());
193 }
194 
195 bool Client::sendData(const msp::ID id, const ByteVector& data) {
196  if(log_level_ >= DEBUG)
197  std::cout << "sending: " << size_t(id) << " | " << data;
198  ByteVector msg;
199  if(msp_ver_ == 2) {
200  msg = packMessageV2(id, data);
201  }
202  else {
203  msg = packMessageV1(id, data);
204  }
205  if(log_level_ >= DEBUG) std::cout << "packed: " << msg;
206  asio::error_code ec;
207  std::size_t bytes_written;
208  {
209  std::lock_guard<std::mutex> lock(mutex_send);
210  bytes_written =
211  asio::write(port, asio::buffer(msg.data(), msg.size()), ec);
212  }
213  if(ec == asio::error::operation_aborted && log_level_ >= WARNING) {
214  // operation_aborted error probably means the client is being closed
215  std::cerr << "------------------> WRITE FAILED <--------------------"
216  << std::endl;
217  return false;
218  }
219  if(log_level_ >= DEBUG)
220  std::cout << "write complete: " << bytes_written << " vs " << msg.size()
221  << std::endl;
222  return (bytes_written == msg.size());
223 }
224 
226  const ByteVector& data) const {
227  ByteVector msg;
228  msg.push_back('$'); // preamble1
229  msg.push_back('M'); // preamble2
230  msg.push_back('<'); // direction
231  msg.push_back(uint8_t(data.size())); // data size
232  msg.push_back(uint8_t(id)); // message_id
233  msg.insert(msg.end(), data.begin(), data.end()); // data
234  msg.push_back(crcV1(uint8_t(id), data)); // crc
235  return msg;
236 }
237 
238 uint8_t Client::crcV1(const uint8_t id, const ByteVector& data) const {
239  uint8_t crc = uint8_t(data.size()) ^ id;
240  for(const uint8_t d : data) {
241  crc = crc ^ d;
242  }
243  return crc;
244 }
245 
247  const ByteVector& data) const {
248  ByteVector msg;
249  msg.push_back('$'); // preamble1
250  msg.push_back('X'); // preamble2
251  msg.push_back('<'); // direction
252  msg.push_back(0); // flag
253  msg.push_back(uint8_t(uint16_t(id) & 0xFF)); // message_id low bits
254  msg.push_back(uint8_t(uint16_t(id) >> 8)); // message_id high bits
255 
256  const uint16_t size = uint16_t(data.size());
257  msg.push_back(uint8_t(size & 0xFF)); // data size low bits
258  msg.push_back(uint8_t(size >> 8)); // data size high bits
259 
260  msg.insert(msg.end(), data.begin(), data.end()); // data
261  msg.push_back(crcV2(0, ByteVector(msg.begin() + 3, msg.end()))); // crc
262 
263  return msg;
264 }
265 
266 uint8_t Client::crcV2(uint8_t crc, const ByteVector& data) const {
267  for(const uint8_t& p : data) {
268  crc = crcV2(crc, p);
269  }
270  return crc;
271 }
272 
273 uint8_t Client::crcV2(uint8_t crc, const uint8_t& b) const {
274  crc ^= b;
275  for(int ii = 0; ii < 8; ++ii) {
276  if(crc & 0x80) {
277  crc = uint8_t(crc << 1) ^ 0xD5;
278  }
279  else {
280  crc = uint8_t(crc << 1);
281  }
282  }
283  return crc;
284 }
285 
286 void Client::processOneMessage(const asio::error_code& ec,
287  const std::size_t& bytes_transferred) {
288  if(log_level_ >= DEBUG)
289  std::cout << "processOneMessage on " << bytes_transferred << " bytes"
290  << std::endl;
291 
292  if(ec == asio::error::operation_aborted) {
293  // operation_aborted error probably means the client is being closed
294  // notify waiting request methods
295  cv_response.notify_all();
296  return;
297  }
298 
299  // ignore and remove header bytes
300  const uint8_t msg_marker = extractChar();
301  if(msg_marker != '$')
302  std::cerr << "Message marker " << size_t(msg_marker)
303  << " is not recognised!" << std::endl;
304 
305  // message version
306  int ver = 0;
307  const uint8_t ver_marker = extractChar();
308  if(ver_marker == 'M') ver = 1;
309  if(ver_marker == 'X') ver = 2;
310  if(ver == 0) {
311  std::cerr << "Version marker " << size_t(ver_marker)
312  << " is not recognised!" << std::endl;
313  }
314 
315  ReceivedMessage recv_msg;
316  if(ver == 2)
317  recv_msg = processOneMessageV2();
318  else
319  recv_msg = processOneMessageV1();
320 
321  {
322  std::lock_guard<std::mutex> lock2(cv_response_mtx);
323  std::lock_guard<std::mutex> lock(mutex_response);
324  request_received.reset(new ReceivedMessage(recv_msg));
325  }
326  // notify waiting request methods
327  cv_response.notify_all();
328 
329  // check subscriptions
330  {
331  std::lock_guard<std::mutex> lock(mutex_subscriptions);
332  std::lock_guard<std::mutex> lock2(mutex_response);
333  if(request_received->status == OK &&
334  subscriptions.count(ID(request_received->id))) {
336  ->decode(request_received->payload);
337  }
338  }
339 
340  asio::async_read_until(port,
341  buffer,
342  std::bind(&Client::messageReady,
343  this,
344  std::placeholders::_1,
345  std::placeholders::_2),
346  std::bind(&Client::processOneMessage,
347  this,
348  std::placeholders::_1,
349  std::placeholders::_2));
350 
351  if(log_level_ >= DEBUG)
352  std::cout << "processOneMessage finished" << std::endl;
353 }
354 
355 std::pair<iterator, bool> Client::messageReady(iterator begin,
356  iterator end) const {
357  iterator i = begin;
358  const size_t available = size_t(std::distance(begin, end));
359 
360  if(available < 2) return std::make_pair(begin, false);
361 
362  if(*i == '$' && *(i + 1) == 'M') {
363  // not even enough data for a header
364  if(available < 6) return std::make_pair(begin, false);
365 
366  const uint8_t payload_size = uint8_t(*(i + 3));
367  // incomplete xfer
368  if(available < size_t(5 + payload_size + 1))
369  return std::make_pair(begin, false);
370 
371  std::advance(i, 5 + payload_size + 1);
372  }
373  else if(*i == '$' && *(i + 1) == 'X') {
374  // not even enough data for a header
375  if(available < 9) return std::make_pair(begin, false);
376 
377  const uint16_t payload_size =
378  uint8_t(*(i + 6)) | uint8_t(*(i + 7) << 8);
379 
380  // incomplete xfer
381  if(available < size_t(8 + payload_size + 1))
382  return std::make_pair(begin, false);
383 
384  std::advance(i, 8 + payload_size + 1);
385  }
386  else {
387  for(; i != end; ++i) {
388  if(*i == '$') break;
389  }
390  // implicitly consume all if $ not found
391  }
392 
393  return std::make_pair(i, true);
394 }
395 
397  ReceivedMessage ret;
398 
399  ret.status = OK;
400 
401  // message direction
402  const uint8_t dir = extractChar();
403  const bool ok_id = (dir != '!');
404 
405  // payload length
406  const uint8_t len = extractChar();
407 
408  // message ID
409  uint8_t id = extractChar();
410  ret.id = msp::ID(id);
411 
412  if(log_level_ >= WARNING && !ok_id) {
413  std::cerr << "Message v1 with ID " << size_t(ret.id)
414  << " is not recognised!" << std::endl;
415  }
416 
417  // payload
418  for(size_t i(0); i < len; i++) {
419  ret.payload.push_back(extractChar());
420  }
421 
422  // CRC
423  const uint8_t rcv_crc = extractChar();
424  const uint8_t exp_crc = crcV1(id, ret.payload);
425  const bool ok_crc = (rcv_crc == exp_crc);
426 
427  if(log_level_ >= WARNING && !ok_crc) {
428  std::cerr << "Message v1 with ID " << size_t(ret.id)
429  << " has wrong CRC! (expected: " << size_t(exp_crc)
430  << ", received: " << size_t(rcv_crc) << ")" << std::endl;
431  }
432 
433  if(!ok_id) {
434  ret.status = FAIL_ID;
435  }
436  else if(!ok_crc) {
437  ret.status = FAIL_CRC;
438  }
439 
440  return ret;
441 }
442 
444  ReceivedMessage ret;
445 
446  ret.status = OK;
447 
448  uint8_t exp_crc = 0;
449 
450  // message direction
451  const uint8_t dir = extractChar();
452  if(log_level_ >= DEBUG) std::cout << "dir: " << size_t(dir) << std::endl;
453  const bool ok_id = (dir != '!');
454 
455  // flag
456  const uint8_t flag = extractChar();
457  if(log_level_ >= DEBUG) std::cout << "flag: " << size_t(flag) << std::endl;
458  exp_crc = crcV2(exp_crc, flag);
459 
460  // message ID
461  const uint8_t id_low = extractChar();
462  const uint8_t id_high = extractChar();
463  uint16_t id = uint16_t(id_low) | uint16_t(id_high << 8);
464  ret.id = msp::ID(id);
465  if(log_level_ >= DEBUG) std::cout << "id: " << size_t(id) << std::endl;
466  exp_crc = crcV2(exp_crc, id_low);
467  exp_crc = crcV2(exp_crc, id_high);
468 
469  // payload length
470  const uint8_t len_low = extractChar();
471  const uint8_t len_high = extractChar();
472  uint32_t len = uint32_t(len_low) | (uint32_t(len_high) << 8);
473  exp_crc = crcV2(exp_crc, len_low);
474  exp_crc = crcV2(exp_crc, len_high);
475  if(log_level_ >= DEBUG) std::cout << "len: " << len << std::endl;
476 
477  if(log_level_ >= WARNING && !ok_id) {
478  std::cerr << "Message v2 with ID " << size_t(ret.id)
479  << " is not recognised!" << std::endl;
480  }
481 
482  // payload
483  ByteVector data;
484  for(size_t i(0); i < len; i++) {
485  ret.payload.push_back(extractChar());
486  }
487 
488  exp_crc = crcV2(exp_crc, ret.payload);
489 
490  // CRC
491  const uint8_t rcv_crc = extractChar();
492 
493  const bool ok_crc = (rcv_crc == exp_crc);
494 
495  if(log_level_ >= WARNING && !ok_crc) {
496  std::cerr << "Message v2 with ID " << size_t(ret.id)
497  << " has wrong CRC! (expected: " << size_t(exp_crc)
498  << ", received: " << size_t(rcv_crc) << ")" << std::endl;
499  }
500 
501  if(!ok_id) {
502  ret.status = FAIL_ID;
503  }
504  else if(!ok_crc) {
505  ret.status = FAIL_CRC;
506  }
507 
508  return ret;
509 }
510 
511 } // namespace client
512 } // namespace msp
void processOneMessage(const asio::error_code &ec, const std::size_t &bytes_transferred)
Main entry point for processing received data. It is called directly by the ASIO library, and as such it much match the function signatures expected by ASIO.
Definition: Client.cpp:286
bool start(const std::string &device, const size_t baudrate=115200)
Start communications with a flight controller.
Definition: Client.cpp:34
bool stopReadThread()
Stops the receiver thread.
Definition: Client.cpp:95
uint8_t extractChar()
Read a single byte from either the buffer or the serial device.
Definition: Client.cpp:185
std::condition_variable cv_response
Definition: Client.hpp:348
bool sendMessageNoWait(const msp::Message &message)
Send a message, but do not wait for any response.
Definition: Client.cpp:173
unsigned int uint
Definition: Client.cpp:5
bool sendData(const msp::ID id, const ByteVector &data=ByteVector(0))
Send an ID and payload to the flight controller.
Definition: Client.cpp:195
bool setVersion(const int &ver)
Change the device path on the next connect.
Definition: Client.cpp:20
std::mutex mutex_response
Definition: Client.hpp:350
ByteVector packMessageV1(const msp::ID id, const ByteVector &data=ByteVector(0)) const
packMessageV1 Packs data ID and data payload into a MSPv1 formatted buffer ready for sending to the s...
Definition: Client.cpp:225
std::mutex mutex_subscriptions
Definition: Client.hpp:358
void setLoggingLevel(const LoggingLevel &level)
Set the verbosity of the output.
Definition: Client.cpp:18
virtual ByteVectorUptr encode() const
Encode all data into a ByteVector.
Definition: Message.hpp:59
asio::io_service io
! io service
Definition: Client.hpp:339
~Client()
~Client Destructor
Definition: Client.cpp:16
bool stop()
Stop communications with a flight controller.
Definition: Client.cpp:39
ID
Definition: msp_msg.hpp:30
std::unique_ptr< ReceivedMessage > request_received
Definition: Client.hpp:355
bool stopSubscriptions()
Stops the receiver thread.
Definition: Client.cpp:115
uint8_t crcV2(uint8_t crc, const ByteVector &data) const
crcV2 Computes a checksum for MSPv2 messages
Definition: Client.cpp:266
FirmwareVariant
Enum of firmware variants.
std::atomic_flag running_
Definition: Client.hpp:345
std::map< msp::ID, std::shared_ptr< SubscriptionBase > > subscriptions
Definition: Client.hpp:359
asio::buffers_iterator< asio::streambuf::const_buffers_type > iterator
Definition: Client.hpp:19
void setVariant(const FirmwareVariant &v)
Change the device path on the next connect.
Definition: Client.cpp:30
std::mutex cv_response_mtx
Definition: Client.hpp:349
ReceivedMessage processOneMessageV2()
processOneMessageV2 Iterates over characters in the ASIO buffer to identify and unpack a MSPv2 encode...
Definition: Client.cpp:443
asio::serial_port port
! port for serial device
Definition: Client.hpp:340
Client()
Client Constructor.
Definition: Client.cpp:10
LoggingLevel log_level_
Definition: Client.hpp:362
virtual bool decode(const ByteVector &)
Decode message contents from a ByteVector.
Definition: Message.hpp:53
FirmwareVariant fw_variant
Definition: Client.hpp:366
std::thread thread
Definition: Client.hpp:344
bool sendMessage(msp::Message &message, const double &timeout=0)
Send a message to the connected flight controller. If the message sends data to the flight controller...
Definition: Client.cpp:123
std::mutex mutex_send
Definition: Client.hpp:352
int getVersion() const
Query the cached device path.
Definition: Client.cpp:28
uint8_t crcV1(const uint8_t id, const ByteVector &data) const
crcV1 Computes a checksum for MSPv1 messages
Definition: Client.cpp:238
bool startReadThread()
Starts the receiver thread that handles incomming messages.
Definition: Client.cpp:73
virtual ID id() const =0
get the ID of the message
bool isConnected() const
Query the system to see if a connection is active.
Definition: Client.cpp:71
bool connectPort(const std::string &device, const size_t baudrate=115200)
Establish connection to serial device and start read thread.
Definition: Client.cpp:43
FirmwareVariant getVariant() const
Query the cached device path.
Definition: Client.cpp:32
ReceivedMessage processOneMessageV1()
processOneMessageV1 Iterates over characters in the ASIO buffer to identify and unpack a MSPv1 encode...
Definition: Client.cpp:396
ByteVector packMessageV2(const msp::ID id, const ByteVector &data=ByteVector(0)) const
packMessageV2 Packs data ID and data payload into a MSPv2 formatted buffer ready for sending to the s...
Definition: Client.cpp:246
bool disconnectPort()
Break connection to serial device and stop read thread.
Definition: Client.cpp:64
asio::streambuf buffer
Definition: Client.hpp:341
std::pair< iterator, bool > messageReady(iterator begin, iterator end) const
messageReady Method used by ASIO library to determine if a full message is present in receiving buffe...
Definition: Client.cpp:355
bool startSubscriptions()
Starts the receiver thread that handles incomming messages.
Definition: Client.cpp:107


msp
Author(s): Christian Rauch
autogenerated on Tue Oct 6 2020 03:38:57