udp.cpp
Go to the documentation of this file.
1 
9 /*
10  * libmavconn
11  * Copyright 2013,2014,2015,2016 Vladimir Ermakov, All rights reserved.
12  *
13  * This file is part of the mavros package and subject to the license terms
14  * in the top-level LICENSE file of the mavros repository.
15  * https://github.com/mavlink/mavros/tree/master/LICENSE.md
16  */
17 
18 #include <cassert>
19 
21 #include <mavconn/thread_utils.h>
22 #include <mavconn/udp.h>
23 
24 namespace mavconn {
25 
26 using boost::system::error_code;
27 using boost::asio::io_service;
28 using boost::asio::ip::udp;
29 using boost::asio::buffer;
31 using utils::operator"" _KiB;
32 using mavlink::mavlink_message_t;
33 
34 #define PFX "mavconn: udp"
35 #define PFXd PFX "%zu: "
36 
37 
38 static bool resolve_address_udp(io_service &io, size_t chan, std::string host, unsigned short port, udp::endpoint &ep)
39 {
40  bool result = false;
41  udp::resolver resolver(io);
42  error_code ec;
43 
44  udp::resolver::query query(host, "");
45 
46  auto fn = [&](const udp::endpoint & q_ep) {
47  ep = q_ep;
48  ep.port(port);
49  result = true;
50  CONSOLE_BRIDGE_logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
51  };
52 
53 #if BOOST_ASIO_VERSION >= 101200
54  for (auto q_ep : resolver.resolve(query, ec)) fn(q_ep);
55 #else
56  std::for_each(resolver.resolve(query, ec), udp::resolver::iterator(), fn);
57 #endif
58 
59  if (ec) {
60  CONSOLE_BRIDGE_logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
61  result = false;
62  }
63 
64  return result;
65 }
66 
67 
68 MAVConnUDP::MAVConnUDP(uint8_t system_id, uint8_t component_id,
69  std::string bind_host, unsigned short bind_port,
70  std::string remote_host, unsigned short remote_port) :
71  MAVConnInterface(system_id, component_id),
72  io_service(),
73  io_work(new io_service::work(io_service)),
74  permanent_broadcast(false),
75  remote_exists(false),
76  socket(io_service),
77  tx_in_progress(false),
78  tx_q {},
79  rx_buf {}
80 {
81  using udps = boost::asio::ip::udp::socket;
82 
83  if (!resolve_address_udp(io_service, conn_id, bind_host, bind_port, bind_ep))
84  throw DeviceError("udp: resolve", "Bind address resolve failed");
85 
86  CONSOLE_BRIDGE_logInform(PFXd "Bind address: %s", conn_id, to_string_ss(bind_ep).c_str());
87 
88  if (remote_host != "") {
89  if (remote_host != BROADCAST_REMOTE_HOST && remote_host != PERMANENT_BROADCAST_REMOTE_HOST)
90  remote_exists = resolve_address_udp(io_service, conn_id, remote_host, remote_port, remote_ep);
91  else {
92  remote_exists = true;
93  remote_ep = udp::endpoint(boost::asio::ip::address_v4::broadcast(), remote_port);
94  }
95 
96  if (remote_exists)
97  CONSOLE_BRIDGE_logInform(PFXd "Remote address: %s", conn_id, to_string_ss(remote_ep).c_str());
98  else
99  CONSOLE_BRIDGE_logWarn(PFXd "Remote address resolve failed.", conn_id);
100  }
101 
102  try {
103  socket.open(udp::v4());
104 
105  // set buffer opt. size from QGC
106  socket.set_option(udps::reuse_address(true));
107  socket.set_option(udps::send_buffer_size(256_KiB));
108  socket.set_option(udps::receive_buffer_size(512_KiB));
109 
110  socket.bind(bind_ep);
111 
112  if (remote_host == BROADCAST_REMOTE_HOST) {
113  socket.set_option(udps::broadcast(true));
114  } else if (remote_host == PERMANENT_BROADCAST_REMOTE_HOST) {
115  socket.set_option(udps::broadcast(true));
116  permanent_broadcast = true;
117  }
118  }
119  catch (boost::system::system_error &err) {
120  throw DeviceError("udp", err);
121  }
122 
123  // NOTE: shared_from_this() should not be used in constructors
124 
125  // give some work to io_service before start
126  io_service.post(std::bind(&MAVConnUDP::do_recvfrom, this));
127 
128  // run io_service for async io
129  io_thread = std::thread([this] () {
131  io_service.run();
132  });
133 }
134 
136 {
137  close();
138 }
139 
141 {
142  lock_guard lock(mutex);
143  if (!is_open())
144  return;
145 
146  socket.cancel();
147  socket.close();
148 
149  io_work.reset();
150  io_service.stop();
151 
152  if (io_thread.joinable())
153  io_thread.join();
154 
155  io_service.reset();
156 
157  if (port_closed_cb)
158  port_closed_cb();
159 }
160 
161 void MAVConnUDP::send_bytes(const uint8_t *bytes, size_t length)
162 {
163  if (!is_open()) {
164  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
165  return;
166  }
167 
168  if (!remote_exists) {
169  CONSOLE_BRIDGE_logDebug(PFXd "send: Remote not known, message dropped.", conn_id);
170  return;
171  }
172 
173  {
174  lock_guard lock(mutex);
175 
176  if (tx_q.size() >= MAX_TXQ_SIZE)
177  throw std::length_error("MAVConnUDP::send_bytes: TX queue overflow");
178 
179  tx_q.emplace_back(bytes, length);
180  }
181  io_service.post(std::bind(&MAVConnUDP::do_sendto, shared_from_this(), true));
182 }
183 
184 void MAVConnUDP::send_message(const mavlink_message_t *message)
185 {
186  assert(message != nullptr);
187 
188  if (!is_open()) {
189  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
190  return;
191  }
192 
193  if (!remote_exists) {
194  CONSOLE_BRIDGE_logDebug(PFXd "send: Remote not known, message dropped.", conn_id);
195  return;
196  }
197 
198  log_send(PFX, message);
199 
200  {
201  lock_guard lock(mutex);
202 
203  if (tx_q.size() >= MAX_TXQ_SIZE)
204  throw std::length_error("MAVConnUDP::send_message: TX queue overflow");
205 
206  tx_q.emplace_back(message);
207  }
208  io_service.post(std::bind(&MAVConnUDP::do_sendto, shared_from_this(), true));
209 }
210 
211 void MAVConnUDP::send_message(const mavlink::Message &message, const uint8_t source_compid)
212 {
213  if (!is_open()) {
214  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
215  return;
216  }
217 
218  if (!remote_exists) {
219  CONSOLE_BRIDGE_logDebug(PFXd "send: Remote not known, message dropped.", conn_id);
220  return;
221  }
222 
223  log_send_obj(PFX, message);
224 
225  {
226  lock_guard lock(mutex);
227 
228  if (tx_q.size() >= MAX_TXQ_SIZE)
229  throw std::length_error("MAVConnUDP::send_message: TX queue overflow");
230 
231  tx_q.emplace_back(message, get_status_p(), sys_id, source_compid);
232  }
233  io_service.post(std::bind(&MAVConnUDP::do_sendto, shared_from_this(), true));
234 }
235 
237 {
238  auto sthis = shared_from_this();
239  socket.async_receive_from(
240  buffer(rx_buf),
242  [sthis] (error_code error, size_t bytes_transferred) {
243  if (error) {
244  CONSOLE_BRIDGE_logError(PFXd "receive: %s", sthis->conn_id, error.message().c_str());
245  sthis->close();
246  return;
247  }
248 
249  if (!sthis->permanent_broadcast && sthis->remote_ep != sthis->last_remote_ep) {
250  CONSOLE_BRIDGE_logInform(PFXd "Remote address: %s", sthis->conn_id, to_string_ss(sthis->remote_ep).c_str());
251  sthis->remote_exists = true;
252  sthis->last_remote_ep = sthis->remote_ep;
253  }
254 
255  sthis->parse_buffer(PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
256  sthis->do_recvfrom();
257  });
258 }
259 
260 void MAVConnUDP::do_sendto(bool check_tx_state)
261 {
262  if (check_tx_state && tx_in_progress)
263  return;
264 
265  lock_guard lock(mutex);
266  if (tx_q.empty())
267  return;
268 
269  tx_in_progress = true;
270  auto sthis = shared_from_this();
271  auto &buf_ref = tx_q.front();
272  socket.async_send_to(
273  buffer(buf_ref.dpos(), buf_ref.nbytes()),
274  remote_ep,
275  [sthis, &buf_ref] (error_code error, size_t bytes_transferred) {
276  assert(bytes_transferred <= buf_ref.len);
277 
278  if (error == boost::asio::error::network_unreachable) {
279  CONSOLE_BRIDGE_logWarn(PFXd "sendto: %s, retrying", sthis->conn_id, error.message().c_str());
280  // do not return, try to resend
281  }
282  else if (error) {
283  CONSOLE_BRIDGE_logError(PFXd "sendto: %s", sthis->conn_id, error.message().c_str());
284  sthis->close();
285  return;
286  }
287 
288  sthis->iostat_tx_add(bytes_transferred);
289  lock_guard lock(sthis->mutex);
290 
291  if (sthis->tx_q.empty()) {
292  sthis->tx_in_progress = false;
293  return;
294  }
295 
296  buf_ref.pos += bytes_transferred;
297  if (buf_ref.nbytes() == 0) {
298  sthis->tx_q.pop_front();
299  }
300 
301  if (!sthis->tx_q.empty())
302  sthis->do_sendto(false);
303  else
304  sthis->tx_in_progress = false;
305  });
306 }
307 
309 {
310  return to_string_ss(remote_ep);
311 }
312 
313 } // namespace mavconn
void log_send(const char *pfx, const mavlink::mavlink_message_t *msg)
Definition: interface.cpp:132
ClosedCb port_closed_cb
Port closed notification callback.
Definition: interface.h:195
std::lock_guard< std::recursive_mutex > lock_guard
Definition: interface.h:42
std::string get_remote_endpoint() const
Definition: udp.cpp:308
#define CONSOLE_BRIDGE_logInform(fmt,...)
boost::asio::ip::udp::endpoint bind_ep
Definition: udp.h:76
#define CONSOLE_BRIDGE_logWarn(fmt,...)
static constexpr size_t MAX_TXQ_SIZE
Maximum count of transmission buffers.
Definition: interface.h:249
Common exception for communication error.
Definition: interface.h:64
const std::string to_string_ss(T &obj)
Convert to string objects with operator <<.
Definition: thread_utils.h:71
MAVConnUDP(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string bind_host=DEFAULT_BIND_HOST, unsigned short bind_port=DEFAULT_BIND_PORT, std::string remote_host=DEFAULT_REMOTE_HOST, unsigned short remote_port=DEFAULT_REMOTE_PORT)
Definition: udp.cpp:68
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
Definition: udp.cpp:161
bool set_this_thread_name(const std::string &name, Args &&...args)
Set name to current thread, printf-like.
Definition: thread_utils.h:55
#define CONSOLE_BRIDGE_logDebug(fmt,...)
static bool resolve_address_udp(io_service &io, size_t chan, std::string host, unsigned short port, udp::endpoint &ep)
Definition: udp.cpp:38
void log_send_obj(const char *pfx, const mavlink::Message &msg)
Definition: interface.cpp:142
bool permanent_broadcast
Definition: udp.h:69
boost::asio::ip::udp::endpoint remote_ep
Definition: udp.h:73
Generic mavlink interface.
Definition: interface.h:97
#define PFXd
Definition: udp.cpp:35
std::deque< MsgBuffer > tx_q
Definition: udp.h:79
std::array< uint8_t, MsgBuffer::MAX_SIZE > rx_buf
Definition: udp.h:80
mavlink::mavlink_status_t * get_status_p()
Definition: interface.h:257
size_t conn_id
Channel number used for logging.
Definition: interface.h:255
#define CONSOLE_BRIDGE_logError(fmt,...)
static constexpr auto BROADCAST_REMOTE_HOST
Markers for broadcast modes. Not valid domain names.
Definition: udp.h:39
void close() override
Close connection.
Definition: udp.cpp:140
some useful utils
MAVConn console-bridge compatibility header.
bool is_open() override
Definition: udp.h:59
static constexpr auto PERMANENT_BROADCAST_REMOTE_HOST
Definition: udp.h:40
boost::asio::ip::udp::socket socket
Definition: udp.h:72
std::atomic< bool > remote_exists
Definition: udp.h:71
std::atomic< bool > tx_in_progress
Definition: udp.h:78
#define PFX
Definition: udp.cpp:34
void do_sendto(bool check_tx_state)
Definition: udp.cpp:260
std::thread io_thread
Definition: udp.h:68
std::unique_ptr< boost::asio::io_service::work > io_work
Definition: udp.h:67
boost::asio::io_service io_service
Definition: udp.h:66
virtual ~MAVConnUDP()
Definition: udp.cpp:135
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
boost::asio::ip::udp::endpoint recv_ep
Definition: udp.h:74
uint8_t sys_id
Connection System Id.
Definition: interface.h:243
std::recursive_mutex mutex
Definition: udp.h:81
void do_recvfrom()
Definition: udp.cpp:236


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Tue Jun 1 2021 02:36:21