async_manager.hpp
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // © Copyright 2020, Septentrio NV/SA.
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // 1. Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // 2. Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // 3. Neither the name of the copyright holder nor the names of its
14 // contributors may be used to endorse or promote products derived
15 // from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 // POSSIBILITY OF SUCH DAMAGE.
28 //
29 // *****************************************************************************
30 
31 // *****************************************************************************
32 //
33 // Boost Software License - Version 1.0 - August 17th, 2003
34 //
35 // Permission is hereby granted, free of charge, to any person or organization
36 // obtaining a copy of the software and accompanying documentation covered by
37 // this license (the "Software") to use, reproduce, display, distribute,
38 // execute, and transmit the Software, and to prepare derivative works of the
39 // Software, and to permit third-parties to whom the Software is furnished to
40 // do so, all subject to the following:
41 
42 // The copyright notices in the Software and this entire statement, including
43 // the above license grant, this restriction and the following disclaimer,
44 // must be included in all copies of the Software, in whole or in part, and
45 // all derivative works of the Software, unless such copies or derivative
46 // works are solely in the form of machine-executable object code generated by
47 // a source language processor.
48 //
49 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
50 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
51 // FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
52 // SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
53 // FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
54 // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
55 // DEALINGS IN THE SOFTWARE.
56 //
57 // *****************************************************************************
58 
59 #pragma once
60 
61 // Boost includes
62 #include <boost/asio.hpp>
63 #include <boost/bind.hpp>
64 #include <boost/regex.hpp>
65 
66 // ROSaic includes
69 
70 // local includes
73 
83 namespace io {
84 
91  {
92  public:
93  virtual ~AsyncManagerBase() {}
95  [[nodiscard]] virtual bool connect() = 0;
97  virtual void send(const std::string& cmd) = 0;
98  };
99 
107  template <typename IoType>
109  {
110  public:
116  AsyncManager(ROSaicNodeBase* node, TelegramQueue* telegramQueue);
117 
118  ~AsyncManager();
119 
120  [[nodiscard]] bool connect();
121 
122  void setPort(const std::string& port);
123 
124  void send(const std::string& cmd);
125 
126  private:
127  void receive();
128  void close();
129  void runIoService();
130  void runWatchdog();
131  void write(const std::string& cmd);
132  void resync();
133  template <uint8_t index>
134  void readSync();
135  void readSbfHeader();
136  void readSbf(std::size_t length);
137  void readUnknown();
138  void readString();
139  void readStringElements();
140 
143  std::shared_ptr<boost::asio::io_service> ioService_;
144  IoType ioInterface_;
145  std::atomic<bool> running_;
146  std::thread ioThread_;
147  std::thread watchdogThread_;
148 
149  std::array<uint8_t, 1> buf_;
153  std::shared_ptr<Telegram> telegram_;
156  };
157 
158  template <typename IoType>
160  TelegramQueue* telegramQueue) :
161  node_(node),
162  ioService_(new boost::asio::io_service), ioInterface_(node, ioService_),
163  telegramQueue_(telegramQueue)
164  {
165  node_->log(log_level::DEBUG, "AsyncManager created.");
166  }
167 
168  template <typename IoType>
170  {
171  running_ = false;
172  close();
173  node_->log(log_level::DEBUG, "AsyncManager shutting down threads");
174  ioService_->stop();
175  ioThread_.join();
176  watchdogThread_.join();
177  node_->log(log_level::DEBUG, "AsyncManager threads stopped");
178  }
179 
180  template <typename IoType>
181  [[nodiscard]] bool AsyncManager<IoType>::connect()
182  {
183  running_ = true;
184 
185  if (!ioInterface_.connect())
186  {
187  return false;
188  }
189  receive();
190 
191  return true;
192  }
193 
194  template <typename IoType>
195  void AsyncManager<IoType>::setPort(const std::string& port)
196  {
197  ioInterface_.setPort(port);
198  }
199 
200  template <typename IoType>
201  void AsyncManager<IoType>::send(const std::string& cmd)
202  {
203  if (cmd.size() == 0)
204  {
205  node_->log(log_level::ERROR,
206  "AsyncManager message size to be sent to the Rx would be 0");
207  return;
208  }
209 
210  ioService_->post(boost::bind(&AsyncManager<IoType>::write, this, cmd));
211  }
212 
213  template <typename IoType>
215  {
216  resync();
217  ioThread_ =
218  std::thread(std::bind(&AsyncManager<IoType>::runIoService, this));
219  if (!watchdogThread_.joinable())
220  watchdogThread_ =
221  std::thread(std::bind(&AsyncManager::runWatchdog, this));
222  }
223 
224  template <typename IoType>
226  {
227  ioService_->post([this]() { ioInterface_.close(); });
228  }
229 
230  template <typename IoType>
232  {
233  ioService_->run();
234  node_->log(log_level::DEBUG, "AsyncManager ioService terminated.");
235  }
236 
237  template <typename IoType>
239  {
240  while (running_)
241  {
242  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
243  if (running_ && ioService_->stopped())
244  {
245  if (node_->settings()->read_from_sbf_log ||
246  node_->settings()->read_from_pcap)
247  {
248  node_->log(
250  "AsyncManager finished reading file. Node will continue to publish queued messages.");
251  break;
252  } else
253  {
254  node_->log(log_level::ERROR,
255  "AsyncManager connection lost. Trying to reconnect.");
256  ioService_->reset();
257  ioThread_.join();
258  while (!ioInterface_.connect())
259  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
260  receive();
261  }
262  } else if (running_ && std::is_same<TcpIo, IoType>::value)
263  {
264  // Send to check if TCP connection still alive
265  std::string empty = " ";
266  boost::asio::async_write(
267  *(ioInterface_.stream_), boost::asio::buffer(empty.data(), 1),
268  [](boost::system::error_code ec, std::size_t /*length*/) {});
269  }
270  }
271  }
272 
273  template <typename IoType>
274  void AsyncManager<IoType>::write(const std::string& cmd)
275  {
276  boost::asio::async_write(
277  *(ioInterface_.stream_), boost::asio::buffer(cmd.data(), cmd.size()),
278  [this, cmd](boost::system::error_code ec, std::size_t /*length*/) {
279  if (!ec)
280  {
281  // Prints the data that was sent
282  node_->log(log_level::DEBUG, "AsyncManager sent the following " +
283  std::to_string(cmd.size()) +
284  " bytes to the Rx: " + cmd);
285  } else
286  {
287  node_->log(log_level::ERROR,
288  "AsyncManager was unable to send the following " +
289  std::to_string(cmd.size()) +
290  " bytes to the Rx: " + cmd);
291  }
292  });
293  }
294 
295  template <typename IoType>
297  {
298  telegram_.reset(new Telegram);
299  readSync<0>();
300  }
301 
302  template <typename IoType>
303  template <uint8_t index>
305  {
306  static_assert(index < 3);
307 
308  boost::asio::async_read(
309  *(ioInterface_.stream_),
310  boost::asio::buffer(telegram_->message.data() + index, 1),
311  [this](boost::system::error_code ec, std::size_t numBytes) {
312  Timestamp stamp = node_->getTime();
313 
314  if (!ec)
315  {
316  if (numBytes == 1)
317  {
318  uint8_t& currByte = telegram_->message[index];
319 
320  if (currByte == SYNC_BYTE_1)
321  {
322  telegram_->stamp = stamp;
323  readSync<1>();
324  } else
325  {
326  switch (index)
327  {
328  case 0:
329  {
330  telegram_->type = telegram_type::UNKNOWN;
331  readUnknown();
332  break;
333  }
334  case 1:
335  {
336  switch (currByte)
337  {
338  case SBF_SYNC_BYTE_2:
339  {
340  telegram_->type = telegram_type::SBF;
341  readSbfHeader();
342  break;
343  }
344  case NMEA_SYNC_BYTE_2:
345  {
346  telegram_->type = telegram_type::NMEA;
347  readSync<2>();
348  break;
349  }
350  case NMEA_INS_SYNC_BYTE_2:
351  {
352  telegram_->type = telegram_type::NMEA_INS;
353  readSync<2>();
354  break;
355  }
356  case RESPONSE_SYNC_BYTE_2:
357  {
358  telegram_->type = telegram_type::RESPONSE;
359  readSync<2>();
360  break;
361  }
362  default:
363  {
364  std::stringstream ss;
365  ss << std::hex << currByte;
366  node_->log(
367  log_level::DEBUG,
368  "AsyncManager sync byte 2 read fault, should never come here.. Received byte was " +
369  ss.str());
370  resync();
371  break;
372  }
373  }
374  break;
375  }
376  case 2:
377  {
378  switch (currByte)
379  {
380  case NMEA_SYNC_BYTE_3:
381  {
382  if (telegram_->type == telegram_type::NMEA)
383  readString();
384  else
385  resync();
386  break;
387  }
388  case NMEA_INS_SYNC_BYTE_3:
389  {
390  if (telegram_->type == telegram_type::NMEA_INS)
391  readString();
392  else
393  resync();
394  break;
395  }
396  case RESPONSE_SYNC_BYTE_3:
397  {
398  if (telegram_->type == telegram_type::RESPONSE)
399  readString();
400  else
401  resync();
402  break;
403  }
404  case RESPONSE_SYNC_BYTE_3a:
405  {
406  if (telegram_->type == telegram_type::RESPONSE)
407  readString();
408  else
409  resync();
410  break;
411  }
412  case ERROR_SYNC_BYTE_3:
413  {
414  if (telegram_->type == telegram_type::RESPONSE)
415  {
416  telegram_->type =
417  telegram_type::ERROR_RESPONSE;
418  readString();
419  } else
420  resync();
421  break;
422  }
423  default:
424  {
425  std::stringstream ss;
426  ss << std::hex << currByte;
427  node_->log(
428  log_level::DEBUG,
429  "AsyncManager sync byte 3 read fault, should never come here. Received byte was " +
430  ss.str());
431  resync();
432  break;
433  }
434  }
435  break;
436  }
437  default:
438  {
439  node_->log(
440  log_level::DEBUG,
441  "AsyncManager sync read fault, should never come here.");
442  resync();
443  break;
444  }
445  }
446  }
447  } else
448  {
449  node_->log(
450  log_level::DEBUG,
451  "AsyncManager sync read fault, wrong number of bytes read: " +
452  std::to_string(numBytes));
453  resync();
454  }
455  } else
456  {
457  node_->log(log_level::DEBUG,
458  "AsyncManager sync read error: " + ec.message());
459  }
460  });
461  }
462 
463  template <typename IoType>
465  {
466  telegram_->message.resize(SBF_HEADER_SIZE);
467 
468  boost::asio::async_read(
469  *(ioInterface_.stream_),
470  boost::asio::buffer(telegram_->message.data() + 2, SBF_HEADER_SIZE - 2),
471  [this](boost::system::error_code ec, std::size_t numBytes) {
472  if (!ec)
473  {
474  if (numBytes == (SBF_HEADER_SIZE - 2))
475  {
476  uint16_t length =
477  parsing_utilities::getLength(telegram_->message);
478  if (length > MAX_SBF_SIZE)
479  {
480  node_->log(
481  log_level::DEBUG,
482  "AsyncManager SBF header read fault, length of block exceeds " +
483  std::to_string(MAX_SBF_SIZE) + ": " +
484  std::to_string(length));
485  } else
486  readSbf(length);
487  } else
488  {
489  node_->log(
490  log_level::DEBUG,
491  "AsyncManager SBF header read fault, wrong number of bytes read: " +
492  std::to_string(numBytes));
493  resync();
494  }
495  } else
496  {
497  node_->log(log_level::DEBUG,
498  "AsyncManager SBF header read error: " +
499  ec.message());
500  }
501  });
502  }
503 
504  template <typename IoType>
505  void AsyncManager<IoType>::readSbf(std::size_t length)
506  {
507  telegram_->message.resize(length);
508 
509  boost::asio::async_read(
510  *(ioInterface_.stream_),
511  boost::asio::buffer(telegram_->message.data() + SBF_HEADER_SIZE,
513  [this, length](boost::system::error_code ec, std::size_t numBytes) {
514  if (!ec)
515  {
516  if (numBytes == (length - SBF_HEADER_SIZE))
517  {
518  if (crc::isValid(telegram_->message))
519  {
520  telegramQueue_->push(telegram_);
521  } else
522  node_->log(log_level::DEBUG,
523  "AsyncManager crc failed for SBF " +
524  std::to_string(parsing_utilities::getId(
525  telegram_->message)) +
526  ".");
527  } else
528  {
529  node_->log(
530  log_level::DEBUG,
531  "AsyncManager SBF read fault, wrong number of bytes read: " +
532  std::to_string(numBytes));
533  }
534  resync();
535  } else
536  {
537  node_->log(log_level::DEBUG,
538  "AsyncManager SBF read error: " + ec.message());
539  }
540  });
541  }
542 
543  template <typename IoType>
545  {
546  telegram_->message.resize(1);
547  telegram_->message.reserve(256);
548  readStringElements();
549  }
550 
551  template <typename IoType>
553  {
554  telegram_->message.resize(3);
555  telegram_->message.reserve(256);
556  readStringElements();
557  }
558 
559  template <typename IoType>
561  {
562  boost::asio::async_read(
563  *(ioInterface_.stream_), boost::asio::buffer(buf_.data(), 1),
564  [this](boost::system::error_code ec, std::size_t numBytes) {
565  if (!ec)
566  {
567  if (numBytes == 1)
568  {
569  telegram_->message.push_back(buf_[0]);
570  /*node_->log(log_level::DEBUG,
571  "Buffer: " +
572  std::string(telegram_->message.begin(),
573  telegram_->message.end()));*/
574 
575  switch (buf_[0])
576  {
577  case SYNC_BYTE_1:
578  {
579  telegram_.reset(new Telegram);
580  telegram_->message[0] = buf_[0];
581  telegram_->stamp = node_->getTime();
582  node_->log(
583  log_level::DEBUG,
584  "AsyncManager string read fault, sync 1 found.");
585  readSync<1>();
586  break;
587  }
588  case LF:
589  {
590  if (telegram_->message[telegram_->message.size() - 2] ==
591  CR)
592  telegramQueue_->push(telegram_);
593  else
594  node_->log(
595  log_level::DEBUG,
596  "LF wo CR: " +
597  std::string(telegram_->message.begin(),
598  telegram_->message.end()));
599  resync();
600  break;
601  }
602  case CONNECTION_DESCRIPTOR_FOOTER:
603  {
604  telegram_->type = telegram_type::CONNECTION_DESCRIPTOR;
605  telegramQueue_->push(telegram_);
606  resync();
607  break;
608  }
609  default:
610  {
611  readStringElements();
612  break;
613  }
614  }
615  } else
616  {
617  node_->log(
618  log_level::DEBUG,
619  "AsyncManager string read fault, wrong number of bytes read: " +
620  std::to_string(numBytes));
621  resync();
622  }
623  } else
624  {
625  node_->log(log_level::DEBUG,
626  "AsyncManager string read error: " + ec.message());
627  }
628  });
629  }
630 } // namespace io
io::AsyncManager::readSbf
void readSbf(std::size_t length)
Definition: async_manager.hpp:505
log_level::DEBUG
@ DEBUG
Definition: typedefs.hpp:172
Timestamp
uint64_t Timestamp
Definition: typedefs.hpp:92
io::AsyncManager::running_
std::atomic< bool > running_
Definition: async_manager.hpp:145
ConcurrentQueue< std::shared_ptr< Telegram > >
io::AsyncManager::buf_
std::array< uint8_t, 1 > buf_
Definition: async_manager.hpp:149
io::AsyncManager::ioInterface_
IoType ioInterface_
Definition: async_manager.hpp:144
io::AsyncManager::write
void write(const std::string &cmd)
Definition: async_manager.hpp:274
io::AsyncManagerBase::send
virtual void send(const std::string &cmd)=0
Sends commands to the receiver.
crc.hpp
Declares the functions to compute and validate the CRC of a buffer.
io::AsyncManager
This is the central interface between ROSaic and the Rx(s), managing I/O operations such as reading m...
Definition: async_manager.hpp:108
ROSaicNodeBase::log
void log(log_level::LogLevel logLevel, const std::string &s) const
Log function to provide abstraction of ROS loggers.
Definition: typedefs.hpp:257
io::AsyncManager::ioThread_
std::thread ioThread_
Definition: async_manager.hpp:146
io::AsyncManagerBase
Interface (in C++ terms), that could be used for any I/O manager, synchronous and asynchronous alike.
Definition: async_manager.hpp:90
io::AsyncManager::watchdogThread_
std::thread watchdogThread_
Definition: async_manager.hpp:147
io::AsyncManager::node_
ROSaicNodeBase * node_
Pointer to the node.
Definition: async_manager.hpp:142
io::AsyncManager::readString
void readString()
Definition: async_manager.hpp:552
io::AsyncManager::ioService_
std::shared_ptr< boost::asio::io_service > ioService_
Definition: async_manager.hpp:143
io::AsyncManager::send
void send(const std::string &cmd)
Sends commands to the receiver.
Definition: async_manager.hpp:201
boost
telegram.hpp
log_level::INFO
@ INFO
Definition: typedefs.hpp:173
io::AsyncManager::readSbfHeader
void readSbfHeader()
Definition: async_manager.hpp:464
io::AsyncManager::runIoService
void runIoService()
Definition: async_manager.hpp:231
io::AsyncManager::close
void close()
Definition: async_manager.hpp:225
ROSaicNodeBase
This class is the base class for abstraction.
Definition: typedefs.hpp:184
io::AsyncManager::telegram_
std::shared_ptr< Telegram > telegram_
Telegram.
Definition: async_manager.hpp:153
io::AsyncManager::readUnknown
void readUnknown()
Definition: async_manager.hpp:544
io.hpp
io::AsyncManager::AsyncManager
AsyncManager(ROSaicNodeBase *node, TelegramQueue *telegramQueue)
Class constructor.
Definition: async_manager.hpp:159
io::AsyncManager::resync
void resync()
Definition: async_manager.hpp:296
io::AsyncManager::readSync
void readSync()
Definition: async_manager.hpp:304
parsing_utilities.hpp
Declares utility functions used when parsing messages.
io::AsyncManager::telegramQueue_
TelegramQueue * telegramQueue_
TelegramQueue.
Definition: async_manager.hpp:155
io
Definition: async_manager.hpp:83
io::AsyncManagerBase::connect
virtual bool connect()=0
Connects the stream.
Telegram
Definition: telegram.hpp:104
log_level::ERROR
@ ERROR
Definition: typedefs.hpp:175
SBF_HEADER_SIZE
static const uint16_t SBF_HEADER_SIZE
Definition: telegram.hpp:86
io::AsyncManager::recvStamp_
Timestamp recvStamp_
Timestamp of receiving buffer.
Definition: async_manager.hpp:151
io::AsyncManager::connect
bool connect()
Connects the stream.
Definition: async_manager.hpp:181
length
TF2SIMD_FORCE_INLINE tf2Scalar length(const Quaternion &q)
io::AsyncManagerBase::~AsyncManagerBase
virtual ~AsyncManagerBase()
Definition: async_manager.hpp:93
io::AsyncManager::readStringElements
void readStringElements()
Definition: async_manager.hpp:560
io::AsyncManager::receive
void receive()
Definition: async_manager.hpp:214
io::AsyncManager::setPort
void setPort(const std::string &port)
Definition: async_manager.hpp:195
io::AsyncManager::~AsyncManager
~AsyncManager()
Definition: async_manager.hpp:169
io::AsyncManager::runWatchdog
void runWatchdog()
Definition: async_manager.hpp:238


septentrio_gnss_driver
Author(s): Tibor Dome
autogenerated on Wed Nov 22 2023 04:04:27