$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include "ros/transport/transport_udp.h" 00036 #include "ros/poll_set.h" 00037 #include "ros/file_log.h" 00038 00039 #include <ros/assert.h> 00040 #include <boost/bind.hpp> 00041 00042 #include <fcntl.h> 00043 #if defined(__APPLE__) 00044 // For readv() and writev() 00045 #include <sys/types.h> 00046 #include <sys/uio.h> 00047 #include <unistd.h> 00048 #endif 00049 00050 namespace ros 00051 { 00052 00053 TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size) 00054 : sock_(-1) 00055 , closed_(false) 00056 , expecting_read_(false) 00057 , expecting_write_(false) 00058 , is_server_(false) 00059 , server_port_(-1) 00060 , poll_set_(poll_set) 00061 , flags_(flags) 00062 , connection_id_(0) 00063 , current_message_id_(0) 00064 , total_blocks_(0) 00065 , last_block_(0) 00066 , max_datagram_size_(max_datagram_size) 00067 , data_filled_(0) 00068 , reorder_buffer_(0) 00069 , reorder_bytes_(0) 00070 { 00071 // This may eventually be machine dependent 00072 if (max_datagram_size_ == 0) 00073 max_datagram_size_ = 1500; 00074 reorder_buffer_ = new uint8_t[max_datagram_size_]; 00075 reorder_start_ = reorder_buffer_; 00076 data_buffer_ = new uint8_t[max_datagram_size_]; 00077 data_start_ = data_buffer_; 00078 } 00079 00080 TransportUDP::~TransportUDP() 00081 { 00082 ROS_ASSERT_MSG(sock_ == ROS_INVALID_SOCKET, "TransportUDP socket [%d] was never closed", sock_); 00083 delete [] reorder_buffer_; 00084 delete [] data_buffer_; 00085 } 00086 00087 bool TransportUDP::setSocket(int sock) 00088 { 00089 sock_ = sock; 00090 return initializeSocket(); 00091 } 00092 00093 void TransportUDP::socketUpdate(int events) 00094 { 00095 { 00096 boost::mutex::scoped_lock lock(close_mutex_); 00097 00098 if (closed_) 00099 { 00100 return; 00101 } 00102 } 00103 00104 if((events & POLLERR) || 00105 (events & POLLHUP) || 00106 (events & POLLNVAL)) 00107 { 00108 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d", sock_, events); 00109 close(); 00110 } 00111 else 00112 { 00113 if ((events & POLLIN) && expecting_read_) 00114 { 00115 if (read_cb_) 00116 { 00117 read_cb_(shared_from_this()); 00118 } 00119 } 00120 00121 if ((events & POLLOUT) && expecting_write_) 00122 { 00123 if (write_cb_) 00124 { 00125 write_cb_(shared_from_this()); 00126 } 00127 } 00128 } 00129 00130 } 00131 00132 std::string TransportUDP::getTransportInfo() 00133 { 00134 return "UDPROS connection to [" + cached_remote_host_ + "]"; 00135 } 00136 00137 bool TransportUDP::connect(const std::string& host, int port, int connection_id) 00138 { 00139 sock_ = socket(AF_INET, SOCK_DGRAM, 0); 00140 connection_id_ = connection_id; 00141 00142 if (sock_ == ROS_INVALID_SOCKET) 00143 { 00144 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string()); 00145 return false; 00146 } 00147 00148 sockaddr_in sin; 00149 sin.sin_family = AF_INET; 00150 if (inet_addr(host.c_str()) == INADDR_NONE) 00151 { 00152 struct addrinfo* addr; 00153 if (getaddrinfo(host.c_str(), NULL, NULL, &addr) != 0) 00154 { 00155 close(); 00156 ROS_ERROR("couldn't resolve host [%s]", host.c_str()); 00157 return false; 00158 } 00159 00160 bool found = false; 00161 struct addrinfo* it = addr; 00162 for (; it; it = it->ai_next) 00163 { 00164 if (it->ai_family == AF_INET) 00165 { 00166 memcpy(&sin, it->ai_addr, it->ai_addrlen); 00167 sin.sin_family = it->ai_family; 00168 sin.sin_port = htons(port); 00169 00170 found = true; 00171 break; 00172 } 00173 } 00174 00175 freeaddrinfo(addr); 00176 00177 if (!found) 00178 { 00179 ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str()); 00180 return false; 00181 } 00182 00183 ROSCPP_LOG_DEBUG("Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr)); 00184 } 00185 else 00186 { 00187 sin.sin_addr.s_addr = inet_addr(host.c_str()); // already an IP addr 00188 } 00189 00190 sin.sin_port = htons(port); 00191 00192 if (::connect(sock_, (sockaddr *)&sin, sizeof(sin))) 00193 { 00194 ROSCPP_LOG_DEBUG("Connect to udpros host [%s:%d] failed with error [%s]", host.c_str(), port, last_socket_error_string()); 00195 close(); 00196 00197 return false; 00198 } 00199 00200 // from daniel stonier: 00201 #ifdef WIN32 00202 // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless) 00203 // recv() needs to check if its connected or not when its asynchronous? 00204 Sleep(100); 00205 #endif 00206 00207 if (!initializeSocket()) 00208 { 00209 return false; 00210 } 00211 00212 ROSCPP_LOG_DEBUG("Connect succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_); 00213 00214 return true; 00215 } 00216 00217 bool TransportUDP::createIncoming(int port, bool is_server) 00218 { 00219 is_server_ = is_server; 00220 00221 sock_ = socket(AF_INET, SOCK_DGRAM, 0); 00222 00223 if (sock_ <= 0) 00224 { 00225 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string()); 00226 return false; 00227 } 00228 00229 server_address_.sin_family = AF_INET; 00230 server_address_.sin_port = htons(port); 00231 server_address_.sin_addr.s_addr = INADDR_ANY; 00232 if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0) 00233 { 00234 ROS_ERROR("bind() failed with error [%s]", last_socket_error_string()); 00235 return false; 00236 } 00237 00238 socklen_t len = sizeof(server_address_); 00239 getsockname(sock_, (sockaddr *)&server_address_, &len); 00240 server_port_ = ntohs(server_address_.sin_port); 00241 ROSCPP_LOG_DEBUG("UDPROS server listening on port [%d]", server_port_); 00242 00243 if (!initializeSocket()) 00244 { 00245 return false; 00246 } 00247 00248 enableRead(); 00249 00250 return true; 00251 } 00252 00253 bool TransportUDP::initializeSocket() 00254 { 00255 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET); 00256 00257 if (!(flags_ & SYNCHRONOUS)) 00258 { 00259 int result = set_non_blocking(sock_); 00260 if ( result != 0 ) { 00261 ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result); 00262 close(); 00263 return false; 00264 } 00265 } 00266 00267 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS)); 00268 if (poll_set_) 00269 { 00270 poll_set_->addSocket(sock_, boost::bind(&TransportUDP::socketUpdate, this, _1), shared_from_this()); 00271 } 00272 00273 return true; 00274 } 00275 00276 void TransportUDP::close() 00277 { 00278 Callback disconnect_cb; 00279 00280 if (!closed_) 00281 { 00282 { 00283 boost::mutex::scoped_lock lock(close_mutex_); 00284 00285 if (!closed_) 00286 { 00287 closed_ = true; 00288 00289 ROSCPP_LOG_DEBUG("UDP socket [%d] closed", sock_); 00290 00291 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET); 00292 00293 if (poll_set_) 00294 { 00295 poll_set_->delSocket(sock_); 00296 } 00297 00298 if ( close_socket(sock_) != 0 ) 00299 { 00300 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string()); 00301 } 00302 00303 sock_ = ROS_INVALID_SOCKET; 00304 00305 disconnect_cb = disconnect_cb_; 00306 00307 disconnect_cb_ = Callback(); 00308 read_cb_ = Callback(); 00309 write_cb_ = Callback(); 00310 } 00311 } 00312 } 00313 00314 if (disconnect_cb) 00315 { 00316 disconnect_cb(shared_from_this()); 00317 } 00318 } 00319 00320 int32_t TransportUDP::read(uint8_t* buffer, uint32_t size) 00321 { 00322 { 00323 boost::mutex::scoped_lock lock(close_mutex_); 00324 if (closed_) 00325 { 00326 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_); 00327 return -1; 00328 } 00329 } 00330 00331 ROS_ASSERT((int32_t)size > 0); 00332 00333 uint32_t bytes_read = 0; 00334 00335 while (bytes_read < size) 00336 { 00337 TransportUDPHeader header; 00338 00339 uint32_t copy_bytes = 0; 00340 bool from_previous = false; 00341 if (reorder_bytes_) 00342 { 00343 if (reorder_start_ != reorder_buffer_) 00344 { 00345 from_previous = true; 00346 } 00347 00348 copy_bytes = std::min(size - bytes_read, reorder_bytes_); 00349 header = reorder_header_; 00350 memcpy(buffer + bytes_read, reorder_start_, copy_bytes); 00351 reorder_bytes_ -= copy_bytes; 00352 reorder_start_ += copy_bytes; 00353 } 00354 else 00355 { 00356 if (data_filled_ == 0) 00357 { 00358 #if defined(WIN32) 00359 SSIZE_T num_bytes = 0; 00360 DWORD received_bytes = 0; 00361 DWORD flags = 0; 00362 WSABUF iov[2]; 00363 iov[0].buf = reinterpret_cast<char*>(&header); 00364 iov[0].len = sizeof(header); 00365 iov[1].buf = reinterpret_cast<char*>(data_buffer_); 00366 iov[1].len = max_datagram_size_ - sizeof(header); 00367 int rc = WSARecv(sock_, iov, 2, &received_bytes, &flags, NULL, NULL); 00368 if ( rc == SOCKET_ERROR) { 00369 num_bytes = -1; 00370 } else { 00371 num_bytes = received_bytes; 00372 } 00373 #else 00374 ssize_t num_bytes; 00375 struct iovec iov[2]; 00376 iov[0].iov_base = &header; 00377 iov[0].iov_len = sizeof(header); 00378 iov[1].iov_base = data_buffer_; 00379 iov[1].iov_len = max_datagram_size_ - sizeof(header); 00380 // Read a datagram with header 00381 num_bytes = readv(sock_, iov, 2); 00382 #endif 00383 if (num_bytes < 0) 00384 { 00385 if ( last_socket_error_is_would_block() ) 00386 { 00387 num_bytes = 0; 00388 break; 00389 } 00390 else 00391 { 00392 ROSCPP_LOG_DEBUG("readv() failed with error [%s]", last_socket_error_string()); 00393 close(); 00394 break; 00395 } 00396 } 00397 else if (num_bytes == 0) 00398 { 00399 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size); 00400 close(); 00401 return -1; 00402 } 00403 else if (num_bytes < (unsigned) sizeof(header)) 00404 { 00405 ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes), last_socket_error_string()); 00406 close(); 00407 return -1; 00408 } 00409 00410 num_bytes -= sizeof(header); 00411 data_filled_ = num_bytes; 00412 data_start_ = data_buffer_; 00413 } 00414 else 00415 { 00416 from_previous = true; 00417 } 00418 00419 copy_bytes = std::min(size - bytes_read, data_filled_); 00420 // Copy from the data buffer, whether it has data left in it from a previous datagram or 00421 // was just filled by readv() 00422 memcpy(buffer + bytes_read, data_start_, copy_bytes); 00423 data_filled_ = std::max((int64_t)0, (int64_t)data_filled_ - (int64_t)size); 00424 data_start_ += copy_bytes; 00425 } 00426 00427 00428 if (from_previous) 00429 { 00430 bytes_read += copy_bytes; 00431 } 00432 else 00433 { 00434 // Process header 00435 switch (header.op_) 00436 { 00437 case ROS_UDP_DATA0: 00438 if (current_message_id_) 00439 { 00440 ROS_DEBUG("Received new message [%d:%d], while still working on [%d] (block %d of %d)", header.message_id_, header.block_, current_message_id_, last_block_ + 1, total_blocks_); 00441 reorder_header_ = header; 00442 reorder_bytes_ = copy_bytes; 00443 memcpy(reorder_buffer_, buffer + bytes_read, copy_bytes); 00444 reorder_start_ = reorder_buffer_; 00445 current_message_id_ = 0; 00446 total_blocks_ = 0; 00447 last_block_ = 0; 00448 00449 data_filled_ = 0; 00450 data_start_ = data_buffer_; 00451 return -1; 00452 } 00453 total_blocks_ = header.block_; 00454 last_block_ = 0; 00455 current_message_id_ = header.message_id_; 00456 break; 00457 case ROS_UDP_DATAN: 00458 if (header.message_id_ != current_message_id_) 00459 { 00460 ROS_DEBUG("Message Id mismatch: %d != %d", header.message_id_, current_message_id_); 00461 return 0; 00462 } 00463 if (header.block_ != last_block_ + 1) 00464 { 00465 ROS_DEBUG("Expected block %d, received %d", last_block_ + 1, header.block_); 00466 return 0; 00467 } 00468 last_block_ = header.block_; 00469 00470 break; 00471 default: 00472 ROS_ERROR("Unexpected UDP header OP [%d]", header.op_); 00473 return -1; 00474 } 00475 00476 bytes_read += copy_bytes; 00477 00478 if (last_block_ == (total_blocks_ - 1)) 00479 { 00480 current_message_id_ = 0; 00481 break; 00482 } 00483 } 00484 } 00485 00486 return bytes_read; 00487 } 00488 00489 int32_t TransportUDP::write(uint8_t* buffer, uint32_t size) 00490 { 00491 { 00492 boost::mutex::scoped_lock lock(close_mutex_); 00493 00494 if (closed_) 00495 { 00496 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_); 00497 return -1; 00498 } 00499 } 00500 00501 ROS_ASSERT((int32_t)size > 0); 00502 00503 const uint32_t max_payload_size = max_datagram_size_ - sizeof(TransportUDPHeader); 00504 00505 uint32_t bytes_sent = 0; 00506 uint32_t this_block = 0; 00507 if (++current_message_id_ == 0) 00508 ++current_message_id_; 00509 while (bytes_sent < size) 00510 { 00511 TransportUDPHeader header; 00512 header.connection_id_ = connection_id_; 00513 header.message_id_ = current_message_id_; 00514 if (this_block == 0) 00515 { 00516 header.op_ = ROS_UDP_DATA0; 00517 header.block_ = (size + max_payload_size - 1) / max_payload_size; 00518 } 00519 else 00520 { 00521 header.op_ = ROS_UDP_DATAN; 00522 header.block_ = this_block; 00523 } 00524 ++this_block; 00525 #if defined(WIN32) 00526 WSABUF iov[2]; 00527 DWORD sent_bytes; 00528 SSIZE_T num_bytes = 0; 00529 DWORD flags = 0; 00530 int rc; 00531 iov[0].buf = reinterpret_cast<char*>(&header); 00532 iov[0].len = sizeof(header); 00533 iov[1].buf = reinterpret_cast<char*>(buffer + bytes_sent); 00534 iov[1].len = std::min(max_payload_size, size - bytes_sent); 00535 rc = WSASend(sock_, iov, 2, &sent_bytes, flags, NULL, NULL); 00536 num_bytes = sent_bytes; 00537 if (rc == SOCKET_ERROR) { 00538 num_bytes = -1; 00539 } 00540 #else 00541 struct iovec iov[2]; 00542 iov[0].iov_base = &header; 00543 iov[0].iov_len = sizeof(header); 00544 iov[1].iov_base = buffer + bytes_sent; 00545 iov[1].iov_len = std::min(max_payload_size, size - bytes_sent); 00546 ssize_t num_bytes = writev(sock_, iov, 2); 00547 #endif 00548 //usleep(100); 00549 if (num_bytes < 0) 00550 { 00551 if( !last_socket_error_is_would_block() ) // Actually EAGAIN or EWOULDBLOCK on posix 00552 { 00553 ROSCPP_LOG_DEBUG("writev() failed with error [%s]", last_socket_error_string()); 00554 close(); 00555 break; 00556 } 00557 else 00558 { 00559 num_bytes = 0; 00560 } 00561 } 00562 else if (num_bytes < (unsigned) sizeof(header)) 00563 { 00564 ROSCPP_LOG_DEBUG("Socket [%d] short write (%d bytes), closing", sock_, int(num_bytes)); 00565 close(); 00566 break; 00567 } 00568 else 00569 { 00570 num_bytes -= sizeof(header); 00571 } 00572 bytes_sent += num_bytes; 00573 } 00574 00575 return bytes_sent; 00576 } 00577 00578 void TransportUDP::enableRead() 00579 { 00580 { 00581 boost::mutex::scoped_lock lock(close_mutex_); 00582 00583 if (closed_) 00584 { 00585 return; 00586 } 00587 } 00588 00589 if (!expecting_read_) 00590 { 00591 poll_set_->addEvents(sock_, POLLIN); 00592 expecting_read_ = true; 00593 } 00594 } 00595 00596 void TransportUDP::disableRead() 00597 { 00598 ROS_ASSERT(!(flags_ & SYNCHRONOUS)); 00599 00600 { 00601 boost::mutex::scoped_lock lock(close_mutex_); 00602 00603 if (closed_) 00604 { 00605 return; 00606 } 00607 } 00608 00609 if (expecting_read_) 00610 { 00611 poll_set_->delEvents(sock_, POLLIN); 00612 expecting_read_ = false; 00613 } 00614 } 00615 00616 void TransportUDP::enableWrite() 00617 { 00618 { 00619 boost::mutex::scoped_lock lock(close_mutex_); 00620 00621 if (closed_) 00622 { 00623 return; 00624 } 00625 } 00626 00627 if (!expecting_write_) 00628 { 00629 poll_set_->addEvents(sock_, POLLOUT); 00630 expecting_write_ = true; 00631 } 00632 } 00633 00634 void TransportUDP::disableWrite() 00635 { 00636 { 00637 boost::mutex::scoped_lock lock(close_mutex_); 00638 00639 if (closed_) 00640 { 00641 return; 00642 } 00643 } 00644 00645 if (expecting_write_) 00646 { 00647 poll_set_->delEvents(sock_, POLLOUT); 00648 expecting_write_ = false; 00649 } 00650 } 00651 00652 TransportUDPPtr TransportUDP::createOutgoing(std::string host, int port, int connection_id, int max_datagram_size) 00653 { 00654 ROS_ASSERT(is_server_); 00655 00656 TransportUDPPtr transport(new TransportUDP(poll_set_, flags_, max_datagram_size)); 00657 if (!transport->connect(host, port, connection_id)) 00658 { 00659 ROS_ERROR("Failed to create outgoing connection"); 00660 return TransportUDPPtr(); 00661 } 00662 return transport; 00663 00664 } 00665 00666 }