25 #if BOOST_VERSION >= 107000
26 #define GET_IO_SERVICE(s) ((boost::asio::io_context&)(s).get_executor().context())
28 #define GET_IO_SERVICE(s) ((s).get_io_service())
33 using boost::system::error_code;
34 using boost::asio::io_service;
35 using boost::asio::ip::tcp;
36 using boost::asio::buffer;
38 using mavlink::mavlink_message_t;
39 using mavlink::mavlink_status_t;
41 #define PFX "mavconn: tcp"
42 #define PFXd PFX "%zu: "
45 static bool resolve_address_tcp(io_service &io,
size_t chan, std::string host,
unsigned short port, tcp::endpoint &ep)
48 tcp::resolver resolver(io);
51 tcp::resolver::query query(host,
"");
53 auto fn = [&](
const tcp::endpoint & q_ep) {
60 #if BOOST_ASIO_VERSION >= 101200
61 for (
auto q_ep : resolver.resolve(query, ec)) fn(q_ep);
63 std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(), fn);
78 std::string server_host,
unsigned short server_port) :
81 io_work(new io_service::work(io_service)),
84 tx_in_progress(false),
89 throw DeviceError(
"tcp: resolve",
"Bind address resolve failed");
94 socket.open(tcp::v4());
95 socket.connect(server_ep);
97 catch (boost::system::system_error &err) {
98 throw DeviceError(
"tcp", err);
103 boost::asio::io_service &server_io) :
106 is_destroying(false),
107 tx_in_progress(false),
130 const ReceivedCb &cb_handle_message,
131 const ClosedCb &cb_handle_closed_port)
152 boost::system::error_code ec;
153 socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
182 throw std::length_error(
"MAVConnTCPClient::send_bytes: TX queue overflow");
184 tx_q.emplace_back(bytes, length);
191 assert(message !=
nullptr);
204 throw std::length_error(
"MAVConnTCPClient::send_message: TX queue overflow");
206 tx_q.emplace_back(message);
224 throw std::length_error(
"MAVConnTCPClient::send_message: TX queue overflow");
236 auto sthis = shared_from_this();
239 [sthis] (error_code error,
size_t bytes_transferred) {
246 sthis->parse_buffer(
PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
261 auto sthis = shared_from_this();
262 auto &buf_ref =
tx_q.front();
264 buffer(buf_ref.dpos(), buf_ref.nbytes()),
265 [sthis, &buf_ref] (error_code error,
size_t bytes_transferred) {
266 assert(bytes_transferred <= buf_ref.len);
269 CONSOLE_BRIDGE_logError(PFXd
"send: %s", sthis->conn_id, error.message().c_str());
274 sthis->iostat_tx_add(bytes_transferred);
277 if (sthis->tx_q.empty()) {
278 sthis->tx_in_progress = false;
282 buf_ref.pos += bytes_transferred;
283 if (buf_ref.nbytes() == 0) {
284 sthis->tx_q.pop_front();
287 if (!sthis->tx_q.empty())
288 sthis->do_send(
false);
290 sthis->tx_in_progress =
false;
297 MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
298 std::string server_host,
unsigned short server_port) :
301 acceptor(io_service),
305 throw DeviceError(
"tcp-l: resolve",
"Bind address resolve failed");
311 acceptor.set_option(tcp::acceptor::reuse_address(
true));
315 catch (boost::system::system_error &err) {
327 const ReceivedCb &cb_handle_message,
328 const ClosedCb &cb_handle_closed_port)
350 "All connections will be closed.",
conn_id);
364 mavlink_status_t status {};
368 auto inst_status = instp->get_status();
374 status.packet_rx_success_count += inst_status.packet_rx_success_count;
375 status.packet_rx_drop_count += inst_status.packet_rx_drop_count;
376 status.buffer_overrun += inst_status.buffer_overrun;
377 status.parse_error += inst_status.parse_error;
392 auto inst_iostat = instp->get_iostat();
399 iostat.tx_total_bytes += inst_iostat.tx_total_bytes;
400 iostat.tx_speed += inst_iostat.tx_speed;
401 iostat.rx_total_bytes += inst_iostat.rx_total_bytes;
402 iostat.rx_speed += inst_iostat.rx_speed;
413 instp->send_bytes(bytes, length);
421 instp->send_message(message);
429 instp->send_message(message, source_compid);
438 auto sthis = shared_from_this();
441 acceptor_client->socket,
442 acceptor_client->server_ep,
443 [sthis, acceptor_client] (error_code error) {
445 CONSOLE_BRIDGE_logError(PFXd
"accept: %s", sthis->conn_id, error.message().c_str());
451 lock_guard lock(sthis->mutex);
453 std::weak_ptr<MAVConnTCPClient> weak_client{acceptor_client};
454 acceptor_client->message_received_cb = sthis->message_received_cb;
455 acceptor_client->port_closed_cb = [weak_client, sthis] () { sthis->client_closed(weak_client); };
456 acceptor_client->client_connected(sthis->conn_id);
458 sthis->client_list.push_back(acceptor_client);
466 if (
auto instp = weak_instp.lock()) {