00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/transport/transport_udp.h"
00036 #include "ros/poll_set.h"
00037 #include "ros/file_log.h"
00038
00039 #include <ros/assert.h>
00040 #include <boost/bind.hpp>
00041
00042 #include <fcntl.h>
00043 #if defined(__APPLE__)
00044
00045 #include <sys/types.h>
00046 #include <sys/uio.h>
00047 #include <unistd.h>
00048 #elif defined(__ANDROID__)
00049
00050 #include <sys/uio.h>
00051 #endif
00052
00053 namespace ros
00054 {
00055
00056 TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
00057 : sock_(-1)
00058 , closed_(false)
00059 , expecting_read_(false)
00060 , expecting_write_(false)
00061 , is_server_(false)
00062 , server_port_(-1)
00063 , local_port_(-1)
00064 , poll_set_(poll_set)
00065 , flags_(flags)
00066 , connection_id_(0)
00067 , current_message_id_(0)
00068 , total_blocks_(0)
00069 , last_block_(0)
00070 , max_datagram_size_(max_datagram_size)
00071 , data_filled_(0)
00072 , reorder_buffer_(0)
00073 , reorder_bytes_(0)
00074 {
00075
00076 if (max_datagram_size_ == 0)
00077 max_datagram_size_ = 1500;
00078 reorder_buffer_ = new uint8_t[max_datagram_size_];
00079 reorder_start_ = reorder_buffer_;
00080 data_buffer_ = new uint8_t[max_datagram_size_];
00081 data_start_ = data_buffer_;
00082 }
00083
00084 TransportUDP::~TransportUDP()
00085 {
00086 ROS_ASSERT_MSG(sock_ == ROS_INVALID_SOCKET, "TransportUDP socket [%d] was never closed", sock_);
00087 delete [] reorder_buffer_;
00088 delete [] data_buffer_;
00089 }
00090
00091 bool TransportUDP::setSocket(int sock)
00092 {
00093 sock_ = sock;
00094 return initializeSocket();
00095 }
00096
00097 void TransportUDP::socketUpdate(int events)
00098 {
00099 {
00100 boost::mutex::scoped_lock lock(close_mutex_);
00101
00102 if (closed_)
00103 {
00104 return;
00105 }
00106 }
00107
00108 if((events & POLLERR) ||
00109 (events & POLLHUP) ||
00110 (events & POLLNVAL))
00111 {
00112 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d", sock_, events);
00113 close();
00114 }
00115 else
00116 {
00117 if ((events & POLLIN) && expecting_read_)
00118 {
00119 if (read_cb_)
00120 {
00121 read_cb_(shared_from_this());
00122 }
00123 }
00124
00125 if ((events & POLLOUT) && expecting_write_)
00126 {
00127 if (write_cb_)
00128 {
00129 write_cb_(shared_from_this());
00130 }
00131 }
00132 }
00133
00134 }
00135
00136 std::string TransportUDP::getTransportInfo()
00137 {
00138 std::stringstream str;
00139 str << "UDPROS connection on port " << local_port_ << " to [" << cached_remote_host_ << "]";
00140 return str.str();
00141 }
00142
00143 bool TransportUDP::connect(const std::string& host, int port, int connection_id)
00144 {
00145 if (!isHostAllowed(host))
00146 return false;
00147
00148 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00149 connection_id_ = connection_id;
00150
00151 if (sock_ == ROS_INVALID_SOCKET)
00152 {
00153 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00154 return false;
00155 }
00156
00157 sockaddr_in sin;
00158 sin.sin_family = AF_INET;
00159 if (inet_addr(host.c_str()) == INADDR_NONE)
00160 {
00161 struct addrinfo* addr;
00162 struct addrinfo hints;
00163 memset(&hints, 0, sizeof(hints));
00164 hints.ai_family = AF_UNSPEC;
00165
00166 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
00167 {
00168 close();
00169 ROS_ERROR("couldn't resolve host [%s]", host.c_str());
00170 return false;
00171 }
00172
00173 bool found = false;
00174 struct addrinfo* it = addr;
00175 for (; it; it = it->ai_next)
00176 {
00177 if (it->ai_family == AF_INET)
00178 {
00179 memcpy(&sin, it->ai_addr, it->ai_addrlen);
00180 sin.sin_family = it->ai_family;
00181 sin.sin_port = htons(port);
00182
00183 found = true;
00184 break;
00185 }
00186 }
00187
00188 freeaddrinfo(addr);
00189
00190 if (!found)
00191 {
00192 ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00193 return false;
00194 }
00195
00196 ROSCPP_LOG_DEBUG("Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
00197 }
00198 else
00199 {
00200 sin.sin_addr.s_addr = inet_addr(host.c_str());
00201 }
00202
00203 sin.sin_port = htons(port);
00204
00205 if (::connect(sock_, (sockaddr *)&sin, sizeof(sin)))
00206 {
00207 ROSCPP_LOG_DEBUG("Connect to udpros host [%s:%d] failed with error [%s]", host.c_str(), port, last_socket_error_string());
00208 close();
00209
00210 return false;
00211 }
00212
00213
00214 #ifdef WIN32
00215
00216
00217 Sleep(100);
00218 #endif
00219
00220 std::stringstream ss;
00221 ss << host << ":" << port << " on socket " << sock_;
00222 cached_remote_host_ = ss.str();
00223
00224 if (!initializeSocket())
00225 {
00226 return false;
00227 }
00228
00229 ROSCPP_LOG_DEBUG("Connect succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00230
00231 return true;
00232 }
00233
00234 bool TransportUDP::createIncoming(int port, bool is_server)
00235 {
00236 is_server_ = is_server;
00237
00238 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00239
00240 if (sock_ <= 0)
00241 {
00242 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00243 return false;
00244 }
00245
00246 server_address_.sin_family = AF_INET;
00247 server_address_.sin_port = htons(port);
00248 server_address_.sin_addr.s_addr = isOnlyLocalhostAllowed() ?
00249 htonl(INADDR_LOOPBACK) :
00250 INADDR_ANY;
00251 if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00252 {
00253 ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
00254 return false;
00255 }
00256
00257 socklen_t len = sizeof(server_address_);
00258 getsockname(sock_, (sockaddr *)&server_address_, &len);
00259 server_port_ = ntohs(server_address_.sin_port);
00260 ROSCPP_LOG_DEBUG("UDPROS server listening on port [%d]", server_port_);
00261
00262 if (!initializeSocket())
00263 {
00264 return false;
00265 }
00266
00267 enableRead();
00268
00269 return true;
00270 }
00271
00272 bool TransportUDP::initializeSocket()
00273 {
00274 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00275
00276 if (!(flags_ & SYNCHRONOUS))
00277 {
00278 int result = set_non_blocking(sock_);
00279 if ( result != 0 ) {
00280 ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00281 close();
00282 return false;
00283 }
00284 }
00285
00286 socklen_t len = sizeof(local_address_);
00287 getsockname(sock_, (sockaddr *)&local_address_, &len);
00288 local_port_ = ntohs(local_address_.sin_port);
00289
00290 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00291 if (poll_set_)
00292 {
00293 poll_set_->addSocket(sock_, boost::bind(&TransportUDP::socketUpdate, this, _1), shared_from_this());
00294 }
00295
00296 return true;
00297 }
00298
00299 void TransportUDP::close()
00300 {
00301 Callback disconnect_cb;
00302
00303 if (!closed_)
00304 {
00305 {
00306 boost::mutex::scoped_lock lock(close_mutex_);
00307
00308 if (!closed_)
00309 {
00310 closed_ = true;
00311
00312 ROSCPP_LOG_DEBUG("UDP socket [%d] closed", sock_);
00313
00314 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00315
00316 if (poll_set_)
00317 {
00318 poll_set_->delSocket(sock_);
00319 }
00320
00321 if ( close_socket(sock_) != 0 )
00322 {
00323 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00324 }
00325
00326 sock_ = ROS_INVALID_SOCKET;
00327
00328 disconnect_cb = disconnect_cb_;
00329
00330 disconnect_cb_ = Callback();
00331 read_cb_ = Callback();
00332 write_cb_ = Callback();
00333 }
00334 }
00335 }
00336
00337 if (disconnect_cb)
00338 {
00339 disconnect_cb(shared_from_this());
00340 }
00341 }
00342
00343 int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
00344 {
00345 {
00346 boost::mutex::scoped_lock lock(close_mutex_);
00347 if (closed_)
00348 {
00349 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00350 return -1;
00351 }
00352 }
00353
00354 ROS_ASSERT((int32_t)size > 0);
00355
00356 uint32_t bytes_read = 0;
00357
00358 while (bytes_read < size)
00359 {
00360 TransportUDPHeader header;
00361
00362
00363
00364
00365 uint32_t copy_bytes = 0;
00366 bool from_previous = false;
00367 if (reorder_bytes_)
00368 {
00369 if (reorder_start_ != reorder_buffer_)
00370 {
00371 from_previous = true;
00372 }
00373
00374 copy_bytes = std::min(size - bytes_read, reorder_bytes_);
00375 header = reorder_header_;
00376 memcpy(buffer + bytes_read, reorder_start_, copy_bytes);
00377 reorder_bytes_ -= copy_bytes;
00378 reorder_start_ += copy_bytes;
00379 }
00380 else
00381 {
00382 if (data_filled_ == 0)
00383 {
00384 #if defined(WIN32)
00385 SSIZE_T num_bytes = 0;
00386 DWORD received_bytes = 0;
00387 DWORD flags = 0;
00388 WSABUF iov[2];
00389 iov[0].buf = reinterpret_cast<char*>(&header);
00390 iov[0].len = sizeof(header);
00391 iov[1].buf = reinterpret_cast<char*>(data_buffer_);
00392 iov[1].len = max_datagram_size_ - sizeof(header);
00393 int rc = WSARecv(sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
00394 if ( rc == SOCKET_ERROR) {
00395 num_bytes = -1;
00396 } else {
00397 num_bytes = received_bytes;
00398 }
00399 #else
00400 ssize_t num_bytes;
00401 struct iovec iov[2];
00402 iov[0].iov_base = &header;
00403 iov[0].iov_len = sizeof(header);
00404 iov[1].iov_base = data_buffer_;
00405 iov[1].iov_len = max_datagram_size_ - sizeof(header);
00406
00407 num_bytes = readv(sock_, iov, 2);
00408 #endif
00409 if (num_bytes < 0)
00410 {
00411 if ( last_socket_error_is_would_block() )
00412 {
00413 num_bytes = 0;
00414 break;
00415 }
00416 else
00417 {
00418 ROSCPP_LOG_DEBUG("readv() failed with error [%s]", last_socket_error_string());
00419 close();
00420 break;
00421 }
00422 }
00423 else if (num_bytes == 0)
00424 {
00425 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00426 close();
00427 return -1;
00428 }
00429 else if (num_bytes < (unsigned) sizeof(header))
00430 {
00431 ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes), last_socket_error_string());
00432 close();
00433 return -1;
00434 }
00435
00436 num_bytes -= sizeof(header);
00437 data_filled_ = num_bytes;
00438 data_start_ = data_buffer_;
00439 }
00440 else
00441 {
00442 from_previous = true;
00443 }
00444
00445 copy_bytes = std::min(size - bytes_read, data_filled_);
00446
00447
00448 memcpy(buffer + bytes_read, data_start_, copy_bytes);
00449 data_filled_ -= copy_bytes;
00450 data_start_ += copy_bytes;
00451 }
00452
00453
00454 if (from_previous)
00455 {
00456
00457
00458 bytes_read += copy_bytes;
00459 }
00460 else
00461 {
00462
00463 switch (header.op_)
00464 {
00465 case ROS_UDP_DATA0:
00466 if (current_message_id_)
00467 {
00468 ROS_DEBUG("Received new message [%d:%d], while still working on [%d] (block %d of %d)", header.message_id_, header.block_, current_message_id_, last_block_ + 1, total_blocks_);
00469 reorder_header_ = header;
00470
00471
00472
00473 reorder_bytes_ = data_filled_ + (data_start_ - data_buffer_);
00474 memcpy(reorder_buffer_, data_buffer_, reorder_bytes_);
00475 reorder_start_ = reorder_buffer_;
00476 current_message_id_ = 0;
00477 total_blocks_ = 0;
00478 last_block_ = 0;
00479
00480 data_filled_ = 0;
00481 data_start_ = data_buffer_;
00482 return -1;
00483 }
00484 total_blocks_ = header.block_;
00485 last_block_ = 0;
00486 current_message_id_ = header.message_id_;
00487 break;
00488 case ROS_UDP_DATAN:
00489 if (header.message_id_ != current_message_id_)
00490 {
00491 ROS_DEBUG("Message Id mismatch: %d != %d", header.message_id_, current_message_id_);
00492 data_filled_ = 0;
00493 return 0;
00494 }
00495 if (header.block_ != last_block_ + 1)
00496 {
00497 ROS_DEBUG("Expected block %d, received %d", last_block_ + 1, header.block_);
00498 data_filled_ = 0;
00499 return 0;
00500 }
00501 last_block_ = header.block_;
00502
00503 break;
00504 default:
00505 ROS_ERROR("Unexpected UDP header OP [%d]", header.op_);
00506 return -1;
00507 }
00508
00509 bytes_read += copy_bytes;
00510
00511 if (last_block_ == (total_blocks_ - 1))
00512 {
00513 current_message_id_ = 0;
00514 break;
00515 }
00516 }
00517 }
00518
00519 return bytes_read;
00520 }
00521
00522 int32_t TransportUDP::write(uint8_t* buffer, uint32_t size)
00523 {
00524 {
00525 boost::mutex::scoped_lock lock(close_mutex_);
00526
00527 if (closed_)
00528 {
00529 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00530 return -1;
00531 }
00532 }
00533
00534 ROS_ASSERT((int32_t)size > 0);
00535
00536 const uint32_t max_payload_size = max_datagram_size_ - sizeof(TransportUDPHeader);
00537
00538 uint32_t bytes_sent = 0;
00539 uint32_t this_block = 0;
00540 if (++current_message_id_ == 0)
00541 ++current_message_id_;
00542 while (bytes_sent < size)
00543 {
00544 TransportUDPHeader header;
00545 header.connection_id_ = connection_id_;
00546 header.message_id_ = current_message_id_;
00547 if (this_block == 0)
00548 {
00549 header.op_ = ROS_UDP_DATA0;
00550 header.block_ = (size + max_payload_size - 1) / max_payload_size;
00551 }
00552 else
00553 {
00554 header.op_ = ROS_UDP_DATAN;
00555 header.block_ = this_block;
00556 }
00557 ++this_block;
00558 #if defined(WIN32)
00559 WSABUF iov[2];
00560 DWORD sent_bytes;
00561 SSIZE_T num_bytes = 0;
00562 DWORD flags = 0;
00563 int rc;
00564 iov[0].buf = reinterpret_cast<char*>(&header);
00565 iov[0].len = sizeof(header);
00566 iov[1].buf = reinterpret_cast<char*>(buffer + bytes_sent);
00567 iov[1].len = std::min(max_payload_size, size - bytes_sent);
00568 rc = WSASend(sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
00569 num_bytes = sent_bytes;
00570 if (rc == SOCKET_ERROR) {
00571 num_bytes = -1;
00572 }
00573 #else
00574 struct iovec iov[2];
00575 iov[0].iov_base = &header;
00576 iov[0].iov_len = sizeof(header);
00577 iov[1].iov_base = buffer + bytes_sent;
00578 iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
00579 ssize_t num_bytes = writev(sock_, iov, 2);
00580 #endif
00581
00582 if (num_bytes < 0)
00583 {
00584 if( !last_socket_error_is_would_block() )
00585 {
00586 ROSCPP_LOG_DEBUG("writev() failed with error [%s]", last_socket_error_string());
00587 close();
00588 break;
00589 }
00590 else
00591 {
00592 num_bytes = 0;
00593 --this_block;
00594 }
00595 }
00596 else if (num_bytes < (unsigned) sizeof(header))
00597 {
00598 ROSCPP_LOG_DEBUG("Socket [%d] short write (%d bytes), closing", sock_, int(num_bytes));
00599 close();
00600 break;
00601 }
00602 else
00603 {
00604 num_bytes -= sizeof(header);
00605 }
00606 bytes_sent += num_bytes;
00607 }
00608
00609 return bytes_sent;
00610 }
00611
00612 void TransportUDP::enableRead()
00613 {
00614 {
00615 boost::mutex::scoped_lock lock(close_mutex_);
00616
00617 if (closed_)
00618 {
00619 return;
00620 }
00621 }
00622
00623 if (!expecting_read_)
00624 {
00625 poll_set_->addEvents(sock_, POLLIN);
00626 expecting_read_ = true;
00627 }
00628 }
00629
00630 void TransportUDP::disableRead()
00631 {
00632 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00633
00634 {
00635 boost::mutex::scoped_lock lock(close_mutex_);
00636
00637 if (closed_)
00638 {
00639 return;
00640 }
00641 }
00642
00643 if (expecting_read_)
00644 {
00645 poll_set_->delEvents(sock_, POLLIN);
00646 expecting_read_ = false;
00647 }
00648 }
00649
00650 void TransportUDP::enableWrite()
00651 {
00652 {
00653 boost::mutex::scoped_lock lock(close_mutex_);
00654
00655 if (closed_)
00656 {
00657 return;
00658 }
00659 }
00660
00661 if (!expecting_write_)
00662 {
00663 poll_set_->addEvents(sock_, POLLOUT);
00664 expecting_write_ = true;
00665 }
00666 }
00667
00668 void TransportUDP::disableWrite()
00669 {
00670 {
00671 boost::mutex::scoped_lock lock(close_mutex_);
00672
00673 if (closed_)
00674 {
00675 return;
00676 }
00677 }
00678
00679 if (expecting_write_)
00680 {
00681 poll_set_->delEvents(sock_, POLLOUT);
00682 expecting_write_ = false;
00683 }
00684 }
00685
00686 TransportUDPPtr TransportUDP::createOutgoing(std::string host, int port, int connection_id, int max_datagram_size)
00687 {
00688 ROS_ASSERT(is_server_);
00689
00690 TransportUDPPtr transport(boost::make_shared<TransportUDP>(poll_set_, flags_, max_datagram_size));
00691 if (!transport->connect(host, port, connection_id))
00692 {
00693 ROS_ERROR("Failed to create outgoing connection");
00694 return TransportUDPPtr();
00695 }
00696 return transport;
00697
00698 }
00699
00700 std::string TransportUDP::getClientURI()
00701 {
00702 ROS_ASSERT(!is_server_);
00703
00704 sockaddr_storage sas;
00705 socklen_t sas_len = sizeof(sas);
00706 getpeername(sock_, (sockaddr *)&sas, &sas_len);
00707
00708 sockaddr_in *sin = (sockaddr_in *)&sas;
00709
00710 char namebuf[128];
00711 int port = ntohs(sin->sin_port);
00712 strcpy(namebuf, inet_ntoa(sin->sin_addr));
00713
00714 std::string ip = namebuf;
00715 std::stringstream uri;
00716 uri << ip << ":" << port;
00717
00718 return uri.str();
00719 }
00720
00721 }