udp.cpp
Go to the documentation of this file.
00001 
00009 /*
00010  * Copyright 2013,2014 Vladimir Ermakov.
00011  *
00012  * This program is free software; you can redistribute it and/or modify
00013  * it under the terms of the GNU General Public License as published by
00014  * the Free Software Foundation; either version 3 of the License, or
00015  * (at your option) any later version.
00016  *
00017  * This program is distributed in the hope that it will be useful, but
00018  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
00019  * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
00020  * for more details.
00021  *
00022  * You should have received a copy of the GNU General Public License along
00023  * with this program; if not, write to the Free Software Foundation, Inc.,
00024  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
00025  */
00026 
00027 #include <cassert>
00028 #include <console_bridge/console.h>
00029 
00030 #include <mavconn/thread_utils.h>
00031 #include <mavconn/udp.h>
00032 
00033 namespace mavconn {
00034 using boost::system::error_code;
00035 using boost::asio::io_service;
00036 using boost::asio::ip::udp;
00037 using boost::asio::buffer;
00038 using mavutils::to_string_cs;
00039 typedef std::lock_guard<std::recursive_mutex> lock_guard;
00040 
00041 
00042 static bool resolve_address_udp(io_service &io, std::string host, unsigned short port, udp::endpoint &ep)
00043 {
00044         bool result = false;
00045         udp::resolver resolver(io);
00046         error_code ec;
00047 
00048         udp::resolver::query query(host, "");
00049         std::for_each(resolver.resolve(query, ec), udp::resolver::iterator(),
00050                 [&](const udp::endpoint &q_ep) {
00051                         ep = q_ep;
00052                         ep.port(port);
00053                         result = true;
00054                         logDebug("udp: host %s resolved as %s", host.c_str(), to_string_cs(ep));
00055                 });
00056 
00057         if (ec) {
00058                 logWarn("udp: resolve error: %s", ec.message().c_str());
00059                 result = false;
00060         }
00061 
00062         return result;
00063 }
00064 
00065 
00066 MAVConnUDP::MAVConnUDP(uint8_t system_id, uint8_t component_id,
00067                 std::string bind_host, unsigned short bind_port,
00068                 std::string remote_host, unsigned short remote_port) :
00069         MAVConnInterface(system_id, component_id),
00070         remote_exists(false),
00071         tx_in_progress(false),
00072         io_service(),
00073         io_work(new io_service::work(io_service)),
00074         socket(io_service)
00075 {
00076         if (!resolve_address_udp(io_service, bind_host, bind_port, bind_ep))
00077                 throw DeviceError("udp: resolve", "Bind address resolve failed");
00078 
00079         logInform("udp%d: Bind address: %s", channel, to_string_cs(bind_ep));
00080 
00081         if (remote_host != "") {
00082                 remote_exists = resolve_address_udp(io_service, remote_host, remote_port, remote_ep);
00083 
00084                 if (remote_exists)
00085                         logInform("udp%d: Remote address: %s", channel, to_string_cs(remote_ep));
00086                 else
00087                         logWarn("udp%d: Remote address resolve failed.", channel);
00088         }
00089 
00090         try {
00091                 socket.open(udp::v4());
00092                 socket.bind(bind_ep);
00093         }
00094         catch (boost::system::system_error &err) {
00095                 throw DeviceError("udp", err);
00096         }
00097 
00098         // give some work to io_service before start
00099         io_service.post(boost::bind(&MAVConnUDP::do_recvfrom, this));
00100 
00101         // run io_service for async io
00102         std::thread t(boost::bind(&io_service::run, &this->io_service));
00103         mavutils::set_thread_name(t, "MAVConnUDP%d", channel);
00104         io_thread.swap(t);
00105 }
00106 
00107 MAVConnUDP::~MAVConnUDP() {
00108         close();
00109 }
00110 
00111 void MAVConnUDP::close() {
00112         lock_guard lock(mutex);
00113         if (!is_open())
00114                 return;
00115 
00116         io_work.reset();
00117         io_service.stop();
00118         socket.close();
00119 
00120         // clear tx queue
00121         std::for_each(tx_q.begin(), tx_q.end(),
00122                         [](MsgBuffer *p) { delete p; });
00123         tx_q.clear();
00124 
00125         if (io_thread.joinable())
00126                 io_thread.join();
00127 
00128         /* emit */ port_closed();
00129 }
00130 
00131 void MAVConnUDP::send_bytes(const uint8_t *bytes, size_t length)
00132 {
00133         if (!is_open()) {
00134                 logError("udp%d:send: channel closed!", channel);
00135                 return;
00136         }
00137 
00138         if (!remote_exists) {
00139                 logDebug("udp%d:send: Remote not known, message dropped.", channel);
00140                 return;
00141         }
00142 
00143         MsgBuffer *buf = new MsgBuffer(bytes, length);
00144         {
00145                 lock_guard lock(mutex);
00146                 tx_q.push_back(buf);
00147         }
00148         io_service.post(boost::bind(&MAVConnUDP::do_sendto, this, true));
00149 }
00150 
00151 void MAVConnUDP::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00152 {
00153         assert(message != nullptr);
00154 
00155         if (!is_open()) {
00156                 logError("udp%d:send: channel closed!", channel);
00157                 return;
00158         }
00159 
00160         if (!remote_exists) {
00161                 logDebug("udp%d:send: Remote not known, message dropped.", channel);
00162                 return;
00163         }
00164 
00165         logDebug("udp%d:send: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d",
00166                         channel, message->msgid, message->len, sysid, compid);
00167 
00168         MsgBuffer *buf = new_msgbuffer(message, sysid, compid);
00169         {
00170                 lock_guard lock(mutex);
00171                 tx_q.push_back(buf);
00172         }
00173         io_service.post(boost::bind(&MAVConnUDP::do_sendto, this, true));
00174 }
00175 
00176 void MAVConnUDP::do_recvfrom()
00177 {
00178         socket.async_receive_from(
00179                         buffer(rx_buf, sizeof(rx_buf)),
00180                         remote_ep,
00181                         boost::bind(&MAVConnUDP::async_receive_end,
00182                                 this,
00183                                 boost::asio::placeholders::error,
00184                                 boost::asio::placeholders::bytes_transferred));
00185 }
00186 
00187 void MAVConnUDP::async_receive_end(error_code error, size_t bytes_transferred)
00188 {
00189         mavlink_message_t message;
00190         mavlink_status_t status;
00191 
00192         if (error) {
00193                 logError("udp%d:receive: %s", channel, error.message().c_str());
00194                 close();
00195                 return;
00196         }
00197 
00198         if (remote_ep != last_remote_ep) {
00199                 logInform("udp%d: Remote address: %s", channel, to_string_cs(remote_ep));
00200                 remote_exists = true;
00201                 last_remote_ep = remote_ep;
00202         }
00203 
00204         for (ssize_t i = 0; i < bytes_transferred; i++) {
00205                 if (mavlink_parse_char(channel, rx_buf[i], &message, &status)) {
00206                         logDebug("udp%d:recv: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d",
00207                                         channel, message.msgid, message.len, message.sysid, message.compid);
00208 
00209                         /* emit */ message_received(&message, message.sysid, message.compid);
00210                 }
00211         }
00212 
00213         do_recvfrom();
00214 }
00215 
00216 void MAVConnUDP::do_sendto(bool check_tx_state)
00217 {
00218         if (check_tx_state && tx_in_progress)
00219                 return;
00220 
00221         lock_guard lock(mutex);
00222         if (tx_q.empty())
00223                 return;
00224 
00225         tx_in_progress = true;
00226         MsgBuffer *buf = tx_q.front();
00227         socket.async_send_to(
00228                         buffer(buf->dpos(), buf->nbytes()),
00229                         remote_ep,
00230                         boost::bind(&MAVConnUDP::async_sendto_end,
00231                                 this,
00232                                 boost::asio::placeholders::error,
00233                                 boost::asio::placeholders::bytes_transferred));
00234 }
00235 
00236 void MAVConnUDP::async_sendto_end(error_code error, size_t bytes_transferred)
00237 {
00238         if (error) {
00239                 logError("udp%d:sendto: %s", channel, error.message().c_str());
00240                 close();
00241                 return;
00242         }
00243 
00244         lock_guard lock(mutex);
00245         if (tx_q.empty()) {
00246                 tx_in_progress = false;
00247                 return;
00248         }
00249 
00250         MsgBuffer *buf = tx_q.front();
00251         buf->pos += bytes_transferred;
00252         if (buf->nbytes() == 0) {
00253                 tx_q.pop_front();
00254                 delete buf;
00255         }
00256 
00257         if (!tx_q.empty())
00258                 do_sendto(false);
00259         else
00260                 tx_in_progress = false;
00261 }
00262 
00263 }; // namespace mavconn


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Wed Aug 26 2015 12:29:08