tcp.cpp
Go to the documentation of this file.
1 
9 /*
10  * libmavconn
11  * Copyright 2014,2015,2016 Vladimir Ermakov, All rights reserved.
12  *
13  * This file is part of the mavros package and subject to the license terms
14  * in the top-level LICENSE file of the mavros repository.
15  * https://github.com/mavlink/mavros/tree/master/LICENSE.md
16  */
17 
18 #include <cassert>
19 
21 #include <mavconn/thread_utils.h>
22 #include <mavconn/tcp.h>
23 
24 namespace mavconn {
25 
26 using boost::system::error_code;
27 using boost::asio::io_service;
28 using boost::asio::ip::tcp;
29 using boost::asio::buffer;
31 using mavlink::mavlink_message_t;
32 using mavlink::mavlink_status_t;
33 
34 #define PFX "mavconn: tcp"
35 #define PFXd PFX "%zu: "
36 
37 
38 static bool resolve_address_tcp(io_service &io, size_t chan, std::string host, unsigned short port, tcp::endpoint &ep)
39 {
40  bool result = false;
41  tcp::resolver resolver(io);
42  error_code ec;
43 
44  tcp::resolver::query query(host, "");
45 
46  auto fn = [&](const tcp::endpoint & q_ep) {
47  ep = q_ep;
48  ep.port(port);
49  result = true;
50  CONSOLE_BRIDGE_logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
51  };
52 
53 #if BOOST_ASIO_VERSION >= 101200
54  for (auto q_ep : resolver.resolve(query, ec)) fn(q_ep);
55 #else
56  std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(), fn);
57 #endif
58 
59  if (ec) {
60  CONSOLE_BRIDGE_logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
61  result = false;
62  }
63 
64  return result;
65 }
66 
67 
68 /* -*- TCP client variant -*- */
69 
70 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
71  std::string server_host, unsigned short server_port) :
72  MAVConnInterface(system_id, component_id),
73  is_destroying(false),
74  tx_in_progress(false),
75  tx_q {},
76  rx_buf {},
77  io_service(),
78  io_work(new io_service::work(io_service)),
80 {
81  if (!resolve_address_tcp(io_service, conn_id, server_host, server_port, server_ep))
82  throw DeviceError("tcp: resolve", "Bind address resolve failed");
83 
84  CONSOLE_BRIDGE_logInform(PFXd "Server address: %s", conn_id, to_string_ss(server_ep).c_str());
85 
86  try {
87  socket.open(tcp::v4());
88  socket.connect(server_ep);
89  }
90  catch (boost::system::system_error &err) {
91  throw DeviceError("tcp", err);
92  }
93 
94  // NOTE: shared_from_this() should not be used in constructors
95 
96  // give some work to io_service before start
97  io_service.post(std::bind(&MAVConnTCPClient::do_recv, this));
98 
99  // run io_service for async io
100  io_thread = std::thread([this] () {
102  io_service.run();
103  });
104 }
105 
106 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
107  boost::asio::io_service &server_io) :
108  MAVConnInterface(system_id, component_id),
109  tx_in_progress(false),
110  tx_q {},
111  rx_buf {},
112  socket(server_io)
113 {
114  // waiting when server call client_connected()
115 }
116 
117 void MAVConnTCPClient::client_connected(size_t server_channel)
118 {
119  CONSOLE_BRIDGE_logInform(PFXd "Got client, id: %zu, address: %s",
120  server_channel, conn_id, to_string_ss(server_ep).c_str());
121 
122  // start recv
123  socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_recv, shared_from_this()));
124 }
125 
127 {
128  is_destroying = true;
129  close();
130 }
131 
133 {
134  lock_guard lock(mutex);
135  if (!is_open())
136  return;
137 
138  socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
139  socket.cancel();
140  socket.close();
141 
142  io_work.reset();
143  io_service.stop();
144 
145  if (io_thread.joinable())
146  io_thread.join();
147 
148  io_service.reset();
149 
150  if (port_closed_cb)
151  port_closed_cb();
152 }
153 
154 void MAVConnTCPClient::send_bytes(const uint8_t *bytes, size_t length)
155 {
156  if (!is_open()) {
157  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
158  return;
159  }
160 
161  {
162  lock_guard lock(mutex);
163 
164  if (tx_q.size() >= MAX_TXQ_SIZE)
165  throw std::length_error("MAVConnTCPClient::send_bytes: TX queue overflow");
166 
167  tx_q.emplace_back(bytes, length);
168  }
169  socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
170 }
171 
173 {
174  assert(message != nullptr);
175 
176  if (!is_open()) {
177  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
178  return;
179  }
180 
181  log_send(PFX, message);
182 
183  {
184  lock_guard lock(mutex);
185 
186  if (tx_q.size() >= MAX_TXQ_SIZE)
187  throw std::length_error("MAVConnTCPClient::send_message: TX queue overflow");
188 
189  tx_q.emplace_back(message);
190  }
191  socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
192 }
193 
194 void MAVConnTCPClient::send_message(const mavlink::Message &message, const uint8_t source_compid)
195 {
196  if (!is_open()) {
197  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
198  return;
199  }
200 
201  log_send_obj(PFX, message);
202 
203  {
204  lock_guard lock(mutex);
205 
206  if (tx_q.size() >= MAX_TXQ_SIZE)
207  throw std::length_error("MAVConnTCPClient::send_message: TX queue overflow");
208 
209  tx_q.emplace_back(message, get_status_p(), sys_id, source_compid);
210  }
211  socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
212 }
213 
215 {
216  if (is_destroying) {
217  return;
218  }
219  auto sthis = shared_from_this();
220  socket.async_receive(
221  buffer(rx_buf),
222  [sthis] (error_code error, size_t bytes_transferred) {
223  if (error) {
224  CONSOLE_BRIDGE_logError(PFXd "receive: %s", sthis->conn_id, error.message().c_str());
225  sthis->close();
226  return;
227  }
228 
229  sthis->parse_buffer(PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
230  sthis->do_recv();
231  });
232 }
233 
234 void MAVConnTCPClient::do_send(bool check_tx_state)
235 {
236  if (check_tx_state && tx_in_progress)
237  return;
238 
239  lock_guard lock(mutex);
240  if (tx_q.empty())
241  return;
242 
243  tx_in_progress = true;
244  auto sthis = shared_from_this();
245  auto &buf_ref = tx_q.front();
246  socket.async_send(
247  buffer(buf_ref.dpos(), buf_ref.nbytes()),
248  [sthis, &buf_ref] (error_code error, size_t bytes_transferred) {
249  assert(bytes_transferred <= buf_ref.len);
250 
251  if (error) {
252  CONSOLE_BRIDGE_logError(PFXd "send: %s", sthis->conn_id, error.message().c_str());
253  sthis->close();
254  return;
255  }
256 
257  sthis->iostat_tx_add(bytes_transferred);
258  lock_guard lock(sthis->mutex);
259 
260  if (sthis->tx_q.empty()) {
261  sthis->tx_in_progress = false;
262  return;
263  }
264 
265  buf_ref.pos += bytes_transferred;
266  if (buf_ref.nbytes() == 0) {
267  sthis->tx_q.pop_front();
268  }
269 
270  if (!sthis->tx_q.empty())
271  sthis->do_send(false);
272  else
273  sthis->tx_in_progress = false;
274  });
275 }
276 
277 
278 /* -*- TCP server variant -*- */
279 
280 MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
281  std::string server_host, unsigned short server_port) :
282  MAVConnInterface(system_id, component_id),
283  io_service(),
284  acceptor(io_service),
285  is_destroying(false)
286 {
287  if (!resolve_address_tcp(io_service, conn_id, server_host, server_port, bind_ep))
288  throw DeviceError("tcp-l: resolve", "Bind address resolve failed");
289 
290  CONSOLE_BRIDGE_logInform(PFXd "Bind address: %s", conn_id, to_string_ss(bind_ep).c_str());
291 
292  try {
293  acceptor.open(tcp::v4());
294  acceptor.set_option(tcp::acceptor::reuse_address(true));
295  acceptor.bind(bind_ep);
296  acceptor.listen();
297  }
298  catch (boost::system::system_error &err) {
299  throw DeviceError("tcp-l", err);
300  }
301 
302  // give some work to io_service before start
303  io_service.post(std::bind(&MAVConnTCPServer::do_accept, this));
304 
305  // run io_service for async io
306  io_thread = std::thread([this] () {
308  io_service.run();
309  });
310 }
311 
313 {
314  is_destroying = true;
315  close();
316 }
317 
319 {
320  lock_guard lock(mutex);
321  if (!is_open())
322  return;
323 
324  CONSOLE_BRIDGE_logInform(PFXd "Terminating server. "
325  "All connections will be closed.", conn_id);
326 
327  io_service.stop();
328  acceptor.close();
329 
330  if (io_thread.joinable())
331  io_thread.join();
332 
333  if (port_closed_cb)
334  port_closed_cb();
335 }
336 
338 {
339  mavlink_status_t status {};
340 
341  lock_guard lock(mutex);
342  for (auto &instp : client_list) {
343  auto inst_status = instp->get_status();
344 
345  // [[[cog:
346  // for f in ('packet_rx_success_count', 'packet_rx_drop_count', 'buffer_overrun', 'parse_error'):
347  // cog.outl("status.{f:23s} += inst_status.{f};".format(**locals()))
348  // ]]]
349  status.packet_rx_success_count += inst_status.packet_rx_success_count;
350  status.packet_rx_drop_count += inst_status.packet_rx_drop_count;
351  status.buffer_overrun += inst_status.buffer_overrun;
352  status.parse_error += inst_status.parse_error;
353  // [[[end]]] (checksum: a6186246ed026f1cf2b4ffc7407e893b)
354 
355  /* seq counters always 0 for this connection type */
356  }
357 
358  return status;
359 }
360 
362 {
363  MAVConnInterface::IOStat iostat {};
364 
365  lock_guard lock(mutex);
366  for (auto &instp : client_list) {
367  auto inst_iostat = instp->get_iostat();
368 
369  // [[[cog:
370  // for p in ('tx', 'rx'):
371  // for f in ('total_bytes', 'speed'):
372  // cog.outl("iostat.{p}_{f:11s} += inst_iostat.{p}_{f};".format(**locals()))
373  // ]]]
374  iostat.tx_total_bytes += inst_iostat.tx_total_bytes;
375  iostat.tx_speed += inst_iostat.tx_speed;
376  iostat.rx_total_bytes += inst_iostat.rx_total_bytes;
377  iostat.rx_speed += inst_iostat.rx_speed;
378  // [[[end]]] (checksum: fb4fe06794471d9b068ce0c129ee7673)
379  }
380 
381  return iostat;
382 }
383 
384 void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length)
385 {
386  lock_guard lock(mutex);
387  for (auto &instp : client_list) {
388  instp->send_bytes(bytes, length);
389  }
390 }
391 
393 {
394  lock_guard lock(mutex);
395  for (auto &instp : client_list) {
396  instp->send_message(message);
397  }
398 }
399 
400 void MAVConnTCPServer::send_message(const mavlink::Message &message, const uint8_t source_compid)
401 {
402  lock_guard lock(mutex);
403  for (auto &instp : client_list) {
404  instp->send_message(message, source_compid);
405  }
406 }
407 
409 {
410  if (is_destroying) {
411  return;
412  }
413  auto sthis = shared_from_this();
414  auto acceptor_client = std::make_shared<MAVConnTCPClient>(sys_id, comp_id, io_service);
415  acceptor.async_accept(
416  acceptor_client->socket,
417  acceptor_client->server_ep,
418  [sthis, acceptor_client] (error_code error) {
419  if (error) {
420  CONSOLE_BRIDGE_logError(PFXd "accept: %s", sthis->conn_id, error.message().c_str());
421  sthis->close();
422  return;
423  }
424 
425  lock_guard lock(sthis->mutex);
426 
427  std::weak_ptr<MAVConnTCPClient> weak_client{acceptor_client};
428  acceptor_client->client_connected(sthis->conn_id);
429  acceptor_client->message_received_cb = std::bind(&MAVConnTCPServer::recv_message, sthis, std::placeholders::_1, std::placeholders::_2);
430  acceptor_client->port_closed_cb = [weak_client, sthis] () { sthis->client_closed(weak_client); };
431 
432  sthis->client_list.push_back(acceptor_client);
433  sthis->do_accept();
434  });
435 }
436 
437 void MAVConnTCPServer::client_closed(std::weak_ptr<MAVConnTCPClient> weak_instp)
438 {
439  if (auto instp = weak_instp.lock()) {
440  bool locked = mutex.try_lock();
441  CONSOLE_BRIDGE_logInform(PFXd "Client connection closed, id: %p, address: %s",
442  conn_id, instp.get(), to_string_ss(instp->server_ep).c_str());
443 
444  client_list.remove(instp);
445 
446  if (locked)
447  mutex.unlock();
448  }
449 }
450 
451 void MAVConnTCPServer::recv_message(const mavlink_message_t *message, const Framing framing)
452 {
454  message_received_cb(message, framing);
455 }
456 } // namespace mavconn
void log_send(const char *pfx, const mavlink::mavlink_message_t *msg)
Definition: interface.cpp:143
ClosedCb port_closed_cb
Port closed notification callback.
Definition: interface.h:195
MAVConnTCPClient(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string server_host=DEFAULT_SERVER_HOST, unsigned short server_port=DEFAULT_SERVER_PORT)
Definition: tcp.cpp:70
MAVConnTCPServer(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string bind_host=DEFAULT_BIND_HOST, unsigned short bind_port=DEFAULT_BIND_PORT)
Definition: tcp.cpp:280
std::lock_guard< std::recursive_mutex > lock_guard
Definition: interface.h:42
boost::asio::ip::tcp::acceptor acceptor
Definition: tcp.h:125
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
Definition: tcp.cpp:384
#define CONSOLE_BRIDGE_logInform(fmt,...)
#define CONSOLE_BRIDGE_logWarn(fmt,...)
boost::asio::io_service io_service
Definition: tcp.h:121
void close() override
Close connection.
Definition: tcp.cpp:318
void close() override
Close connection.
Definition: tcp.cpp:132
static constexpr size_t MAX_TXQ_SIZE
Maximum count of transmission buffers.
Definition: interface.h:249
std::atomic< bool > tx_in_progress
Definition: tcp.h:75
Common exception for communication error.
Definition: interface.h:64
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
const std::string to_string_ss(T &obj)
Convert to string objects with operator <<.
Definition: thread_utils.h:71
bool set_this_thread_name(const std::string &name, Args &&...args)
Set name to current thread, printf-like.
Definition: thread_utils.h:55
#define CONSOLE_BRIDGE_logDebug(fmt,...)
void log_send_obj(const char *pfx, const mavlink::Message &msg)
Definition: interface.cpp:153
ReceivedCb message_received_cb
Message receive callback.
Definition: interface.h:193
std::recursive_mutex mutex
Definition: tcp.h:131
boost::asio::ip::tcp::endpoint bind_ep
Definition: tcp.h:126
std::recursive_mutex mutex
Definition: tcp.h:78
Generic mavlink interface.
Definition: interface.h:97
mavlink::mavlink_status_t get_status() override
Definition: tcp.cpp:337
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
#define PFXd
Definition: tcp.cpp:35
uint8_t comp_id
Connection Component Id.
Definition: interface.h:244
void client_connected(size_t server_channel)
Definition: tcp.cpp:117
void recv_message(const mavlink::mavlink_message_t *message, const Framing framing)
Definition: tcp.cpp:451
mavlink::mavlink_status_t * get_status_p()
Definition: interface.h:257
size_t conn_id
Channel number used for logging.
Definition: interface.h:255
#define CONSOLE_BRIDGE_logError(fmt,...)
static bool resolve_address_tcp(io_service &io, size_t chan, std::string host, unsigned short port, tcp::endpoint &ep)
Definition: tcp.cpp:38
some useful utils
bool is_open() override
Definition: tcp.h:60
std::array< uint8_t, MsgBuffer::MAX_SIZE > rx_buf
Definition: tcp.h:77
MAVConn console-bridge compatibility header.
std::atomic< bool > is_destroying
Definition: tcp.h:128
void client_closed(std::weak_ptr< MAVConnTCPClient > weak_instp)
Definition: tcp.cpp:437
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
Definition: tcp.cpp:154
std::atomic< bool > is_destroying
Definition: tcp.h:73
#define PFX
Definition: tcp.cpp:34
IOStat get_iostat() override
Definition: tcp.cpp:361
bool is_open() override
Definition: tcp.h:116
boost::asio::ip::tcp::socket socket
Definition: tcp.h:70
std::list< std::shared_ptr< MAVConnTCPClient > > client_list
Definition: tcp.h:130
std::unique_ptr< boost::asio::io_service::work > io_work
Definition: tcp.h:67
boost::asio::ip::tcp::endpoint server_ep
Definition: tcp.h:71
std::thread io_thread
Definition: tcp.h:68
uint8_t sys_id
Connection System Id.
Definition: interface.h:243
std::deque< MsgBuffer > tx_q
Definition: tcp.h:76
boost::asio::io_service io_service
Definition: tcp.h:66
std::thread io_thread
Definition: tcp.h:123
Framing
Rx packer framing status. (same as mavlink::mavlink_framing_t)
Definition: interface.h:48
void do_send(bool check_tx_state)
Definition: tcp.cpp:234


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Mon Jul 8 2019 03:20:07