00001
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <cassert>
00019 #include <console_bridge/console.h>
00020
00021 #include <mavconn/thread_utils.h>
00022 #include <mavconn/tcp.h>
00023
00024 namespace mavconn {
00025 using boost::system::error_code;
00026 using boost::asio::io_service;
00027 using boost::asio::ip::tcp;
00028 using boost::asio::buffer;
00029 using mavutils::to_string_ss;
00030 typedef std::lock_guard<std::recursive_mutex> lock_guard;
00031
00032 #define PFXd "mavconn: tcp%d: "
00033
00034
00035 static bool resolve_address_tcp(io_service &io, int chan, std::string host, unsigned short port, tcp::endpoint &ep)
00036 {
00037 bool result = false;
00038 tcp::resolver resolver(io);
00039 error_code ec;
00040
00041 tcp::resolver::query query(host, "");
00042 std::for_each(resolver.resolve(query, ec), tcp::resolver::iterator(),
00043 [&](const tcp::endpoint &q_ep) {
00044 ep = q_ep;
00045 ep.port(port);
00046 result = true;
00047 logDebug(PFXd "host %s resolved as %s", chan, host.c_str(), to_string_ss(ep).c_str());
00048 });
00049
00050 if (ec) {
00051 logWarn(PFXd "resolve error: %s", chan, ec.message().c_str());
00052 result = false;
00053 }
00054
00055 return result;
00056 }
00057
00058
00059
00060
00061 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
00062 std::string server_host, unsigned short server_port) :
00063 MAVConnInterface(system_id, component_id),
00064 tx_in_progress(false),
00065 io_service(),
00066 io_work(new io_service::work(io_service)),
00067 socket(io_service)
00068 {
00069 if (!resolve_address_tcp(io_service, channel, server_host, server_port, server_ep))
00070 throw DeviceError("tcp: resolve", "Bind address resolve failed");
00071
00072 logInform(PFXd "Server address: %s", channel, to_string_ss(server_ep).c_str());
00073
00074 try {
00075 socket.open(tcp::v4());
00076 socket.connect(server_ep);
00077 }
00078 catch (boost::system::system_error &err) {
00079 throw DeviceError("tcp", err);
00080 }
00081
00082
00083 io_service.post(boost::bind(&MAVConnTCPClient::do_recv, this));
00084
00085
00086 std::thread t(boost::bind(&io_service::run, &this->io_service));
00087 mavutils::set_thread_name(t, "MAVConnTCPc%d", channel);
00088 io_thread.swap(t);
00089 }
00090
00091 MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
00092 boost::asio::io_service &server_io) :
00093 MAVConnInterface(system_id, component_id),
00094 socket(server_io)
00095 {
00096
00097 }
00098
00099 void MAVConnTCPClient::client_connected(int server_channel) {
00100 logInform(PFXd "Got client, channel: %d, address: %s",
00101 server_channel, channel, to_string_ss(server_ep).c_str());
00102
00103
00104 socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_recv, this));
00105 }
00106
00107 MAVConnTCPClient::~MAVConnTCPClient() {
00108 close();
00109 }
00110
00111 void MAVConnTCPClient::close() {
00112 lock_guard lock(mutex);
00113 if (!is_open())
00114 return;
00115
00116 io_work.reset();
00117 io_service.stop();
00118 socket.close();
00119
00120
00121 for (auto &p : tx_q)
00122 delete p;
00123 tx_q.clear();
00124
00125 if (io_thread.joinable())
00126 io_thread.join();
00127
00128 port_closed();
00129 }
00130
00131 void MAVConnTCPClient::send_bytes(const uint8_t *bytes, size_t length)
00132 {
00133 if (!is_open()) {
00134 logError(PFXd "send: channel closed!", channel);
00135 return;
00136 }
00137
00138 MsgBuffer *buf = new MsgBuffer(bytes, length);
00139 {
00140 lock_guard lock(mutex);
00141 tx_q.push_back(buf);
00142 }
00143 socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_send, this, true));
00144 }
00145
00146 void MAVConnTCPClient::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00147 {
00148 assert(message != nullptr);
00149
00150 if (!is_open()) {
00151 logError(PFXd "send: channel closed!", channel);
00152 return;
00153 }
00154
00155 logDebug(PFXd "send: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d Seq: %d",
00156 channel, message->msgid, message->len, sysid, compid, message->seq);
00157
00158 MsgBuffer *buf = new_msgbuffer(message, sysid, compid);
00159 {
00160 lock_guard lock(mutex);
00161 tx_q.push_back(buf);
00162 }
00163 socket.get_io_service().post(boost::bind(&MAVConnTCPClient::do_send, this, true));
00164 }
00165
00166 void MAVConnTCPClient::do_recv()
00167 {
00168 socket.async_receive(
00169 buffer(rx_buf, sizeof(rx_buf)),
00170 boost::bind(&MAVConnTCPClient::async_receive_end,
00171 this,
00172 boost::asio::placeholders::error,
00173 boost::asio::placeholders::bytes_transferred));
00174 }
00175
00176 void MAVConnTCPClient::async_receive_end(error_code error, size_t bytes_transferred)
00177 {
00178 mavlink_message_t message;
00179 mavlink_status_t status;
00180
00181 if (error) {
00182 logError(PFXd "receive: %s", channel, error.message().c_str());
00183 close();
00184 return;
00185 }
00186
00187 iostat_rx_add(bytes_transferred);
00188 for (size_t i = 0; i < bytes_transferred; i++) {
00189 if (mavlink_parse_char(channel, rx_buf[i], &message, &status)) {
00190 logDebug(PFXd "recv: Message-Id: %d [%d bytes] Sys-Id: %d Comp-Id: %d Seq: %d",
00191 channel, message.msgid, message.len, message.sysid, message.compid, message.seq);
00192
00193 message_received(&message, message.sysid, message.compid);
00194 }
00195 }
00196
00197 do_recv();
00198 }
00199
00200 void MAVConnTCPClient::do_send(bool check_tx_state)
00201 {
00202 if (check_tx_state && tx_in_progress)
00203 return;
00204
00205 lock_guard lock(mutex);
00206 if (tx_q.empty())
00207 return;
00208
00209 tx_in_progress = true;
00210 MsgBuffer *buf = tx_q.front();
00211 socket.async_send(
00212 buffer(buf->dpos(), buf->nbytes()),
00213 boost::bind(&MAVConnTCPClient::async_send_end,
00214 this,
00215 boost::asio::placeholders::error,
00216 boost::asio::placeholders::bytes_transferred));
00217 }
00218
00219 void MAVConnTCPClient::async_send_end(error_code error, size_t bytes_transferred)
00220 {
00221 if (error) {
00222 logError(PFXd "send: %s", channel, error.message().c_str());
00223 close();
00224 return;
00225 }
00226
00227 iostat_tx_add(bytes_transferred);
00228 lock_guard lock(mutex);
00229 if (tx_q.empty()) {
00230 tx_in_progress = false;
00231 return;
00232 }
00233
00234 MsgBuffer *buf = tx_q.front();
00235 buf->pos += bytes_transferred;
00236 if (buf->nbytes() == 0) {
00237 tx_q.pop_front();
00238 delete buf;
00239 }
00240
00241 if (!tx_q.empty())
00242 do_send(false);
00243 else
00244 tx_in_progress = false;
00245 }
00246
00247
00248
00249
00250 MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
00251 std::string server_host, unsigned short server_port) :
00252 MAVConnInterface(system_id, component_id),
00253 io_service(),
00254 acceptor(io_service)
00255 {
00256 if (!resolve_address_tcp(io_service, channel, server_host, server_port, bind_ep))
00257 throw DeviceError("tcp-l: resolve", "Bind address resolve failed");
00258
00259 logInform(PFXd "Bind address: %s", channel, to_string_ss(bind_ep).c_str());
00260
00261 try {
00262 acceptor.open(tcp::v4());
00263 acceptor.set_option(tcp::acceptor::reuse_address(true));
00264 acceptor.bind(bind_ep);
00265 acceptor.listen(channes_available());
00266 }
00267 catch (boost::system::system_error &err) {
00268 throw DeviceError("tcp-l", err);
00269 }
00270
00271
00272 io_service.post(boost::bind(&MAVConnTCPServer::do_accept, this));
00273
00274
00275 std::thread t(boost::bind(&io_service::run, &this->io_service));
00276 mavutils::set_thread_name(t, "MAVConnTCPs%d", channel);
00277 io_thread.swap(t);
00278 }
00279
00280 MAVConnTCPServer::~MAVConnTCPServer() {
00281 close();
00282 }
00283
00284 void MAVConnTCPServer::close() {
00285 lock_guard lock(mutex);
00286 if (!is_open())
00287 return;
00288
00289 logInform(PFXd "Terminating server. "
00290 "All connections will be closed.", channel);
00291
00292 io_service.stop();
00293 acceptor.close();
00294
00295 if (io_thread.joinable())
00296 io_thread.join();
00297
00298 port_closed();
00299 }
00300
00301 mavlink_status_t MAVConnTCPServer::get_status()
00302 {
00303 mavlink_status_t status{};
00304
00305 lock_guard lock(mutex);
00306 for (auto &instp : client_list) {
00307 auto inst_status = instp->get_status();
00308
00309 #define ADD_STATUS(_field) \
00310 status._field += inst_status._field
00311
00312 ADD_STATUS(packet_rx_success_count);
00313 ADD_STATUS(packet_rx_drop_count);
00314 ADD_STATUS(buffer_overrun);
00315 ADD_STATUS(parse_error);
00316
00317
00318 #undef ADD_STATUS
00319 };
00320
00321 return status;
00322 }
00323
00324 MAVConnInterface::IOStat MAVConnTCPServer::get_iostat()
00325 {
00326 MAVConnInterface::IOStat iostat{};
00327
00328 lock_guard lock(mutex);
00329 for (auto &instp : client_list) {
00330 auto inst_iostat = instp->get_iostat();
00331
00332 #define ADD_IOSTAT(_field) \
00333 iostat._field += inst_iostat._field
00334
00335 ADD_IOSTAT(tx_total_bytes);
00336 ADD_IOSTAT(rx_total_bytes);
00337 ADD_IOSTAT(tx_speed);
00338 ADD_IOSTAT(rx_speed);
00339
00340 #undef ADD_IOSTAT
00341 };
00342
00343 return iostat;
00344 }
00345
00346 void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length)
00347 {
00348 lock_guard lock(mutex);
00349 for (auto &instp : client_list) {
00350 instp->send_bytes(bytes, length);
00351 }
00352 }
00353
00354 void MAVConnTCPServer::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00355 {
00356 lock_guard lock(mutex);
00357 for (auto &instp : client_list) {
00358 instp->send_message(message, sysid, compid);
00359 }
00360 }
00361
00362 void MAVConnTCPServer::do_accept()
00363 {
00364 acceptor_client.reset();
00365 acceptor_client = boost::make_shared<MAVConnTCPClient>(sys_id, comp_id, io_service);
00366 acceptor.async_accept(
00367 acceptor_client->socket,
00368 acceptor_client->server_ep,
00369 boost::bind(&MAVConnTCPServer::async_accept_end,
00370 this,
00371 boost::asio::placeholders::error));
00372 }
00373
00374 void MAVConnTCPServer::async_accept_end(error_code error)
00375 {
00376 if (error) {
00377 logError(PFXd "accept: %s", channel, error.message().c_str());
00378 close();
00379 return;
00380 }
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391 lock_guard lock(mutex);
00392 acceptor_client->client_connected(channel);
00393 acceptor_client->message_received.connect(boost::bind(&MAVConnTCPServer::recv_message, this, _1, _2, _3));
00394 acceptor_client->port_closed.connect(boost::bind(&MAVConnTCPServer::client_closed, this,
00395 boost::weak_ptr<MAVConnTCPClient>(acceptor_client)));
00396
00397 client_list.push_back(acceptor_client);
00398 do_accept();
00399 }
00400
00401 void MAVConnTCPServer::client_closed(boost::weak_ptr<MAVConnTCPClient> weak_instp)
00402 {
00403 if (auto instp = weak_instp.lock()) {
00404 bool locked = mutex.try_lock();
00405 logInform(PFXd "Client connection closed, channel: %d, address: %s",
00406 channel, instp->channel, to_string_ss(instp->server_ep).c_str());
00407
00408 client_list.remove(instp);
00409
00410 if (locked)
00411 mutex.unlock();
00412 }
00413 }
00414
00415 void MAVConnTCPServer::recv_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
00416 {
00417
00418 message_received(message, sysid, compid);
00419 }
00420
00421 };