26 using boost::system::error_code;
27 using boost::asio::io_service;
28 using boost::asio::ip::udp;
29 using boost::asio::buffer;
31 using utils::operator
"" _KiB;
32 using mavlink::mavlink_message_t;
34 #define PFX "mavconn: udp"
35 #define PFXd PFX "%zu: "
38 static bool resolve_address_udp(io_service &io,
size_t chan, std::string host,
unsigned short port, udp::endpoint &ep)
41 udp::resolver resolver(io);
44 udp::resolver::query query(host,
"");
46 auto fn = [&](
const udp::endpoint & q_ep) {
53 #if BOOST_ASIO_VERSION >= 101200
54 for (
auto q_ep : resolver.resolve(query, ec)) fn(q_ep);
56 std::for_each(resolver.resolve(query, ec), udp::resolver::iterator(), fn);
69 std::string bind_host,
unsigned short bind_port,
70 std::string remote_host,
unsigned short remote_port) :
73 io_work(new io_service::work(io_service)),
74 permanent_broadcast(false),
77 tx_in_progress(false),
81 using udps = boost::asio::ip::udp::socket;
84 throw DeviceError(
"udp: resolve",
"Bind address resolve failed");
88 if (remote_host !=
"") {
89 if (remote_host != BROADCAST_REMOTE_HOST && remote_host != PERMANENT_BROADCAST_REMOTE_HOST)
90 remote_exists =
resolve_address_udp(io_service, conn_id, remote_host, remote_port, remote_ep);
93 remote_ep = udp::endpoint(boost::asio::ip::address_v4::broadcast(), remote_port);
103 socket.open(udp::v4());
106 socket.set_option(udps::reuse_address(
true));
107 socket.set_option(udps::send_buffer_size(256_KiB));
108 socket.set_option(udps::receive_buffer_size(512_KiB));
110 socket.bind(bind_ep);
112 if (remote_host == BROADCAST_REMOTE_HOST) {
113 socket.set_option(udps::broadcast(
true));
114 }
else if (remote_host == PERMANENT_BROADCAST_REMOTE_HOST) {
115 socket.set_option(udps::broadcast(
true));
116 permanent_broadcast =
true;
119 catch (boost::system::system_error &err) {
120 throw DeviceError(
"udp", err);
131 const ClosedCb &cb_handle_closed_port)
183 throw std::length_error(
"MAVConnUDP::send_bytes: TX queue overflow");
185 tx_q.emplace_back(bytes, length);
192 assert(message !=
nullptr);
210 throw std::length_error(
"MAVConnUDP::send_message: TX queue overflow");
212 tx_q.emplace_back(message);
235 throw std::length_error(
"MAVConnUDP::send_message: TX queue overflow");
244 auto sthis = shared_from_this();
245 socket.async_receive_from(
248 [sthis] (error_code error,
size_t bytes_transferred) {
255 if (!sthis->permanent_broadcast && sthis->remote_ep != sthis->last_remote_ep) {
257 sthis->remote_exists =
true;
258 sthis->last_remote_ep = sthis->remote_ep;
261 sthis->parse_buffer(
PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
262 sthis->do_recvfrom();
276 auto sthis = shared_from_this();
277 auto &buf_ref =
tx_q.front();
279 buffer(buf_ref.dpos(), buf_ref.nbytes()),
281 [sthis, &buf_ref] (error_code error,
size_t bytes_transferred) {
282 assert(bytes_transferred <= buf_ref.len);
284 if (error == boost::asio::error::network_unreachable) {
285 CONSOLE_BRIDGE_logWarn(PFXd
"sendto: %s, retrying", sthis->conn_id, error.message().c_str());
294 sthis->iostat_tx_add(bytes_transferred);
297 if (sthis->tx_q.empty()) {
298 sthis->tx_in_progress =
false;
302 buf_ref.pos += bytes_transferred;
303 if (buf_ref.nbytes() == 0) {
304 sthis->tx_q.pop_front();
307 if (!sthis->tx_q.empty())
308 sthis->do_sendto(
false);
310 sthis->tx_in_progress =
false;
314 std::string MAVConnUDP::get_remote_endpoint()
const