tcp.cpp
Go to the documentation of this file.
1 
9 /*
10  * libmavconn
11  * Copyright 2014,2015,2016 Vladimir Ermakov, All rights reserved.
12  *
13  * This file is part of the mavros package and subject to the license terms
14  * in the top-level LICENSE file of the mavros repository.
15  * https://github.com/mavlink/mavros/tree/master/LICENSE.md
16  */
17 
18 #include <cassert>
19 
21 #include <mavconn/thread_utils.h>
22 #include <mavconn/tcp.h>
23 
24 // Ensure the correct io_service() is called based on boost version
25 #if BOOST_VERSION >= 107000
26 #define GET_IO_SERVICE(s) ((boost::asio::io_context&)(s).get_executor().context())
27 #else
28 #define GET_IO_SERVICE(s) ((s).get_io_service())
29 #endif
30 
31 namespace mavconn {
32 
33 using boost::system::error_code;
34 using boost::asio::io_service;
35 using boost::asio::ip::tcp;
36 using boost::asio::buffer;
38 using mavlink::mavlink_message_t;
39 using mavlink::mavlink_status_t;
40 
41 #define PFX "mavconn: tcp"
42 #define PFXd PFX "%zu: "
43 
44 
45 static bool resolve_address_tcp(io_service &io, size_t chan, std::string host, unsigned short port, tcp::endpoint &ep)
46 {
47  bool result = false;
48  tcp::resolver resolver(io);
49  error_code ec;
50 
51  tcp::resolver::query query(host, "");
52 
53  auto fn = [&](const tcp::endpoint & q_ep) {
54  ep = q_ep;
55  ep.port(port);
56  result = true;
57  CONSOLE_BRIDGE_logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
58  };
59 
60 #if BOOST_ASIO_VERSION >= 101200
61  for (auto q_ep : resolver.resolve(query, ec)) fn(q_ep);
62 #else
63  std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(), fn);
64 #endif
65 
66  if (ec) {
67  CONSOLE_BRIDGE_logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
68  result = false;
69  }
70 
71  return result;
72 }
73 
74 
75 /* -*- TCP client variant -*- */
76 
77 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
78  std::string server_host, unsigned short server_port) :
79  MAVConnInterface(system_id, component_id),
80  io_service(),
81  io_work(new io_service::work(io_service)),
82  socket(io_service),
83  is_destroying(false),
84  tx_in_progress(false),
85  tx_q {},
86  rx_buf {}
87 {
88  if (!resolve_address_tcp(io_service, conn_id, server_host, server_port, server_ep))
89  throw DeviceError("tcp: resolve", "Bind address resolve failed");
90 
91  CONSOLE_BRIDGE_logInform(PFXd "Server address: %s", conn_id, to_string_ss(server_ep).c_str());
92 
93  try {
94  socket.open(tcp::v4());
95  socket.connect(server_ep);
96  }
97  catch (boost::system::system_error &err) {
98  throw DeviceError("tcp", err);
99  }
100 
101  // NOTE: shared_from_this() should not be used in constructors
102 
103  // give some work to io_service before start
104  io_service.post(std::bind(&MAVConnTCPClient::do_recv, this));
105 
106  // run io_service for async io
107  io_thread = std::thread([this] () {
109  io_service.run();
110  });
111 }
112 
113 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
114  boost::asio::io_service &server_io) :
115  MAVConnInterface(system_id, component_id),
116  socket(server_io),
117  tx_in_progress(false),
118  tx_q {},
119  rx_buf {}
120 {
121  // waiting when server call client_connected()
122 }
123 
124 void MAVConnTCPClient::client_connected(size_t server_channel)
125 {
126  CONSOLE_BRIDGE_logInform(PFXd "Got client, id: %zu, address: %s",
127  server_channel, conn_id, to_string_ss(server_ep).c_str());
128 
129  // start recv
130  GET_IO_SERVICE(socket).post(std::bind(&MAVConnTCPClient::do_recv, shared_from_this()));
131 }
132 
134 {
135  is_destroying = true;
136  close();
137 }
138 
140 {
141  lock_guard lock(mutex);
142  if (!is_open())
143  return;
144 
145  socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
146  socket.cancel();
147  socket.close();
148 
149  io_work.reset();
150  io_service.stop();
151 
152  if (io_thread.joinable())
153  io_thread.join();
154 
155  io_service.reset();
156 
157  if (port_closed_cb)
158  port_closed_cb();
159 }
160 
161 void MAVConnTCPClient::send_bytes(const uint8_t *bytes, size_t length)
162 {
163  if (!is_open()) {
164  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
165  return;
166  }
167 
168  {
169  lock_guard lock(mutex);
170 
171  if (tx_q.size() >= MAX_TXQ_SIZE)
172  throw std::length_error("MAVConnTCPClient::send_bytes: TX queue overflow");
173 
174  tx_q.emplace_back(bytes, length);
175  }
176  GET_IO_SERVICE(socket).post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
177 }
178 
180 {
181  assert(message != nullptr);
182 
183  if (!is_open()) {
184  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
185  return;
186  }
187 
188  log_send(PFX, message);
189 
190  {
191  lock_guard lock(mutex);
192 
193  if (tx_q.size() >= MAX_TXQ_SIZE)
194  throw std::length_error("MAVConnTCPClient::send_message: TX queue overflow");
195 
196  tx_q.emplace_back(message);
197  }
198  GET_IO_SERVICE(socket).post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
199 }
200 
201 void MAVConnTCPClient::send_message(const mavlink::Message &message, const uint8_t source_compid)
202 {
203  if (!is_open()) {
204  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
205  return;
206  }
207 
208  log_send_obj(PFX, message);
209 
210  {
211  lock_guard lock(mutex);
212 
213  if (tx_q.size() >= MAX_TXQ_SIZE)
214  throw std::length_error("MAVConnTCPClient::send_message: TX queue overflow");
215 
216  tx_q.emplace_back(message, get_status_p(), sys_id, source_compid);
217  }
218  GET_IO_SERVICE(socket).post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
219 }
220 
222 {
223  if (is_destroying) {
224  return;
225  }
226  auto sthis = shared_from_this();
227  socket.async_receive(
228  buffer(rx_buf),
229  [sthis] (error_code error, size_t bytes_transferred) {
230  if (error) {
231  CONSOLE_BRIDGE_logError(PFXd "receive: %s", sthis->conn_id, error.message().c_str());
232  sthis->close();
233  return;
234  }
235 
236  sthis->parse_buffer(PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
237  sthis->do_recv();
238  });
239 }
240 
241 void MAVConnTCPClient::do_send(bool check_tx_state)
242 {
243  if (check_tx_state && tx_in_progress)
244  return;
245 
246  lock_guard lock(mutex);
247  if (tx_q.empty())
248  return;
249 
250  tx_in_progress = true;
251  auto sthis = shared_from_this();
252  auto &buf_ref = tx_q.front();
253  socket.async_send(
254  buffer(buf_ref.dpos(), buf_ref.nbytes()),
255  [sthis, &buf_ref] (error_code error, size_t bytes_transferred) {
256  assert(bytes_transferred <= buf_ref.len);
257 
258  if (error) {
259  CONSOLE_BRIDGE_logError(PFXd "send: %s", sthis->conn_id, error.message().c_str());
260  sthis->close();
261  return;
262  }
263 
264  sthis->iostat_tx_add(bytes_transferred);
265  lock_guard lock(sthis->mutex);
266 
267  if (sthis->tx_q.empty()) {
268  sthis->tx_in_progress = false;
269  return;
270  }
271 
272  buf_ref.pos += bytes_transferred;
273  if (buf_ref.nbytes() == 0) {
274  sthis->tx_q.pop_front();
275  }
276 
277  if (!sthis->tx_q.empty())
278  sthis->do_send(false);
279  else
280  sthis->tx_in_progress = false;
281  });
282 }
283 
284 
285 /* -*- TCP server variant -*- */
286 
287 MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
288  std::string server_host, unsigned short server_port) :
289  MAVConnInterface(system_id, component_id),
290  io_service(),
291  acceptor(io_service),
292  is_destroying(false)
293 {
294  if (!resolve_address_tcp(io_service, conn_id, server_host, server_port, bind_ep))
295  throw DeviceError("tcp-l: resolve", "Bind address resolve failed");
296 
297  CONSOLE_BRIDGE_logInform(PFXd "Bind address: %s", conn_id, to_string_ss(bind_ep).c_str());
298 
299  try {
300  acceptor.open(tcp::v4());
301  acceptor.set_option(tcp::acceptor::reuse_address(true));
302  acceptor.bind(bind_ep);
303  acceptor.listen();
304  }
305  catch (boost::system::system_error &err) {
306  throw DeviceError("tcp-l", err);
307  }
308 
309  // give some work to io_service before start
310  io_service.post(std::bind(&MAVConnTCPServer::do_accept, this));
311 
312  // run io_service for async io
313  io_thread = std::thread([this] () {
315  io_service.run();
316  });
317 }
318 
320 {
321  is_destroying = true;
322  close();
323 }
324 
326 {
327  lock_guard lock(mutex);
328  if (!is_open())
329  return;
330 
331  CONSOLE_BRIDGE_logInform(PFXd "Terminating server. "
332  "All connections will be closed.", conn_id);
333 
334  io_service.stop();
335  acceptor.close();
336 
337  if (io_thread.joinable())
338  io_thread.join();
339 
340  if (port_closed_cb)
341  port_closed_cb();
342 }
343 
345 {
346  mavlink_status_t status {};
347 
348  lock_guard lock(mutex);
349  for (auto &instp : client_list) {
350  auto inst_status = instp->get_status();
351 
352  // [[[cog:
353  // for f in ('packet_rx_success_count', 'packet_rx_drop_count', 'buffer_overrun', 'parse_error'):
354  // cog.outl("status.{f:23s} += inst_status.{f};".format(**locals()))
355  // ]]]
356  status.packet_rx_success_count += inst_status.packet_rx_success_count;
357  status.packet_rx_drop_count += inst_status.packet_rx_drop_count;
358  status.buffer_overrun += inst_status.buffer_overrun;
359  status.parse_error += inst_status.parse_error;
360  // [[[end]]] (checksum: a6186246ed026f1cf2b4ffc7407e893b)
361 
362  /* seq counters always 0 for this connection type */
363  }
364 
365  return status;
366 }
367 
369 {
370  MAVConnInterface::IOStat iostat {};
371 
372  lock_guard lock(mutex);
373  for (auto &instp : client_list) {
374  auto inst_iostat = instp->get_iostat();
375 
376  // [[[cog:
377  // for p in ('tx', 'rx'):
378  // for f in ('total_bytes', 'speed'):
379  // cog.outl("iostat.{p}_{f:11s} += inst_iostat.{p}_{f};".format(**locals()))
380  // ]]]
381  iostat.tx_total_bytes += inst_iostat.tx_total_bytes;
382  iostat.tx_speed += inst_iostat.tx_speed;
383  iostat.rx_total_bytes += inst_iostat.rx_total_bytes;
384  iostat.rx_speed += inst_iostat.rx_speed;
385  // [[[end]]] (checksum: fb4fe06794471d9b068ce0c129ee7673)
386  }
387 
388  return iostat;
389 }
390 
391 void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length)
392 {
393  lock_guard lock(mutex);
394  for (auto &instp : client_list) {
395  instp->send_bytes(bytes, length);
396  }
397 }
398 
400 {
401  lock_guard lock(mutex);
402  for (auto &instp : client_list) {
403  instp->send_message(message);
404  }
405 }
406 
407 void MAVConnTCPServer::send_message(const mavlink::Message &message, const uint8_t source_compid)
408 {
409  lock_guard lock(mutex);
410  for (auto &instp : client_list) {
411  instp->send_message(message, source_compid);
412  }
413 }
414 
416 {
417  if (is_destroying) {
418  return;
419  }
420  auto sthis = shared_from_this();
421  auto acceptor_client = std::make_shared<MAVConnTCPClient>(sys_id, comp_id, io_service);
422  acceptor.async_accept(
423  acceptor_client->socket,
424  acceptor_client->server_ep,
425  [sthis, acceptor_client] (error_code error) {
426  if (error) {
427  CONSOLE_BRIDGE_logError(PFXd "accept: %s", sthis->conn_id, error.message().c_str());
428  sthis->close();
429  return;
430  }
431 
432  lock_guard lock(sthis->mutex);
433 
434  std::weak_ptr<MAVConnTCPClient> weak_client{acceptor_client};
435  acceptor_client->client_connected(sthis->conn_id);
436  acceptor_client->message_received_cb = std::bind(&MAVConnTCPServer::recv_message, sthis, std::placeholders::_1, std::placeholders::_2);
437  acceptor_client->port_closed_cb = [weak_client, sthis] () { sthis->client_closed(weak_client); };
438 
439  sthis->client_list.push_back(acceptor_client);
440  sthis->do_accept();
441  });
442 }
443 
444 void MAVConnTCPServer::client_closed(std::weak_ptr<MAVConnTCPClient> weak_instp)
445 {
446  if (auto instp = weak_instp.lock()) {
447  bool locked = mutex.try_lock();
448  CONSOLE_BRIDGE_logInform(PFXd "Client connection closed, id: %p, address: %s",
449  conn_id, instp.get(), to_string_ss(instp->server_ep).c_str());
450 
451  client_list.remove(instp);
452 
453  if (locked)
454  mutex.unlock();
455  }
456 }
457 
458 void MAVConnTCPServer::recv_message(const mavlink_message_t *message, const Framing framing)
459 {
461  message_received_cb(message, framing);
462 }
463 } // namespace mavconn
void log_send(const char *pfx, const mavlink::mavlink_message_t *msg)
Definition: interface.cpp:132
virtual ~MAVConnTCPClient()
Definition: tcp.cpp:133
ClosedCb port_closed_cb
Port closed notification callback.
Definition: interface.h:195
MAVConnTCPClient(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string server_host=DEFAULT_SERVER_HOST, unsigned short server_port=DEFAULT_SERVER_PORT)
Definition: tcp.cpp:77
MAVConnTCPServer(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string bind_host=DEFAULT_BIND_HOST, unsigned short bind_port=DEFAULT_BIND_PORT)
Definition: tcp.cpp:287
std::lock_guard< std::recursive_mutex > lock_guard
Definition: interface.h:42
boost::asio::ip::tcp::acceptor acceptor
Definition: tcp.h:125
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
Definition: tcp.cpp:391
#define CONSOLE_BRIDGE_logInform(fmt,...)
#define CONSOLE_BRIDGE_logWarn(fmt,...)
boost::asio::io_service io_service
Definition: tcp.h:121
void close() override
Close connection.
Definition: tcp.cpp:325
void close() override
Close connection.
Definition: tcp.cpp:139
static constexpr size_t MAX_TXQ_SIZE
Maximum count of transmission buffers.
Definition: interface.h:249
std::atomic< bool > tx_in_progress
Definition: tcp.h:75
Common exception for communication error.
Definition: interface.h:64
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
const std::string to_string_ss(T &obj)
Convert to string objects with operator <<.
Definition: thread_utils.h:71
bool set_this_thread_name(const std::string &name, Args &&...args)
Set name to current thread, printf-like.
Definition: thread_utils.h:55
#define CONSOLE_BRIDGE_logDebug(fmt,...)
void log_send_obj(const char *pfx, const mavlink::Message &msg)
Definition: interface.cpp:142
ReceivedCb message_received_cb
Message receive callback.
Definition: interface.h:193
std::recursive_mutex mutex
Definition: tcp.h:131
boost::asio::ip::tcp::endpoint bind_ep
Definition: tcp.h:126
std::recursive_mutex mutex
Definition: tcp.h:78
virtual ~MAVConnTCPServer()
Definition: tcp.cpp:319
Generic mavlink interface.
Definition: interface.h:97
mavlink::mavlink_status_t get_status() override
Definition: tcp.cpp:344
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
#define PFXd
Definition: tcp.cpp:42
uint8_t comp_id
Connection Component Id.
Definition: interface.h:244
void client_connected(size_t server_channel)
Definition: tcp.cpp:124
void recv_message(const mavlink::mavlink_message_t *message, const Framing framing)
Definition: tcp.cpp:458
mavlink::mavlink_status_t * get_status_p()
Definition: interface.h:257
size_t conn_id
Channel number used for logging.
Definition: interface.h:255
#define CONSOLE_BRIDGE_logError(fmt,...)
static bool resolve_address_tcp(io_service &io, size_t chan, std::string host, unsigned short port, tcp::endpoint &ep)
Definition: tcp.cpp:45
some useful utils
bool is_open() override
Definition: tcp.h:60
std::array< uint8_t, MsgBuffer::MAX_SIZE > rx_buf
Definition: tcp.h:77
MAVConn console-bridge compatibility header.
std::atomic< bool > is_destroying
Definition: tcp.h:128
void client_closed(std::weak_ptr< MAVConnTCPClient > weak_instp)
Definition: tcp.cpp:444
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
Definition: tcp.cpp:161
#define GET_IO_SERVICE(s)
Definition: tcp.cpp:28
std::atomic< bool > is_destroying
Definition: tcp.h:73
#define PFX
Definition: tcp.cpp:41
IOStat get_iostat() override
Definition: tcp.cpp:368
bool is_open() override
Definition: tcp.h:116
boost::asio::ip::tcp::socket socket
Definition: tcp.h:70
std::list< std::shared_ptr< MAVConnTCPClient > > client_list
Definition: tcp.h:130
std::unique_ptr< boost::asio::io_service::work > io_work
Definition: tcp.h:67
boost::asio::ip::tcp::endpoint server_ep
Definition: tcp.h:71
std::thread io_thread
Definition: tcp.h:68
uint8_t sys_id
Connection System Id.
Definition: interface.h:243
std::deque< MsgBuffer > tx_q
Definition: tcp.h:76
boost::asio::io_service io_service
Definition: tcp.h:66
std::thread io_thread
Definition: tcp.h:123
Framing
Rx packer framing status. (same as mavlink::mavlink_framing_t)
Definition: interface.h:48
void do_send(bool check_tx_state)
Definition: tcp.cpp:241


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Tue Jun 1 2021 02:36:21