00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/transport/transport_tcp.h"
00036 #include "ros/poll_set.h"
00037 #include "ros/header.h"
00038 #include "ros/file_log.h"
00039
00040 #include <ros/assert.h>
00041
00042 #include <sstream>
00043
00044 #include <boost/bind.hpp>
00045
00046 #include <sys/socket.h>
00047 #include <netinet/tcp.h>
00048
00049 #include <sys/poll.h>
00050 #include <arpa/inet.h>
00051 #include <netdb.h>
00052 #include <fcntl.h>
00053 #include <errno.h>
00054
00055 namespace ros
00056 {
00057
00058 bool TransportTCP::s_use_keepalive_ = true;
00059
00060 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
00061 : sock_(-1)
00062 , closed_(false)
00063 , expecting_read_(false)
00064 , expecting_write_(false)
00065 , is_server_(false)
00066 , server_port_(-1)
00067 , poll_set_(poll_set)
00068 , flags_(flags)
00069 {
00070
00071 }
00072
00073 TransportTCP::~TransportTCP()
00074 {
00075 ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
00076 }
00077
00078 bool TransportTCP::setSocket(int sock)
00079 {
00080 sock_ = sock;
00081 return initializeSocket();
00082 }
00083
00084 bool TransportTCP::setNonBlocking()
00085 {
00086 if (!(flags_ & SYNCHRONOUS))
00087 {
00088
00089 if(fcntl(sock_, F_SETFL, O_NONBLOCK) == -1)
00090 {
00091 ROS_ERROR("fcntl (non-blocking) to socket [%d] failed with error [%s]", sock_, strerror(errno));
00092
00093 close();
00094 return false;
00095 }
00096 }
00097
00098 return true;
00099 }
00100
00101 bool TransportTCP::initializeSocket()
00102 {
00103 ROS_ASSERT(sock_ != -1);
00104
00105 if (!setNonBlocking())
00106 {
00107 return false;
00108 }
00109
00110 setKeepAlive(s_use_keepalive_, 60, 10, 9);
00111
00112
00113 if (cached_remote_host_.empty())
00114 {
00115 if (is_server_)
00116 {
00117 cached_remote_host_ = "TCPServer Socket";
00118 }
00119 else
00120 {
00121 std::stringstream ss;
00122 ss << getClientURI() << " on socket " << sock_;
00123 cached_remote_host_ = ss.str();
00124 }
00125 }
00126
00127 #ifdef ROSCPP_USE_TCP_NODELAY
00128 setNoDelay(true);
00129 #endif
00130
00131 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00132 if (poll_set_)
00133 {
00134 ROS_DEBUG("Adding socket [%d] to pollset", sock_);
00135 poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
00136 }
00137
00138 if (!(flags_ & SYNCHRONOUS))
00139 {
00140
00141 }
00142
00143 return true;
00144 }
00145
00146 void TransportTCP::parseHeader(const Header& header)
00147 {
00148 std::string nodelay;
00149 if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
00150 {
00151 ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
00152 setNoDelay(true);
00153 }
00154 }
00155
00156 void TransportTCP::setNoDelay(bool nodelay)
00157 {
00158 int flag = nodelay ? 1 : 0;
00159 int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
00160 if (result < 0)
00161 {
00162 ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00163 }
00164 }
00165
00166 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
00167 {
00168 if (use)
00169 {
00170 int val = 1;
00171 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0)
00172 {
00173 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00174 }
00175
00176
00177 #if defined(SOL_TCP) && !defined(__CYGWIN__)
00178 val = idle;
00179 if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
00180 {
00181 ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00182 }
00183
00184 val = interval;
00185 if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
00186 {
00187 ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00188 }
00189
00190 val = count;
00191 if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
00192 {
00193 ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00194 }
00195 #endif
00196 }
00197 else
00198 {
00199 int val = 0;
00200 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0)
00201 {
00202 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00203 }
00204 }
00205 }
00206
00207 bool TransportTCP::connect(const std::string& host, int port)
00208 {
00209 sock_ = socket(AF_INET, SOCK_STREAM, 0);
00210 connected_host_ = host;
00211 connected_port_ = port;
00212
00213 if (sock_ == -1)
00214 {
00215 ROS_ERROR("socket() failed with error [%s]", strerror(errno));
00216 return false;
00217 }
00218
00219 setNonBlocking();
00220
00221 sockaddr_in sin;
00222 sin.sin_family = AF_INET;
00223 if (inet_addr(host.c_str()) == INADDR_NONE)
00224 {
00225 struct addrinfo* addr;
00226 if (getaddrinfo(host.c_str(), NULL, NULL, &addr) != 0)
00227 {
00228 close();
00229 ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
00230 return false;
00231 }
00232
00233 bool found = false;
00234 struct addrinfo* it = addr;
00235 for (; it; it = it->ai_next)
00236 {
00237 if (it->ai_family == AF_INET)
00238 {
00239 memcpy(&sin, it->ai_addr, it->ai_addrlen);
00240 sin.sin_family = it->ai_family;
00241 sin.sin_port = htons(port);
00242
00243 found = true;
00244 break;
00245 }
00246 }
00247
00248 freeaddrinfo(addr);
00249
00250 if (!found)
00251 {
00252 ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00253 return false;
00254 }
00255
00256 ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), inet_ntoa(sin.sin_addr), sock_);
00257 }
00258 else
00259 {
00260 sin.sin_addr.s_addr = inet_addr(host.c_str());
00261 }
00262
00263 sin.sin_port = htons(port);
00264
00265 int ret = ::connect(sock_, (sockaddr *)&sin, sizeof(sin));
00266 ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
00267 if (((flags_ & SYNCHRONOUS) && ret != 0) ||
00268 (!(flags_ & SYNCHRONOUS) && errno != EINPROGRESS))
00269 {
00270 ROSCPP_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, strerror(errno));
00271 close();
00272
00273 return false;
00274 }
00275
00276 std::stringstream ss;
00277 ss << host << ":" << port << " on socket " << sock_;
00278 cached_remote_host_ = ss.str();
00279
00280 if (!initializeSocket())
00281 {
00282 return false;
00283 }
00284
00285 if (flags_ & SYNCHRONOUS)
00286 {
00287 ROSCPP_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00288 }
00289 else
00290 {
00291 ROSCPP_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00292 }
00293
00294 return true;
00295 }
00296
00297 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
00298 {
00299 is_server_ = true;
00300 accept_cb_ = accept_cb;
00301
00302 sock_ = socket(AF_INET, SOCK_STREAM, 0);
00303
00304 if (sock_ <= 0)
00305 {
00306 ROS_ERROR("socket() failed with error [%s]", strerror(errno));
00307 return false;
00308 }
00309
00310 server_address_.sin_family = AF_INET;
00311 server_address_.sin_port = htons(port);
00312 server_address_.sin_addr.s_addr = INADDR_ANY;
00313 if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00314 {
00315 ROS_ERROR("bind() failed with error [%s]", strerror(errno));
00316 return false;
00317 }
00318
00319 ::listen(sock_, backlog);
00320 socklen_t len = sizeof(server_address_);
00321 getsockname(sock_, (sockaddr *)&server_address_, &len);
00322 server_port_ = ntohs(server_address_.sin_port);
00323
00324 if (!initializeSocket())
00325 {
00326 return false;
00327 }
00328
00329 if (!(flags_ & SYNCHRONOUS))
00330 {
00331 enableRead();
00332 }
00333
00334 return true;
00335 }
00336
00337 void TransportTCP::close()
00338 {
00339 Callback disconnect_cb;
00340
00341 if (!closed_)
00342 {
00343 {
00344 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00345
00346 if (!closed_)
00347 {
00348 closed_ = true;
00349
00350 ROS_ASSERT(sock_ != -1);
00351
00352 if (poll_set_)
00353 {
00354 poll_set_->delSocket(sock_);
00355 }
00356
00357 ::shutdown(sock_, SHUT_RDWR);
00358 if (::close(sock_) < 0)
00359 {
00360 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, strerror(errno));
00361 }
00362 else
00363 {
00364 ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
00365 }
00366
00367 sock_ = -1;
00368
00369 disconnect_cb = disconnect_cb_;
00370
00371 disconnect_cb_ = Callback();
00372 read_cb_ = Callback();
00373 write_cb_ = Callback();
00374 accept_cb_ = AcceptCallback();
00375 }
00376 }
00377 }
00378
00379 if (disconnect_cb)
00380 {
00381 disconnect_cb(shared_from_this());
00382 }
00383 }
00384
00385 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
00386 {
00387 {
00388 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00389 if (closed_)
00390 {
00391 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00392 return -1;
00393 }
00394 }
00395
00396 ROS_ASSERT((int32_t)size > 0);
00397
00398 int num_bytes = ::recv(sock_, buffer, size, 0);
00399 if (num_bytes < 0)
00400 {
00401 if (errno != EAGAIN && errno != EWOULDBLOCK)
00402 {
00403 ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, strerror(errno));
00404 close();
00405 }
00406 else
00407 {
00408 num_bytes = 0;
00409 }
00410 }
00411 else if (num_bytes == 0)
00412 {
00413 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00414 close();
00415 return -1;
00416 }
00417
00418 return num_bytes;
00419 }
00420
00421 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
00422 {
00423 {
00424 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00425
00426 if (closed_)
00427 {
00428 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00429 return -1;
00430 }
00431 }
00432
00433 ROS_ASSERT((int32_t)size > 0);
00434
00435 int num_bytes = ::send(sock_, buffer, size, 0);
00436 if (num_bytes < 0)
00437 {
00438 if(errno != EAGAIN)
00439 {
00440 ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, strerror(errno));
00441
00442 close();
00443 }
00444 else
00445 {
00446 num_bytes = 0;
00447 }
00448 }
00449
00450 return num_bytes;
00451 }
00452
00453 void TransportTCP::enableRead()
00454 {
00455 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00456
00457 {
00458 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00459
00460 if (closed_)
00461 {
00462 return;
00463 }
00464 }
00465
00466 if (!expecting_read_)
00467 {
00468 poll_set_->addEvents(sock_, POLLIN);
00469 expecting_read_ = true;
00470 }
00471 }
00472
00473 void TransportTCP::disableRead()
00474 {
00475 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00476
00477 {
00478 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00479
00480 if (closed_)
00481 {
00482 return;
00483 }
00484 }
00485
00486 if (expecting_read_)
00487 {
00488 poll_set_->delEvents(sock_, POLLIN);
00489 expecting_read_ = false;
00490 }
00491 }
00492
00493 void TransportTCP::enableWrite()
00494 {
00495 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00496
00497 {
00498 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00499
00500 if (closed_)
00501 {
00502 return;
00503 }
00504 }
00505
00506 if (!expecting_write_)
00507 {
00508 poll_set_->addEvents(sock_, POLLOUT);
00509 expecting_write_ = true;
00510 }
00511 }
00512
00513 void TransportTCP::disableWrite()
00514 {
00515 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00516
00517 {
00518 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00519
00520 if (closed_)
00521 {
00522 return;
00523 }
00524 }
00525
00526 if (expecting_write_)
00527 {
00528 poll_set_->delEvents(sock_, POLLOUT);
00529 expecting_write_ = false;
00530 }
00531 }
00532
00533 TransportTCPPtr TransportTCP::accept()
00534 {
00535 ROS_ASSERT(is_server_);
00536
00537 sockaddr client_address;
00538 socklen_t len = sizeof(client_address);
00539 int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
00540 if (new_sock >= 0)
00541 {
00542 ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
00543
00544 TransportTCPPtr transport(new TransportTCP(poll_set_, flags_));
00545 if (!transport->setSocket(new_sock))
00546 {
00547 ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
00548 }
00549
00550 return transport;
00551 }
00552 else
00553 {
00554 ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, strerror(errno));
00555 }
00556
00557 return TransportTCPPtr();
00558 }
00559
00560 void TransportTCP::socketUpdate(int events)
00561 {
00562 boost::recursive_mutex::scoped_lock lock(close_mutex_);
00563
00564 if (closed_)
00565 {
00566 return;
00567 }
00568
00569
00570 if ((events & POLLIN) && expecting_read_)
00571 {
00572 if (is_server_)
00573 {
00574
00575
00576 TransportTCPPtr transport = accept();
00577 if (transport)
00578 {
00579 ROS_ASSERT(accept_cb_);
00580 accept_cb_(transport);
00581 }
00582 }
00583 else
00584 {
00585 if (read_cb_)
00586 {
00587 read_cb_(shared_from_this());
00588 }
00589 }
00590 }
00591
00592 if (closed_)
00593 {
00594 return;
00595 }
00596
00597 if ((events & POLLOUT) && expecting_write_)
00598 {
00599 if (write_cb_)
00600 {
00601 write_cb_(shared_from_this());
00602 }
00603 }
00604
00605 if (closed_)
00606 {
00607 return;
00608 }
00609
00610 if((events & POLLERR) ||
00611 (events & POLLHUP) ||
00612 (events & POLLNVAL))
00613 {
00614 uint32_t error = -1;
00615 socklen_t len = sizeof(error);
00616 if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
00617 {
00618 ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
00619 }
00620
00621 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
00622 close();
00623 }
00624 }
00625
00626 std::string TransportTCP::getTransportInfo()
00627 {
00628 return "TCPROS connection to [" + cached_remote_host_ + "]";
00629 }
00630
00631 std::string TransportTCP::getClientURI()
00632 {
00633 ROS_ASSERT(!is_server_);
00634
00635 sockaddr_in addr;
00636 socklen_t len = sizeof(addr);
00637 getpeername(sock_, (sockaddr *)&addr, &len);
00638 int port = ntohs(addr.sin_port);
00639 std::string ip = inet_ntoa(addr.sin_addr);
00640
00641 std::stringstream uri;
00642 uri << ip << ":" << port;
00643
00644 return uri.str();
00645 }
00646
00647 }