00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/io.h"
00036 #include "ros/transport/transport_tcp.h"
00037 #include "ros/poll_set.h"
00038 #include "ros/header.h"
00039 #include "ros/file_log.h"
00040 #include <ros/assert.h>
00041 #include <sstream>
00042 #include <boost/bind.hpp>
00043 #include <fcntl.h>
00044 #include <errno.h>
00045 namespace ros
00046 {
00047
00048 bool TransportTCP::s_use_keepalive_ = true;
00049
00050 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
00051 : sock_(ROS_INVALID_SOCKET)
00052 , closed_(false)
00053 , expecting_read_(false)
00054 , expecting_write_(false)
00055 , is_server_(false)
00056 , server_port_(-1)
00057 , poll_set_(poll_set)
00058 , flags_(flags)
00059 {
00060
00061 }
00062
00063 TransportTCP::~TransportTCP()
00064 {
00065 ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
00066 }
00067
00068 bool TransportTCP::setSocket(int sock)
00069 {
00070 sock_ = sock;
00071 return initializeSocket();
00072 }
00073
00074 bool TransportTCP::setNonBlocking()
00075 {
00076 if (!(flags_ & SYNCHRONOUS))
00077 {
00078 int result = set_non_blocking(sock_);
00079 if ( result != 0 ) {
00080 ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00081 close();
00082 return false;
00083 }
00084 }
00085
00086 return true;
00087 }
00088
00089 bool TransportTCP::initializeSocket()
00090 {
00091 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00092
00093 if (!setNonBlocking())
00094 {
00095 return false;
00096 }
00097
00098 setKeepAlive(s_use_keepalive_, 60, 10, 9);
00099
00100
00101 if (cached_remote_host_.empty())
00102 {
00103 if (is_server_)
00104 {
00105 cached_remote_host_ = "TCPServer Socket";
00106 }
00107 else
00108 {
00109 std::stringstream ss;
00110 ss << getClientURI() << " on socket " << sock_;
00111 cached_remote_host_ = ss.str();
00112 }
00113 }
00114
00115 #ifdef ROSCPP_USE_TCP_NODELAY
00116 setNoDelay(true);
00117 #endif
00118
00119 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00120 if (poll_set_)
00121 {
00122 ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
00123 poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
00124 }
00125
00126 if (!(flags_ & SYNCHRONOUS))
00127 {
00128
00129 }
00130
00131 return true;
00132 }
00133
00134 void TransportTCP::parseHeader(const Header& header)
00135 {
00136 std::string nodelay;
00137 if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
00138 {
00139 ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
00140 setNoDelay(true);
00141 }
00142 }
00143
00144 void TransportTCP::setNoDelay(bool nodelay)
00145 {
00146 int flag = nodelay ? 1 : 0;
00147 int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
00148 if (result < 0)
00149 {
00150 ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00151 }
00152 }
00153
00154 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
00155 {
00156 if (use)
00157 {
00158 int val = 1;
00159 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00160 {
00161 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00162 }
00163
00164
00165 #if defined(SOL_TCP) && !defined(__CYGWIN__)
00166 val = idle;
00167 if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
00168 {
00169 ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00170 }
00171
00172 val = interval;
00173 if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
00174 {
00175 ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00176 }
00177
00178 val = count;
00179 if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
00180 {
00181 ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00182 }
00183 #endif
00184 }
00185 else
00186 {
00187 int val = 0;
00188 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00189 {
00190 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00191 }
00192 }
00193 }
00194
00195 bool TransportTCP::connect(const std::string& host, int port)
00196 {
00197 sock_ = socket(AF_INET, SOCK_STREAM, 0);
00198 connected_host_ = host;
00199 connected_port_ = port;
00200
00201 if (sock_ == ROS_INVALID_SOCKET)
00202 {
00203 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00204 return false;
00205 }
00206
00207 setNonBlocking();
00208
00209 sockaddr_in sin;
00210 sin.sin_family = AF_INET;
00211 if (inet_addr(host.c_str()) == INADDR_NONE)
00212 {
00213 struct addrinfo* addr;
00214 if (getaddrinfo(host.c_str(), NULL, NULL, &addr) != 0)
00215 {
00216 close();
00217 ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
00218 return false;
00219 }
00220
00221 bool found = false;
00222 struct addrinfo* it = addr;
00223 for (; it; it = it->ai_next)
00224 {
00225 if (it->ai_family == AF_INET)
00226 {
00227 memcpy(&sin, it->ai_addr, it->ai_addrlen);
00228 sin.sin_family = it->ai_family;
00229 sin.sin_port = htons(port);
00230
00231 found = true;
00232 break;
00233 }
00234 }
00235
00236 freeaddrinfo(addr);
00237
00238 if (!found)
00239 {
00240 ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00241 return false;
00242 }
00243
00244 ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), inet_ntoa(sin.sin_addr), sock_);
00245 }
00246 else
00247 {
00248 sin.sin_addr.s_addr = inet_addr(host.c_str());
00249 }
00250
00251 sin.sin_port = htons(port);
00252
00253 int ret = ::connect(sock_, (sockaddr *)&sin, sizeof(sin));
00254
00255 ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
00256 if (((flags_ & SYNCHRONOUS) && ret != 0) ||
00257 (!(flags_ & SYNCHRONOUS) && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN))
00258 {
00259 ROSCPP_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
00260 close();
00261
00262 return false;
00263 }
00264
00265
00266 #ifdef WIN32
00267
00268
00269 Sleep(100);
00270 #endif
00271
00272
00273 std::stringstream ss;
00274 ss << host << ":" << port << " on socket " << sock_;
00275 cached_remote_host_ = ss.str();
00276
00277 if (!initializeSocket())
00278 {
00279 return false;
00280 }
00281
00282 if (flags_ & SYNCHRONOUS)
00283 {
00284 ROSCPP_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00285 }
00286 else
00287 {
00288 ROSCPP_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00289 }
00290
00291 return true;
00292 }
00293
00294 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
00295 {
00296 is_server_ = true;
00297 accept_cb_ = accept_cb;
00298
00299 sock_ = socket(AF_INET, SOCK_STREAM, 0);
00300
00301 if (sock_ <= 0)
00302 {
00303 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00304 return false;
00305 }
00306
00307 server_address_.sin_family = AF_INET;
00308 server_address_.sin_port = htons(port);
00309 server_address_.sin_addr.s_addr = INADDR_ANY;
00310 if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00311 {
00312 ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
00313 return false;
00314 }
00315
00316 ::listen(sock_, backlog);
00317 socklen_t len = sizeof(server_address_);
00318 getsockname(sock_, (sockaddr *)&server_address_, &len);
00319 server_port_ = ntohs(server_address_.sin_port);
00320
00321 if (!initializeSocket())
00322 {
00323 return false;
00324 }
00325
00326 if (!(flags_ & SYNCHRONOUS))
00327 {
00328 enableRead();
00329 }
00330
00331 return true;
00332 }
00333
00334 void TransportTCP::close()
00335 {
00336 Callback disconnect_cb;
00337
00338 if (!closed_)
00339 {
00340 {
00341 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00342
00343 if (!closed_)
00344 {
00345 closed_ = true;
00346
00347 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00348
00349 if (poll_set_)
00350 {
00351 poll_set_->delSocket(sock_);
00352 }
00353
00354 ::shutdown(sock_, ROS_SOCKETS_SHUT_RDWR);
00355 if ( close_socket(sock_) != 0 )
00356 {
00357 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00358 } else
00359 {
00360 ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
00361 }
00362 sock_ = ROS_INVALID_SOCKET;
00363
00364 disconnect_cb = disconnect_cb_;
00365
00366 disconnect_cb_ = Callback();
00367 read_cb_ = Callback();
00368 write_cb_ = Callback();
00369 accept_cb_ = AcceptCallback();
00370 }
00371 }
00372 }
00373
00374 if (disconnect_cb)
00375 {
00376 disconnect_cb(shared_from_this());
00377 }
00378 }
00379
00380 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
00381 {
00382 {
00383 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00384
00385 if (closed_)
00386 {
00387 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00388 return -1;
00389 }
00390 }
00391
00392 ROS_ASSERT((int32_t)size > 0);
00393
00394 int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), size, 0);
00395 if (num_bytes < 0)
00396 {
00397 if ( !last_socket_error_is_would_block() )
00398 {
00399 ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00400 close();
00401 }
00402 else
00403 {
00404 num_bytes = 0;
00405 }
00406 }
00407 else if (num_bytes == 0)
00408 {
00409 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00410 close();
00411 return -1;
00412 }
00413
00414 return num_bytes;
00415 }
00416
00417 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
00418 {
00419 {
00420 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00421
00422 if (closed_)
00423 {
00424 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00425 return -1;
00426 }
00427 }
00428
00429 ROS_ASSERT((int32_t)size > 0);
00430
00431 int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), size, 0);
00432 if (num_bytes < 0)
00433 {
00434 if ( !last_socket_error_is_would_block() )
00435 {
00436 ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00437 close();
00438 }
00439 else
00440 {
00441 num_bytes = 0;
00442 }
00443 }
00444
00445 return num_bytes;
00446 }
00447
00448 void TransportTCP::enableRead()
00449 {
00450 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00451
00452 {
00453 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00454
00455 if (closed_)
00456 {
00457 return;
00458 }
00459 }
00460
00461 if (!expecting_read_)
00462 {
00463 poll_set_->addEvents(sock_, POLLIN);
00464 expecting_read_ = true;
00465 }
00466 }
00467
00468 void TransportTCP::disableRead()
00469 {
00470 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00471
00472 {
00473 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00474
00475 if (closed_)
00476 {
00477 return;
00478 }
00479 }
00480
00481 if (expecting_read_)
00482 {
00483 poll_set_->delEvents(sock_, POLLIN);
00484 expecting_read_ = false;
00485 }
00486 }
00487
00488 void TransportTCP::enableWrite()
00489 {
00490 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00491
00492 {
00493 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00494
00495 if (closed_)
00496 {
00497 return;
00498 }
00499 }
00500
00501 if (!expecting_write_)
00502 {
00503 poll_set_->addEvents(sock_, POLLOUT);
00504 expecting_write_ = true;
00505 }
00506 }
00507
00508 void TransportTCP::disableWrite()
00509 {
00510 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00511
00512 {
00513 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00514
00515 if (closed_)
00516 {
00517 return;
00518 }
00519 }
00520
00521 if (expecting_write_)
00522 {
00523 poll_set_->delEvents(sock_, POLLOUT);
00524 expecting_write_ = false;
00525 }
00526 }
00527
00528 TransportTCPPtr TransportTCP::accept()
00529 {
00530 ROS_ASSERT(is_server_);
00531
00532 sockaddr client_address;
00533 socklen_t len = sizeof(client_address);
00534 int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
00535 if (new_sock >= 0)
00536 {
00537 ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
00538
00539 TransportTCPPtr transport(new TransportTCP(poll_set_, flags_));
00540 if (!transport->setSocket(new_sock))
00541 {
00542 ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
00543 }
00544
00545 return transport;
00546 }
00547 else
00548 {
00549 ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00550 }
00551
00552 return TransportTCPPtr();
00553 }
00554
00555 void TransportTCP::socketUpdate(int events)
00556 {
00557 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00558 {
00559 if (closed_)
00560 {
00561 return;
00562 }
00563 }
00564
00565
00566 if ((events & POLLIN) && expecting_read_)
00567 {
00568 if (is_server_)
00569 {
00570
00571
00572 TransportTCPPtr transport = accept();
00573 if (transport)
00574 {
00575 ROS_ASSERT(accept_cb_);
00576 accept_cb_(transport);
00577 }
00578 }
00579 else
00580 {
00581 if (read_cb_)
00582 {
00583 read_cb_(shared_from_this());
00584 }
00585 }
00586 }
00587
00588 {
00589 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00590
00591 if (closed_)
00592 {
00593 return;
00594 }
00595 }
00596
00597 if ((events & POLLOUT) && expecting_write_)
00598 {
00599 if (write_cb_)
00600 {
00601 write_cb_(shared_from_this());
00602 }
00603 }
00604
00605 {
00606 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00607
00608 if (closed_)
00609 {
00610 return;
00611 }
00612 }
00613
00614 if((events & POLLERR) ||
00615 (events & POLLHUP) ||
00616 (events & POLLNVAL))
00617 {
00618 uint32_t error = -1;
00619 socklen_t len = sizeof(error);
00620 if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
00621 {
00622 ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
00623 }
00624 #ifdef _MSC_VER
00625 char err[60];
00626 strerror_s(err,60,error);
00627 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
00628 #else
00629 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
00630 #endif
00631
00632 close();
00633 }
00634 }
00635
00636 std::string TransportTCP::getTransportInfo()
00637 {
00638 return "TCPROS connection to [" + cached_remote_host_ + "]";
00639 }
00640
00641 std::string TransportTCP::getClientURI()
00642 {
00643 ROS_ASSERT(!is_server_);
00644
00645 sockaddr_in addr;
00646 socklen_t len = sizeof(addr);
00647 getpeername(sock_, (sockaddr *)&addr, &len);
00648 int port = ntohs(addr.sin_port);
00649 std::string ip = inet_ntoa(addr.sin_addr);
00650
00651 std::stringstream uri;
00652 uri << ip << ":" << port;
00653
00654 return uri.str();
00655 }
00656
00657 }