00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/io.h"
00036 #include "ros/transport/transport_tcp.h"
00037 #include "ros/poll_set.h"
00038 #include "ros/header.h"
00039 #include "ros/file_log.h"
00040 #include <ros/assert.h>
00041 #include <sstream>
00042 #include <boost/bind.hpp>
00043 #include <fcntl.h>
00044 #include <errno.h>
00045 namespace ros
00046 {
00047
00048 bool TransportTCP::s_use_keepalive_ = true;
00049 bool TransportTCP::s_use_ipv6_ = false;
00050
00051 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
00052 : sock_(ROS_INVALID_SOCKET)
00053 , closed_(false)
00054 , expecting_read_(false)
00055 , expecting_write_(false)
00056 , is_server_(false)
00057 , server_port_(-1)
00058 , poll_set_(poll_set)
00059 , flags_(flags)
00060 {
00061
00062 }
00063
00064 TransportTCP::~TransportTCP()
00065 {
00066 ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
00067 }
00068
00069 bool TransportTCP::setSocket(int sock)
00070 {
00071 sock_ = sock;
00072 return initializeSocket();
00073 }
00074
00075 bool TransportTCP::setNonBlocking()
00076 {
00077 if (!(flags_ & SYNCHRONOUS))
00078 {
00079 int result = set_non_blocking(sock_);
00080 if ( result != 0 ) {
00081 ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00082 close();
00083 return false;
00084 }
00085 }
00086
00087 return true;
00088 }
00089
00090 bool TransportTCP::initializeSocket()
00091 {
00092 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00093
00094 if (!setNonBlocking())
00095 {
00096 return false;
00097 }
00098
00099 setKeepAlive(s_use_keepalive_, 60, 10, 9);
00100
00101
00102 if (cached_remote_host_.empty())
00103 {
00104 if (is_server_)
00105 {
00106 cached_remote_host_ = "TCPServer Socket";
00107 }
00108 else
00109 {
00110 std::stringstream ss;
00111 ss << getClientURI() << " on socket " << sock_;
00112 cached_remote_host_ = ss.str();
00113 }
00114 }
00115
00116 #ifdef ROSCPP_USE_TCP_NODELAY
00117 setNoDelay(true);
00118 #endif
00119
00120 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00121 if (poll_set_)
00122 {
00123 ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
00124 poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
00125 }
00126
00127 if (!(flags_ & SYNCHRONOUS))
00128 {
00129
00130 }
00131
00132 return true;
00133 }
00134
00135 void TransportTCP::parseHeader(const Header& header)
00136 {
00137 std::string nodelay;
00138 if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
00139 {
00140 ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
00141 setNoDelay(true);
00142 }
00143 }
00144
00145 void TransportTCP::setNoDelay(bool nodelay)
00146 {
00147 int flag = nodelay ? 1 : 0;
00148 int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
00149 if (result < 0)
00150 {
00151 ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00152 }
00153 }
00154
00155 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
00156 {
00157 if (use)
00158 {
00159 int val = 1;
00160 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00161 {
00162 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00163 }
00164
00165
00166 #if defined(SOL_TCP) && !defined(__CYGWIN__)
00167 val = idle;
00168 if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
00169 {
00170 ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00171 }
00172
00173 val = interval;
00174 if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
00175 {
00176 ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00177 }
00178
00179 val = count;
00180 if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
00181 {
00182 ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00183 }
00184 #endif
00185 }
00186 else
00187 {
00188 int val = 0;
00189 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00190 {
00191 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00192 }
00193 }
00194 }
00195
00196 bool TransportTCP::connect(const std::string& host, int port)
00197 {
00198 sock_ = socket(s_use_ipv6_ ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
00199 connected_host_ = host;
00200 connected_port_ = port;
00201
00202 if (sock_ == ROS_INVALID_SOCKET)
00203 {
00204 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00205 return false;
00206 }
00207
00208 setNonBlocking();
00209
00210 sockaddr_storage sas;
00211 socklen_t sas_len;
00212
00213 in_addr ina;
00214 in6_addr in6a;
00215 if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
00216 {
00217 sockaddr_in *address = (sockaddr_in*) &sas;
00218 sas_len = sizeof(sockaddr_in);
00219
00220 address->sin_family = AF_INET;
00221 address->sin_port = htons(port);
00222 address->sin_addr.s_addr = ina.s_addr;
00223 }
00224 else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
00225 {
00226 sockaddr_in6 *address = (sockaddr_in6*) &sas;
00227 sas_len = sizeof(sockaddr_in6);
00228 address->sin6_family = AF_INET6;
00229 address->sin6_port = htons(port);
00230 memcpy(address->sin6_addr.s6_addr, in6a.s6_addr, sizeof(in6a.s6_addr));
00231 }
00232 else
00233 {
00234 struct addrinfo* addr;
00235 struct addrinfo hints;
00236 memset(&hints, 0, sizeof(hints));
00237 hints.ai_family = AF_UNSPEC;
00238
00239 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
00240 {
00241 close();
00242 ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
00243 return false;
00244 }
00245
00246 bool found = false;
00247 struct addrinfo* it = addr;
00248 char namebuf[128];
00249 for (; it; it = it->ai_next)
00250 {
00251 if (!s_use_ipv6_ && it->ai_family == AF_INET)
00252 {
00253 sockaddr_in *address = (sockaddr_in*) &sas;
00254 sas_len = sizeof(*address);
00255
00256 memcpy(address, it->ai_addr, it->ai_addrlen);
00257 address->sin_family = it->ai_family;
00258 address->sin_port = htons(port);
00259
00260 strcpy(namebuf, inet_ntoa(address->sin_addr));
00261 found = true;
00262 break;
00263 }
00264 if (s_use_ipv6_ && it->ai_family == AF_INET6)
00265 {
00266 sockaddr_in6 *address = (sockaddr_in6*) &sas;
00267 sas_len = sizeof(*address);
00268
00269 memcpy(address, it->ai_addr, it->ai_addrlen);
00270 address->sin6_family = it->ai_family;
00271 address->sin6_port = htons((u_short) port);
00272
00273
00274 inet_ntop(AF_INET6, (void*)&(address->sin6_addr), namebuf, sizeof(namebuf));
00275 found = true;
00276 break;
00277 }
00278 }
00279
00280 freeaddrinfo(addr);
00281
00282 if (!found)
00283 {
00284 ROS_ERROR("Couldn't resolve an address for [%s]\n", host.c_str());
00285 return false;
00286 }
00287
00288 ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf, sock_);
00289 }
00290
00291 int ret = ::connect(sock_, (sockaddr*) &sas, sas_len);
00292
00293 ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
00294 if (((flags_ & SYNCHRONOUS) && ret != 0) ||
00295 (!(flags_ & SYNCHRONOUS) && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN))
00296 {
00297 ROSCPP_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
00298 close();
00299
00300 return false;
00301 }
00302
00303
00304 #ifdef WIN32
00305
00306
00307 Sleep(100);
00308 #endif
00309
00310
00311 std::stringstream ss;
00312 ss << host << ":" << port << " on socket " << sock_;
00313 cached_remote_host_ = ss.str();
00314
00315 if (!initializeSocket())
00316 {
00317 return false;
00318 }
00319
00320 if (flags_ & SYNCHRONOUS)
00321 {
00322 ROSCPP_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00323 }
00324 else
00325 {
00326 ROSCPP_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00327 }
00328
00329 return true;
00330 }
00331
00332 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
00333 {
00334 is_server_ = true;
00335 accept_cb_ = accept_cb;
00336
00337 if (s_use_ipv6_)
00338 {
00339 sock_ = socket(AF_INET6, SOCK_STREAM, 0);
00340 sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
00341 address->sin6_family = AF_INET6;
00342 address->sin6_addr = in6addr_any;
00343 address->sin6_port = htons(port);
00344 sa_len_ = sizeof(sockaddr_in6);
00345 }
00346 else
00347 {
00348 sock_ = socket(AF_INET, SOCK_STREAM, 0);
00349 sockaddr_in *address = (sockaddr_in *)&server_address_;
00350 address->sin_family = AF_INET;
00351 address->sin_addr.s_addr = INADDR_ANY;
00352 address->sin_port = htons(port);
00353 sa_len_ = sizeof(sockaddr_in);
00354 }
00355
00356 if (sock_ <= 0)
00357 {
00358 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00359 return false;
00360 }
00361
00362
00363 if (bind(sock_, (sockaddr *)&server_address_, sa_len_) < 0)
00364 {
00365 ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
00366 return false;
00367 }
00368
00369 ::listen(sock_, backlog);
00370 getsockname(sock_, (sockaddr *)&server_address_, &sa_len_);
00371
00372 switch (server_address_.ss_family)
00373 {
00374 case AF_INET:
00375 server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
00376 break;
00377 case AF_INET6:
00378 server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
00379 break;
00380 }
00381
00382 if (!initializeSocket())
00383 {
00384 return false;
00385 }
00386
00387 if (!(flags_ & SYNCHRONOUS))
00388 {
00389 enableRead();
00390 }
00391
00392 return true;
00393 }
00394
00395 void TransportTCP::close()
00396 {
00397 Callback disconnect_cb;
00398
00399 if (!closed_)
00400 {
00401 {
00402 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00403
00404 if (!closed_)
00405 {
00406 closed_ = true;
00407
00408 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00409
00410 if (poll_set_)
00411 {
00412 poll_set_->delSocket(sock_);
00413 }
00414
00415 ::shutdown(sock_, ROS_SOCKETS_SHUT_RDWR);
00416 if ( close_socket(sock_) != 0 )
00417 {
00418 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00419 } else
00420 {
00421 ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
00422 }
00423 sock_ = ROS_INVALID_SOCKET;
00424
00425 disconnect_cb = disconnect_cb_;
00426
00427 disconnect_cb_ = Callback();
00428 read_cb_ = Callback();
00429 write_cb_ = Callback();
00430 accept_cb_ = AcceptCallback();
00431 }
00432 }
00433 }
00434
00435 if (disconnect_cb)
00436 {
00437 disconnect_cb(shared_from_this());
00438 }
00439 }
00440
00441 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
00442 {
00443 {
00444 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00445
00446 if (closed_)
00447 {
00448 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00449 return -1;
00450 }
00451 }
00452
00453 ROS_ASSERT(size > 0);
00454
00455
00456 uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
00457 int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), read_size, 0);
00458 if (num_bytes < 0)
00459 {
00460 if ( !last_socket_error_is_would_block() )
00461 {
00462 ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00463 close();
00464 }
00465 else
00466 {
00467 num_bytes = 0;
00468 }
00469 }
00470 else if (num_bytes == 0)
00471 {
00472 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%u bytes, closing", sock_, size);
00473 close();
00474 return -1;
00475 }
00476
00477 return num_bytes;
00478 }
00479
00480 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
00481 {
00482 {
00483 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00484
00485 if (closed_)
00486 {
00487 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00488 return -1;
00489 }
00490 }
00491
00492 ROS_ASSERT(size > 0);
00493
00494
00495 uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
00496 int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
00497 if (num_bytes < 0)
00498 {
00499 if ( !last_socket_error_is_would_block() )
00500 {
00501 ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00502 close();
00503 }
00504 else
00505 {
00506 num_bytes = 0;
00507 }
00508 }
00509
00510 return num_bytes;
00511 }
00512
00513 void TransportTCP::enableRead()
00514 {
00515 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00516
00517 {
00518 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00519
00520 if (closed_)
00521 {
00522 return;
00523 }
00524 }
00525
00526 if (!expecting_read_)
00527 {
00528 poll_set_->addEvents(sock_, POLLIN);
00529 expecting_read_ = true;
00530 }
00531 }
00532
00533 void TransportTCP::disableRead()
00534 {
00535 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00536
00537 {
00538 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00539
00540 if (closed_)
00541 {
00542 return;
00543 }
00544 }
00545
00546 if (expecting_read_)
00547 {
00548 poll_set_->delEvents(sock_, POLLIN);
00549 expecting_read_ = false;
00550 }
00551 }
00552
00553 void TransportTCP::enableWrite()
00554 {
00555 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00556
00557 {
00558 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00559
00560 if (closed_)
00561 {
00562 return;
00563 }
00564 }
00565
00566 if (!expecting_write_)
00567 {
00568 poll_set_->addEvents(sock_, POLLOUT);
00569 expecting_write_ = true;
00570 }
00571 }
00572
00573 void TransportTCP::disableWrite()
00574 {
00575 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00576
00577 {
00578 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00579
00580 if (closed_)
00581 {
00582 return;
00583 }
00584 }
00585
00586 if (expecting_write_)
00587 {
00588 poll_set_->delEvents(sock_, POLLOUT);
00589 expecting_write_ = false;
00590 }
00591 }
00592
00593 TransportTCPPtr TransportTCP::accept()
00594 {
00595 ROS_ASSERT(is_server_);
00596
00597 sockaddr client_address;
00598 socklen_t len = sizeof(client_address);
00599 int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
00600 if (new_sock >= 0)
00601 {
00602 ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
00603
00604 TransportTCPPtr transport(new TransportTCP(poll_set_, flags_));
00605 if (!transport->setSocket(new_sock))
00606 {
00607 ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
00608 }
00609
00610 return transport;
00611 }
00612 else
00613 {
00614 ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00615 }
00616
00617 return TransportTCPPtr();
00618 }
00619
00620 void TransportTCP::socketUpdate(int events)
00621 {
00622 {
00623 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00624 if (closed_)
00625 {
00626 return;
00627 }
00628
00629
00630 if ((events & POLLIN) && expecting_read_)
00631 {
00632 if (is_server_)
00633 {
00634
00635
00636 TransportTCPPtr transport = accept();
00637 if (transport)
00638 {
00639 ROS_ASSERT(accept_cb_);
00640 accept_cb_(transport);
00641 }
00642 }
00643 else
00644 {
00645 if (read_cb_)
00646 {
00647 read_cb_(shared_from_this());
00648 }
00649 }
00650 }
00651
00652 if ((events & POLLOUT) && expecting_write_)
00653 {
00654 if (write_cb_)
00655 {
00656 write_cb_(shared_from_this());
00657 }
00658 }
00659 }
00660
00661 if((events & POLLERR) ||
00662 (events & POLLHUP) ||
00663 (events & POLLNVAL))
00664 {
00665 uint32_t error = -1;
00666 socklen_t len = sizeof(error);
00667 if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
00668 {
00669 ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
00670 }
00671 #ifdef _MSC_VER
00672 char err[60];
00673 strerror_s(err,60,error);
00674 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
00675 #else
00676 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
00677 #endif
00678
00679 close();
00680 }
00681 }
00682
00683 std::string TransportTCP::getTransportInfo()
00684 {
00685 return "TCPROS connection to [" + cached_remote_host_ + "]";
00686 }
00687
00688 std::string TransportTCP::getClientURI()
00689 {
00690 ROS_ASSERT(!is_server_);
00691
00692 sockaddr_storage sas;
00693 socklen_t sas_len = sizeof(sas);
00694 getpeername(sock_, (sockaddr *)&sas, &sas_len);
00695
00696 sockaddr_in *sin = (sockaddr_in *)&sas;
00697 sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
00698
00699 char namebuf[128];
00700 int port;
00701
00702 switch (sas.ss_family)
00703 {
00704 case AF_INET:
00705 port = ntohs(sin->sin_port);
00706 strcpy(namebuf, inet_ntoa(sin->sin_addr));
00707 break;
00708 case AF_INET6:
00709 port = ntohs(sin6->sin6_port);
00710 inet_ntop(AF_INET6, (void*)&(sin6->sin6_addr), namebuf, sizeof(namebuf));
00711 break;
00712 default:
00713 namebuf[0] = 0;
00714 port = 0;
00715 break;
00716 }
00717
00718 std::string ip = namebuf;
00719 std::stringstream uri;
00720 uri << ip << ":" << port;
00721
00722 return uri.str();
00723 }
00724
00725 }