tcp.cpp
Go to the documentation of this file.
1 
9 /*
10  * libmavconn
11  * Copyright 2014,2015,2016 Vladimir Ermakov, All rights reserved.
12  *
13  * This file is part of the mavros package and subject to the license terms
14  * in the top-level LICENSE file of the mavros repository.
15  * https://github.com/mavlink/mavros/tree/master/LICENSE.md
16  */
17 
18 #include <cassert>
19 
21 #include <mavconn/thread_utils.h>
22 #include <mavconn/tcp.h>
23 
24 // Ensure the correct io_service() is called based on boost version
25 #if BOOST_VERSION >= 107000
26 #define GET_IO_SERVICE(s) ((boost::asio::io_context&)(s).get_executor().context())
27 #else
28 #define GET_IO_SERVICE(s) ((s).get_io_service())
29 #endif
30 
31 namespace mavconn {
32 
33 using boost::system::error_code;
34 using boost::asio::io_service;
35 using boost::asio::ip::tcp;
36 using boost::asio::buffer;
38 using mavlink::mavlink_message_t;
39 using mavlink::mavlink_status_t;
40 
41 #define PFX "mavconn: tcp"
42 #define PFXd PFX "%zu: "
43 
44 
45 static bool resolve_address_tcp(io_service &io, size_t chan, std::string host, unsigned short port, tcp::endpoint &ep)
46 {
47  bool result = false;
48  tcp::resolver resolver(io);
49  error_code ec;
50 
51  tcp::resolver::query query(host, "");
52 
53  auto fn = [&](const tcp::endpoint & q_ep) {
54  ep = q_ep;
55  ep.port(port);
56  result = true;
57  CONSOLE_BRIDGE_logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
58  };
59 
60 #if BOOST_ASIO_VERSION >= 101200
61  for (auto q_ep : resolver.resolve(query, ec)) fn(q_ep);
62 #else
63  std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(), fn);
64 #endif
65 
66  if (ec) {
67  CONSOLE_BRIDGE_logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
68  result = false;
69  }
70 
71  return result;
72 }
73 
74 
75 /* -*- TCP client variant -*- */
76 
77 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
78  std::string server_host, unsigned short server_port) :
79  MAVConnInterface(system_id, component_id),
80  io_service(),
81  io_work(new io_service::work(io_service)),
82  socket(io_service),
83  is_destroying(false),
84  tx_in_progress(false),
85  tx_q {},
86  rx_buf {}
87 {
88  if (!resolve_address_tcp(io_service, conn_id, server_host, server_port, server_ep))
89  throw DeviceError("tcp: resolve", "Bind address resolve failed");
90 
91  CONSOLE_BRIDGE_logInform(PFXd "Server address: %s", conn_id, to_string_ss(server_ep).c_str());
92 
93  try {
94  socket.open(tcp::v4());
95  socket.connect(server_ep);
96  }
97  catch (boost::system::system_error &err) {
98  throw DeviceError("tcp", err);
99  }
100 }
101 
102 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
103  boost::asio::io_service &server_io) :
104  MAVConnInterface(system_id, component_id),
105  socket(server_io),
106  is_destroying(false),
107  tx_in_progress(false),
108  tx_q {},
109  rx_buf {}
110 {
111  // waiting when server call client_connected()
112 }
113 
114 void MAVConnTCPClient::client_connected(size_t server_channel)
115 {
116  CONSOLE_BRIDGE_logInform(PFXd "Got client, id: %zu, address: %s",
117  server_channel, conn_id, to_string_ss(server_ep).c_str());
118 
119  // start recv
120  GET_IO_SERVICE(socket).post(std::bind(&MAVConnTCPClient::do_recv, shared_from_this()));
121 }
122 
124 {
125  is_destroying = true;
126  close();
127 }
128 
130  const ReceivedCb &cb_handle_message,
131  const ClosedCb &cb_handle_closed_port)
132 {
133  message_received_cb = cb_handle_message;
134  port_closed_cb = cb_handle_closed_port;
135 
136  // give some work to io_service before start
137  io_service.post(std::bind(&MAVConnTCPClient::do_recv, this));
138 
139  // run io_service for async io
140  io_thread = std::thread([this] () {
142  io_service.run();
143  });
144 }
145 
147 {
148  lock_guard lock(mutex);
149  if (!is_open())
150  return;
151 
152  boost::system::error_code ec;
153  socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
154  if (ec)
155  CONSOLE_BRIDGE_logError(PFXd "shutdown: %s", conn_id, ec.message().c_str());
156  socket.cancel();
157  socket.close();
158 
159  io_work.reset();
160  io_service.stop();
161 
162  if (io_thread.joinable())
163  io_thread.join();
164 
165  io_service.reset();
166 
167  if (port_closed_cb)
168  port_closed_cb();
169 }
170 
171 void MAVConnTCPClient::send_bytes(const uint8_t *bytes, size_t length)
172 {
173  if (!is_open()) {
174  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
175  return;
176  }
177 
178  {
179  lock_guard lock(mutex);
180 
181  if (tx_q.size() >= MAX_TXQ_SIZE)
182  throw std::length_error("MAVConnTCPClient::send_bytes: TX queue overflow");
183 
184  tx_q.emplace_back(bytes, length);
185  }
186  GET_IO_SERVICE(socket).post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
187 }
188 
190 {
191  assert(message != nullptr);
192 
193  if (!is_open()) {
194  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
195  return;
196  }
197 
198  log_send(PFX, message);
199 
200  {
201  lock_guard lock(mutex);
202 
203  if (tx_q.size() >= MAX_TXQ_SIZE)
204  throw std::length_error("MAVConnTCPClient::send_message: TX queue overflow");
205 
206  tx_q.emplace_back(message);
207  }
208  GET_IO_SERVICE(socket).post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
209 }
210 
211 void MAVConnTCPClient::send_message(const mavlink::Message &message, const uint8_t source_compid)
212 {
213  if (!is_open()) {
214  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
215  return;
216  }
217 
218  log_send_obj(PFX, message);
219 
220  {
221  lock_guard lock(mutex);
222 
223  if (tx_q.size() >= MAX_TXQ_SIZE)
224  throw std::length_error("MAVConnTCPClient::send_message: TX queue overflow");
225 
226  tx_q.emplace_back(message, get_status_p(), sys_id, source_compid);
227  }
228  GET_IO_SERVICE(socket).post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
229 }
230 
232 {
233  if (is_destroying) {
234  return;
235  }
236  auto sthis = shared_from_this();
237  socket.async_receive(
238  buffer(rx_buf),
239  [sthis] (error_code error, size_t bytes_transferred) {
240  if (error) {
241  CONSOLE_BRIDGE_logError(PFXd "receive: %s", sthis->conn_id, error.message().c_str());
242  sthis->close();
243  return;
244  }
245 
246  sthis->parse_buffer(PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
247  sthis->do_recv();
248  });
249 }
250 
251 void MAVConnTCPClient::do_send(bool check_tx_state)
252 {
253  if (check_tx_state && tx_in_progress)
254  return;
255 
256  lock_guard lock(mutex);
257  if (tx_q.empty())
258  return;
259 
260  tx_in_progress = true;
261  auto sthis = shared_from_this();
262  auto &buf_ref = tx_q.front();
263  socket.async_send(
264  buffer(buf_ref.dpos(), buf_ref.nbytes()),
265  [sthis, &buf_ref] (error_code error, size_t bytes_transferred) {
266  assert(bytes_transferred <= buf_ref.len);
267 
268  if (error) {
269  CONSOLE_BRIDGE_logError(PFXd "send: %s", sthis->conn_id, error.message().c_str());
270  sthis->close();
271  return;
272  }
273 
274  sthis->iostat_tx_add(bytes_transferred);
275  lock_guard lock(sthis->mutex);
276 
277  if (sthis->tx_q.empty()) {
278  sthis->tx_in_progress = false;
279  return;
280  }
281 
282  buf_ref.pos += bytes_transferred;
283  if (buf_ref.nbytes() == 0) {
284  sthis->tx_q.pop_front();
285  }
286 
287  if (!sthis->tx_q.empty())
288  sthis->do_send(false);
289  else
290  sthis->tx_in_progress = false;
291  });
292 }
293 
294 
295 /* -*- TCP server variant -*- */
296 
297 MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
298  std::string server_host, unsigned short server_port) :
299  MAVConnInterface(system_id, component_id),
300  io_service(),
301  acceptor(io_service),
302  is_destroying(false)
303 {
304  if (!resolve_address_tcp(io_service, conn_id, server_host, server_port, bind_ep))
305  throw DeviceError("tcp-l: resolve", "Bind address resolve failed");
306 
307  CONSOLE_BRIDGE_logInform(PFXd "Bind address: %s", conn_id, to_string_ss(bind_ep).c_str());
308 
309  try {
310  acceptor.open(tcp::v4());
311  acceptor.set_option(tcp::acceptor::reuse_address(true));
312  acceptor.bind(bind_ep);
313  acceptor.listen();
314  }
315  catch (boost::system::system_error &err) {
316  throw DeviceError("tcp-l", err);
317  }
318 }
319 
321 {
322  is_destroying = true;
323  close();
324 }
325 
327  const ReceivedCb &cb_handle_message,
328  const ClosedCb &cb_handle_closed_port)
329 {
330  message_received_cb = cb_handle_message;
331  port_closed_cb = cb_handle_closed_port;
332 
333  // give some work to io_service before start
334  io_service.post(std::bind(&MAVConnTCPServer::do_accept, this));
335 
336  // run io_service for async io
337  io_thread = std::thread([this] () {
339  io_service.run();
340  });
341 }
342 
344 {
345  lock_guard lock(mutex);
346  if (!is_open())
347  return;
348 
349  CONSOLE_BRIDGE_logInform(PFXd "Terminating server. "
350  "All connections will be closed.", conn_id);
351 
352  io_service.stop();
353  acceptor.close();
354 
355  if (io_thread.joinable())
356  io_thread.join();
357 
358  if (port_closed_cb)
359  port_closed_cb();
360 }
361 
363 {
364  mavlink_status_t status {};
365 
366  lock_guard lock(mutex);
367  for (auto &instp : client_list) {
368  auto inst_status = instp->get_status();
369 
370  // [[[cog:
371  // for f in ('packet_rx_success_count', 'packet_rx_drop_count', 'buffer_overrun', 'parse_error'):
372  // cog.outl("status.{f:23s} += inst_status.{f};".format(**locals()))
373  // ]]]
374  status.packet_rx_success_count += inst_status.packet_rx_success_count;
375  status.packet_rx_drop_count += inst_status.packet_rx_drop_count;
376  status.buffer_overrun += inst_status.buffer_overrun;
377  status.parse_error += inst_status.parse_error;
378  // [[[end]]] (checksum: a6186246ed026f1cf2b4ffc7407e893b)
379 
380  /* seq counters always 0 for this connection type */
381  }
382 
383  return status;
384 }
385 
387 {
388  MAVConnInterface::IOStat iostat {};
389 
390  lock_guard lock(mutex);
391  for (auto &instp : client_list) {
392  auto inst_iostat = instp->get_iostat();
393 
394  // [[[cog:
395  // for p in ('tx', 'rx'):
396  // for f in ('total_bytes', 'speed'):
397  // cog.outl("iostat.{p}_{f:11s} += inst_iostat.{p}_{f};".format(**locals()))
398  // ]]]
399  iostat.tx_total_bytes += inst_iostat.tx_total_bytes;
400  iostat.tx_speed += inst_iostat.tx_speed;
401  iostat.rx_total_bytes += inst_iostat.rx_total_bytes;
402  iostat.rx_speed += inst_iostat.rx_speed;
403  // [[[end]]] (checksum: fb4fe06794471d9b068ce0c129ee7673)
404  }
405 
406  return iostat;
407 }
408 
409 void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length)
410 {
411  lock_guard lock(mutex);
412  for (auto &instp : client_list) {
413  instp->send_bytes(bytes, length);
414  }
415 }
416 
418 {
419  lock_guard lock(mutex);
420  for (auto &instp : client_list) {
421  instp->send_message(message);
422  }
423 }
424 
425 void MAVConnTCPServer::send_message(const mavlink::Message &message, const uint8_t source_compid)
426 {
427  lock_guard lock(mutex);
428  for (auto &instp : client_list) {
429  instp->send_message(message, source_compid);
430  }
431 }
432 
434 {
435  if (is_destroying) {
436  return;
437  }
438  auto sthis = shared_from_this();
439  auto acceptor_client = std::make_shared<MAVConnTCPClient>(sys_id, comp_id, io_service);
440  acceptor.async_accept(
441  acceptor_client->socket,
442  acceptor_client->server_ep,
443  [sthis, acceptor_client] (error_code error) {
444  if (error) {
445  CONSOLE_BRIDGE_logError(PFXd "accept: %s", sthis->conn_id, error.message().c_str());
446  sthis->close();
447  return;
448  }
449 
450  {
451  lock_guard lock(sthis->mutex);
452 
453  std::weak_ptr<MAVConnTCPClient> weak_client{acceptor_client};
454  acceptor_client->message_received_cb = sthis->message_received_cb;
455  acceptor_client->port_closed_cb = [weak_client, sthis] () { sthis->client_closed(weak_client); };
456  acceptor_client->client_connected(sthis->conn_id);
457 
458  sthis->client_list.push_back(acceptor_client);
459  sthis->do_accept();
460  }
461  });
462 }
463 
464 void MAVConnTCPServer::client_closed(std::weak_ptr<MAVConnTCPClient> weak_instp)
465 {
466  if (auto instp = weak_instp.lock()) {
467  CONSOLE_BRIDGE_logInform(PFXd "Client connection closed, id: %p, address: %s",
468  conn_id, instp.get(), to_string_ss(instp->server_ep).c_str());
469 
470  {
471  lock_guard lock(mutex);
472  client_list.remove(instp);
473  }
474  }
475 }
476 } // namespace mavconn
void log_send(const char *pfx, const mavlink::mavlink_message_t *msg)
Definition: interface.cpp:132
virtual ~MAVConnTCPClient()
Definition: tcp.cpp:123
ClosedCb port_closed_cb
Port closed notification callback.
Definition: interface.h:202
MAVConnTCPClient(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string server_host=DEFAULT_SERVER_HOST, unsigned short server_port=DEFAULT_SERVER_PORT)
Definition: tcp.cpp:77
MAVConnTCPServer(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string bind_host=DEFAULT_BIND_HOST, unsigned short bind_port=DEFAULT_BIND_PORT)
Definition: tcp.cpp:297
std::lock_guard< std::recursive_mutex > lock_guard
Definition: interface.h:42
boost::asio::ip::tcp::acceptor acceptor
Definition: tcp.h:131
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
Definition: tcp.cpp:409
std::function< void(const mavlink::mavlink_message_t *message, const Framing framing)> ReceivedCb
Definition: interface.h:102
#define CONSOLE_BRIDGE_logInform(fmt,...)
void connect(const ReceivedCb &cb_handle_message, const ClosedCb &cb_handle_closed_port=ClosedCb()) override
Establish connection, automatically called by open_url()
Definition: tcp.cpp:129
#define CONSOLE_BRIDGE_logWarn(fmt,...)
boost::asio::io_service io_service
Definition: tcp.h:127
void close() override
Close connection.
Definition: tcp.cpp:343
void close() override
Close connection.
Definition: tcp.cpp:146
static constexpr size_t MAX_TXQ_SIZE
Maximum count of transmission buffers.
Definition: interface.h:268
std::atomic< bool > tx_in_progress
Definition: tcp.h:78
Common exception for communication error.
Definition: interface.h:64
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
const std::string to_string_ss(T &obj)
Convert to string objects with operator <<.
Definition: thread_utils.h:71
#define CONSOLE_BRIDGE_logDebug(fmt,...)
void log_send_obj(const char *pfx, const mavlink::Message &msg)
Definition: interface.cpp:142
ReceivedCb message_received_cb
Message receive callback.
Definition: interface.h:200
std::recursive_mutex mutex
Definition: tcp.h:137
boost::asio::ip::tcp::endpoint bind_ep
Definition: tcp.h:132
std::recursive_mutex mutex
Definition: tcp.h:81
virtual ~MAVConnTCPServer()
Definition: tcp.cpp:320
Generic mavlink interface.
Definition: interface.h:97
mavlink::mavlink_status_t get_status() override
Definition: tcp.cpp:362
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
#define PFXd
Definition: tcp.cpp:42
uint8_t comp_id
Connection Component Id.
Definition: interface.h:263
void client_connected(size_t server_channel)
Definition: tcp.cpp:114
mavlink::mavlink_status_t * get_status_p()
Definition: interface.h:276
std::function< void(void)> ClosedCb
Definition: interface.h:103
size_t conn_id
Channel number used for logging.
Definition: interface.h:274
#define CONSOLE_BRIDGE_logError(fmt,...)
static bool resolve_address_tcp(io_service &io, size_t chan, std::string host, unsigned short port, tcp::endpoint &ep)
Definition: tcp.cpp:45
some useful utils
bool is_open() override
Definition: tcp.h:63
std::array< uint8_t, MsgBuffer::MAX_SIZE > rx_buf
Definition: tcp.h:80
MAVConn console-bridge compatibility header.
std::atomic< bool > is_destroying
Definition: tcp.h:134
void connect(const ReceivedCb &cb_handle_message, const ClosedCb &cb_handle_closed_port=ClosedCb()) override
Establish connection, automatically called by open_url()
Definition: tcp.cpp:326
bool set_this_thread_name(const std::string &name, Args &&... args)
Set name to current thread, printf-like.
Definition: thread_utils.h:55
void client_closed(std::weak_ptr< MAVConnTCPClient > weak_instp)
Definition: tcp.cpp:464
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
Definition: tcp.cpp:171
#define GET_IO_SERVICE(s)
Definition: tcp.cpp:28
std::atomic< bool > is_destroying
Definition: tcp.h:76
#define PFX
Definition: tcp.cpp:41
IOStat get_iostat() override
Definition: tcp.cpp:386
bool is_open() override
Definition: tcp.h:122
boost::asio::ip::tcp::socket socket
Definition: tcp.h:73
std::list< std::shared_ptr< MAVConnTCPClient > > client_list
Definition: tcp.h:136
std::unique_ptr< boost::asio::io_service::work > io_work
Definition: tcp.h:70
boost::asio::ip::tcp::endpoint server_ep
Definition: tcp.h:74
std::thread io_thread
Definition: tcp.h:71
uint8_t sys_id
Connection System Id.
Definition: interface.h:262
std::deque< MsgBuffer > tx_q
Definition: tcp.h:79
boost::asio::io_service io_service
Definition: tcp.h:69
std::thread io_thread
Definition: tcp.h:129
void do_send(bool check_tx_state)
Definition: tcp.cpp:251


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Tue Jun 13 2023 02:17:45