00001
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <cassert>
00019 #include <console_bridge/console.h>
00020
00021 #include <mavconn/thread_utils.h>
00022 #include <mavconn/udp.h>
00023
00024 namespace mavconn {
00025 using boost::system::error_code;
00026 using boost::asio::io_service;
00027 using boost::asio::ip::udp;
00028 using boost::asio::buffer;
00029 using mavutils::to_string_ss;
00030 typedef std::lock_guard<std::recursive_mutex> lock_guard;
00031
00032 #define PFXd "mavconn: udp%d: "
00033
00034
00035 static bool resolve_address_udp(io_service &io, int chan, std::string host, unsigned short port, udp::endpoint &ep)
00036 {
00037 bool result = false;
00038 udp::resolver resolver(io);
00039 error_code ec;
00040
00041 udp::resolver::query query(host, "");
00042 std::for_each(resolver.resolve(query, ec), udp::resolver::iterator(),
00043 [&](const udp::endpoint &q_ep) {
00044 ep = q_ep;
00045 ep.port(port);
00046 result = true;
00047 logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
00048 });
00049
00050 if (ec) {
00051 logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
00052 result = false;
00053 }
00054
00055 return result;
00056 }
00057
00058
00059 MAVConnUDP::MAVConnUDP(uint8_t system_id, uint8_t component_id,
00060 std::string bind_host, unsigned short bind_port,
00061 std::string remote_host, unsigned short remote_port) :
00062 MAVConnInterface(system_id, component_id),
00063 remote_exists(false),
00064 tx_in_progress(false),
00065 io_service(),
00066 io_work(new io_service::work(io_service)),
00067 socket(io_service)
00068 {
00069 if (!resolve_address_udp(io_service, channel, bind_host, bind_port, bind_ep))
00070 throw DeviceError("udp: resolve", "Bind address resolve failed");
00071
00072 logInform(PFXd "Bind address: %s", channel, to_string_ss(bind_ep).c_str());
00073
00074 if (remote_host != "") {
00075 remote_exists = resolve_address_udp(io_service, channel, remote_host, remote_port, remote_ep);
00076
00077 if (remote_exists)
00078 logInform(PFXd "Remote address: %s", channel, to_string_ss(remote_ep).c_str());
00079 else
00080 logWarn(PFXd "Remote address resolve failed.", channel);
00081 }
00082
00083 try {
00084 socket.open(udp::v4());
00085 socket.bind(bind_ep);
00086 }
00087 catch (boost::system::system_error &err) {
00088 throw DeviceError("udp", err);
00089 }
00090
00091
00092 io_service.post(boost::bind(&MAVConnUDP::do_recvfrom, this));
00093
00094
00095 std::thread t(boost::bind(&io_service::run, &this->io_service));
00096 mavutils::set_thread_name(t, "MAVConnUDP%d", channel);
00097 io_thread.swap(t);
00098 }
00099
00100 MAVConnUDP::~MAVConnUDP() {
00101 close();
00102 }
00103
00104 void MAVConnUDP::close() {
00105 lock_guard lock(mutex);
00106 if (!is_open())
00107 return;
00108
00109 io_work.reset();
00110 io_service.stop();
00111 socket.close();
00112
00113
00114 for (auto &p : tx_q)
00115 delete p;
00116 tx_q.clear();
00117
00118 if (io_thread.joinable())
00119 io_thread.join();
00120
00121 port_closed();
00122 }
00123
00124 void MAVConnUDP::send_bytes(const uint8_t *bytes, size_t length)
00125 {
00126 if (!is_open()) {
00127 logError(PFXd "send: channel closed!", channel);
00128 return;
00129 }
00130
00131 if (!remote_exists) {
00132 logDebug(PFXd "send: Remote not known, message dropped.", channel);
00133 return;
00134 }
00135
00136 MsgBuffer *buf = new MsgBuffer(bytes, length);
00137 {
00138 lock_guard lock(mutex);
00139 tx_q.push_back(buf);
00140 }
00141 io_service.post(boost::bind(&MAVConnUDP::do_sendto, this, true));
00142 }
00143
00144 void MAVConnUDP::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00145 {
00146 assert(message != nullptr);
00147
00148 if (!is_open()) {
00149 logError(PFXd "send: channel closed!", channel);
00150 return;
00151 }
00152
00153 if (!remote_exists) {
00154 logDebug(PFXd "send: Remote not known, message dropped.", channel);
00155 return;
00156 }
00157
00158 logDebug(PFXd "send: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d Seq: %d",
00159 channel, message->msgid, message->len, sysid, compid, message->seq);
00160
00161 MsgBuffer *buf = new_msgbuffer(message, sysid, compid);
00162 {
00163 lock_guard lock(mutex);
00164 tx_q.push_back(buf);
00165 }
00166 io_service.post(boost::bind(&MAVConnUDP::do_sendto, this, true));
00167 }
00168
00169 void MAVConnUDP::do_recvfrom()
00170 {
00171 socket.async_receive_from(
00172 buffer(rx_buf, sizeof(rx_buf)),
00173 remote_ep,
00174 boost::bind(&MAVConnUDP::async_receive_end,
00175 this,
00176 boost::asio::placeholders::error,
00177 boost::asio::placeholders::bytes_transferred));
00178 }
00179
00180 void MAVConnUDP::async_receive_end(error_code error, size_t bytes_transferred)
00181 {
00182 mavlink_message_t message;
00183 mavlink_status_t status;
00184
00185 if (error) {
00186 logError(PFXd "receive: %s", channel, error.message().c_str());
00187 close();
00188 return;
00189 }
00190
00191 if (remote_ep != last_remote_ep) {
00192 logInform(PFXd "Remote address: %s", channel, to_string_ss(remote_ep).c_str());
00193 remote_exists = true;
00194 last_remote_ep = remote_ep;
00195 }
00196
00197 iostat_rx_add(bytes_transferred);
00198 for (size_t i = 0; i < bytes_transferred; i++) {
00199 if (mavlink_parse_char(channel, rx_buf[i], &message, &status)) {
00200 logDebug(PFXd "recv: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d Seq: %d",
00201 channel, message.msgid, message.len, message.sysid, message.compid, message.seq);
00202
00203 message_received(&message, message.sysid, message.compid);
00204 }
00205 }
00206
00207 do_recvfrom();
00208 }
00209
00210 void MAVConnUDP::do_sendto(bool check_tx_state)
00211 {
00212 if (check_tx_state && tx_in_progress)
00213 return;
00214
00215 lock_guard lock(mutex);
00216 if (tx_q.empty())
00217 return;
00218
00219 tx_in_progress = true;
00220 MsgBuffer *buf = tx_q.front();
00221 socket.async_send_to(
00222 buffer(buf->dpos(), buf->nbytes()),
00223 remote_ep,
00224 boost::bind(&MAVConnUDP::async_sendto_end,
00225 this,
00226 boost::asio::placeholders::error,
00227 boost::asio::placeholders::bytes_transferred));
00228 }
00229
00230 void MAVConnUDP::async_sendto_end(error_code error, size_t bytes_transferred)
00231 {
00232 if (error == boost::asio::error::network_unreachable) {
00233 logWarn(PFXd "sendto: %s, retrying", channel, error.message().c_str());
00234
00235 }
00236 else if (error) {
00237 logError(PFXd "sendto: %s", channel, error.message().c_str());
00238 close();
00239 return;
00240 }
00241
00242 iostat_tx_add(bytes_transferred);
00243 lock_guard lock(mutex);
00244 if (tx_q.empty()) {
00245 tx_in_progress = false;
00246 return;
00247 }
00248
00249 MsgBuffer *buf = tx_q.front();
00250 buf->pos += bytes_transferred;
00251 if (buf->nbytes() == 0) {
00252 tx_q.pop_front();
00253 delete buf;
00254 }
00255
00256 if (!tx_q.empty())
00257 do_sendto(false);
00258 else
00259 tx_in_progress = false;
00260 }
00261
00262 };