62 #include <boost/asio.hpp>
63 #include <boost/bind.hpp>
64 #include <boost/regex.hpp>
95 [[nodiscard]]
virtual bool connect() = 0;
97 virtual void send(
const std::string& cmd) = 0;
107 template <
typename IoType>
122 void setPort(
const std::string& port);
124 void send(
const std::string& cmd);
131 void write(
const std::string& cmd);
133 template <u
int8_t index>
158 template <
typename IoType>
162 ioService_(new
boost::asio::io_service), ioInterface_(node, ioService_),
163 telegramQueue_(telegramQueue)
168 template <
typename IoType>
176 watchdogThread_.join();
180 template <
typename IoType>
185 if (!ioInterface_.connect())
194 template <
typename IoType>
197 ioInterface_.setPort(port);
200 template <
typename IoType>
206 "AsyncManager message size to be sent to the Rx would be 0");
213 template <
typename IoType>
219 if (!watchdogThread_.joinable())
224 template <
typename IoType>
227 ioService_->post([
this]() { ioInterface_.close(); });
230 template <
typename IoType>
237 template <
typename IoType>
242 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
243 if (running_ && ioService_->stopped())
245 if (node_->settings()->read_from_sbf_log ||
246 node_->settings()->read_from_pcap)
250 "AsyncManager finished reading file. Node will continue to publish queued messages.");
255 "AsyncManager connection lost. Trying to reconnect.");
258 while (!ioInterface_.connect())
259 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
262 }
else if (running_ && std::is_same<TcpIo, IoType>::value)
265 std::string empty =
" ";
266 boost::asio::async_write(
267 *(ioInterface_.stream_), boost::asio::buffer(empty.data(), 1),
268 [](boost::system::error_code ec, std::size_t ) {});
273 template <
typename IoType>
276 boost::asio::async_write(
277 *(ioInterface_.stream_), boost::asio::buffer(cmd.data(), cmd.size()),
278 [
this, cmd](boost::system::error_code ec, std::size_t ) {
282 node_->log(log_level::DEBUG,
"AsyncManager sent the following " +
283 std::to_string(cmd.size()) +
284 " bytes to the Rx: " + cmd);
287 node_->log(log_level::ERROR,
288 "AsyncManager was unable to send the following " +
289 std::to_string(cmd.size()) +
290 " bytes to the Rx: " + cmd);
295 template <
typename IoType>
302 template <
typename IoType>
303 template <u
int8_t index>
306 static_assert(index < 3);
308 boost::asio::async_read(
309 *(ioInterface_.stream_),
310 boost::asio::buffer(telegram_->message.data() + index, 1),
311 [
this](boost::system::error_code ec, std::size_t numBytes) {
312 Timestamp stamp = node_->getTime();
318 uint8_t& currByte = telegram_->message[index];
320 if (currByte == SYNC_BYTE_1)
322 telegram_->stamp = stamp;
330 telegram_->type = telegram_type::UNKNOWN;
338 case SBF_SYNC_BYTE_2:
340 telegram_->type = telegram_type::SBF;
344 case NMEA_SYNC_BYTE_2:
346 telegram_->type = telegram_type::NMEA;
350 case NMEA_INS_SYNC_BYTE_2:
352 telegram_->type = telegram_type::NMEA_INS;
356 case RESPONSE_SYNC_BYTE_2:
358 telegram_->type = telegram_type::RESPONSE;
364 std::stringstream ss;
365 ss << std::hex << currByte;
368 "AsyncManager sync byte 2 read fault, should never come here.. Received byte was " +
380 case NMEA_SYNC_BYTE_3:
382 if (telegram_->type == telegram_type::NMEA)
388 case NMEA_INS_SYNC_BYTE_3:
390 if (telegram_->type == telegram_type::NMEA_INS)
396 case RESPONSE_SYNC_BYTE_3:
398 if (telegram_->type == telegram_type::RESPONSE)
404 case RESPONSE_SYNC_BYTE_3a:
406 if (telegram_->type == telegram_type::RESPONSE)
412 case ERROR_SYNC_BYTE_3:
414 if (telegram_->type == telegram_type::RESPONSE)
417 telegram_type::ERROR_RESPONSE;
425 std::stringstream ss;
426 ss << std::hex << currByte;
429 "AsyncManager sync byte 3 read fault, should never come here. Received byte was " +
441 "AsyncManager sync read fault, should never come here.");
451 "AsyncManager sync read fault, wrong number of bytes read: " +
452 std::to_string(numBytes));
457 node_->log(log_level::DEBUG,
458 "AsyncManager sync read error: " + ec.message());
463 template <
typename IoType>
468 boost::asio::async_read(
469 *(ioInterface_.stream_),
470 boost::asio::buffer(telegram_->message.data() + 2,
SBF_HEADER_SIZE - 2),
471 [
this](boost::system::error_code ec, std::size_t numBytes) {
474 if (numBytes == (SBF_HEADER_SIZE - 2))
477 parsing_utilities::getLength(telegram_->message);
478 if (length > MAX_SBF_SIZE)
482 "AsyncManager SBF header read fault, length of block exceeds " +
483 std::to_string(MAX_SBF_SIZE) +
": " +
484 std::to_string(length));
491 "AsyncManager SBF header read fault, wrong number of bytes read: " +
492 std::to_string(numBytes));
497 node_->log(log_level::DEBUG,
498 "AsyncManager SBF header read error: " +
504 template <
typename IoType>
507 telegram_->message.resize(
length);
509 boost::asio::async_read(
510 *(ioInterface_.stream_),
513 [
this,
length](boost::system::error_code ec, std::size_t numBytes) {
516 if (numBytes == (length - SBF_HEADER_SIZE))
518 if (crc::isValid(telegram_->message))
520 telegramQueue_->push(telegram_);
522 node_->log(log_level::DEBUG,
523 "AsyncManager crc failed for SBF " +
524 std::to_string(parsing_utilities::getId(
525 telegram_->message)) +
531 "AsyncManager SBF read fault, wrong number of bytes read: " +
532 std::to_string(numBytes));
537 node_->log(log_level::DEBUG,
538 "AsyncManager SBF read error: " + ec.message());
543 template <
typename IoType>
546 telegram_->message.resize(1);
547 telegram_->message.reserve(256);
548 readStringElements();
551 template <
typename IoType>
554 telegram_->message.resize(3);
555 telegram_->message.reserve(256);
556 readStringElements();
559 template <
typename IoType>
562 boost::asio::async_read(
563 *(ioInterface_.stream_), boost::asio::buffer(buf_.data(), 1),
564 [
this](boost::system::error_code ec, std::size_t numBytes) {
569 telegram_->message.push_back(buf_[0]);
579 telegram_.reset(new Telegram);
580 telegram_->message[0] = buf_[0];
581 telegram_->stamp = node_->getTime();
584 "AsyncManager string read fault, sync 1 found.");
590 if (telegram_->message[telegram_->message.size() - 2] ==
592 telegramQueue_->push(telegram_);
597 std::string(telegram_->message.begin(),
598 telegram_->message.end()));
602 case CONNECTION_DESCRIPTOR_FOOTER:
604 telegram_->type = telegram_type::CONNECTION_DESCRIPTOR;
605 telegramQueue_->push(telegram_);
611 readStringElements();
619 "AsyncManager string read fault, wrong number of bytes read: " +
620 std::to_string(numBytes));
625 node_->log(log_level::DEBUG,
626 "AsyncManager string read error: " + ec.message());