udp.cpp
Go to the documentation of this file.
00001 
00009 /*
00010  * libmavconn
00011  * Copyright 2013,2014,2015 Vladimir Ermakov, All rights reserved.
00012  *
00013  * This file is part of the mavros package and subject to the license terms
00014  * in the top-level LICENSE file of the mavros repository.
00015  * https://github.com/mavlink/mavros/tree/master/LICENSE.md
00016  */
00017 
00018 #include <cassert>
00019 #include <console_bridge/console.h>
00020 
00021 #include <mavconn/thread_utils.h>
00022 #include <mavconn/udp.h>
00023 
00024 namespace mavconn {
00025 using boost::system::error_code;
00026 using boost::asio::io_service;
00027 using boost::asio::ip::udp;
00028 using boost::asio::buffer;
00029 using mavutils::to_string_ss;
00030 typedef std::lock_guard<std::recursive_mutex> lock_guard;
00031 
00032 #define PFXd    "mavconn: udp%d: "
00033 
00034 
00035 static bool resolve_address_udp(io_service &io, int chan, std::string host, unsigned short port, udp::endpoint &ep)
00036 {
00037         bool result = false;
00038         udp::resolver resolver(io);
00039         error_code ec;
00040 
00041         udp::resolver::query query(host, "");
00042         std::for_each(resolver.resolve(query, ec), udp::resolver::iterator(),
00043                 [&](const udp::endpoint &q_ep) {
00044                         ep = q_ep;
00045                         ep.port(port);
00046                         result = true;
00047                         logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
00048                 });
00049 
00050         if (ec) {
00051                 logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
00052                 result = false;
00053         }
00054 
00055         return result;
00056 }
00057 
00058 
00059 MAVConnUDP::MAVConnUDP(uint8_t system_id, uint8_t component_id,
00060                 std::string bind_host, unsigned short bind_port,
00061                 std::string remote_host, unsigned short remote_port) :
00062         MAVConnInterface(system_id, component_id),
00063         remote_exists(false),
00064         tx_in_progress(false),
00065         io_service(),
00066         io_work(new io_service::work(io_service)),
00067         socket(io_service)
00068 {
00069         if (!resolve_address_udp(io_service, channel, bind_host, bind_port, bind_ep))
00070                 throw DeviceError("udp: resolve", "Bind address resolve failed");
00071 
00072         logInform(PFXd "Bind address: %s", channel, to_string_ss(bind_ep).c_str());
00073 
00074         if (remote_host != "") {
00075                 remote_exists = resolve_address_udp(io_service, channel, remote_host, remote_port, remote_ep);
00076 
00077                 if (remote_exists)
00078                         logInform(PFXd "Remote address: %s", channel, to_string_ss(remote_ep).c_str());
00079                 else
00080                         logWarn(PFXd "Remote address resolve failed.", channel);
00081         }
00082 
00083         try {
00084                 socket.open(udp::v4());
00085                 socket.bind(bind_ep);
00086         }
00087         catch (boost::system::system_error &err) {
00088                 throw DeviceError("udp", err);
00089         }
00090 
00091         // give some work to io_service before start
00092         io_service.post(boost::bind(&MAVConnUDP::do_recvfrom, this));
00093 
00094         // run io_service for async io
00095         std::thread t(boost::bind(&io_service::run, &this->io_service));
00096         mavutils::set_thread_name(t, "MAVConnUDP%d", channel);
00097         io_thread.swap(t);
00098 }
00099 
00100 MAVConnUDP::~MAVConnUDP() {
00101         close();
00102 }
00103 
00104 void MAVConnUDP::close() {
00105         lock_guard lock(mutex);
00106         if (!is_open())
00107                 return;
00108 
00109         io_work.reset();
00110         io_service.stop();
00111         socket.close();
00112 
00113         // clear tx queue
00114         for (auto &p : tx_q)
00115                 delete p;
00116         tx_q.clear();
00117 
00118         if (io_thread.joinable())
00119                 io_thread.join();
00120 
00121         /* emit */ port_closed();
00122 }
00123 
00124 void MAVConnUDP::send_bytes(const uint8_t *bytes, size_t length)
00125 {
00126         if (!is_open()) {
00127                 logError(PFXd "send: channel closed!", channel);
00128                 return;
00129         }
00130 
00131         if (!remote_exists) {
00132                 logDebug(PFXd "send: Remote not known, message dropped.", channel);
00133                 return;
00134         }
00135 
00136         MsgBuffer *buf = new MsgBuffer(bytes, length);
00137         {
00138                 lock_guard lock(mutex);
00139                 tx_q.push_back(buf);
00140         }
00141         io_service.post(boost::bind(&MAVConnUDP::do_sendto, this, true));
00142 }
00143 
00144 void MAVConnUDP::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00145 {
00146         assert(message != nullptr);
00147 
00148         if (!is_open()) {
00149                 logError(PFXd "send: channel closed!", channel);
00150                 return;
00151         }
00152 
00153         if (!remote_exists) {
00154                 logDebug(PFXd "send: Remote not known, message dropped.", channel);
00155                 return;
00156         }
00157 
00158         logDebug(PFXd "send: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d Seq: %d",
00159                         channel, message->msgid, message->len, sysid, compid, message->seq);
00160 
00161         MsgBuffer *buf = new_msgbuffer(message, sysid, compid);
00162         {
00163                 lock_guard lock(mutex);
00164                 tx_q.push_back(buf);
00165         }
00166         io_service.post(boost::bind(&MAVConnUDP::do_sendto, this, true));
00167 }
00168 
00169 void MAVConnUDP::do_recvfrom()
00170 {
00171         socket.async_receive_from(
00172                         buffer(rx_buf, sizeof(rx_buf)),
00173                         remote_ep,
00174                         boost::bind(&MAVConnUDP::async_receive_end,
00175                                 this,
00176                                 boost::asio::placeholders::error,
00177                                 boost::asio::placeholders::bytes_transferred));
00178 }
00179 
00180 void MAVConnUDP::async_receive_end(error_code error, size_t bytes_transferred)
00181 {
00182         mavlink_message_t message;
00183         mavlink_status_t status;
00184 
00185         if (error) {
00186                 logError(PFXd "receive: %s", channel, error.message().c_str());
00187                 close();
00188                 return;
00189         }
00190 
00191         if (remote_ep != last_remote_ep) {
00192                 logInform(PFXd "Remote address: %s", channel, to_string_ss(remote_ep).c_str());
00193                 remote_exists = true;
00194                 last_remote_ep = remote_ep;
00195         }
00196 
00197         iostat_rx_add(bytes_transferred);
00198         for (size_t i = 0; i < bytes_transferred; i++) {
00199                 if (mavlink_parse_char(channel, rx_buf[i], &message, &status)) {
00200                         logDebug(PFXd "recv: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d Seq: %d",
00201                                         channel, message.msgid, message.len, message.sysid, message.compid, message.seq);
00202 
00203                         /* emit */ message_received(&message, message.sysid, message.compid);
00204                 }
00205         }
00206 
00207         do_recvfrom();
00208 }
00209 
00210 void MAVConnUDP::do_sendto(bool check_tx_state)
00211 {
00212         if (check_tx_state && tx_in_progress)
00213                 return;
00214 
00215         lock_guard lock(mutex);
00216         if (tx_q.empty())
00217                 return;
00218 
00219         tx_in_progress = true;
00220         MsgBuffer *buf = tx_q.front();
00221         socket.async_send_to(
00222                         buffer(buf->dpos(), buf->nbytes()),
00223                         remote_ep,
00224                         boost::bind(&MAVConnUDP::async_sendto_end,
00225                                 this,
00226                                 boost::asio::placeholders::error,
00227                                 boost::asio::placeholders::bytes_transferred));
00228 }
00229 
00230 void MAVConnUDP::async_sendto_end(error_code error, size_t bytes_transferred)
00231 {
00232         if (error == boost::asio::error::network_unreachable) {
00233                 logWarn(PFXd "sendto: %s, retrying", channel, error.message().c_str());
00234                 // do not return, try to resend
00235         }
00236         else if (error) {
00237                 logError(PFXd "sendto: %s", channel, error.message().c_str());
00238                 close();
00239                 return;
00240         }
00241 
00242         iostat_tx_add(bytes_transferred);
00243         lock_guard lock(mutex);
00244         if (tx_q.empty()) {
00245                 tx_in_progress = false;
00246                 return;
00247         }
00248 
00249         MsgBuffer *buf = tx_q.front();
00250         buf->pos += bytes_transferred;
00251         if (buf->nbytes() == 0) {
00252                 tx_q.pop_front();
00253                 delete buf;
00254         }
00255 
00256         if (!tx_q.empty())
00257                 do_sendto(false);
00258         else
00259                 tx_in_progress = false;
00260 }
00261 
00262 }; // namespace mavconn


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Thu Feb 9 2017 04:00:13