00001
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include <cassert>
00028 #include <console_bridge/console.h>
00029
00030 #include <mavconn/thread_utils.h>
00031 #include <mavconn/tcp.h>
00032
00033 namespace mavconn {
00034 using boost::system::error_code;
00035 using boost::asio::io_service;
00036 using boost::asio::ip::tcp;
00037 using boost::asio::buffer;
00038 using mavutils::to_string_cs;
00039 typedef std::lock_guard<std::recursive_mutex> lock_guard;
00040
00041
00042 static bool resolve_address_tcp(io_service &io, std::string host, unsigned short port, tcp::endpoint &ep)
00043 {
00044 bool result = false;
00045 tcp::resolver resolver(io);
00046 error_code ec;
00047
00048 tcp::resolver::query query(host, "");
00049 std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(),
00050 [&](const tcp::endpoint &q_ep) {
00051 ep = q_ep;
00052 ep.port(port);
00053 result = true;
00054 logDebug("tcp: host %s resolved as %s", host.c_str(), to_string_cs(ep));
00055 });
00056
00057 if (ec) {
00058 logWarn("tcp: resolve error: %s", ec.message().c_str());
00059 result = false;
00060 }
00061
00062 return result;
00063 }
00064
00065
00066
00067
00068 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
00069 std::string server_host, unsigned short server_port) :
00070 MAVConnInterface(system_id, component_id),
00071 tx_in_progress(false),
00072 io_service(),
00073 io_work(new io_service::work(io_service)),
00074 socket(io_service)
00075 {
00076 if (!resolve_address_tcp(io_service, server_host, server_port, server_ep))
00077 throw DeviceError("tcp: resolve", "Bind address resolve failed");
00078
00079 logInform("tcp%d: Server address: %s", channel, to_string_cs(server_ep));
00080
00081 try {
00082 socket.open(tcp::v4());
00083 socket.connect(server_ep);
00084 }
00085 catch (boost::system::system_error &err) {
00086 throw DeviceError("tcp", err);
00087 }
00088
00089
00090 io_service.post(boost::bind(&MAVConnTCPClient::do_recv, this));
00091
00092
00093 std::thread t(boost::bind(&io_service::run, &this->io_service));
00094 mavutils::set_thread_name(t, "MAVConnTCPc%d", channel);
00095 io_thread.swap(t);
00096 }
00097
00098 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
00099 boost::asio::io_service &server_io) :
00100 MAVConnInterface(system_id, component_id),
00101 socket(server_io)
00102 {
00103
00104 }
00105
00106 void MAVConnTCPClient::client_connected(int server_channel) {
00107 logInform("tcp-l%d: Got client, channel: %d, address: %s",
00108 server_channel, channel, to_string_cs(server_ep));
00109
00110
00111 socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_recv, this));
00112 }
00113
00114 MAVConnTCPClient::~MAVConnTCPClient() {
00115 close();
00116 }
00117
00118 void MAVConnTCPClient::close() {
00119 lock_guard lock(mutex);
00120 if (!is_open())
00121 return;
00122
00123 io_work.reset();
00124 io_service.stop();
00125 socket.close();
00126
00127
00128 std::for_each(tx_q.begin(), tx_q.end(),
00129 [](MsgBuffer *p) { delete p; });
00130 tx_q.clear();
00131
00132 if (io_thread.joinable())
00133 io_thread.join();
00134
00135 port_closed();
00136 }
00137
00138 void MAVConnTCPClient::send_bytes(const uint8_t *bytes, size_t length)
00139 {
00140 if (!is_open()) {
00141 logError("tcp%d:send: channel closed!", channel);
00142 return;
00143 }
00144
00145 MsgBuffer *buf = new MsgBuffer(bytes, length);
00146 {
00147 lock_guard lock(mutex);
00148 tx_q.push_back(buf);
00149 }
00150 socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_send, this, true));
00151 }
00152
00153 void MAVConnTCPClient::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00154 {
00155 assert(message != nullptr);
00156
00157 if (!is_open()) {
00158 logError("tcp%d:send: channel closed!", channel);
00159 return;
00160 }
00161
00162 logDebug("tcp%d:send: Message-ID: %d [%d bytes] Sys-Id: %d Comp-Id: %d",
00163 channel, message->msgid, message->len, sysid, compid);
00164
00165 MsgBuffer *buf = new_msgbuffer(message, sysid, compid);
00166 {
00167 lock_guard lock(mutex);
00168 tx_q.push_back(buf);
00169 }
00170 socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_send, this, true));
00171 }
00172
00173 void MAVConnTCPClient::do_recv()
00174 {
00175 socket.async_receive(
00176 buffer(rx_buf, sizeof(rx_buf)),
00177 boost::bind(&MAVConnTCPClient::async_receive_end,
00178 this,
00179 boost::asio::placeholders::error,
00180 boost::asio::placeholders::bytes_transferred));
00181 }
00182
00183 void MAVConnTCPClient::async_receive_end(error_code error, size_t bytes_transferred)
00184 {
00185 mavlink_message_t message;
00186 mavlink_status_t status;
00187
00188 if (error) {
00189 logError("tcp%d:receive: %s", channel, error.message().c_str());
00190 close();
00191 return;
00192 }
00193
00194 for (ssize_t i = 0; i < bytes_transferred; i++) {
00195 if (mavlink_parse_char(channel, rx_buf[i], &message, &status)) {
00196 logDebug("tcp%d:recv: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d",
00197 channel, message.msgid, message.len, message.sysid, message.compid);
00198
00199 message_received(&message, message.sysid, message.compid);
00200 }
00201 }
00202
00203 do_recv();
00204 }
00205
00206 void MAVConnTCPClient::do_send(bool check_tx_state)
00207 {
00208 if (check_tx_state && tx_in_progress)
00209 return;
00210
00211 lock_guard lock(mutex);
00212 if (tx_q.empty())
00213 return;
00214
00215 tx_in_progress = true;
00216 MsgBuffer *buf = tx_q.front();
00217 socket.async_send(
00218 buffer(buf->dpos(), buf->nbytes()),
00219 boost::bind(&MAVConnTCPClient::async_send_end,
00220 this,
00221 boost::asio::placeholders::error,
00222 boost::asio::placeholders::bytes_transferred));
00223 }
00224
00225 void MAVConnTCPClient::async_send_end(error_code error, size_t bytes_transferred)
00226 {
00227 if (error) {
00228 logError("tcp%d:sendto: %s", channel, error.message().c_str());
00229 close();
00230 return;
00231 }
00232
00233 lock_guard lock(mutex);
00234 if (tx_q.empty()) {
00235 tx_in_progress = false;
00236 return;
00237 }
00238
00239 MsgBuffer *buf = tx_q.front();
00240 buf->pos += bytes_transferred;
00241 if (buf->nbytes() == 0) {
00242 tx_q.pop_front();
00243 delete buf;
00244 }
00245
00246 if (!tx_q.empty())
00247 do_send(false);
00248 else
00249 tx_in_progress = false;
00250 }
00251
00252
00253
00254
00255 MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
00256 std::string server_host, unsigned short server_port) :
00257 MAVConnInterface(system_id, component_id),
00258 io_service(),
00259 acceptor(io_service)
00260 {
00261 if (!resolve_address_tcp(io_service, server_host, server_port, bind_ep))
00262 throw DeviceError("tcp-l: resolve", "Bind address resolve failed");
00263
00264 logInform("tcp-l%d: Bind address: %s", channel, to_string_cs(bind_ep));
00265
00266 try {
00267 acceptor.open(tcp::v4());
00268 acceptor.set_option(tcp::acceptor::reuse_address(true));
00269 acceptor.bind(bind_ep);
00270 acceptor.listen(channes_available());
00271 }
00272 catch (boost::system::system_error &err) {
00273 throw DeviceError("tcp-l", err);
00274 }
00275
00276
00277 io_service.post(boost::bind(&MAVConnTCPServer::do_accept, this));
00278
00279
00280 std::thread t(boost::bind(&io_service::run, &this->io_service));
00281 mavutils::set_thread_name(t, "MAVConnTCPs%d", channel);
00282 io_thread.swap(t);
00283 }
00284
00285 MAVConnTCPServer::~MAVConnTCPServer() {
00286 close();
00287 }
00288
00289 void MAVConnTCPServer::close() {
00290 lock_guard lock(mutex);
00291 if (!is_open())
00292 return;
00293
00294 logInform("tcp-l%d: Terminating server. "
00295 "All connections will be closed.", channel);
00296
00297 io_service.stop();
00298 acceptor.close();
00299
00300 if (io_thread.joinable())
00301 io_thread.join();
00302
00303 port_closed();
00304 }
00305
00306 void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length)
00307 {
00308 lock_guard lock(mutex);
00309 std::for_each(client_list.begin(), client_list.end(),
00310 [&](boost::shared_ptr<MAVConnTCPClient> instp) {
00311 instp->send_bytes(bytes, length);
00312 });
00313 }
00314
00315 void MAVConnTCPServer::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00316 {
00317 lock_guard lock(mutex);
00318 std::for_each(client_list.begin(), client_list.end(),
00319 [&](boost::shared_ptr<MAVConnTCPClient> instp) {
00320 instp->send_message(message, sysid, compid);
00321 });
00322 }
00323
00324 void MAVConnTCPServer::do_accept()
00325 {
00326 acceptor_client.reset();
00327 acceptor_client = boost::make_shared<MAVConnTCPClient>(sys_id, comp_id, io_service);
00328 acceptor.async_accept(
00329 acceptor_client->socket,
00330 acceptor_client->server_ep,
00331 boost::bind(&MAVConnTCPServer::async_accept_end,
00332 this,
00333 boost::asio::placeholders::error));
00334 }
00335
00336 void MAVConnTCPServer::async_accept_end(error_code error)
00337 {
00338 if (error) {
00339 logError("tcp-l%d:accept: ", channel, error.message().c_str());
00340 close();
00341 return;
00342 }
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353 lock_guard lock(mutex);
00354 acceptor_client->client_connected(channel);
00355 acceptor_client->message_received.connect(boost::bind(&MAVConnTCPServer::recv_message, this, _1, _2, _3));
00356 acceptor_client->port_closed.connect(boost::bind(&MAVConnTCPServer::client_closed, this,
00357 boost::weak_ptr<MAVConnTCPClient>(acceptor_client)));
00358
00359 client_list.push_back(acceptor_client);
00360 do_accept();
00361 }
00362
00363 void MAVConnTCPServer::client_closed(boost::weak_ptr<MAVConnTCPClient> weak_instp)
00364 {
00365 if (auto instp = weak_instp.lock()) {
00366 bool locked = mutex.try_lock();
00367 logInform("tcp-l%d: Client connection closed, channel: %d, address: %s",
00368 channel, instp->channel, to_string_cs(instp->server_ep));
00369
00370 client_list.remove(instp);
00371
00372 if (locked)
00373 mutex.unlock();
00374 }
00375 }
00376
00377 void MAVConnTCPServer::recv_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00378 {
00379 message_received(message, sysid, compid);
00380 }
00381
00382 };