25 #if BOOST_VERSION >= 107000 26 #define GET_IO_SERVICE(s) ((boost::asio::io_context&)(s).get_executor().context()) 28 #define GET_IO_SERVICE(s) ((s).get_io_service()) 33 using boost::system::error_code;
34 using boost::asio::io_service;
35 using boost::asio::ip::tcp;
36 using boost::asio::buffer;
38 using mavlink::mavlink_message_t;
39 using mavlink::mavlink_status_t;
41 #define PFX "mavconn: tcp" 42 #define PFXd PFX "%zu: " 45 static bool resolve_address_tcp(io_service &io,
size_t chan, std::string host,
unsigned short port, tcp::endpoint &ep)
48 tcp::resolver resolver(io);
51 tcp::resolver::query query(host,
"");
53 auto fn = [&](
const tcp::endpoint & q_ep) {
60 #if BOOST_ASIO_VERSION >= 101200 61 for (
auto q_ep : resolver.resolve(query, ec)) fn(q_ep);
63 std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(), fn);
78 std::string server_host,
unsigned short server_port) :
81 io_work(new io_service::work(io_service)),
84 tx_in_progress(false),
89 throw DeviceError(
"tcp: resolve",
"Bind address resolve failed");
97 catch (boost::system::system_error &err) {
114 boost::asio::io_service &server_io) :
145 socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
172 throw std::length_error(
"MAVConnTCPClient::send_bytes: TX queue overflow");
174 tx_q.emplace_back(bytes, length);
181 assert(message !=
nullptr);
194 throw std::length_error(
"MAVConnTCPClient::send_message: TX queue overflow");
196 tx_q.emplace_back(message);
214 throw std::length_error(
"MAVConnTCPClient::send_message: TX queue overflow");
226 auto sthis = shared_from_this();
229 [sthis] (error_code error,
size_t bytes_transferred) {
236 sthis->parse_buffer(
PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
251 auto sthis = shared_from_this();
252 auto &buf_ref =
tx_q.front();
254 buffer(buf_ref.dpos(), buf_ref.nbytes()),
255 [sthis, &buf_ref] (error_code error,
size_t bytes_transferred) {
256 assert(bytes_transferred <= buf_ref.len);
264 sthis->iostat_tx_add(bytes_transferred);
267 if (sthis->tx_q.empty()) {
268 sthis->tx_in_progress =
false;
272 buf_ref.pos += bytes_transferred;
273 if (buf_ref.nbytes() == 0) {
274 sthis->tx_q.pop_front();
277 if (!sthis->tx_q.empty())
278 sthis->do_send(
false);
280 sthis->tx_in_progress =
false;
288 std::string server_host,
unsigned short server_port) :
295 throw DeviceError(
"tcp-l: resolve",
"Bind address resolve failed");
301 acceptor.set_option(tcp::acceptor::reuse_address(
true));
305 catch (boost::system::system_error &err) {
332 "All connections will be closed.",
conn_id);
350 auto inst_status = instp->get_status();
356 status.packet_rx_success_count += inst_status.packet_rx_success_count;
357 status.packet_rx_drop_count += inst_status.packet_rx_drop_count;
358 status.buffer_overrun += inst_status.buffer_overrun;
359 status.parse_error += inst_status.parse_error;
374 auto inst_iostat = instp->get_iostat();
381 iostat.tx_total_bytes += inst_iostat.tx_total_bytes;
382 iostat.tx_speed += inst_iostat.tx_speed;
383 iostat.rx_total_bytes += inst_iostat.rx_total_bytes;
384 iostat.rx_speed += inst_iostat.rx_speed;
395 instp->send_bytes(bytes, length);
403 instp->send_message(message);
411 instp->send_message(message, source_compid);
420 auto sthis = shared_from_this();
423 acceptor_client->socket,
424 acceptor_client->server_ep,
425 [sthis, acceptor_client] (error_code error) {
427 CONSOLE_BRIDGE_logError(PFXd
"accept: %s", sthis->conn_id, error.message().c_str());
434 std::weak_ptr<MAVConnTCPClient> weak_client{acceptor_client};
435 acceptor_client->client_connected(sthis->conn_id);
437 acceptor_client->port_closed_cb = [weak_client, sthis] () { sthis->client_closed(weak_client); };
439 sthis->client_list.push_back(acceptor_client);
446 if (
auto instp = weak_instp.lock()) {
447 bool locked =
mutex.try_lock();
void log_send(const char *pfx, const mavlink::mavlink_message_t *msg)
virtual ~MAVConnTCPClient()
ClosedCb port_closed_cb
Port closed notification callback.
MAVConnTCPClient(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string server_host=DEFAULT_SERVER_HOST, unsigned short server_port=DEFAULT_SERVER_PORT)
MAVConnTCPServer(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string bind_host=DEFAULT_BIND_HOST, unsigned short bind_port=DEFAULT_BIND_PORT)
std::lock_guard< std::recursive_mutex > lock_guard
boost::asio::ip::tcp::acceptor acceptor
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
#define CONSOLE_BRIDGE_logInform(fmt,...)
#define CONSOLE_BRIDGE_logWarn(fmt,...)
boost::asio::io_service io_service
void close() override
Close connection.
void close() override
Close connection.
static constexpr size_t MAX_TXQ_SIZE
Maximum count of transmission buffers.
std::atomic< bool > tx_in_progress
Common exception for communication error.
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
const std::string to_string_ss(T &obj)
Convert to string objects with operator <<.
bool set_this_thread_name(const std::string &name, Args &&...args)
Set name to current thread, printf-like.
#define CONSOLE_BRIDGE_logDebug(fmt,...)
void log_send_obj(const char *pfx, const mavlink::Message &msg)
ReceivedCb message_received_cb
Message receive callback.
std::recursive_mutex mutex
boost::asio::ip::tcp::endpoint bind_ep
std::recursive_mutex mutex
virtual ~MAVConnTCPServer()
Generic mavlink interface.
mavlink::mavlink_status_t get_status() override
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
uint8_t comp_id
Connection Component Id.
void client_connected(size_t server_channel)
void recv_message(const mavlink::mavlink_message_t *message, const Framing framing)
mavlink::mavlink_status_t * get_status_p()
size_t conn_id
Channel number used for logging.
#define CONSOLE_BRIDGE_logError(fmt,...)
static bool resolve_address_tcp(io_service &io, size_t chan, std::string host, unsigned short port, tcp::endpoint &ep)
std::array< uint8_t, MsgBuffer::MAX_SIZE > rx_buf
MAVConn console-bridge compatibility header.
struct __mavlink_status mavlink_status_t
std::atomic< bool > is_destroying
struct __mavlink_message mavlink_message_t
void client_closed(std::weak_ptr< MAVConnTCPClient > weak_instp)
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
#define GET_IO_SERVICE(s)
std::atomic< bool > is_destroying
IOStat get_iostat() override
boost::asio::ip::tcp::socket socket
std::list< std::shared_ptr< MAVConnTCPClient > > client_list
std::unique_ptr< boost::asio::io_service::work > io_work
boost::asio::ip::tcp::endpoint server_ep
uint8_t sys_id
Connection System Id.
std::deque< MsgBuffer > tx_q
boost::asio::io_service io_service
Framing
Rx packer framing status. (same as mavlink::mavlink_framing_t)
void do_send(bool check_tx_state)