Program Listing for File io.hpp
↰ Return to documentation for file (/tmp/ws/src/septentrio_gnss_driver/include/septentrio_gnss_driver/communication/io.hpp
)
// *****************************************************************************
//
// © Copyright 2020, Septentrio NV/SA.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// 3. Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
// *****************************************************************************
#pragma once
// C++
#include <thread>
// Linux
#include <linux/input.h>
#include <linux/serial.h>
// Boost
#include <boost/asio.hpp>
// pcap
#include <pcap.h>
// ROSaic
#include <septentrio_gnss_driver/abstraction/typedefs.hpp>
#include <septentrio_gnss_driver/communication/telegram.hpp>
const static std::array<uint32_t, 21> baudrates = {
1200, 2400, 4800, 9600, 19200, 38400, 57600,
115200, 230400, 460800, 500000, 576000, 921600, 1000000,
1152000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000};
namespace io {
class UdpClient
{
public:
UdpClient(ROSaicNodeBase* node, int16_t port, TelegramQueue* telegramQueue) :
node_(node), running_(true), port_(port), telegramQueue_(telegramQueue)
{
connect();
watchdogThread_ =
std::thread(boost::bind(&UdpClient::runWatchdog, this));
}
~UdpClient()
{
running_ = false;
node_->log(log_level::INFO, "UDP client shutting down threads");
ioService_.stop();
ioThread_.join();
watchdogThread_.join();
node_->log(log_level::INFO, " UDP client threads stopped");
}
private:
void connect()
{
socket_.reset(new boost::asio::ip::udp::socket(
ioService_,
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_)));
asyncReceive();
ioThread_ = std::thread(boost::bind(&UdpClient::runIoService, this));
node_->log(log_level::INFO,
"Listening on UDP port " + std::to_string(port_));
}
void asyncReceive()
{
socket_->async_receive_from(
boost::asio::buffer(buffer_, MAX_UDP_PACKET_SIZE), eP_,
boost::bind(&UdpClient::handleReceive, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handleReceive(const boost::system::error_code& error,
size_t bytes_recvd)
{
Timestamp stamp = node_->getTime();
size_t idx = 0;
if (!error && (bytes_recvd > 0))
{
while ((bytes_recvd - idx) > 2)
{
std::shared_ptr<Telegram> telegram(new Telegram);
telegram->stamp = stamp;
/*node_->log(log_level::DEBUG,
"Buffer: " + std::string(telegram->message.begin(),
telegram->message.end()));*/
if (buffer_[idx] == SYNC_BYTE_1)
{
if (buffer_[idx + 1] == SBF_SYNC_BYTE_2)
{
if ((bytes_recvd - idx) > SBF_HEADER_SIZE)
{
uint16_t length = parsing_utilities::parseUInt16(
&buffer_[idx + 6]);
telegram->message.assign(&buffer_[idx],
&buffer_[idx + length]);
if (crc::isValid(telegram->message))
{
telegram->type = telegram_type::SBF;
telegramQueue_->push(telegram);
} else
node_->log(
log_level::DEBUG,
"AsyncManager crc failed for SBF " +
std::to_string(parsing_utilities::getId(
telegram->message)) +
".");
idx += length;
}
} else if ((buffer_[idx + 1] == NMEA_SYNC_BYTE_2) &&
(buffer_[idx + 2] == NMEA_SYNC_BYTE_3))
{
size_t idx_end = findNmeaEnd(idx, bytes_recvd);
telegram->message.assign(&buffer_[idx],
&buffer_[idx_end + 1]);
telegram->type = telegram_type::NMEA;
telegramQueue_->push(telegram);
idx = idx_end + 1;
} else if ((buffer_[idx + 1] == NMEA_INS_SYNC_BYTE_2) &&
(buffer_[idx + 2] == NMEA_INS_SYNC_BYTE_3))
{
size_t idx_end = findNmeaEnd(idx, bytes_recvd);
telegram->message.assign(&buffer_[idx],
&buffer_[idx_end + 1]);
telegram->type = telegram_type::NMEA_INS;
telegramQueue_->push(telegram);
idx = idx_end + 1;
} else
{
node_->log(log_level::DEBUG,
"head: " +
std::string(std::string(
telegram->message.begin(),
telegram->message.begin() + 2)));
}
} else
{
node_->log(log_level::DEBUG, "UDP msg resync.");
++idx;
}
}
} else
{
node_->log(log_level::ERROR,
"UDP client receive error: " + error.message());
}
asyncReceive();
}
void runIoService()
{
ioService_.run();
node_->log(log_level::INFO, "UDP client ioService terminated.");
}
void runWatchdog()
{
while (running_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (running_ && ioService_.stopped())
{
node_->log(log_level::ERROR,
"UDP client connection lost. Trying to reconnect.");
ioService_.reset();
ioThread_.join();
connect();
}
}
}
private:
size_t findNmeaEnd(size_t idx, size_t bytes_recvd)
{
size_t idx_end = idx + 2;
while (idx_end < bytes_recvd)
{
if ((buffer_[idx_end] == LF) && (buffer_[idx_end - 1] == CR))
break;
++idx_end;
}
return idx_end;
}
ROSaicNodeBase* node_;
std::atomic<bool> running_;
int16_t port_;
boost::asio::io_service ioService_;
std::thread ioThread_;
std::thread watchdogThread_;
boost::asio::ip::udp::endpoint eP_;
std::unique_ptr<boost::asio::ip::udp::socket> socket_;
std::array<uint8_t, MAX_UDP_PACKET_SIZE> buffer_;
TelegramQueue* telegramQueue_;
};
class TcpIo
{
public:
TcpIo(ROSaicNodeBase* node,
std::shared_ptr<boost::asio::io_service> ioService) :
node_(node),
ioService_(ioService)
{
port_ = node_->settings()->device_tcp_port;
}
~TcpIo() { stream_->close(); }
void close() { stream_->close(); }
void setPort(const std::string& port) { port_ = port; }
[[nodiscard]] bool connect()
{
boost::asio::ip::tcp::resolver::iterator endpointIterator;
try
{
boost::asio::ip::tcp::resolver resolver(*ioService_);
boost::asio::ip::tcp::resolver::query query(
node_->settings()->device_tcp_ip, port_);
endpointIterator = resolver.resolve(query);
} catch (std::runtime_error& e)
{
node_->log(log_level::ERROR,
"Could not resolve " + node_->settings()->device_tcp_ip +
" on port " + port_ + ": " + e.what());
return false;
}
stream_.reset(new boost::asio::ip::tcp::socket(*ioService_));
node_->log(log_level::INFO, "Connecting to tcp://" +
node_->settings()->device_tcp_ip + ":" +
port_ + "...");
try
{
stream_->connect(*endpointIterator);
stream_->set_option(boost::asio::ip::tcp::no_delay(true));
node_->log(log_level::INFO,
"Connected to " + endpointIterator->host_name() + ":" +
endpointIterator->service_name() + ".");
} catch (std::runtime_error& e)
{
node_->log(log_level::ERROR,
"Could not connect to " + endpointIterator->host_name() +
": " + endpointIterator->service_name() + ": " +
e.what());
return false;
}
return true;
}
private:
ROSaicNodeBase* node_;
std::shared_ptr<boost::asio::io_service> ioService_;
std::string port_;
public:
std::unique_ptr<boost::asio::ip::tcp::socket> stream_;
};
class SerialIo
{
public:
SerialIo(ROSaicNodeBase* node,
std::shared_ptr<boost::asio::io_service> ioService) :
node_(node),
ioService_(ioService), flowcontrol_(node->settings()->hw_flow_control),
baudrate_(node->settings()->baudrate)
{
stream_.reset(new boost::asio::serial_port(*ioService_));
}
~SerialIo() { stream_->close(); }
void close() { stream_->close(); }
[[nodiscard]] bool connect()
{
if (stream_->is_open())
{
stream_->close();
}
bool opened = false;
while (!opened)
{
try
{
node_->log(log_level::INFO,
"Connecting serially to device " +
node_->settings()->device +
", targeted baudrate: " +
std::to_string(node_->settings()->baudrate));
stream_->open(node_->settings()->device);
opened = true;
} catch (const boost::system::system_error& err)
{
node_->log(log_level::ERROR, "Could not open serial port " +
node_->settings()->device +
". Error: " + err.what() +
". Will retry every second.");
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
}
}
// No Parity, 8bits data, 1 stop Bit
stream_->set_option(boost::asio::serial_port_base::baud_rate(baudrate_));
stream_->set_option(boost::asio::serial_port_base::parity(
boost::asio::serial_port_base::parity::none));
stream_->set_option(boost::asio::serial_port_base::character_size(8));
stream_->set_option(boost::asio::serial_port_base::stop_bits(
boost::asio::serial_port_base::stop_bits::one));
// Hardware flow control settings
if (flowcontrol_ == "RTS|CTS")
{
stream_->set_option(boost::asio::serial_port_base::flow_control(
boost::asio::serial_port_base::flow_control::hardware));
} else
{
stream_->set_option(boost::asio::serial_port_base::flow_control(
boost::asio::serial_port_base::flow_control::none));
}
// Set low latency
int fd = stream_->native_handle();
struct serial_struct serialInfo;
ioctl(fd, TIOCGSERIAL, &serialInfo);
serialInfo.flags |= ASYNC_LOW_LATENCY;
ioctl(fd, TIOCSSERIAL, &serialInfo);
return setBaudrate();
}
[[nodiscard]] bool setBaudrate()
{
// Setting the baudrate, incrementally..
node_->log(log_level::DEBUG,
"Gradually increasing the baudrate to the desired value...");
boost::asio::serial_port_base::baud_rate current_baudrate;
node_->log(log_level::DEBUG, "Initiated current_baudrate object...");
try
{
stream_->get_option(current_baudrate); // Note that this sets
// current_baudrate.value()
// often to 115200, since by
// default, all Rx COM ports,
// at least for mosaic Rxs, are set to a baudrate of 115200 baud,
// using 8 data-bits, no parity and 1 stop-bit.
} catch (boost::system::system_error& e)
{
node_->log(log_level::ERROR, "get_option failed due to " +
static_cast<std::string>(e.what()));
node_->log(log_level::INFO, "Additional info about error is " +
static_cast<std::string>(e.what()));
/*
boost::system::error_code e_loop;
do // Caution: Might cause infinite loop..
{
stream_->get_option(current_baudrate, e_loop);
} while(e_loop);
*/
return false;
}
// Gradually increase the baudrate to the desired value
// The desired baudrate can be lower or larger than the
// current baudrate; the for loop takes care of both scenarios.
node_->log(log_level::DEBUG,
"Current baudrate is " +
std::to_string(current_baudrate.value()));
for (uint8_t i = 0; i < baudrates.size(); i++)
{
if (current_baudrate.value() == baudrate_)
{
break; // Break if the desired baudrate has been reached.
}
if (current_baudrate.value() >= baudrates[i] &&
baudrate_ > baudrates[i])
{
continue;
}
// Increment until Baudrate[i] matches current_baudrate.
try
{
stream_->set_option(
boost::asio::serial_port_base::baud_rate(baudrates[i]));
} catch (boost::system::system_error& e)
{
node_->log(log_level::ERROR,
"set_option failed due to " +
static_cast<std::string>(e.what()));
node_->log(log_level::INFO,
"Additional info about error is " +
static_cast<std::string>(e.what()));
return false;
}
using namespace std::chrono_literals;
std::this_thread::sleep_for(500ms);
try
{
stream_->get_option(current_baudrate);
} catch (boost::system::system_error& e)
{
node_->log(log_level::ERROR,
"get_option failed due to " +
static_cast<std::string>(e.what()));
node_->log(log_level::INFO,
"Additional info about error is " +
static_cast<std::string>(e.what()));
/*
boost::system::error_code e_loop;
do // Caution: Might cause infinite loop..
{
stream_->get_option(current_baudrate, e_loop);
} while(e_loop);
*/
return false;
}
node_->log(log_level::DEBUG,
"Set ASIO baudrate to " +
std::to_string(current_baudrate.value()));
}
node_->log(log_level::INFO,
"Set ASIO baudrate to " +
std::to_string(current_baudrate.value()) +
", leaving InitializeSerial() method");
// clear io
::tcflush(stream_->native_handle(), TCIOFLUSH);
return true;
}
private:
ROSaicNodeBase* node_;
std::shared_ptr<boost::asio::io_service> ioService_;
std::string flowcontrol_;
uint32_t baudrate_;
public:
std::unique_ptr<boost::asio::serial_port> stream_;
};
class SbfFileIo
{
public:
SbfFileIo(ROSaicNodeBase* node,
std::shared_ptr<boost::asio::io_service> ioService) :
node_(node),
ioService_(ioService)
{
}
~SbfFileIo() { stream_->close(); }
void close() { stream_->close(); }
[[nodiscard]] bool connect()
{
node_->log(log_level::INFO, "Opening SBF file stream" +
node_->settings()->device + "...");
int fd = open(node_->settings()->device.c_str(), O_RDONLY);
if (fd == -1)
{
node_->log(log_level::ERROR, "open SBF file failed.");
return false;
}
try
{
stream_.reset(
new boost::asio::posix::stream_descriptor(*ioService_));
stream_->assign(fd);
} catch (std::runtime_error& e)
{
node_->log(log_level::ERROR, "assigning SBF file failed due to " +
static_cast<std::string>(e.what()));
return false;
}
return true;
}
private:
ROSaicNodeBase* node_;
std::shared_ptr<boost::asio::io_service> ioService_;
public:
std::unique_ptr<boost::asio::posix::stream_descriptor> stream_;
};
class PcapFileIo
{
public:
PcapFileIo(ROSaicNodeBase* node,
std::shared_ptr<boost::asio::io_service> ioService) :
node_(node),
ioService_(ioService)
{
}
~PcapFileIo()
{
pcap_close(pcap_);
stream_->close();
}
void close()
{
pcap_close(pcap_);
stream_->close();
}
[[nodiscard]] bool connect()
{
try
{
node_->log(log_level::INFO, "Opening pcap file stream" +
node_->settings()->device + "...");
stream_.reset(
new boost::asio::posix::stream_descriptor(*ioService_));
pcap_ = pcap_open_offline(node_->settings()->device.c_str(),
errBuff_.data());
stream_->assign(pcap_get_selectable_fd(pcap_));
} catch (std::runtime_error& e)
{
node_->log(log_level::ERROR, "assigning PCAP file failed due to " +
static_cast<std::string>(e.what()));
return false;
}
return true;
}
private:
ROSaicNodeBase* node_;
std::shared_ptr<boost::asio::io_service> ioService_;
std::array<char, 100> errBuff_;
pcap_t* pcap_;
public:
std::unique_ptr<boost::asio::posix::stream_descriptor> stream_;
};
} // namespace io