62 #include <boost/asio.hpp>
63 #include <boost/bind/bind.hpp>
64 #include <boost/regex.hpp>
95 [[nodiscard]]
virtual bool connect() = 0;
97 virtual void close() = 0;
99 virtual void send(
const std::string& cmd) = 0;
110 template <
typename IoType>
127 void setPort(
const std::string& port);
129 void send(
const std::string& cmd);
137 void write(
const std::string& cmd);
139 template <u
int8_t index>
166 template <
typename IoType>
169 node_(node), ioContext_(
std::make_shared<
boost::asio::io_context>()),
170 ioInterface_(node, ioContext_), telegramQueue_(telegramQueue)
175 template <
typename IoType>
182 template <
typename IoType>
187 if (!ioInterface_.connect())
197 template <
typename IoType>
202 ioInterface_.close();
204 if (ioThread_.joinable())
209 if (watchdogThread_.joinable())
210 watchdogThread_.join();
214 template <
typename IoType>
217 ioInterface_.setPort(port);
220 template <
typename IoType>
226 "AsyncManager message size to be sent to the Rx would be 0");
230 boost::asio::post(*ioContext_,
234 template <
typename IoType>
240 template <
typename IoType>
246 if (!watchdogThread_.joinable())
251 template <
typename IoType>
254 ioContext_->restart();
259 template <
typename IoType>
264 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
265 if (running_ && ioContext_->stopped())
267 if (node_->settings()->read_from_sbf_log ||
268 node_->settings()->read_from_pcap)
272 "AsyncManager finished reading file. Node will continue to publish queued messages.");
277 "AsyncManager connection lost. Trying to reconnect.");
279 connected_ = ioInterface_.connect();
283 }
else if (running_ && std::is_same<TcpIo, IoType>::value)
286 std::string empty =
" ";
287 boost::asio::async_write(
288 *(ioInterface_.stream_), boost::asio::buffer(empty.data(), 1),
289 [
this](boost::system::error_code ec, std::size_t ) {
297 template <
typename IoType>
300 boost::asio::async_write(
301 *(ioInterface_.stream_), boost::asio::buffer(cmd.data(), cmd.size()),
302 [
this, cmd](boost::system::error_code ec, std::size_t ) {
306 node_->log(log_level::DEBUG,
"AsyncManager sent the following " +
307 std::to_string(cmd.size()) +
308 " bytes to the Rx: " + cmd);
311 node_->log(log_level::ERROR,
312 "AsyncManager was unable to send the following " +
313 std::to_string(cmd.size()) +
314 " bytes to the Rx: " + cmd);
319 template <
typename IoType>
322 telegram_ = std::make_shared<Telegram>();
326 template <
typename IoType>
327 template <u
int8_t index>
330 static_assert(index < 3);
332 boost::asio::async_read(
333 *(ioInterface_.stream_),
334 boost::asio::buffer(telegram_->message.data() + index, 1),
335 [
this](boost::system::error_code ec, std::size_t numBytes) {
336 Timestamp stamp = node_->getTime();
342 uint8_t& currByte = telegram_->message[index];
344 if (currByte == SYNC_BYTE_1)
346 telegram_->stamp = stamp;
354 telegram_->type = telegram_type::UNKNOWN;
362 case SBF_SYNC_BYTE_2:
364 telegram_->type = telegram_type::SBF;
368 case NMEA_SYNC_BYTE_2:
370 telegram_->type = telegram_type::NMEA;
374 case NMEA_INS_SYNC_BYTE_2:
376 telegram_->type = telegram_type::NMEA_INS;
380 case RESPONSE_SYNC_BYTE_2:
382 telegram_->type = telegram_type::RESPONSE;
388 std::stringstream ss;
389 ss << std::hex << currByte;
392 "AsyncManager sync byte 2 read fault, should never come here.. Received byte was " +
404 case NMEA_SYNC_BYTE_3:
406 if (telegram_->type == telegram_type::NMEA)
412 case NMEA_INS_SYNC_BYTE_3:
414 if (telegram_->type == telegram_type::NMEA_INS)
420 case RESPONSE_SYNC_BYTE_3:
422 if (telegram_->type == telegram_type::RESPONSE)
428 case RESPONSE_SYNC_BYTE_3a:
430 if (telegram_->type == telegram_type::RESPONSE)
436 case ERROR_SYNC_BYTE_3:
438 if (telegram_->type == telegram_type::RESPONSE)
441 telegram_type::ERROR_RESPONSE;
449 std::stringstream ss;
450 ss << std::hex << currByte;
453 "AsyncManager sync byte 3 read fault, should never come here. Received byte was " +
465 "AsyncManager sync read fault, unknown sync byte 2 found.");
475 "AsyncManager sync read fault, wrong number of bytes read: " +
476 std::to_string(numBytes));
481 node_->log(log_level::DEBUG,
482 "AsyncManager sync read error: " + ec.message());
484 if ((boost::asio::error::eof == ec) ||
485 (boost::asio::error::network_unreachable == ec) ||
486 (boost::asio::error::interrupted == ec) ||
487 (boost::asio::error::bad_descriptor == ec) ||
488 (boost::asio::error::connection_reset == ec))
500 template <
typename IoType>
505 boost::asio::async_read(
506 *(ioInterface_.stream_),
507 boost::asio::buffer(telegram_->message.data() + 2,
SBF_HEADER_SIZE - 2),
508 [
this](boost::system::error_code ec, std::size_t numBytes) {
511 if (numBytes == (SBF_HEADER_SIZE - 2))
514 parsing_utilities::getLength(telegram_->message);
515 if (length > MAX_SBF_SIZE)
519 "AsyncManager SBF header read fault, length of block exceeds " +
520 std::to_string(MAX_SBF_SIZE) +
": " +
521 std::to_string(length));
528 "AsyncManager SBF header read fault, wrong number of bytes read: " +
529 std::to_string(numBytes));
534 node_->log(log_level::DEBUG,
535 "AsyncManager SBF header read error: " +
542 template <
typename IoType>
545 telegram_->message.resize(
length);
547 boost::asio::async_read(
548 *(ioInterface_.stream_),
551 [
this,
length](boost::system::error_code ec, std::size_t numBytes) {
554 if (numBytes == (length - SBF_HEADER_SIZE))
556 if (crc::isValid(telegram_->message))
558 telegramQueue_->push(telegram_);
560 node_->log(log_level::DEBUG,
561 "AsyncManager crc failed for SBF " +
562 std::to_string(parsing_utilities::getId(
563 telegram_->message)) +
569 "AsyncManager SBF read fault, wrong number of bytes read: " +
570 std::to_string(numBytes));
575 node_->log(log_level::DEBUG,
576 "AsyncManager SBF read error: " + ec.message());
582 template <
typename IoType>
585 telegram_->message.resize(1);
586 telegram_->message.reserve(256);
587 readStringElements();
590 template <
typename IoType>
593 telegram_->message.resize(3);
594 telegram_->message.reserve(256);
595 readStringElements();
598 template <
typename IoType>
601 boost::asio::async_read(
602 *(ioInterface_.stream_), boost::asio::buffer(buf_.data(), 1),
603 [
this](boost::system::error_code ec, std::size_t numBytes) {
608 telegram_->message.push_back(buf_[0]);
618 telegram_ = std::make_shared<Telegram>();
619 telegram_->message[0] = buf_[0];
620 telegram_->stamp = node_->getTime();
623 "AsyncManager string read fault, sync 1 found.");
629 if (telegram_->message[telegram_->message.size() - 2] ==
631 telegramQueue_->push(telegram_);
636 std::string(telegram_->message.begin(),
637 telegram_->message.end()));
641 case CONNECTION_DESCRIPTOR_FOOTER:
643 telegram_->type = telegram_type::CONNECTION_DESCRIPTOR;
644 telegramQueue_->push(telegram_);
650 readStringElements();
658 "AsyncManager string read fault, wrong number of bytes read: " +
659 std::to_string(numBytes));
664 node_->log(log_level::DEBUG,
665 "AsyncManager string read error: " + ec.message());