async_manager.hpp
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // © Copyright 2020, Septentrio NV/SA.
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // 1. Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // 2. Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // 3. Neither the name of the copyright holder nor the names of its
14 // contributors may be used to endorse or promote products derived
15 // from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 // POSSIBILITY OF SUCH DAMAGE.
28 //
29 // *****************************************************************************
30 
31 // *****************************************************************************
32 //
33 // Boost Software License - Version 1.0 - August 17th, 2003
34 //
35 // Permission is hereby granted, free of charge, to any person or organization
36 // obtaining a copy of the software and accompanying documentation covered by
37 // this license (the "Software") to use, reproduce, display, distribute,
38 // execute, and transmit the Software, and to prepare derivative works of the
39 // Software, and to permit third-parties to whom the Software is furnished to
40 // do so, all subject to the following:
41 
42 // The copyright notices in the Software and this entire statement, including
43 // the above license grant, this restriction and the following disclaimer,
44 // must be included in all copies of the Software, in whole or in part, and
45 // all derivative works of the Software, unless such copies or derivative
46 // works are solely in the form of machine-executable object code generated by
47 // a source language processor.
48 //
49 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
50 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
51 // FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
52 // SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
53 // FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
54 // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
55 // DEALINGS IN THE SOFTWARE.
56 //
57 // *****************************************************************************
58 
59 #pragma once
60 
61 // Boost includes
62 #include <boost/asio.hpp>
63 #include <boost/bind/bind.hpp>
64 #include <boost/regex.hpp>
65 
66 // ROSaic includes
69 
70 // local includes
73 
83 namespace io {
84 
91  {
92  public:
93  virtual ~AsyncManagerBase() {}
95  [[nodiscard]] virtual bool connect() = 0;
96 
97  virtual void close() = 0;
99  virtual void send(const std::string& cmd) = 0;
100  bool connected() { return false; };
101  };
102 
110  template <typename IoType>
112  {
113  public:
119  AsyncManager(ROSaicNodeBase* node, TelegramQueue* telegramQueue);
120 
121  ~AsyncManager();
122 
123  [[nodiscard]] bool connect();
124 
125  void close();
126 
127  void setPort(const std::string& port);
128 
129  void send(const std::string& cmd);
130 
131  bool connected();
132 
133  private:
134  void receive();
135  void runIoContext();
136  void runWatchdog();
137  void write(const std::string& cmd);
138  void resync();
139  template <uint8_t index>
140  void readSync();
141  void readSbfHeader();
142  void readSbf(std::size_t length);
143  void readUnknown();
144  void readString();
145  void readStringElements();
146 
149  std::shared_ptr<boost::asio::io_context> ioContext_;
150  IoType ioInterface_;
151  std::atomic<bool> running_;
152  std::thread ioThread_;
153  std::thread watchdogThread_;
154 
155  bool connected_ = false;
156 
157  std::array<uint8_t, 1> buf_;
161  std::shared_ptr<Telegram> telegram_;
164  };
165 
166  template <typename IoType>
168  TelegramQueue* telegramQueue) :
169  node_(node), ioContext_(std::make_shared<boost::asio::io_context>()),
170  ioInterface_(node, ioContext_), telegramQueue_(telegramQueue)
171  {
172  node_->log(log_level::DEBUG, "AsyncManager created.");
173  }
174 
175  template <typename IoType>
177  {
178  if (connected_)
179  close();
180  }
181 
182  template <typename IoType>
183  [[nodiscard]] bool AsyncManager<IoType>::connect()
184  {
185  running_ = true;
186 
187  if (!ioInterface_.connect())
188  {
189  return false;
190  }
191  connected_ = true;
192  receive();
193 
194  return true;
195  }
196 
197  template <typename IoType>
199  {
200  running_ = false;
201  connected_ = false;
202  ioInterface_.close();
203  node_->log(log_level::DEBUG, "AsyncManager shutting down threads");
204  if (ioThread_.joinable())
205  {
206  ioContext_->stop();
207  ioThread_.join();
208  }
209  if (watchdogThread_.joinable())
210  watchdogThread_.join();
211  node_->log(log_level::DEBUG, "AsyncManager threads stopped");
212  }
213 
214  template <typename IoType>
215  void AsyncManager<IoType>::setPort(const std::string& port)
216  {
217  ioInterface_.setPort(port);
218  }
219 
220  template <typename IoType>
221  void AsyncManager<IoType>::send(const std::string& cmd)
222  {
223  if (cmd.size() == 0)
224  {
225  node_->log(log_level::ERROR,
226  "AsyncManager message size to be sent to the Rx would be 0");
227  return;
228  }
229 
230  boost::asio::post(*ioContext_,
231  boost::bind(&AsyncManager<IoType>::write, this, cmd));
232  }
233 
234  template <typename IoType>
236  {
237  return connected_;
238  }
239 
240  template <typename IoType>
242  {
243  resync();
244  ioThread_ =
245  std::thread(std::bind(&AsyncManager<IoType>::runIoContext, this));
246  if (!watchdogThread_.joinable())
247  watchdogThread_ =
248  std::thread(std::bind(&AsyncManager::runWatchdog, this));
249  }
250 
251  template <typename IoType>
253  {
254  ioContext_->restart();
255  ioContext_->run();
256  node_->log(log_level::DEBUG, "AsyncManager ioContext terminated.");
257  }
258 
259  template <typename IoType>
261  {
262  while (running_)
263  {
264  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
265  if (running_ && ioContext_->stopped())
266  {
267  if (node_->settings()->read_from_sbf_log ||
268  node_->settings()->read_from_pcap)
269  {
270  node_->log(
272  "AsyncManager finished reading file. Node will continue to publish queued messages.");
273  break;
274  } else
275  {
276  node_->log(log_level::ERROR,
277  "AsyncManager connection lost. Trying to reconnect.");
278  ioThread_.join();
279  connected_ = ioInterface_.connect();
280  if (connected_)
281  receive();
282  }
283  } else if (running_ && std::is_same<TcpIo, IoType>::value)
284  {
285  // Send to check if TCP connection still alive
286  std::string empty = " ";
287  boost::asio::async_write(
288  *(ioInterface_.stream_), boost::asio::buffer(empty.data(), 1),
289  [this](boost::system::error_code ec, std::size_t /*length*/) {
290  if (ec)
291  ioContext_->stop();
292  });
293  }
294  }
295  }
296 
297  template <typename IoType>
298  void AsyncManager<IoType>::write(const std::string& cmd)
299  {
300  boost::asio::async_write(
301  *(ioInterface_.stream_), boost::asio::buffer(cmd.data(), cmd.size()),
302  [this, cmd](boost::system::error_code ec, std::size_t /*length*/) {
303  if (!ec)
304  {
305  // Prints the data that was sent
306  node_->log(log_level::DEBUG, "AsyncManager sent the following " +
307  std::to_string(cmd.size()) +
308  " bytes to the Rx: " + cmd);
309  } else
310  {
311  node_->log(log_level::ERROR,
312  "AsyncManager was unable to send the following " +
313  std::to_string(cmd.size()) +
314  " bytes to the Rx: " + cmd);
315  }
316  });
317  }
318 
319  template <typename IoType>
321  {
322  telegram_ = std::make_shared<Telegram>();
323  readSync<0>();
324  }
325 
326  template <typename IoType>
327  template <uint8_t index>
329  {
330  static_assert(index < 3);
331 
332  boost::asio::async_read(
333  *(ioInterface_.stream_),
334  boost::asio::buffer(telegram_->message.data() + index, 1),
335  [this](boost::system::error_code ec, std::size_t numBytes) {
336  Timestamp stamp = node_->getTime();
337 
338  if (!ec)
339  {
340  if (numBytes == 1)
341  {
342  uint8_t& currByte = telegram_->message[index];
343 
344  if (currByte == SYNC_BYTE_1)
345  {
346  telegram_->stamp = stamp;
347  readSync<1>();
348  } else
349  {
350  switch (index)
351  {
352  case 0:
353  {
354  telegram_->type = telegram_type::UNKNOWN;
355  readUnknown();
356  break;
357  }
358  case 1:
359  {
360  switch (currByte)
361  {
362  case SBF_SYNC_BYTE_2:
363  {
364  telegram_->type = telegram_type::SBF;
365  readSbfHeader();
366  break;
367  }
368  case NMEA_SYNC_BYTE_2:
369  {
370  telegram_->type = telegram_type::NMEA;
371  readSync<2>();
372  break;
373  }
374  case NMEA_INS_SYNC_BYTE_2:
375  {
376  telegram_->type = telegram_type::NMEA_INS;
377  readSync<2>();
378  break;
379  }
380  case RESPONSE_SYNC_BYTE_2:
381  {
382  telegram_->type = telegram_type::RESPONSE;
383  readSync<2>();
384  break;
385  }
386  default:
387  {
388  std::stringstream ss;
389  ss << std::hex << currByte;
390  node_->log(
391  log_level::DEBUG,
392  "AsyncManager sync byte 2 read fault, should never come here.. Received byte was " +
393  ss.str());
394  resync();
395  break;
396  }
397  }
398  break;
399  }
400  case 2:
401  {
402  switch (currByte)
403  {
404  case NMEA_SYNC_BYTE_3:
405  {
406  if (telegram_->type == telegram_type::NMEA)
407  readString();
408  else
409  resync();
410  break;
411  }
412  case NMEA_INS_SYNC_BYTE_3:
413  {
414  if (telegram_->type == telegram_type::NMEA_INS)
415  readString();
416  else
417  resync();
418  break;
419  }
420  case RESPONSE_SYNC_BYTE_3:
421  {
422  if (telegram_->type == telegram_type::RESPONSE)
423  readString();
424  else
425  resync();
426  break;
427  }
428  case RESPONSE_SYNC_BYTE_3a:
429  {
430  if (telegram_->type == telegram_type::RESPONSE)
431  readString();
432  else
433  resync();
434  break;
435  }
436  case ERROR_SYNC_BYTE_3:
437  {
438  if (telegram_->type == telegram_type::RESPONSE)
439  {
440  telegram_->type =
441  telegram_type::ERROR_RESPONSE;
442  readString();
443  } else
444  resync();
445  break;
446  }
447  default:
448  {
449  std::stringstream ss;
450  ss << std::hex << currByte;
451  node_->log(
452  log_level::DEBUG,
453  "AsyncManager sync byte 3 read fault, should never come here. Received byte was " +
454  ss.str());
455  resync();
456  break;
457  }
458  }
459  break;
460  }
461  default:
462  {
463  node_->log(
464  log_level::DEBUG,
465  "AsyncManager sync read fault, unknown sync byte 2 found.");
466  resync();
467  break;
468  }
469  }
470  }
471  } else
472  {
473  node_->log(
474  log_level::DEBUG,
475  "AsyncManager sync read fault, wrong number of bytes read: " +
476  std::to_string(numBytes));
477  }
478  } else
479  {
480  if (connected_)
481  node_->log(log_level::DEBUG,
482  "AsyncManager sync read error: " + ec.message());
483 
484  if ((boost::asio::error::eof == ec) ||
485  (boost::asio::error::network_unreachable == ec) ||
486  (boost::asio::error::interrupted == ec) ||
487  (boost::asio::error::bad_descriptor == ec) ||
488  (boost::asio::error::connection_reset == ec))
489  {
490  ioContext_->stop();
491  } else
492  {
493  if (connected_)
494  resync();
495  }
496  }
497  });
498  }
499 
500  template <typename IoType>
502  {
503  telegram_->message.resize(SBF_HEADER_SIZE);
504 
505  boost::asio::async_read(
506  *(ioInterface_.stream_),
507  boost::asio::buffer(telegram_->message.data() + 2, SBF_HEADER_SIZE - 2),
508  [this](boost::system::error_code ec, std::size_t numBytes) {
509  if (!ec)
510  {
511  if (numBytes == (SBF_HEADER_SIZE - 2))
512  {
513  uint16_t length =
514  parsing_utilities::getLength(telegram_->message);
515  if (length > MAX_SBF_SIZE)
516  {
517  node_->log(
518  log_level::DEBUG,
519  "AsyncManager SBF header read fault, length of block exceeds " +
520  std::to_string(MAX_SBF_SIZE) + ": " +
521  std::to_string(length));
522  } else
523  readSbf(length);
524  } else
525  {
526  node_->log(
527  log_level::DEBUG,
528  "AsyncManager SBF header read fault, wrong number of bytes read: " +
529  std::to_string(numBytes));
530  resync();
531  }
532  } else
533  {
534  node_->log(log_level::DEBUG,
535  "AsyncManager SBF header read error: " +
536  ec.message());
537  resync();
538  }
539  });
540  }
541 
542  template <typename IoType>
543  void AsyncManager<IoType>::readSbf(std::size_t length)
544  {
545  telegram_->message.resize(length);
546 
547  boost::asio::async_read(
548  *(ioInterface_.stream_),
549  boost::asio::buffer(telegram_->message.data() + SBF_HEADER_SIZE,
551  [this, length](boost::system::error_code ec, std::size_t numBytes) {
552  if (!ec)
553  {
554  if (numBytes == (length - SBF_HEADER_SIZE))
555  {
556  if (crc::isValid(telegram_->message))
557  {
558  telegramQueue_->push(telegram_);
559  } else
560  node_->log(log_level::DEBUG,
561  "AsyncManager crc failed for SBF " +
562  std::to_string(parsing_utilities::getId(
563  telegram_->message)) +
564  ".");
565  } else
566  {
567  node_->log(
568  log_level::DEBUG,
569  "AsyncManager SBF read fault, wrong number of bytes read: " +
570  std::to_string(numBytes));
571  }
572  resync();
573  } else
574  {
575  node_->log(log_level::DEBUG,
576  "AsyncManager SBF read error: " + ec.message());
577  resync();
578  }
579  });
580  }
581 
582  template <typename IoType>
584  {
585  telegram_->message.resize(1);
586  telegram_->message.reserve(256);
587  readStringElements();
588  }
589 
590  template <typename IoType>
592  {
593  telegram_->message.resize(3);
594  telegram_->message.reserve(256);
595  readStringElements();
596  }
597 
598  template <typename IoType>
600  {
601  boost::asio::async_read(
602  *(ioInterface_.stream_), boost::asio::buffer(buf_.data(), 1),
603  [this](boost::system::error_code ec, std::size_t numBytes) {
604  if (!ec)
605  {
606  if (numBytes == 1)
607  {
608  telegram_->message.push_back(buf_[0]);
609  /*node_->log(log_level::DEBUG,
610  "Buffer: " +
611  std::string(telegram_->message.begin(),
612  telegram_->message.end()));*/
613 
614  switch (buf_[0])
615  {
616  case SYNC_BYTE_1:
617  {
618  telegram_ = std::make_shared<Telegram>();
619  telegram_->message[0] = buf_[0];
620  telegram_->stamp = node_->getTime();
621  node_->log(
622  log_level::DEBUG,
623  "AsyncManager string read fault, sync 1 found.");
624  readSync<1>();
625  break;
626  }
627  case LF:
628  {
629  if (telegram_->message[telegram_->message.size() - 2] ==
630  CR)
631  telegramQueue_->push(telegram_);
632  else
633  node_->log(
634  log_level::DEBUG,
635  "LF wo CR: " +
636  std::string(telegram_->message.begin(),
637  telegram_->message.end()));
638  resync();
639  break;
640  }
641  case CONNECTION_DESCRIPTOR_FOOTER:
642  {
643  telegram_->type = telegram_type::CONNECTION_DESCRIPTOR;
644  telegramQueue_->push(telegram_);
645  resync();
646  break;
647  }
648  default:
649  {
650  readStringElements();
651  break;
652  }
653  }
654  } else
655  {
656  node_->log(
657  log_level::DEBUG,
658  "AsyncManager string read fault, wrong number of bytes read: " +
659  std::to_string(numBytes));
660  resync();
661  }
662  } else
663  {
664  node_->log(log_level::DEBUG,
665  "AsyncManager string read error: " + ec.message());
666  resync();
667  }
668  });
669  }
670 } // namespace io
io::AsyncManager::readSbf
void readSbf(std::size_t length)
Definition: async_manager.hpp:543
Timestamp
uint64_t Timestamp
Definition: typedefs.hpp:101
io::AsyncManager::running_
std::atomic< bool > running_
Definition: async_manager.hpp:151
ConcurrentQueue< std::shared_ptr< Telegram > >
io::AsyncManager::buf_
std::array< uint8_t, 1 > buf_
Definition: async_manager.hpp:157
io::AsyncManager::ioInterface_
IoType ioInterface_
Definition: async_manager.hpp:150
io::AsyncManager::write
void write(const std::string &cmd)
Definition: async_manager.hpp:298
io::AsyncManagerBase::send
virtual void send(const std::string &cmd)=0
Sends commands to the receiver.
crc.hpp
Declares the functions to compute and validate the CRC of a buffer.
io::AsyncManager
This is the central interface between ROSaic and the Rx(s), managing I/O operations such as reading m...
Definition: async_manager.hpp:111
ROSaicNodeBase::log
void log(log_level::LogLevel logLevel, const std::string &s) const
Log function to provide abstraction of ROS loggers.
Definition: typedefs.hpp:286
io::AsyncManager::ioThread_
std::thread ioThread_
Definition: async_manager.hpp:152
io::AsyncManagerBase
Interface (in C++ terms), that could be used for any I/O manager, synchronous and asynchronous alike.
Definition: async_manager.hpp:90
io::AsyncManager::watchdogThread_
std::thread watchdogThread_
Definition: async_manager.hpp:153
io::AsyncManager::node_
ROSaicNodeBase * node_
Pointer to the node.
Definition: async_manager.hpp:148
io::AsyncManager::readString
void readString()
Definition: async_manager.hpp:591
io::AsyncManager::send
void send(const std::string &cmd)
Sends commands to the receiver.
Definition: async_manager.hpp:221
io::AsyncManager::runIoContext
void runIoContext()
Definition: async_manager.hpp:252
boost
telegram.hpp
io::AsyncManager::readSbfHeader
void readSbfHeader()
Definition: async_manager.hpp:501
io::AsyncManager::close
void close()
Definition: async_manager.hpp:198
ROSaicNodeBase
This class is the base class for abstraction.
Definition: typedefs.hpp:192
io::AsyncManager::connected
bool connected()
Definition: async_manager.hpp:235
io::AsyncManager::telegram_
std::shared_ptr< Telegram > telegram_
Telegram.
Definition: async_manager.hpp:161
io::AsyncManager::readUnknown
void readUnknown()
Definition: async_manager.hpp:583
io::AsyncManagerBase::close
virtual void close()=0
io.hpp
io::AsyncManager::AsyncManager
AsyncManager(ROSaicNodeBase *node, TelegramQueue *telegramQueue)
Class constructor.
Definition: async_manager.hpp:167
io::AsyncManager::resync
void resync()
Definition: async_manager.hpp:320
io::AsyncManager::readSync
void readSync()
Definition: async_manager.hpp:328
io::AsyncManagerBase::connected
bool connected()
Definition: async_manager.hpp:100
parsing_utilities.hpp
Declares utility functions used when parsing messages.
io::AsyncManager::telegramQueue_
TelegramQueue * telegramQueue_
TelegramQueue.
Definition: async_manager.hpp:163
io
Definition: async_manager.hpp:83
io::AsyncManagerBase::connect
virtual bool connect()=0
Connects the stream.
io::AsyncManager::connected_
bool connected_
Definition: async_manager.hpp:155
SBF_HEADER_SIZE
static const uint16_t SBF_HEADER_SIZE
Definition: telegram.hpp:91
io::AsyncManager::ioContext_
std::shared_ptr< boost::asio::io_context > ioContext_
Definition: async_manager.hpp:149
log_level::ERROR
@ ERROR
Definition: typedefs.hpp:183
io::AsyncManager::recvStamp_
Timestamp recvStamp_
Timestamp of receiving buffer.
Definition: async_manager.hpp:159
std
log_level::DEBUG
@ DEBUG
Definition: typedefs.hpp:180
io::AsyncManager::connect
bool connect()
Connects the stream.
Definition: async_manager.hpp:183
length
TF2SIMD_FORCE_INLINE tf2Scalar length(const Quaternion &q)
io::AsyncManagerBase::~AsyncManagerBase
virtual ~AsyncManagerBase()
Definition: async_manager.hpp:93
io::AsyncManager::readStringElements
void readStringElements()
Definition: async_manager.hpp:599
io::AsyncManager::receive
void receive()
Definition: async_manager.hpp:241
io::AsyncManager::setPort
void setPort(const std::string &port)
Definition: async_manager.hpp:215
io::AsyncManager::~AsyncManager
~AsyncManager()
Definition: async_manager.hpp:176
io::AsyncManager::runWatchdog
void runWatchdog()
Definition: async_manager.hpp:260
log_level::INFO
@ INFO
Definition: typedefs.hpp:181


septentrio_gnss_driver
Author(s): Tibor Dome, Thomas Emter
autogenerated on Sat May 10 2025 03:03:10