tcp.cpp
Go to the documentation of this file.
00001 
00009 /*
00010  * Copyright 2014 Vladimir Ermakov.
00011  *
00012  * This program is free software; you can redistribute it and/or modify
00013  * it under the terms of the GNU General Public License as published by
00014  * the Free Software Foundation; either version 3 of the License, or
00015  * (at your option) any later version.
00016  *
00017  * This program is distributed in the hope that it will be useful, but
00018  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
00019  * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
00020  * for more details.
00021  *
00022  * You should have received a copy of the GNU General Public License along
00023  * with this program; if not, write to the Free Software Foundation, Inc.,
00024  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
00025  */
00026 
00027 #include <cassert>
00028 #include <console_bridge/console.h>
00029 
00030 #include <mavconn/thread_utils.h>
00031 #include <mavconn/tcp.h>
00032 
00033 namespace mavconn {
00034 using boost::system::error_code;
00035 using boost::asio::io_service;
00036 using boost::asio::ip::tcp;
00037 using boost::asio::buffer;
00038 using mavutils::to_string_cs;
00039 typedef std::lock_guard<std::recursive_mutex> lock_guard;
00040 
00041 
00042 static bool resolve_address_tcp(io_service &io, std::string host, unsigned short port, tcp::endpoint &ep)
00043 {
00044         bool result = false;
00045         tcp::resolver resolver(io);
00046         error_code ec;
00047 
00048         tcp::resolver::query query(host, "");
00049         std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(),
00050                 [&](const tcp::endpoint &q_ep) {
00051                         ep = q_ep;
00052                         ep.port(port);
00053                         result = true;
00054                         logDebug("tcp: host %s resolved as %s", host.c_str(), to_string_cs(ep));
00055                 });
00056 
00057         if (ec) {
00058                 logWarn("tcp: resolve error: %s", ec.message().c_str());
00059                 result = false;
00060         }
00061 
00062         return result;
00063 }
00064 
00065 
00066 /* -*- TCP client variant -*- */
00067 
00068 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
00069                 std::string server_host, unsigned short server_port) :
00070         MAVConnInterface(system_id, component_id),
00071         tx_in_progress(false),
00072         io_service(),
00073         io_work(new io_service::work(io_service)),
00074         socket(io_service)
00075 {
00076         if (!resolve_address_tcp(io_service, server_host, server_port, server_ep))
00077                 throw DeviceError("tcp: resolve", "Bind address resolve failed");
00078 
00079         logInform("tcp%d: Server address: %s", channel, to_string_cs(server_ep));
00080 
00081         try {
00082                 socket.open(tcp::v4());
00083                 socket.connect(server_ep);
00084         }
00085         catch (boost::system::system_error &err) {
00086                 throw DeviceError("tcp", err);
00087         }
00088 
00089         // give some work to io_service before start
00090         io_service.post(boost::bind(&MAVConnTCPClient::do_recv, this));
00091 
00092         // run io_service for async io
00093         std::thread t(boost::bind(&io_service::run, &this->io_service));
00094         mavutils::set_thread_name(t, "MAVConnTCPc%d", channel);
00095         io_thread.swap(t);
00096 }
00097 
00098 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
00099                 boost::asio::io_service &server_io) :
00100         MAVConnInterface(system_id, component_id),
00101         socket(server_io)
00102 {
00103         // waiting when server call client_connected()
00104 }
00105 
00106 void MAVConnTCPClient::client_connected(int server_channel) {
00107         logInform("tcp-l%d: Got client, channel: %d, address: %s",
00108                         server_channel, channel, to_string_cs(server_ep));
00109 
00110         // start recv
00111         socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_recv, this));
00112 }
00113 
00114 MAVConnTCPClient::~MAVConnTCPClient() {
00115         close();
00116 }
00117 
00118 void MAVConnTCPClient::close() {
00119         lock_guard lock(mutex);
00120         if (!is_open())
00121                 return;
00122 
00123         io_work.reset();
00124         io_service.stop();
00125         socket.close();
00126 
00127         // clear tx queue
00128         std::for_each(tx_q.begin(), tx_q.end(),
00129                         [](MsgBuffer *p) { delete p; });
00130         tx_q.clear();
00131 
00132         if (io_thread.joinable())
00133                 io_thread.join();
00134 
00135         /* emit */ port_closed();
00136 }
00137 
00138 void MAVConnTCPClient::send_bytes(const uint8_t *bytes, size_t length)
00139 {
00140         if (!is_open()) {
00141                 logError("tcp%d:send: channel closed!", channel);
00142                 return;
00143         }
00144 
00145         MsgBuffer *buf = new MsgBuffer(bytes, length);
00146         {
00147                 lock_guard lock(mutex);
00148                 tx_q.push_back(buf);
00149         }
00150         socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_send, this, true));
00151 }
00152 
00153 void MAVConnTCPClient::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00154 {
00155         assert(message != nullptr);
00156 
00157         if (!is_open()) {
00158                 logError("tcp%d:send: channel closed!", channel);
00159                 return;
00160         }
00161 
00162         logDebug("tcp%d:send: Message-ID: %d [%d bytes] Sys-Id: %d Comp-Id: %d",
00163                         channel, message->msgid, message->len, sysid, compid);
00164 
00165         MsgBuffer *buf = new_msgbuffer(message, sysid, compid);
00166         {
00167                 lock_guard lock(mutex);
00168                 tx_q.push_back(buf);
00169         }
00170         socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_send, this, true));
00171 }
00172 
00173 void MAVConnTCPClient::do_recv()
00174 {
00175         socket.async_receive(
00176                         buffer(rx_buf, sizeof(rx_buf)),
00177                         boost::bind(&MAVConnTCPClient::async_receive_end,
00178                                 this,
00179                                 boost::asio::placeholders::error,
00180                                 boost::asio::placeholders::bytes_transferred));
00181 }
00182 
00183 void MAVConnTCPClient::async_receive_end(error_code error, size_t bytes_transferred)
00184 {
00185         mavlink_message_t message;
00186         mavlink_status_t status;
00187 
00188         if (error) {
00189                 logError("tcp%d:receive: %s", channel, error.message().c_str());
00190                 close();
00191                 return;
00192         }
00193 
00194         for (ssize_t i = 0; i < bytes_transferred; i++) {
00195                 if (mavlink_parse_char(channel, rx_buf[i], &message, &status)) {
00196                         logDebug("tcp%d:recv: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d",
00197                                         channel, message.msgid, message.len, message.sysid, message.compid);
00198 
00199                         /* emit */ message_received(&message, message.sysid, message.compid);
00200                 }
00201         }
00202 
00203         do_recv();
00204 }
00205 
00206 void MAVConnTCPClient::do_send(bool check_tx_state)
00207 {
00208         if (check_tx_state && tx_in_progress)
00209                 return;
00210 
00211         lock_guard lock(mutex);
00212         if (tx_q.empty())
00213                 return;
00214 
00215         tx_in_progress = true;
00216         MsgBuffer *buf = tx_q.front();
00217         socket.async_send(
00218                         buffer(buf->dpos(), buf->nbytes()),
00219                         boost::bind(&MAVConnTCPClient::async_send_end,
00220                                 this,
00221                                 boost::asio::placeholders::error,
00222                                 boost::asio::placeholders::bytes_transferred));
00223 }
00224 
00225 void MAVConnTCPClient::async_send_end(error_code error, size_t bytes_transferred)
00226 {
00227         if (error) {
00228                 logError("tcp%d:sendto: %s", channel, error.message().c_str());
00229                 close();
00230                 return;
00231         }
00232 
00233         lock_guard lock(mutex);
00234         if (tx_q.empty()) {
00235                 tx_in_progress = false;
00236                 return;
00237         }
00238 
00239         MsgBuffer *buf = tx_q.front();
00240         buf->pos += bytes_transferred;
00241         if (buf->nbytes() == 0) {
00242                 tx_q.pop_front();
00243                 delete buf;
00244         }
00245 
00246         if (!tx_q.empty())
00247                 do_send(false);
00248         else
00249                 tx_in_progress = false;
00250 }
00251 
00252 
00253 /* -*- TCP server variant -*- */
00254 
00255 MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
00256                 std::string server_host, unsigned short server_port) :
00257         MAVConnInterface(system_id, component_id),
00258         io_service(),
00259         acceptor(io_service)
00260 {
00261         if (!resolve_address_tcp(io_service, server_host, server_port, bind_ep))
00262                 throw DeviceError("tcp-l: resolve", "Bind address resolve failed");
00263 
00264         logInform("tcp-l%d: Bind address: %s", channel, to_string_cs(bind_ep));
00265 
00266         try {
00267                 acceptor.open(tcp::v4());
00268                 acceptor.set_option(tcp::acceptor::reuse_address(true));
00269                 acceptor.bind(bind_ep);
00270                 acceptor.listen(channes_available());
00271         }
00272         catch (boost::system::system_error &err) {
00273                 throw DeviceError("tcp-l", err);
00274         }
00275 
00276         // give some work to io_service before start
00277         io_service.post(boost::bind(&MAVConnTCPServer::do_accept, this));
00278 
00279         // run io_service for async io
00280         std::thread t(boost::bind(&io_service::run, &this->io_service));
00281         mavutils::set_thread_name(t, "MAVConnTCPs%d", channel);
00282         io_thread.swap(t);
00283 }
00284 
00285 MAVConnTCPServer::~MAVConnTCPServer() {
00286         close();
00287 }
00288 
00289 void MAVConnTCPServer::close() {
00290         lock_guard lock(mutex);
00291         if (!is_open())
00292                 return;
00293 
00294         logInform("tcp-l%d: Terminating server. "
00295                         "All connections will be closed.", channel);
00296 
00297         io_service.stop();
00298         acceptor.close();
00299 
00300         if (io_thread.joinable())
00301                 io_thread.join();
00302 
00303         /* emit */ port_closed();
00304 }
00305 
00306 void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length)
00307 {
00308         lock_guard lock(mutex);
00309         std::for_each(client_list.begin(), client_list.end(),
00310                         [&](boost::shared_ptr<MAVConnTCPClient> instp) {
00311                 instp->send_bytes(bytes, length);
00312         });
00313 }
00314 
00315 void MAVConnTCPServer::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00316 {
00317         lock_guard lock(mutex);
00318         std::for_each(client_list.begin(), client_list.end(),
00319                         [&](boost::shared_ptr<MAVConnTCPClient> instp) {
00320                 instp->send_message(message, sysid, compid);
00321         });
00322 }
00323 
00324 void MAVConnTCPServer::do_accept()
00325 {
00326         acceptor_client.reset();
00327         acceptor_client = boost::make_shared<MAVConnTCPClient>(sys_id, comp_id, io_service);
00328         acceptor.async_accept(
00329                         acceptor_client->socket,
00330                         acceptor_client->server_ep,
00331                         boost::bind(&MAVConnTCPServer::async_accept_end,
00332                                 this,
00333                                 boost::asio::placeholders::error));
00334 }
00335 
00336 void MAVConnTCPServer::async_accept_end(error_code error)
00337 {
00338         if (error) {
00339                 logError("tcp-l%d:accept: ", channel, error.message().c_str());
00340                 close();
00341                 return;
00342         }
00343 
00344         // NOTE: i want create client class *after* connection accept,
00345         //       but ASIO 1.43 does not support std::move() for sockets.
00346         //       Need find way how to limit channel alloc.
00347         //if (channes_available() <= 0) {
00348         //      ROS_ERROR_NAMED("mavconn", "tcp-l:accept_cb: all channels in use, drop connection");
00349         //      client_sock.close();
00350         //      return;
00351         //}
00352 
00353         lock_guard lock(mutex);
00354         acceptor_client->client_connected(channel);
00355         acceptor_client->message_received.connect(boost::bind(&MAVConnTCPServer::recv_message, this, _1, _2, _3));
00356         acceptor_client->port_closed.connect(boost::bind(&MAVConnTCPServer::client_closed, this,
00357                                 boost::weak_ptr<MAVConnTCPClient>(acceptor_client)));
00358 
00359         client_list.push_back(acceptor_client);
00360         do_accept();
00361 }
00362 
00363 void MAVConnTCPServer::client_closed(boost::weak_ptr<MAVConnTCPClient> weak_instp)
00364 {
00365         if (auto instp = weak_instp.lock()) {
00366                 bool locked = mutex.try_lock();
00367                 logInform("tcp-l%d: Client connection closed, channel: %d, address: %s",
00368                                 channel, instp->channel, to_string_cs(instp->server_ep));
00369 
00370                 client_list.remove(instp);
00371 
00372                 if (locked)
00373                         mutex.unlock();
00374         }
00375 }
00376 
00377 void MAVConnTCPServer::recv_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00378 {
00379         message_received(message, sysid, compid);
00380 }
00381 
00382 }; // namespace mavconn


libmavconn
Author(s): Vladimir Ermakov
autogenerated on Wed Aug 26 2015 12:29:08