tcp.cpp
Go to the documentation of this file.
00001 
00009 /*
00010  * libmavconn
00011  * Copyright 2014,2015 Vladimir Ermakov, All rights reserved.
00012  *
00013  * This file is part of the mavros package and subject to the license terms
00014  * in the top-level LICENSE file of the mavros repository.
00015  * https://github.com/mavlink/mavros/tree/master/LICENSE.md
00016  */
00017 
00018 #include <cassert>
00019 #include <console_bridge/console.h>
00020 
00021 #include <mavconn/thread_utils.h>
00022 #include <mavconn/tcp.h>
00023 
00024 namespace mavconn {
00025 using boost::system::error_code;
00026 using boost::asio::io_service;
00027 using boost::asio::ip::tcp;
00028 using boost::asio::buffer;
00029 using mavutils::to_string_ss;
00030 typedef std::lock_guard<std::recursive_mutex> lock_guard;
00031 
00032 #define PFXd    "mavconn: tcp%d: "
00033 
00034 
00035 static bool resolve_address_tcp(io_service &io, int chan, std::string host, unsigned short port, tcp::endpoint &ep)
00036 {
00037         bool result = false;
00038         tcp::resolver resolver(io);
00039         error_code ec;
00040 
00041         tcp::resolver::query query(host, "");
00042         std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(),
00043                 [&](const tcp::endpoint &q_ep) {
00044                         ep = q_ep;
00045                         ep.port(port);
00046                         result = true;
00047                         logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
00048                 });
00049 
00050         if (ec) {
00051                 logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
00052                 result = false;
00053         }
00054 
00055         return result;
00056 }
00057 
00058 
00059 /* -*- TCP client variant -*- */
00060 
00061 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
00062                 std::string server_host, unsigned short server_port) :
00063         MAVConnInterface(system_id, component_id),
00064         tx_in_progress(false),
00065         io_service(),
00066         io_work(new io_service::work(io_service)),
00067         socket(io_service)
00068 {
00069         if (!resolve_address_tcp(io_service, channel, server_host, server_port, server_ep))
00070                 throw DeviceError("tcp: resolve", "Bind address resolve failed");
00071 
00072         logInform(PFXd "Server address: %s", channel, to_string_ss(server_ep).c_str());
00073 
00074         try {
00075                 socket.open(tcp::v4());
00076                 socket.connect(server_ep);
00077         }
00078         catch (boost::system::system_error &err) {
00079                 throw DeviceError("tcp", err);
00080         }
00081 
00082         // give some work to io_service before start
00083         io_service.post(boost::bind(&MAVConnTCPClient::do_recv, this));
00084 
00085         // run io_service for async io
00086         std::thread t(boost::bind(&io_service::run, &this->io_service));
00087         mavutils::set_thread_name(t, "MAVConnTCPc%d", channel);
00088         io_thread.swap(t);
00089 }
00090 
00091 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
00092                 boost::asio::io_service &server_io) :
00093         MAVConnInterface(system_id, component_id),
00094         socket(server_io)
00095 {
00096         // waiting when server call client_connected()
00097 }
00098 
00099 void MAVConnTCPClient::client_connected(int server_channel) {
00100         logInform(PFXd "Got client, channel: %d, address: %s",
00101                         server_channel, channel, to_string_ss(server_ep).c_str());
00102 
00103         // start recv
00104         socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_recv, this));
00105 }
00106 
00107 MAVConnTCPClient::~MAVConnTCPClient() {
00108         close();
00109 }
00110 
00111 void MAVConnTCPClient::close() {
00112         lock_guard lock(mutex);
00113         if (!is_open())
00114                 return;
00115 
00116         io_work.reset();
00117         io_service.stop();
00118         socket.close();
00119 
00120         // clear tx queue
00121         for (auto &p : tx_q)
00122                 delete p;
00123         tx_q.clear();
00124 
00125         if (io_thread.joinable())
00126                 io_thread.join();
00127 
00128         /* emit */ port_closed();
00129 }
00130 
00131 void MAVConnTCPClient::send_bytes(const uint8_t *bytes, size_t length)
00132 {
00133         if (!is_open()) {
00134                 logError(PFXd "send: channel closed!", channel);
00135                 return;
00136         }
00137 
00138         MsgBuffer *buf = new MsgBuffer(bytes, length);
00139         {
00140                 lock_guard lock(mutex);
00141                 tx_q.push_back(buf);
00142         }
00143         socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_send, this, true));
00144 }
00145 
00146 void MAVConnTCPClient::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00147 {
00148         assert(message != nullptr);
00149 
00150         if (!is_open()) {
00151                 logError(PFXd "send: channel closed!", channel);
00152                 return;
00153         }
00154 
00155         logDebug(PFXd "send: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d Seq: %d",
00156                         channel, message->msgid, message->len, sysid, compid, message->seq);
00157 
00158         MsgBuffer *buf = new_msgbuffer(message, sysid, compid);
00159         {
00160                 lock_guard lock(mutex);
00161                 tx_q.push_back(buf);
00162         }
00163         socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_send, this, true));
00164 }
00165 
00166 void MAVConnTCPClient::do_recv()
00167 {
00168         socket.async_receive(
00169                         buffer(rx_buf, sizeof(rx_buf)),
00170                         boost::bind(&MAVConnTCPClient::async_receive_end,
00171                                 this,
00172                                 boost::asio::placeholders::error,
00173                                 boost::asio::placeholders::bytes_transferred));
00174 }
00175 
00176 void MAVConnTCPClient::async_receive_end(error_code error, size_t bytes_transferred)
00177 {
00178         mavlink_message_t message;
00179         mavlink_status_t status;
00180 
00181         if (error) {
00182                 logError(PFXd "receive: %s", channel, error.message().c_str());
00183                 close();
00184                 return;
00185         }
00186 
00187         iostat_rx_add(bytes_transferred);
00188         for (size_t i = 0; i < bytes_transferred; i++) {
00189                 if (mavlink_parse_char(channel, rx_buf[i], &message, &status)) {
00190                         logDebug(PFXd "recv: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d Seq: %d",
00191                                         channel, message.msgid, message.len, message.sysid, message.compid, message.seq);
00192 
00193                         /* emit */ message_received(&message, message.sysid, message.compid);
00194                 }
00195         }
00196 
00197         do_recv();
00198 }
00199 
00200 void MAVConnTCPClient::do_send(bool check_tx_state)
00201 {
00202         if (check_tx_state && tx_in_progress)
00203                 return;
00204 
00205         lock_guard lock(mutex);
00206         if (tx_q.empty())
00207                 return;
00208 
00209         tx_in_progress = true;
00210         MsgBuffer *buf = tx_q.front();
00211         socket.async_send(
00212                         buffer(buf->dpos(), buf->nbytes()),
00213                         boost::bind(&MAVConnTCPClient::async_send_end,
00214                                 this,
00215                                 boost::asio::placeholders::error,
00216                                 boost::asio::placeholders::bytes_transferred));
00217 }
00218 
00219 void MAVConnTCPClient::async_send_end(error_code error, size_t bytes_transferred)
00220 {
00221         if (error) {
00222                 logError(PFXd "send: %s", channel, error.message().c_str());
00223                 close();
00224                 return;
00225         }
00226 
00227         iostat_tx_add(bytes_transferred);
00228         lock_guard lock(mutex);
00229         if (tx_q.empty()) {
00230                 tx_in_progress = false;
00231                 return;
00232         }
00233 
00234         MsgBuffer *buf = tx_q.front();
00235         buf->pos += bytes_transferred;
00236         if (buf->nbytes() == 0) {
00237                 tx_q.pop_front();
00238                 delete buf;
00239         }
00240 
00241         if (!tx_q.empty())
00242                 do_send(false);
00243         else
00244                 tx_in_progress = false;
00245 }
00246 
00247 
00248 /* -*- TCP server variant -*- */
00249 
00250 MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
00251                 std::string server_host, unsigned short server_port) :
00252         MAVConnInterface(system_id, component_id),
00253         io_service(),
00254         acceptor(io_service)
00255 {
00256         if (!resolve_address_tcp(io_service, channel, server_host, server_port, bind_ep))
00257                 throw DeviceError("tcp-l: resolve", "Bind address resolve failed");
00258 
00259         logInform(PFXd "Bind address: %s", channel, to_string_ss(bind_ep).c_str());
00260 
00261         try {
00262                 acceptor.open(tcp::v4());
00263                 acceptor.set_option(tcp::acceptor::reuse_address(true));
00264                 acceptor.bind(bind_ep);
00265                 acceptor.listen(channes_available());
00266         }
00267         catch (boost::system::system_error &err) {
00268                 throw DeviceError("tcp-l", err);
00269         }
00270 
00271         // give some work to io_service before start
00272         io_service.post(boost::bind(&MAVConnTCPServer::do_accept, this));
00273 
00274         // run io_service for async io
00275         std::thread t(boost::bind(&io_service::run, &this->io_service));
00276         mavutils::set_thread_name(t, "MAVConnTCPs%d", channel);
00277         io_thread.swap(t);
00278 }
00279 
00280 MAVConnTCPServer::~MAVConnTCPServer() {
00281         close();
00282 }
00283 
00284 void MAVConnTCPServer::close() {
00285         lock_guard lock(mutex);
00286         if (!is_open())
00287                 return;
00288 
00289         logInform(PFXd "Terminating server. "
00290                         "All connections will be closed.", channel);
00291 
00292         io_service.stop();
00293         acceptor.close();
00294 
00295         if (io_thread.joinable())
00296                 io_thread.join();
00297 
00298         /* emit */ port_closed();
00299 }
00300 
00301 mavlink_status_t MAVConnTCPServer::get_status()
00302 {
00303         mavlink_status_t status{};
00304 
00305         lock_guard lock(mutex);
00306         for (auto &instp : client_list) {
00307                 auto inst_status = instp->get_status();
00308 
00309 #define ADD_STATUS(_field)      \
00310                 status._field += inst_status._field
00311 
00312                 ADD_STATUS(packet_rx_success_count);
00313                 ADD_STATUS(packet_rx_drop_count);
00314                 ADD_STATUS(buffer_overrun);
00315                 ADD_STATUS(parse_error);
00316                 /* seq counters always 0 for this connection type */
00317 
00318 #undef ADD_STATUS
00319         };
00320 
00321         return status;
00322 }
00323 
00324 MAVConnInterface::IOStat MAVConnTCPServer::get_iostat()
00325 {
00326         MAVConnInterface::IOStat iostat{};
00327 
00328         lock_guard lock(mutex);
00329         for (auto &instp : client_list) {
00330                 auto inst_iostat = instp->get_iostat();
00331 
00332 #define ADD_IOSTAT(_field)      \
00333                 iostat._field += inst_iostat._field
00334 
00335                 ADD_IOSTAT(tx_total_bytes);
00336                 ADD_IOSTAT(rx_total_bytes);
00337                 ADD_IOSTAT(tx_speed);
00338                 ADD_IOSTAT(rx_speed);
00339 
00340 #undef ADD_IOSTAT
00341         };
00342 
00343         return iostat;
00344 }
00345 
00346 void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length)
00347 {
00348         lock_guard lock(mutex);
00349         for (auto &instp : client_list) {
00350                 instp->send_bytes(bytes, length);
00351         }
00352 }
00353 
00354 void MAVConnTCPServer::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00355 {
00356         lock_guard lock(mutex);
00357         for (auto &instp : client_list) {
00358                 instp->send_message(message, sysid, compid);
00359         }
00360 }
00361 
00362 void MAVConnTCPServer::do_accept()
00363 {
00364         acceptor_client.reset();
00365         acceptor_client = boost::make_shared<MAVConnTCPClient>(sys_id, comp_id, io_service);
00366         acceptor.async_accept(
00367                         acceptor_client->socket,
00368                         acceptor_client->server_ep,
00369                         boost::bind(&MAVConnTCPServer::async_accept_end,
00370                                 this,
00371                                 boost::asio::placeholders::error));
00372 }
00373 
00374 void MAVConnTCPServer::async_accept_end(error_code error)
00375 {
00376         if (error) {
00377                 logError(PFXd "accept: %s", channel, error.message().c_str());
00378                 close();
00379                 return;
00380         }
00381 
00382         // NOTE: i want create client class *after* connection accept,
00383         //       but ASIO 1.43 does not support std::move() for sockets.
00384         //       Need find way how to limit channel alloc.
00385         //if (channes_available() <= 0) {
00386         //      ROS_ERROR_NAMED("mavconn", "tcp-l:accept_cb: all channels in use, drop connection");
00387         //      client_sock.close();
00388         //      return;
00389         //}
00390 
00391         lock_guard lock(mutex);
00392         acceptor_client->client_connected(channel);
00393         acceptor_client->message_received.connect(boost::bind(&MAVConnTCPServer::recv_message, this, _1, _2, _3));
00394         acceptor_client->port_closed.connect(boost::bind(&MAVConnTCPServer::client_closed, this,
00395                                 boost::weak_ptr<MAVConnTCPClient>(acceptor_client)));
00396 
00397         client_list.push_back(acceptor_client);
00398         do_accept();
00399 }
00400 
00401 void MAVConnTCPServer::client_closed(boost::weak_ptr<MAVConnTCPClient> weak_instp)
00402 {
00403         if (auto instp = weak_instp.lock()) {
00404                 bool locked = mutex.try_lock();
00405                 logInform(PFXd "Client connection closed, channel: %d, address: %s",
00406                                 channel, instp->channel, to_string_ss(instp->server_ep).c_str());
00407 
00408                 client_list.remove(instp);
00409 
00410                 if (locked)
00411                         mutex.unlock();
00412         }
00413 }
00414 
00415 void MAVConnTCPServer::recv_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00416 {
00417         /* retranslate message */
00418         message_received(message, sysid, compid);
00419 }
00420 
00421 }; // namespace mavconn


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Sat Jun 8 2019 19:55:13