udp.cpp
Go to the documentation of this file.
1 
9 /*
10  * libmavconn
11  * Copyright 2013,2014,2015,2016 Vladimir Ermakov, All rights reserved.
12  *
13  * This file is part of the mavros package and subject to the license terms
14  * in the top-level LICENSE file of the mavros repository.
15  * https://github.com/mavlink/mavros/tree/master/LICENSE.md
16  */
17 
18 #include <cassert>
19 
21 #include <mavconn/thread_utils.h>
22 #include <mavconn/udp.h>
23 
24 namespace mavconn {
25 
26 using boost::system::error_code;
27 using boost::asio::io_service;
28 using boost::asio::ip::udp;
29 using boost::asio::buffer;
31 using utils::operator"" _KiB;
32 using mavlink::mavlink_message_t;
33 
34 #define PFX "mavconn: udp"
35 #define PFXd PFX "%zu: "
36 
37 
38 static bool resolve_address_udp(io_service &io, size_t chan, std::string host, unsigned short port, udp::endpoint &ep)
39 {
40  bool result = false;
41  udp::resolver resolver(io);
42  error_code ec;
43 
44  udp::resolver::query query(host, "");
45 
46  auto fn = [&](const udp::endpoint & q_ep) {
47  ep = q_ep;
48  ep.port(port);
49  result = true;
50  CONSOLE_BRIDGE_logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
51  };
52 
53 #if BOOST_ASIO_VERSION >= 101200
54  for (auto q_ep : resolver.resolve(query, ec)) fn(q_ep);
55 #else
56  std::for_each(resolver.resolve(query, ec), udp::resolver::iterator(), fn);
57 #endif
58 
59  if (ec) {
60  CONSOLE_BRIDGE_logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
61  result = false;
62  }
63 
64  return result;
65 }
66 
67 
68 MAVConnUDP::MAVConnUDP(uint8_t system_id, uint8_t component_id,
69  std::string bind_host, unsigned short bind_port,
70  std::string remote_host, unsigned short remote_port) :
71  MAVConnInterface(system_id, component_id),
72  io_service(),
73  io_work(new io_service::work(io_service)),
74  permanent_broadcast(false),
75  remote_exists(false),
76  socket(io_service),
77  tx_in_progress(false),
78  tx_q {},
79  rx_buf {}
80 {
81  using udps = boost::asio::ip::udp::socket;
82 
83  if (!resolve_address_udp(io_service, conn_id, bind_host, bind_port, bind_ep))
84  throw DeviceError("udp: resolve", "Bind address resolve failed");
85 
86  CONSOLE_BRIDGE_logInform(PFXd "Bind address: %s", conn_id, to_string_ss(bind_ep).c_str());
87 
88  if (remote_host != "") {
89  if (remote_host != BROADCAST_REMOTE_HOST && remote_host != PERMANENT_BROADCAST_REMOTE_HOST)
90  remote_exists = resolve_address_udp(io_service, conn_id, remote_host, remote_port, remote_ep);
91  else {
92  remote_exists = true;
93  remote_ep = udp::endpoint(boost::asio::ip::address_v4::broadcast(), remote_port);
94  }
95 
96  if (remote_exists)
97  CONSOLE_BRIDGE_logInform(PFXd "Remote address: %s", conn_id, to_string_ss(remote_ep).c_str());
98  else
99  CONSOLE_BRIDGE_logWarn(PFXd "Remote address resolve failed.", conn_id);
100  }
101 
102  try {
103  socket.open(udp::v4());
104 
105  // set buffer opt. size from QGC
106  socket.set_option(udps::reuse_address(true));
107  socket.set_option(udps::send_buffer_size(256_KiB));
108  socket.set_option(udps::receive_buffer_size(512_KiB));
109 
110  socket.bind(bind_ep);
111 
112  if (remote_host == BROADCAST_REMOTE_HOST) {
113  socket.set_option(udps::broadcast(true));
114  } else if (remote_host == PERMANENT_BROADCAST_REMOTE_HOST) {
115  socket.set_option(udps::broadcast(true));
116  permanent_broadcast = true;
117  }
118  }
119  catch (boost::system::system_error &err) {
120  throw DeviceError("udp", err);
121  }
122 }
123 
125 {
126  close();
127 }
128 
130  const ReceivedCb &cb_handle_message,
131  const ClosedCb &cb_handle_closed_port)
132 {
133  message_received_cb = cb_handle_message;
134  port_closed_cb = cb_handle_closed_port;
135 
136  // give some work to io_service before start
137  io_service.post(std::bind(&MAVConnUDP::do_recvfrom, this));
138 
139  // run io_service for async io
140  io_thread = std::thread([this] () {
142  io_service.run();
143  });
144 }
145 
147 {
148  lock_guard lock(mutex);
149  if (!is_open())
150  return;
151 
152  socket.cancel();
153  socket.close();
154 
155  io_work.reset();
156  io_service.stop();
157 
158  if (io_thread.joinable())
159  io_thread.join();
160 
161  io_service.reset();
162 
163  if (port_closed_cb)
164  port_closed_cb();
165 }
166 
167 void MAVConnUDP::send_bytes(const uint8_t *bytes, size_t length)
168 {
169  if (!is_open()) {
170  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
171  return;
172  }
173 
174  if (!remote_exists) {
175  CONSOLE_BRIDGE_logDebug(PFXd "send: Remote not known, message dropped.", conn_id);
176  return;
177  }
178 
179  {
180  lock_guard lock(mutex);
181 
182  if (tx_q.size() >= MAX_TXQ_SIZE)
183  throw std::length_error("MAVConnUDP::send_bytes: TX queue overflow");
184 
185  tx_q.emplace_back(bytes, length);
186  }
187  io_service.post(std::bind(&MAVConnUDP::do_sendto, shared_from_this(), true));
188 }
189 
190 void MAVConnUDP::send_message(const mavlink_message_t *message)
191 {
192  assert(message != nullptr);
193 
194  if (!is_open()) {
195  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
196  return;
197  }
198 
199  if (!remote_exists) {
200  CONSOLE_BRIDGE_logDebug(PFXd "send: Remote not known, message dropped.", conn_id);
201  return;
202  }
203 
204  log_send(PFX, message);
205 
206  {
207  lock_guard lock(mutex);
208 
209  if (tx_q.size() >= MAX_TXQ_SIZE)
210  throw std::length_error("MAVConnUDP::send_message: TX queue overflow");
211 
212  tx_q.emplace_back(message);
213  }
214  io_service.post(std::bind(&MAVConnUDP::do_sendto, shared_from_this(), true));
215 }
216 
217 void MAVConnUDP::send_message(const mavlink::Message &message, const uint8_t source_compid)
218 {
219  if (!is_open()) {
220  CONSOLE_BRIDGE_logError(PFXd "send: channel closed!", conn_id);
221  return;
222  }
223 
224  if (!remote_exists) {
225  CONSOLE_BRIDGE_logDebug(PFXd "send: Remote not known, message dropped.", conn_id);
226  return;
227  }
228 
229  log_send_obj(PFX, message);
230 
231  {
232  lock_guard lock(mutex);
233 
234  if (tx_q.size() >= MAX_TXQ_SIZE)
235  throw std::length_error("MAVConnUDP::send_message: TX queue overflow");
236 
237  tx_q.emplace_back(message, get_status_p(), sys_id, source_compid);
238  }
239  io_service.post(std::bind(&MAVConnUDP::do_sendto, shared_from_this(), true));
240 }
241 
243 {
244  auto sthis = shared_from_this();
245  socket.async_receive_from(
246  buffer(rx_buf),
248  [sthis] (error_code error, size_t bytes_transferred) {
249  if (error) {
250  CONSOLE_BRIDGE_logError(PFXd "receive: %s", sthis->conn_id, error.message().c_str());
251  sthis->close();
252  return;
253  }
254 
255  if (!sthis->permanent_broadcast && sthis->remote_ep != sthis->last_remote_ep) {
256  CONSOLE_BRIDGE_logInform(PFXd "Remote address: %s", sthis->conn_id, to_string_ss(sthis->remote_ep).c_str());
257  sthis->remote_exists = true;
258  sthis->last_remote_ep = sthis->remote_ep;
259  }
260 
261  sthis->parse_buffer(PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
262  sthis->do_recvfrom();
263  });
264 }
265 
266 void MAVConnUDP::do_sendto(bool check_tx_state)
267 {
268  if (check_tx_state && tx_in_progress)
269  return;
270 
271  lock_guard lock(mutex);
272  if (tx_q.empty())
273  return;
274 
275  tx_in_progress = true;
276  auto sthis = shared_from_this();
277  auto &buf_ref = tx_q.front();
278  socket.async_send_to(
279  buffer(buf_ref.dpos(), buf_ref.nbytes()),
280  remote_ep,
281  [sthis, &buf_ref] (error_code error, size_t bytes_transferred) {
282  assert(bytes_transferred <= buf_ref.len);
283 
284  if (error == boost::asio::error::network_unreachable) {
285  CONSOLE_BRIDGE_logWarn(PFXd "sendto: %s, retrying", sthis->conn_id, error.message().c_str());
286  // do not return, try to resend
287  }
288  else if (error) {
289  CONSOLE_BRIDGE_logError(PFXd "sendto: %s", sthis->conn_id, error.message().c_str());
290  sthis->close();
291  return;
292  }
293 
294  sthis->iostat_tx_add(bytes_transferred);
295  lock_guard lock(sthis->mutex);
296 
297  if (sthis->tx_q.empty()) {
298  sthis->tx_in_progress = false;
299  return;
300  }
301 
302  buf_ref.pos += bytes_transferred;
303  if (buf_ref.nbytes() == 0) {
304  sthis->tx_q.pop_front();
305  }
306 
307  if (!sthis->tx_q.empty())
308  sthis->do_sendto(false);
309  else
310  sthis->tx_in_progress = false;
311  });
312 }
313 
315 {
316  return to_string_ss(remote_ep);
317 }
318 
319 } // namespace mavconn
void log_send(const char *pfx, const mavlink::mavlink_message_t *msg)
Definition: interface.cpp:132
ClosedCb port_closed_cb
Port closed notification callback.
Definition: interface.h:202
std::lock_guard< std::recursive_mutex > lock_guard
Definition: interface.h:42
std::function< void(const mavlink::mavlink_message_t *message, const Framing framing)> ReceivedCb
Definition: interface.h:102
#define CONSOLE_BRIDGE_logInform(fmt,...)
boost::asio::ip::udp::endpoint bind_ep
Definition: udp.h:79
#define CONSOLE_BRIDGE_logWarn(fmt,...)
void connect(const ReceivedCb &cb_handle_message, const ClosedCb &cb_handle_closed_port=ClosedCb()) override
Establish connection, automatically called by open_url()
Definition: udp.cpp:129
static constexpr size_t MAX_TXQ_SIZE
Maximum count of transmission buffers.
Definition: interface.h:268
Common exception for communication error.
Definition: interface.h:64
const std::string to_string_ss(T &obj)
Convert to string objects with operator <<.
Definition: thread_utils.h:71
MAVConnUDP(uint8_t system_id=1, uint8_t component_id=MAV_COMP_ID_UDP_BRIDGE, std::string bind_host=DEFAULT_BIND_HOST, unsigned short bind_port=DEFAULT_BIND_PORT, std::string remote_host=DEFAULT_REMOTE_HOST, unsigned short remote_port=DEFAULT_REMOTE_PORT)
Definition: udp.cpp:68
void send_bytes(const uint8_t *bytes, size_t length) override
Send raw bytes (for some quirks)
Definition: udp.cpp:167
#define CONSOLE_BRIDGE_logDebug(fmt,...)
static bool resolve_address_udp(io_service &io, size_t chan, std::string host, unsigned short port, udp::endpoint &ep)
Definition: udp.cpp:38
void log_send_obj(const char *pfx, const mavlink::Message &msg)
Definition: interface.cpp:142
ReceivedCb message_received_cb
Message receive callback.
Definition: interface.h:200
bool permanent_broadcast
Definition: udp.h:72
boost::asio::ip::udp::endpoint remote_ep
Definition: udp.h:76
std::string get_remote_endpoint() const
Definition: udp.cpp:314
Generic mavlink interface.
Definition: interface.h:97
#define PFXd
Definition: udp.cpp:35
std::deque< MsgBuffer > tx_q
Definition: udp.h:82
std::array< uint8_t, MsgBuffer::MAX_SIZE > rx_buf
Definition: udp.h:83
mavlink::mavlink_status_t * get_status_p()
Definition: interface.h:276
std::function< void(void)> ClosedCb
Definition: interface.h:103
size_t conn_id
Channel number used for logging.
Definition: interface.h:274
#define CONSOLE_BRIDGE_logError(fmt,...)
static constexpr auto BROADCAST_REMOTE_HOST
Markers for broadcast modes. Not valid domain names.
Definition: udp.h:39
void close() override
Close connection.
Definition: udp.cpp:146
some useful utils
MAVConn console-bridge compatibility header.
bool is_open() override
Definition: udp.h:62
static constexpr auto PERMANENT_BROADCAST_REMOTE_HOST
Definition: udp.h:40
bool set_this_thread_name(const std::string &name, Args &&... args)
Set name to current thread, printf-like.
Definition: thread_utils.h:55
boost::asio::ip::udp::socket socket
Definition: udp.h:75
std::atomic< bool > remote_exists
Definition: udp.h:74
std::atomic< bool > tx_in_progress
Definition: udp.h:81
#define PFX
Definition: udp.cpp:34
void do_sendto(bool check_tx_state)
Definition: udp.cpp:266
std::thread io_thread
Definition: udp.h:71
std::unique_ptr< boost::asio::io_service::work > io_work
Definition: udp.h:70
boost::asio::io_service io_service
Definition: udp.h:69
virtual ~MAVConnUDP()
Definition: udp.cpp:124
void send_message(const mavlink::mavlink_message_t *message) override
Send message (mavlink_message_t)
boost::asio::ip::udp::endpoint recv_ep
Definition: udp.h:77
uint8_t sys_id
Connection System Id.
Definition: interface.h:262
std::recursive_mutex mutex
Definition: udp.h:84
void do_recvfrom()
Definition: udp.cpp:242


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Tue Jun 13 2023 02:17:45