00001
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include <cassert>
00028 #include <console_bridge/console.h>
00029
00030 #include <mavconn/thread_utils.h>
00031 #include <mavconn/udp.h>
00032
00033 namespace mavconn {
00034 using boost::system::error_code;
00035 using boost::asio::io_service;
00036 using boost::asio::ip::udp;
00037 using boost::asio::buffer;
00038 using mavutils::to_string_cs;
00039 typedef std::lock_guard<std::recursive_mutex> lock_guard;
00040
00041
00042 static bool resolve_address_udp(io_service &io, std::string host, unsigned short port, udp::endpoint &ep)
00043 {
00044 bool result = false;
00045 udp::resolver resolver(io);
00046 error_code ec;
00047
00048 udp::resolver::query query(host, "");
00049 std::for_each(resolver.resolve(query, ec), udp::resolver::iterator(),
00050 [&](const udp::endpoint &q_ep) {
00051 ep = q_ep;
00052 ep.port(port);
00053 result = true;
00054 logDebug("udp: host %s resolved as %s", host.c_str(), to_string_cs(ep));
00055 });
00056
00057 if (ec) {
00058 logWarn("udp: resolve error: %s", ec.message().c_str());
00059 result = false;
00060 }
00061
00062 return result;
00063 }
00064
00065
00066 MAVConnUDP::MAVConnUDP(uint8_t system_id, uint8_t component_id,
00067 std::string bind_host, unsigned short bind_port,
00068 std::string remote_host, unsigned short remote_port) :
00069 MAVConnInterface(system_id, component_id),
00070 remote_exists(false),
00071 tx_in_progress(false),
00072 io_service(),
00073 io_work(new io_service::work(io_service)),
00074 socket(io_service)
00075 {
00076 if (!resolve_address_udp(io_service, bind_host, bind_port, bind_ep))
00077 throw DeviceError("udp: resolve", "Bind address resolve failed");
00078
00079 logInform("udp%d: Bind address: %s", channel, to_string_cs(bind_ep));
00080
00081 if (remote_host != "") {
00082 remote_exists = resolve_address_udp(io_service, remote_host, remote_port, remote_ep);
00083
00084 if (remote_exists)
00085 logInform("udp%d: Remote address: %s", channel, to_string_cs(remote_ep));
00086 else
00087 logWarn("udp%d: Remote address resolve failed.", channel);
00088 }
00089
00090 try {
00091 socket.open(udp::v4());
00092 socket.bind(bind_ep);
00093 }
00094 catch (boost::system::system_error &err) {
00095 throw DeviceError("udp", err);
00096 }
00097
00098
00099 io_service.post(boost::bind(&MAVConnUDP::do_recvfrom, this));
00100
00101
00102 std::thread t(boost::bind(&io_service::run, &this->io_service));
00103 mavutils::set_thread_name(t, "MAVConnUDP%d", channel);
00104 io_thread.swap(t);
00105 }
00106
00107 MAVConnUDP::~MAVConnUDP() {
00108 close();
00109 }
00110
00111 void MAVConnUDP::close() {
00112 lock_guard lock(mutex);
00113 if (!is_open())
00114 return;
00115
00116 io_work.reset();
00117 io_service.stop();
00118 socket.close();
00119
00120
00121 std::for_each(tx_q.begin(), tx_q.end(),
00122 [](MsgBuffer *p) { delete p; });
00123 tx_q.clear();
00124
00125 if (io_thread.joinable())
00126 io_thread.join();
00127
00128 port_closed();
00129 }
00130
00131 void MAVConnUDP::send_bytes(const uint8_t *bytes, size_t length)
00132 {
00133 if (!is_open()) {
00134 logError("udp%d:send: channel closed!", channel);
00135 return;
00136 }
00137
00138 if (!remote_exists) {
00139 logDebug("udp%d:send: Remote not known, message dropped.", channel);
00140 return;
00141 }
00142
00143 MsgBuffer *buf = new MsgBuffer(bytes, length);
00144 {
00145 lock_guard lock(mutex);
00146 tx_q.push_back(buf);
00147 }
00148 io_service.post(boost::bind(&MAVConnUDP::do_sendto, this, true));
00149 }
00150
00151 void MAVConnUDP::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00152 {
00153 assert(message != nullptr);
00154
00155 if (!is_open()) {
00156 logError("udp%d:send: channel closed!", channel);
00157 return;
00158 }
00159
00160 if (!remote_exists) {
00161 logDebug("udp%d:send: Remote not known, message dropped.", channel);
00162 return;
00163 }
00164
00165 logDebug("udp%d:send: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d",
00166 channel, message->msgid, message->len, sysid, compid);
00167
00168 MsgBuffer *buf = new_msgbuffer(message, sysid, compid);
00169 {
00170 lock_guard lock(mutex);
00171 tx_q.push_back(buf);
00172 }
00173 io_service.post(boost::bind(&MAVConnUDP::do_sendto, this, true));
00174 }
00175
00176 void MAVConnUDP::do_recvfrom()
00177 {
00178 socket.async_receive_from(
00179 buffer(rx_buf, sizeof(rx_buf)),
00180 remote_ep,
00181 boost::bind(&MAVConnUDP::async_receive_end,
00182 this,
00183 boost::asio::placeholders::error,
00184 boost::asio::placeholders::bytes_transferred));
00185 }
00186
00187 void MAVConnUDP::async_receive_end(error_code error, size_t bytes_transferred)
00188 {
00189 mavlink_message_t message;
00190 mavlink_status_t status;
00191
00192 if (error) {
00193 logError("udp%d:receive: %s", channel, error.message().c_str());
00194 close();
00195 return;
00196 }
00197
00198 if (remote_ep != last_remote_ep) {
00199 logInform("udp%d: Remote address: %s", channel, to_string_cs(remote_ep));
00200 remote_exists = true;
00201 last_remote_ep = remote_ep;
00202 }
00203
00204 for (ssize_t i = 0; i < bytes_transferred; i++) {
00205 if (mavlink_parse_char(channel, rx_buf[i], &message, &status)) {
00206 logDebug("udp%d:recv: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d",
00207 channel, message.msgid, message.len, message.sysid, message.compid);
00208
00209 message_received(&message, message.sysid, message.compid);
00210 }
00211 }
00212
00213 do_recvfrom();
00214 }
00215
00216 void MAVConnUDP::do_sendto(bool check_tx_state)
00217 {
00218 if (check_tx_state && tx_in_progress)
00219 return;
00220
00221 lock_guard lock(mutex);
00222 if (tx_q.empty())
00223 return;
00224
00225 tx_in_progress = true;
00226 MsgBuffer *buf = tx_q.front();
00227 socket.async_send_to(
00228 buffer(buf->dpos(), buf->nbytes()),
00229 remote_ep,
00230 boost::bind(&MAVConnUDP::async_sendto_end,
00231 this,
00232 boost::asio::placeholders::error,
00233 boost::asio::placeholders::bytes_transferred));
00234 }
00235
00236 void MAVConnUDP::async_sendto_end(error_code error, size_t bytes_transferred)
00237 {
00238 if (error) {
00239 logError("udp%d:sendto: %s", channel, error.message().c_str());
00240 close();
00241 return;
00242 }
00243
00244 lock_guard lock(mutex);
00245 if (tx_q.empty()) {
00246 tx_in_progress = false;
00247 return;
00248 }
00249
00250 MsgBuffer *buf = tx_q.front();
00251 buf->pos += bytes_transferred;
00252 if (buf->nbytes() == 0) {
00253 tx_q.pop_front();
00254 delete buf;
00255 }
00256
00257 if (!tx_q.empty())
00258 do_sendto(false);
00259 else
00260 tx_in_progress = false;
00261 }
00262
00263 };