$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include "ros/io.h" 00036 #include "ros/transport/transport_tcp.h" 00037 #include "ros/poll_set.h" 00038 #include "ros/header.h" 00039 #include "ros/file_log.h" 00040 #include <ros/assert.h> 00041 #include <sstream> 00042 #include <boost/bind.hpp> 00043 #include <fcntl.h> 00044 #include <errno.h> 00045 namespace ros 00046 { 00047 00048 bool TransportTCP::s_use_keepalive_ = true; 00049 00050 TransportTCP::TransportTCP(PollSet* poll_set, int flags) 00051 : sock_(ROS_INVALID_SOCKET) 00052 , closed_(false) 00053 , expecting_read_(false) 00054 , expecting_write_(false) 00055 , is_server_(false) 00056 , server_port_(-1) 00057 , poll_set_(poll_set) 00058 , flags_(flags) 00059 { 00060 00061 } 00062 00063 TransportTCP::~TransportTCP() 00064 { 00065 ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_); 00066 } 00067 00068 bool TransportTCP::setSocket(int sock) 00069 { 00070 sock_ = sock; 00071 return initializeSocket(); 00072 } 00073 00074 bool TransportTCP::setNonBlocking() 00075 { 00076 if (!(flags_ & SYNCHRONOUS)) 00077 { 00078 int result = set_non_blocking(sock_); 00079 if ( result != 0 ) { 00080 ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result); 00081 close(); 00082 return false; 00083 } 00084 } 00085 00086 return true; 00087 } 00088 00089 bool TransportTCP::initializeSocket() 00090 { 00091 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET); 00092 00093 if (!setNonBlocking()) 00094 { 00095 return false; 00096 } 00097 00098 setKeepAlive(s_use_keepalive_, 60, 10, 9); 00099 00100 // connect() will set cached_remote_host_ because it already has the host/port available 00101 if (cached_remote_host_.empty()) 00102 { 00103 if (is_server_) 00104 { 00105 cached_remote_host_ = "TCPServer Socket"; 00106 } 00107 else 00108 { 00109 std::stringstream ss; 00110 ss << getClientURI() << " on socket " << sock_; 00111 cached_remote_host_ = ss.str(); 00112 } 00113 } 00114 00115 #ifdef ROSCPP_USE_TCP_NODELAY 00116 setNoDelay(true); 00117 #endif 00118 00119 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS)); 00120 if (poll_set_) 00121 { 00122 ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_); 00123 poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this()); 00124 } 00125 00126 if (!(flags_ & SYNCHRONOUS)) 00127 { 00128 //enableRead(); 00129 } 00130 00131 return true; 00132 } 00133 00134 void TransportTCP::parseHeader(const Header& header) 00135 { 00136 std::string nodelay; 00137 if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1") 00138 { 00139 ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_); 00140 setNoDelay(true); 00141 } 00142 } 00143 00144 void TransportTCP::setNoDelay(bool nodelay) 00145 { 00146 int flag = nodelay ? 1 : 0; 00147 int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); 00148 if (result < 0) 00149 { 00150 ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str()); 00151 } 00152 } 00153 00154 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count) 00155 { 00156 if (use) 00157 { 00158 int val = 1; 00159 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0) 00160 { 00161 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str()); 00162 } 00163 00164 /* cygwin SOL_TCP does not seem to support TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT */ 00165 #if defined(SOL_TCP) && !defined(__CYGWIN__) 00166 val = idle; 00167 if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0) 00168 { 00169 ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str()); 00170 } 00171 00172 val = interval; 00173 if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0) 00174 { 00175 ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str()); 00176 } 00177 00178 val = count; 00179 if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0) 00180 { 00181 ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str()); 00182 } 00183 #endif 00184 } 00185 else 00186 { 00187 int val = 0; 00188 if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0) 00189 { 00190 ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str()); 00191 } 00192 } 00193 } 00194 00195 bool TransportTCP::connect(const std::string& host, int port) 00196 { 00197 sock_ = socket(AF_INET, SOCK_STREAM, 0); 00198 connected_host_ = host; 00199 connected_port_ = port; 00200 00201 if (sock_ == ROS_INVALID_SOCKET) 00202 { 00203 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string()); 00204 return false; 00205 } 00206 00207 setNonBlocking(); 00208 00209 sockaddr_in sin; 00210 sin.sin_family = AF_INET; 00211 if (inet_addr(host.c_str()) == INADDR_NONE) 00212 { 00213 struct addrinfo* addr; 00214 if (getaddrinfo(host.c_str(), NULL, NULL, &addr) != 0) 00215 { 00216 close(); 00217 ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str()); 00218 return false; 00219 } 00220 00221 bool found = false; 00222 struct addrinfo* it = addr; 00223 for (; it; it = it->ai_next) 00224 { 00225 if (it->ai_family == AF_INET) 00226 { 00227 memcpy(&sin, it->ai_addr, it->ai_addrlen); 00228 sin.sin_family = it->ai_family; 00229 sin.sin_port = htons(port); 00230 00231 found = true; 00232 break; 00233 } 00234 } 00235 00236 freeaddrinfo(addr); 00237 00238 if (!found) 00239 { 00240 ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str()); 00241 return false; 00242 } 00243 00244 ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), inet_ntoa(sin.sin_addr), sock_); 00245 } 00246 else 00247 { 00248 sin.sin_addr.s_addr = inet_addr(host.c_str()); // already an IP addr 00249 } 00250 00251 sin.sin_port = htons(port); 00252 00253 int ret = ::connect(sock_, (sockaddr *)&sin, sizeof(sin)); 00254 // windows might need some time to sleep (input from service robotics hack) add this if testing proves it is necessary. 00255 ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0); 00256 if (((flags_ & SYNCHRONOUS) && ret != 0) || // synchronous, connect() should return 0 00257 (!(flags_ & SYNCHRONOUS) && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN)) // asynchronous, connect() should return -1 and WSAGetLastError()=WSAEWOULDBLOCK/errno=EINPROGRESS 00258 { 00259 ROSCPP_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string()); 00260 close(); 00261 00262 return false; 00263 } 00264 00265 // from daniel stonier: 00266 #ifdef WIN32 00267 // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless) 00268 // recv() needs to check if its connected or not when its asynchronous? 00269 Sleep(100); 00270 #endif 00271 00272 00273 std::stringstream ss; 00274 ss << host << ":" << port << " on socket " << sock_; 00275 cached_remote_host_ = ss.str(); 00276 00277 if (!initializeSocket()) 00278 { 00279 return false; 00280 } 00281 00282 if (flags_ & SYNCHRONOUS) 00283 { 00284 ROSCPP_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_); 00285 } 00286 else 00287 { 00288 ROSCPP_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_); 00289 } 00290 00291 return true; 00292 } 00293 00294 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb) 00295 { 00296 is_server_ = true; 00297 accept_cb_ = accept_cb; 00298 00299 sock_ = socket(AF_INET, SOCK_STREAM, 0); 00300 00301 if (sock_ <= 0) 00302 { 00303 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string()); 00304 return false; 00305 } 00306 00307 server_address_.sin_family = AF_INET; 00308 server_address_.sin_port = htons(port); 00309 server_address_.sin_addr.s_addr = INADDR_ANY; 00310 if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0) 00311 { 00312 ROS_ERROR("bind() failed with error [%s]", last_socket_error_string()); 00313 return false; 00314 } 00315 00316 ::listen(sock_, backlog); 00317 socklen_t len = sizeof(server_address_); 00318 getsockname(sock_, (sockaddr *)&server_address_, &len); 00319 server_port_ = ntohs(server_address_.sin_port); 00320 00321 if (!initializeSocket()) 00322 { 00323 return false; 00324 } 00325 00326 if (!(flags_ & SYNCHRONOUS)) 00327 { 00328 enableRead(); 00329 } 00330 00331 return true; 00332 } 00333 00334 void TransportTCP::close() 00335 { 00336 Callback disconnect_cb; 00337 00338 if (!closed_) 00339 { 00340 { 00341 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00342 00343 if (!closed_) 00344 { 00345 closed_ = true; 00346 00347 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET); 00348 00349 if (poll_set_) 00350 { 00351 poll_set_->delSocket(sock_); 00352 } 00353 00354 ::shutdown(sock_, ROS_SOCKETS_SHUT_RDWR); 00355 if ( close_socket(sock_) != 0 ) 00356 { 00357 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string()); 00358 } else 00359 { 00360 ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_); 00361 } 00362 sock_ = ROS_INVALID_SOCKET; 00363 00364 disconnect_cb = disconnect_cb_; 00365 00366 disconnect_cb_ = Callback(); 00367 read_cb_ = Callback(); 00368 write_cb_ = Callback(); 00369 accept_cb_ = AcceptCallback(); 00370 } 00371 } 00372 } 00373 00374 if (disconnect_cb) 00375 { 00376 disconnect_cb(shared_from_this()); 00377 } 00378 } 00379 00380 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size) 00381 { 00382 { 00383 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00384 00385 if (closed_) 00386 { 00387 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_); 00388 return -1; 00389 } 00390 } 00391 00392 ROS_ASSERT((int32_t)size > 0); 00393 00394 int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), size, 0); 00395 if (num_bytes < 0) 00396 { 00397 if ( !last_socket_error_is_would_block() ) // !WSAWOULDBLOCK / !EAGAIN && !EWOULDBLOCK 00398 { 00399 ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string()); 00400 close(); 00401 } 00402 else 00403 { 00404 num_bytes = 0; 00405 } 00406 } 00407 else if (num_bytes == 0) 00408 { 00409 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size); 00410 close(); 00411 return -1; 00412 } 00413 00414 return num_bytes; 00415 } 00416 00417 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size) 00418 { 00419 { 00420 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00421 00422 if (closed_) 00423 { 00424 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_); 00425 return -1; 00426 } 00427 } 00428 00429 ROS_ASSERT((int32_t)size > 0); 00430 00431 int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), size, 0); 00432 if (num_bytes < 0) 00433 { 00434 if ( !last_socket_error_is_would_block() ) 00435 { 00436 ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string()); 00437 close(); 00438 } 00439 else 00440 { 00441 num_bytes = 0; 00442 } 00443 } 00444 00445 return num_bytes; 00446 } 00447 00448 void TransportTCP::enableRead() 00449 { 00450 ROS_ASSERT(!(flags_ & SYNCHRONOUS)); 00451 00452 { 00453 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00454 00455 if (closed_) 00456 { 00457 return; 00458 } 00459 } 00460 00461 if (!expecting_read_) 00462 { 00463 poll_set_->addEvents(sock_, POLLIN); 00464 expecting_read_ = true; 00465 } 00466 } 00467 00468 void TransportTCP::disableRead() 00469 { 00470 ROS_ASSERT(!(flags_ & SYNCHRONOUS)); 00471 00472 { 00473 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00474 00475 if (closed_) 00476 { 00477 return; 00478 } 00479 } 00480 00481 if (expecting_read_) 00482 { 00483 poll_set_->delEvents(sock_, POLLIN); 00484 expecting_read_ = false; 00485 } 00486 } 00487 00488 void TransportTCP::enableWrite() 00489 { 00490 ROS_ASSERT(!(flags_ & SYNCHRONOUS)); 00491 00492 { 00493 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00494 00495 if (closed_) 00496 { 00497 return; 00498 } 00499 } 00500 00501 if (!expecting_write_) 00502 { 00503 poll_set_->addEvents(sock_, POLLOUT); 00504 expecting_write_ = true; 00505 } 00506 } 00507 00508 void TransportTCP::disableWrite() 00509 { 00510 ROS_ASSERT(!(flags_ & SYNCHRONOUS)); 00511 00512 { 00513 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00514 00515 if (closed_) 00516 { 00517 return; 00518 } 00519 } 00520 00521 if (expecting_write_) 00522 { 00523 poll_set_->delEvents(sock_, POLLOUT); 00524 expecting_write_ = false; 00525 } 00526 } 00527 00528 TransportTCPPtr TransportTCP::accept() 00529 { 00530 ROS_ASSERT(is_server_); 00531 00532 sockaddr client_address; 00533 socklen_t len = sizeof(client_address); 00534 int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len); 00535 if (new_sock >= 0) 00536 { 00537 ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock); 00538 00539 TransportTCPPtr transport(new TransportTCP(poll_set_, flags_)); 00540 if (!transport->setSocket(new_sock)) 00541 { 00542 ROS_ERROR("Failed to set socket on transport for socket %d", new_sock); 00543 } 00544 00545 return transport; 00546 } 00547 else 00548 { 00549 ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, last_socket_error_string()); 00550 } 00551 00552 return TransportTCPPtr(); 00553 } 00554 00555 void TransportTCP::socketUpdate(int events) 00556 { 00557 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00558 { 00559 if (closed_) 00560 { 00561 return; 00562 } 00563 } 00564 00565 // Handle read events before err/hup/nval, since there may be data left on the wire 00566 if ((events & POLLIN) && expecting_read_) 00567 { 00568 if (is_server_) 00569 { 00570 // Should not block here, because poll() said that it's ready 00571 // for reading 00572 TransportTCPPtr transport = accept(); 00573 if (transport) 00574 { 00575 ROS_ASSERT(accept_cb_); 00576 accept_cb_(transport); 00577 } 00578 } 00579 else 00580 { 00581 if (read_cb_) 00582 { 00583 read_cb_(shared_from_this()); 00584 } 00585 } 00586 } 00587 00588 { 00589 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00590 00591 if (closed_) 00592 { 00593 return; 00594 } 00595 } 00596 00597 if ((events & POLLOUT) && expecting_write_) 00598 { 00599 if (write_cb_) 00600 { 00601 write_cb_(shared_from_this()); 00602 } 00603 } 00604 00605 { 00606 boost::recursive_mutex::scoped_lock lock(close_mutex_); 00607 00608 if (closed_) 00609 { 00610 return; 00611 } 00612 } 00613 00614 if((events & POLLERR) || 00615 (events & POLLHUP) || 00616 (events & POLLNVAL)) 00617 { 00618 uint32_t error = -1; 00619 socklen_t len = sizeof(error); 00620 if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0) 00621 { 00622 ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_); 00623 } 00624 #ifdef _MSC_VER 00625 char err[60]; 00626 strerror_s(err,60,error); 00627 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err); 00628 #else 00629 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error)); 00630 #endif 00631 00632 close(); 00633 } 00634 } 00635 00636 std::string TransportTCP::getTransportInfo() 00637 { 00638 return "TCPROS connection to [" + cached_remote_host_ + "]"; 00639 } 00640 00641 std::string TransportTCP::getClientURI() 00642 { 00643 ROS_ASSERT(!is_server_); 00644 00645 sockaddr_in addr; 00646 socklen_t len = sizeof(addr); 00647 getpeername(sock_, (sockaddr *)&addr, &len); 00648 int port = ntohs(addr.sin_port); 00649 std::string ip = inet_ntoa(addr.sin_addr); 00650 00651 std::stringstream uri; 00652 uri << ip << ":" << port; 00653 00654 return uri.str(); 00655 } 00656 00657 } // namespace ros