00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/io.h"
00036 #include "ros/transport/transport_tcp.h"
00037 #include "ros/poll_set.h"
00038 #include "ros/header.h"
00039 #include "ros/file_log.h"
00040 #include <ros/assert.h>
00041 #include <sstream>
00042 #include <boost/bind.hpp>
00043 #include <fcntl.h>
00044 #include <errno.h>
00045 namespace ros
00046 {
00047
00048 bool TransportTCP::s_use_keepalive_ = true;
00049 bool TransportTCP::s_use_ipv6_ = false;
00050
00051 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
00052 : sock_(ROS_INVALID_SOCKET)
00053 , closed_(false)
00054 , expecting_read_(false)
00055 , expecting_write_(false)
00056 , is_server_(false)
00057 , server_port_(-1)
00058 , local_port_(-1)
00059 , poll_set_(poll_set)
00060 , flags_(flags)
00061 {
00062
00063 }
00064
00065 TransportTCP::~TransportTCP()
00066 {
00067 ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
00068 }
00069
00070 bool TransportTCP::setSocket(int sock)
00071 {
00072 sock_ = sock;
00073 return initializeSocket();
00074 }
00075
00076 bool TransportTCP::setNonBlocking()
00077 {
00078 if (!(flags_ & SYNCHRONOUS))
00079 {
00080 int result = set_non_blocking(sock_);
00081 if ( result != 0 ) {
00082 ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00083 close();
00084 return false;
00085 }
00086 }
00087
00088 return true;
00089 }
00090
00091 bool TransportTCP::initializeSocket()
00092 {
00093 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00094
00095 if (!setNonBlocking())
00096 {
00097 return false;
00098 }
00099
00100 setKeepAlive(s_use_keepalive_, 60, 10, 9);
00101
00102
00103 if (cached_remote_host_.empty())
00104 {
00105 if (is_server_)
00106 {
00107 cached_remote_host_ = "TCPServer Socket";
00108 }
00109 else
00110 {
00111 std::stringstream ss;
00112 ss << getClientURI() << " on socket " << sock_;
00113 cached_remote_host_ = ss.str();
00114 }
00115 }
00116
00117 if (local_port_ < 0)
00118 {
00119 la_len_ = s_use_ipv6_ ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
00120 getsockname(sock_, (sockaddr *)&local_address_, &la_len_);
00121 switch (local_address_.ss_family)
00122 {
00123 case AF_INET:
00124 local_port_ = ntohs(((sockaddr_in *)&local_address_)->sin_port);
00125 break;
00126 case AF_INET6:
00127 local_port_ = ntohs(((sockaddr_in6 *)&local_address_)->sin6_port);
00128 break;
00129 }
00130 }
00131
00132 #ifdef ROSCPP_USE_TCP_NODELAY
00133 setNoDelay(true);
00134 #endif
00135
00136 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00137 if (poll_set_)
00138 {
00139 ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
00140 poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
00141 }
00142
00143 if (!(flags_ & SYNCHRONOUS))
00144 {
00145
00146 }
00147
00148 return true;
00149 }
00150
00151 void TransportTCP::parseHeader(const Header& header)
00152 {
00153 std::string nodelay;
00154 if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
00155 {
00156 ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
00157 setNoDelay(true);
00158 }
00159 }
00160
00161 void TransportTCP::setNoDelay(bool nodelay)
00162 {
00163 int flag = nodelay ? 1 : 0;
00164 int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
00165 if (result < 0)
00166 {
00167 ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00168 }
00169 }
00170
00171 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
00172 {
00173 if (use)
00174 {
00175 int val = 1;
00176 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00177 {
00178 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00179 }
00180
00181
00182 #if defined(SOL_TCP) && !defined(__CYGWIN__)
00183 val = idle;
00184 if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
00185 {
00186 ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00187 }
00188
00189 val = interval;
00190 if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
00191 {
00192 ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00193 }
00194
00195 val = count;
00196 if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
00197 {
00198 ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00199 }
00200 #endif
00201 }
00202 else
00203 {
00204 int val = 0;
00205 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00206 {
00207 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00208 }
00209 }
00210 }
00211
00212 bool TransportTCP::connect(const std::string& host, int port)
00213 {
00214 if (!isHostAllowed(host))
00215 return false;
00216
00217 sock_ = socket(s_use_ipv6_ ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
00218 connected_host_ = host;
00219 connected_port_ = port;
00220
00221 if (sock_ == ROS_INVALID_SOCKET)
00222 {
00223 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00224 return false;
00225 }
00226
00227 setNonBlocking();
00228
00229 sockaddr_storage sas;
00230 socklen_t sas_len;
00231
00232 in_addr ina;
00233 in6_addr in6a;
00234 if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
00235 {
00236 sockaddr_in *address = (sockaddr_in*) &sas;
00237 sas_len = sizeof(sockaddr_in);
00238
00239 la_len_ = sizeof(sockaddr_in);
00240 address->sin_family = AF_INET;
00241 address->sin_port = htons(port);
00242 address->sin_addr.s_addr = ina.s_addr;
00243 }
00244 else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
00245 {
00246 sockaddr_in6 *address = (sockaddr_in6*) &sas;
00247 sas_len = sizeof(sockaddr_in6);
00248 la_len_ = sizeof(sockaddr_in6);
00249 address->sin6_family = AF_INET6;
00250 address->sin6_port = htons(port);
00251 memcpy(address->sin6_addr.s6_addr, in6a.s6_addr, sizeof(in6a.s6_addr));
00252 }
00253 else
00254 {
00255 struct addrinfo* addr;
00256 struct addrinfo hints;
00257 memset(&hints, 0, sizeof(hints));
00258 hints.ai_family = AF_UNSPEC;
00259
00260 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
00261 {
00262 close();
00263 ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
00264 return false;
00265 }
00266
00267 bool found = false;
00268 struct addrinfo* it = addr;
00269 char namebuf[128];
00270 for (; it; it = it->ai_next)
00271 {
00272 if (!s_use_ipv6_ && it->ai_family == AF_INET)
00273 {
00274 sockaddr_in *address = (sockaddr_in*) &sas;
00275 sas_len = sizeof(*address);
00276
00277 memcpy(address, it->ai_addr, it->ai_addrlen);
00278 address->sin_family = it->ai_family;
00279 address->sin_port = htons(port);
00280
00281 strcpy(namebuf, inet_ntoa(address->sin_addr));
00282 found = true;
00283 break;
00284 }
00285 if (s_use_ipv6_ && it->ai_family == AF_INET6)
00286 {
00287 sockaddr_in6 *address = (sockaddr_in6*) &sas;
00288 sas_len = sizeof(*address);
00289
00290 memcpy(address, it->ai_addr, it->ai_addrlen);
00291 address->sin6_family = it->ai_family;
00292 address->sin6_port = htons((u_short) port);
00293
00294
00295 inet_ntop(AF_INET6, (void*)&(address->sin6_addr), namebuf, sizeof(namebuf));
00296 found = true;
00297 break;
00298 }
00299 }
00300
00301 freeaddrinfo(addr);
00302
00303 if (!found)
00304 {
00305 ROS_ERROR("Couldn't resolve an address for [%s]\n", host.c_str());
00306 return false;
00307 }
00308
00309 ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf, sock_);
00310 }
00311
00312 int ret = ::connect(sock_, (sockaddr*) &sas, sas_len);
00313
00314 ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
00315 if (((flags_ & SYNCHRONOUS) && ret != 0) ||
00316 (!(flags_ & SYNCHRONOUS) && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN))
00317 {
00318 ROSCPP_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
00319 close();
00320
00321 return false;
00322 }
00323
00324
00325 #ifdef WIN32
00326
00327
00328 Sleep(100);
00329 #endif
00330
00331
00332 std::stringstream ss;
00333 ss << host << ":" << port << " on socket " << sock_;
00334 cached_remote_host_ = ss.str();
00335
00336 if (!initializeSocket())
00337 {
00338 return false;
00339 }
00340
00341 if (flags_ & SYNCHRONOUS)
00342 {
00343 ROSCPP_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00344 }
00345 else
00346 {
00347 ROSCPP_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00348 }
00349
00350 return true;
00351 }
00352
00353 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
00354 {
00355 is_server_ = true;
00356 accept_cb_ = accept_cb;
00357
00358 if (s_use_ipv6_)
00359 {
00360 sock_ = socket(AF_INET6, SOCK_STREAM, 0);
00361 sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
00362 address->sin6_family = AF_INET6;
00363 address->sin6_addr = isOnlyLocalhostAllowed() ?
00364 in6addr_loopback :
00365 in6addr_any;
00366 address->sin6_port = htons(port);
00367 sa_len_ = sizeof(sockaddr_in6);
00368 }
00369 else
00370 {
00371 sock_ = socket(AF_INET, SOCK_STREAM, 0);
00372 sockaddr_in *address = (sockaddr_in *)&server_address_;
00373 address->sin_family = AF_INET;
00374 address->sin_addr.s_addr = isOnlyLocalhostAllowed() ?
00375 htonl(INADDR_LOOPBACK) :
00376 INADDR_ANY;
00377 address->sin_port = htons(port);
00378 sa_len_ = sizeof(sockaddr_in);
00379 }
00380
00381 if (sock_ <= 0)
00382 {
00383 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00384 return false;
00385 }
00386
00387
00388 if (bind(sock_, (sockaddr *)&server_address_, sa_len_) < 0)
00389 {
00390 ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
00391 return false;
00392 }
00393
00394 ::listen(sock_, backlog);
00395 getsockname(sock_, (sockaddr *)&server_address_, &sa_len_);
00396
00397 switch (server_address_.ss_family)
00398 {
00399 case AF_INET:
00400 server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
00401 break;
00402 case AF_INET6:
00403 server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
00404 break;
00405 }
00406
00407 if (!initializeSocket())
00408 {
00409 return false;
00410 }
00411
00412 if (!(flags_ & SYNCHRONOUS))
00413 {
00414 enableRead();
00415 }
00416
00417 return true;
00418 }
00419
00420 void TransportTCP::close()
00421 {
00422 Callback disconnect_cb;
00423
00424 if (!closed_)
00425 {
00426 {
00427 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00428
00429 if (!closed_)
00430 {
00431 closed_ = true;
00432
00433 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00434
00435 if (poll_set_)
00436 {
00437 poll_set_->delSocket(sock_);
00438 }
00439
00440 ::shutdown(sock_, ROS_SOCKETS_SHUT_RDWR);
00441 if ( close_socket(sock_) != 0 )
00442 {
00443 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00444 } else
00445 {
00446 ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
00447 }
00448 sock_ = ROS_INVALID_SOCKET;
00449
00450 disconnect_cb = disconnect_cb_;
00451
00452 disconnect_cb_ = Callback();
00453 read_cb_ = Callback();
00454 write_cb_ = Callback();
00455 accept_cb_ = AcceptCallback();
00456 }
00457 }
00458 }
00459
00460 if (disconnect_cb)
00461 {
00462 disconnect_cb(shared_from_this());
00463 }
00464 }
00465
00466 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
00467 {
00468 {
00469 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00470
00471 if (closed_)
00472 {
00473 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00474 return -1;
00475 }
00476 }
00477
00478 ROS_ASSERT(size > 0);
00479
00480
00481 uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
00482 int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), read_size, 0);
00483 if (num_bytes < 0)
00484 {
00485 if ( !last_socket_error_is_would_block() )
00486 {
00487 ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00488 close();
00489 }
00490 else
00491 {
00492 num_bytes = 0;
00493 }
00494 }
00495 else if (num_bytes == 0)
00496 {
00497 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%u bytes, closing", sock_, size);
00498 close();
00499 return -1;
00500 }
00501
00502 return num_bytes;
00503 }
00504
00505 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
00506 {
00507 {
00508 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00509
00510 if (closed_)
00511 {
00512 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00513 return -1;
00514 }
00515 }
00516
00517 ROS_ASSERT(size > 0);
00518
00519
00520 uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
00521 int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
00522 if (num_bytes < 0)
00523 {
00524 if ( !last_socket_error_is_would_block() )
00525 {
00526 ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00527 close();
00528 }
00529 else
00530 {
00531 num_bytes = 0;
00532 }
00533 }
00534
00535 return num_bytes;
00536 }
00537
00538 void TransportTCP::enableRead()
00539 {
00540 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00541
00542 {
00543 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00544
00545 if (closed_)
00546 {
00547 return;
00548 }
00549 }
00550
00551 if (!expecting_read_)
00552 {
00553 poll_set_->addEvents(sock_, POLLIN);
00554 expecting_read_ = true;
00555 }
00556 }
00557
00558 void TransportTCP::disableRead()
00559 {
00560 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00561
00562 {
00563 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00564
00565 if (closed_)
00566 {
00567 return;
00568 }
00569 }
00570
00571 if (expecting_read_)
00572 {
00573 poll_set_->delEvents(sock_, POLLIN);
00574 expecting_read_ = false;
00575 }
00576 }
00577
00578 void TransportTCP::enableWrite()
00579 {
00580 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00581
00582 {
00583 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00584
00585 if (closed_)
00586 {
00587 return;
00588 }
00589 }
00590
00591 if (!expecting_write_)
00592 {
00593 poll_set_->addEvents(sock_, POLLOUT);
00594 expecting_write_ = true;
00595 }
00596 }
00597
00598 void TransportTCP::disableWrite()
00599 {
00600 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00601
00602 {
00603 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00604
00605 if (closed_)
00606 {
00607 return;
00608 }
00609 }
00610
00611 if (expecting_write_)
00612 {
00613 poll_set_->delEvents(sock_, POLLOUT);
00614 expecting_write_ = false;
00615 }
00616 }
00617
00618 TransportTCPPtr TransportTCP::accept()
00619 {
00620 ROS_ASSERT(is_server_);
00621
00622 sockaddr client_address;
00623 socklen_t len = sizeof(client_address);
00624 int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
00625 if (new_sock >= 0)
00626 {
00627 ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
00628
00629 TransportTCPPtr transport(boost::make_shared<TransportTCP>(poll_set_, flags_));
00630 if (!transport->setSocket(new_sock))
00631 {
00632 ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
00633 }
00634
00635 return transport;
00636 }
00637 else
00638 {
00639 ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00640 }
00641
00642 return TransportTCPPtr();
00643 }
00644
00645 void TransportTCP::socketUpdate(int events)
00646 {
00647 {
00648 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00649 if (closed_)
00650 {
00651 return;
00652 }
00653
00654
00655 if ((events & POLLIN) && expecting_read_)
00656 {
00657 if (is_server_)
00658 {
00659
00660
00661 TransportTCPPtr transport = accept();
00662 if (transport)
00663 {
00664 ROS_ASSERT(accept_cb_);
00665 accept_cb_(transport);
00666 }
00667 }
00668 else
00669 {
00670 if (read_cb_)
00671 {
00672 read_cb_(shared_from_this());
00673 }
00674 }
00675 }
00676
00677 if ((events & POLLOUT) && expecting_write_)
00678 {
00679 if (write_cb_)
00680 {
00681 write_cb_(shared_from_this());
00682 }
00683 }
00684 }
00685
00686 if((events & POLLERR) ||
00687 (events & POLLHUP) ||
00688 (events & POLLNVAL))
00689 {
00690 uint32_t error = -1;
00691 socklen_t len = sizeof(error);
00692 if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
00693 {
00694 ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
00695 }
00696 #ifdef _MSC_VER
00697 char err[60];
00698 strerror_s(err,60,error);
00699 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
00700 #else
00701 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
00702 #endif
00703
00704 close();
00705 }
00706 }
00707
00708 std::string TransportTCP::getTransportInfo()
00709 {
00710 std::stringstream str;
00711 str << "TCPROS connection on port " << local_port_ << " to [" << cached_remote_host_ << "]";
00712 return str.str();
00713 }
00714
00715 std::string TransportTCP::getClientURI()
00716 {
00717 ROS_ASSERT(!is_server_);
00718
00719 sockaddr_storage sas;
00720 socklen_t sas_len = sizeof(sas);
00721 getpeername(sock_, (sockaddr *)&sas, &sas_len);
00722
00723 sockaddr_in *sin = (sockaddr_in *)&sas;
00724 sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
00725
00726 char namebuf[128];
00727 int port;
00728
00729 switch (sas.ss_family)
00730 {
00731 case AF_INET:
00732 port = ntohs(sin->sin_port);
00733 strcpy(namebuf, inet_ntoa(sin->sin_addr));
00734 break;
00735 case AF_INET6:
00736 port = ntohs(sin6->sin6_port);
00737 inet_ntop(AF_INET6, (void*)&(sin6->sin6_addr), namebuf, sizeof(namebuf));
00738 break;
00739 default:
00740 namebuf[0] = 0;
00741 port = 0;
00742 break;
00743 }
00744
00745 std::string ip = namebuf;
00746 std::stringstream uri;
00747 uri << ip << ":" << port;
00748
00749 return uri.str();
00750 }
00751
00752 }