00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/transport/transport_udp.h"
00036 #include "ros/poll_set.h"
00037 #include "ros/file_log.h"
00038
00039 #include <ros/assert.h>
00040 #include <boost/bind.hpp>
00041
00042 #include <fcntl.h>
00043 #if defined(__APPLE__)
00044
00045 #include <sys/types.h>
00046 #include <sys/uio.h>
00047 #include <unistd.h>
00048 #endif
00049
00050 namespace ros
00051 {
00052
00053 TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
00054 : sock_(-1)
00055 , closed_(false)
00056 , expecting_read_(false)
00057 , expecting_write_(false)
00058 , is_server_(false)
00059 , server_port_(-1)
00060 , poll_set_(poll_set)
00061 , flags_(flags)
00062 , connection_id_(0)
00063 , current_message_id_(0)
00064 , total_blocks_(0)
00065 , last_block_(0)
00066 , max_datagram_size_(max_datagram_size)
00067 , data_filled_(0)
00068 , reorder_buffer_(0)
00069 , reorder_bytes_(0)
00070 {
00071
00072 if (max_datagram_size_ == 0)
00073 max_datagram_size_ = 1500;
00074 reorder_buffer_ = new uint8_t[max_datagram_size_];
00075 reorder_start_ = reorder_buffer_;
00076 data_buffer_ = new uint8_t[max_datagram_size_];
00077 data_start_ = data_buffer_;
00078 }
00079
00080 TransportUDP::~TransportUDP()
00081 {
00082 ROS_ASSERT_MSG(sock_ == ROS_INVALID_SOCKET, "TransportUDP socket [%d] was never closed", sock_);
00083 delete [] reorder_buffer_;
00084 delete [] data_buffer_;
00085 }
00086
00087 bool TransportUDP::setSocket(int sock)
00088 {
00089 sock_ = sock;
00090 return initializeSocket();
00091 }
00092
00093 void TransportUDP::socketUpdate(int events)
00094 {
00095 {
00096 boost::mutex::scoped_lock lock(close_mutex_);
00097
00098 if (closed_)
00099 {
00100 return;
00101 }
00102 }
00103
00104 if((events & POLLERR) ||
00105 (events & POLLHUP) ||
00106 (events & POLLNVAL))
00107 {
00108 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d", sock_, events);
00109 close();
00110 }
00111 else
00112 {
00113 if ((events & POLLIN) && expecting_read_)
00114 {
00115 if (read_cb_)
00116 {
00117 read_cb_(shared_from_this());
00118 }
00119 }
00120
00121 if ((events & POLLOUT) && expecting_write_)
00122 {
00123 if (write_cb_)
00124 {
00125 write_cb_(shared_from_this());
00126 }
00127 }
00128 }
00129
00130 }
00131
00132 std::string TransportUDP::getTransportInfo()
00133 {
00134 return "UDPROS connection to [" + cached_remote_host_ + "]";
00135 }
00136
00137 bool TransportUDP::connect(const std::string& host, int port, int connection_id)
00138 {
00139 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00140 connection_id_ = connection_id;
00141
00142 if (sock_ == ROS_INVALID_SOCKET)
00143 {
00144 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00145 return false;
00146 }
00147
00148 sockaddr_in sin;
00149 sin.sin_family = AF_INET;
00150 if (inet_addr(host.c_str()) == INADDR_NONE)
00151 {
00152 struct addrinfo* addr;
00153 struct addrinfo hints;
00154 memset(&hints, 0, sizeof(hints));
00155 hints.ai_family = AF_UNSPEC;
00156
00157 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
00158 {
00159 close();
00160 ROS_ERROR("couldn't resolve host [%s]", host.c_str());
00161 return false;
00162 }
00163
00164 bool found = false;
00165 struct addrinfo* it = addr;
00166 for (; it; it = it->ai_next)
00167 {
00168 if (it->ai_family == AF_INET)
00169 {
00170 memcpy(&sin, it->ai_addr, it->ai_addrlen);
00171 sin.sin_family = it->ai_family;
00172 sin.sin_port = htons(port);
00173
00174 found = true;
00175 break;
00176 }
00177 }
00178
00179 freeaddrinfo(addr);
00180
00181 if (!found)
00182 {
00183 ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00184 return false;
00185 }
00186
00187 ROSCPP_LOG_DEBUG("Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
00188 }
00189 else
00190 {
00191 sin.sin_addr.s_addr = inet_addr(host.c_str());
00192 }
00193
00194 sin.sin_port = htons(port);
00195
00196 if (::connect(sock_, (sockaddr *)&sin, sizeof(sin)))
00197 {
00198 ROSCPP_LOG_DEBUG("Connect to udpros host [%s:%d] failed with error [%s]", host.c_str(), port, last_socket_error_string());
00199 close();
00200
00201 return false;
00202 }
00203
00204
00205 #ifdef WIN32
00206
00207
00208 Sleep(100);
00209 #endif
00210
00211 if (!initializeSocket())
00212 {
00213 return false;
00214 }
00215
00216 ROSCPP_LOG_DEBUG("Connect succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00217
00218 return true;
00219 }
00220
00221 bool TransportUDP::createIncoming(int port, bool is_server)
00222 {
00223 is_server_ = is_server;
00224
00225 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00226
00227 if (sock_ <= 0)
00228 {
00229 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00230 return false;
00231 }
00232
00233 server_address_.sin_family = AF_INET;
00234 server_address_.sin_port = htons(port);
00235 server_address_.sin_addr.s_addr = INADDR_ANY;
00236 if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00237 {
00238 ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
00239 return false;
00240 }
00241
00242 socklen_t len = sizeof(server_address_);
00243 getsockname(sock_, (sockaddr *)&server_address_, &len);
00244 server_port_ = ntohs(server_address_.sin_port);
00245 ROSCPP_LOG_DEBUG("UDPROS server listening on port [%d]", server_port_);
00246
00247 if (!initializeSocket())
00248 {
00249 return false;
00250 }
00251
00252 enableRead();
00253
00254 return true;
00255 }
00256
00257 bool TransportUDP::initializeSocket()
00258 {
00259 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00260
00261 if (!(flags_ & SYNCHRONOUS))
00262 {
00263 int result = set_non_blocking(sock_);
00264 if ( result != 0 ) {
00265 ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00266 close();
00267 return false;
00268 }
00269 }
00270
00271 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00272 if (poll_set_)
00273 {
00274 poll_set_->addSocket(sock_, boost::bind(&TransportUDP::socketUpdate, this, _1), shared_from_this());
00275 }
00276
00277 return true;
00278 }
00279
00280 void TransportUDP::close()
00281 {
00282 Callback disconnect_cb;
00283
00284 if (!closed_)
00285 {
00286 {
00287 boost::mutex::scoped_lock lock(close_mutex_);
00288
00289 if (!closed_)
00290 {
00291 closed_ = true;
00292
00293 ROSCPP_LOG_DEBUG("UDP socket [%d] closed", sock_);
00294
00295 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00296
00297 if (poll_set_)
00298 {
00299 poll_set_->delSocket(sock_);
00300 }
00301
00302 if ( close_socket(sock_) != 0 )
00303 {
00304 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00305 }
00306
00307 sock_ = ROS_INVALID_SOCKET;
00308
00309 disconnect_cb = disconnect_cb_;
00310
00311 disconnect_cb_ = Callback();
00312 read_cb_ = Callback();
00313 write_cb_ = Callback();
00314 }
00315 }
00316 }
00317
00318 if (disconnect_cb)
00319 {
00320 disconnect_cb(shared_from_this());
00321 }
00322 }
00323
00324 int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
00325 {
00326 {
00327 boost::mutex::scoped_lock lock(close_mutex_);
00328 if (closed_)
00329 {
00330 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00331 return -1;
00332 }
00333 }
00334
00335 ROS_ASSERT((int32_t)size > 0);
00336
00337 uint32_t bytes_read = 0;
00338
00339 while (bytes_read < size)
00340 {
00341 TransportUDPHeader header;
00342
00343
00344
00345
00346 uint32_t copy_bytes = 0;
00347 bool from_previous = false;
00348 if (reorder_bytes_)
00349 {
00350 if (reorder_start_ != reorder_buffer_)
00351 {
00352 from_previous = true;
00353 }
00354
00355 copy_bytes = std::min(size - bytes_read, reorder_bytes_);
00356 header = reorder_header_;
00357 memcpy(buffer + bytes_read, reorder_start_, copy_bytes);
00358 reorder_bytes_ -= copy_bytes;
00359 reorder_start_ += copy_bytes;
00360 }
00361 else
00362 {
00363 if (data_filled_ == 0)
00364 {
00365 #if defined(WIN32)
00366 SSIZE_T num_bytes = 0;
00367 DWORD received_bytes = 0;
00368 DWORD flags = 0;
00369 WSABUF iov[2];
00370 iov[0].buf = reinterpret_cast<char*>(&header);
00371 iov[0].len = sizeof(header);
00372 iov[1].buf = reinterpret_cast<char*>(data_buffer_);
00373 iov[1].len = max_datagram_size_ - sizeof(header);
00374 int rc = WSARecv(sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
00375 if ( rc == SOCKET_ERROR) {
00376 num_bytes = -1;
00377 } else {
00378 num_bytes = received_bytes;
00379 }
00380 #else
00381 ssize_t num_bytes;
00382 struct iovec iov[2];
00383 iov[0].iov_base = &header;
00384 iov[0].iov_len = sizeof(header);
00385 iov[1].iov_base = data_buffer_;
00386 iov[1].iov_len = max_datagram_size_ - sizeof(header);
00387
00388 num_bytes = readv(sock_, iov, 2);
00389 #endif
00390 if (num_bytes < 0)
00391 {
00392 if ( last_socket_error_is_would_block() )
00393 {
00394 num_bytes = 0;
00395 break;
00396 }
00397 else
00398 {
00399 ROSCPP_LOG_DEBUG("readv() failed with error [%s]", last_socket_error_string());
00400 close();
00401 break;
00402 }
00403 }
00404 else if (num_bytes == 0)
00405 {
00406 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00407 close();
00408 return -1;
00409 }
00410 else if (num_bytes < (unsigned) sizeof(header))
00411 {
00412 ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes), last_socket_error_string());
00413 close();
00414 return -1;
00415 }
00416
00417 num_bytes -= sizeof(header);
00418 data_filled_ = num_bytes;
00419 data_start_ = data_buffer_;
00420 }
00421 else
00422 {
00423 from_previous = true;
00424 }
00425
00426 copy_bytes = std::min(size - bytes_read, data_filled_);
00427
00428
00429 memcpy(buffer + bytes_read, data_start_, copy_bytes);
00430 data_filled_ -= copy_bytes;
00431 data_start_ += copy_bytes;
00432 }
00433
00434
00435 if (from_previous)
00436 {
00437
00438
00439 bytes_read += copy_bytes;
00440 }
00441 else
00442 {
00443
00444 switch (header.op_)
00445 {
00446 case ROS_UDP_DATA0:
00447 if (current_message_id_)
00448 {
00449 ROS_DEBUG("Received new message [%d:%d], while still working on [%d] (block %d of %d)", header.message_id_, header.block_, current_message_id_, last_block_ + 1, total_blocks_);
00450 reorder_header_ = header;
00451
00452
00453
00454 reorder_bytes_ = data_filled_ + (data_start_ - data_buffer_);
00455 memcpy(reorder_buffer_, data_buffer_, reorder_bytes_);
00456 reorder_start_ = reorder_buffer_;
00457 current_message_id_ = 0;
00458 total_blocks_ = 0;
00459 last_block_ = 0;
00460
00461 data_filled_ = 0;
00462 data_start_ = data_buffer_;
00463 return -1;
00464 }
00465 total_blocks_ = header.block_;
00466 last_block_ = 0;
00467 current_message_id_ = header.message_id_;
00468 break;
00469 case ROS_UDP_DATAN:
00470 if (header.message_id_ != current_message_id_)
00471 {
00472 ROS_DEBUG("Message Id mismatch: %d != %d", header.message_id_, current_message_id_);
00473 data_filled_ = 0;
00474 return 0;
00475 }
00476 if (header.block_ != last_block_ + 1)
00477 {
00478 ROS_DEBUG("Expected block %d, received %d", last_block_ + 1, header.block_);
00479 data_filled_ = 0;
00480 return 0;
00481 }
00482 last_block_ = header.block_;
00483
00484 break;
00485 default:
00486 ROS_ERROR("Unexpected UDP header OP [%d]", header.op_);
00487 return -1;
00488 }
00489
00490 bytes_read += copy_bytes;
00491
00492 if (last_block_ == (total_blocks_ - 1))
00493 {
00494 current_message_id_ = 0;
00495 break;
00496 }
00497 }
00498 }
00499
00500 return bytes_read;
00501 }
00502
00503 int32_t TransportUDP::write(uint8_t* buffer, uint32_t size)
00504 {
00505 {
00506 boost::mutex::scoped_lock lock(close_mutex_);
00507
00508 if (closed_)
00509 {
00510 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00511 return -1;
00512 }
00513 }
00514
00515 ROS_ASSERT((int32_t)size > 0);
00516
00517 const uint32_t max_payload_size = max_datagram_size_ - sizeof(TransportUDPHeader);
00518
00519 uint32_t bytes_sent = 0;
00520 uint32_t this_block = 0;
00521 if (++current_message_id_ == 0)
00522 ++current_message_id_;
00523 while (bytes_sent < size)
00524 {
00525 TransportUDPHeader header;
00526 header.connection_id_ = connection_id_;
00527 header.message_id_ = current_message_id_;
00528 if (this_block == 0)
00529 {
00530 header.op_ = ROS_UDP_DATA0;
00531 header.block_ = (size + max_payload_size - 1) / max_payload_size;
00532 }
00533 else
00534 {
00535 header.op_ = ROS_UDP_DATAN;
00536 header.block_ = this_block;
00537 }
00538 ++this_block;
00539 #if defined(WIN32)
00540 WSABUF iov[2];
00541 DWORD sent_bytes;
00542 SSIZE_T num_bytes = 0;
00543 DWORD flags = 0;
00544 int rc;
00545 iov[0].buf = reinterpret_cast<char*>(&header);
00546 iov[0].len = sizeof(header);
00547 iov[1].buf = reinterpret_cast<char*>(buffer + bytes_sent);
00548 iov[1].len = std::min(max_payload_size, size - bytes_sent);
00549 rc = WSASend(sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
00550 num_bytes = sent_bytes;
00551 if (rc == SOCKET_ERROR) {
00552 num_bytes = -1;
00553 }
00554 #else
00555 struct iovec iov[2];
00556 iov[0].iov_base = &header;
00557 iov[0].iov_len = sizeof(header);
00558 iov[1].iov_base = buffer + bytes_sent;
00559 iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
00560 ssize_t num_bytes = writev(sock_, iov, 2);
00561 #endif
00562
00563 if (num_bytes < 0)
00564 {
00565 if( !last_socket_error_is_would_block() )
00566 {
00567 ROSCPP_LOG_DEBUG("writev() failed with error [%s]", last_socket_error_string());
00568 close();
00569 break;
00570 }
00571 else
00572 {
00573 num_bytes = 0;
00574 }
00575 }
00576 else if (num_bytes < (unsigned) sizeof(header))
00577 {
00578 ROSCPP_LOG_DEBUG("Socket [%d] short write (%d bytes), closing", sock_, int(num_bytes));
00579 close();
00580 break;
00581 }
00582 else
00583 {
00584 num_bytes -= sizeof(header);
00585 }
00586 bytes_sent += num_bytes;
00587 }
00588
00589 return bytes_sent;
00590 }
00591
00592 void TransportUDP::enableRead()
00593 {
00594 {
00595 boost::mutex::scoped_lock lock(close_mutex_);
00596
00597 if (closed_)
00598 {
00599 return;
00600 }
00601 }
00602
00603 if (!expecting_read_)
00604 {
00605 poll_set_->addEvents(sock_, POLLIN);
00606 expecting_read_ = true;
00607 }
00608 }
00609
00610 void TransportUDP::disableRead()
00611 {
00612 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00613
00614 {
00615 boost::mutex::scoped_lock lock(close_mutex_);
00616
00617 if (closed_)
00618 {
00619 return;
00620 }
00621 }
00622
00623 if (expecting_read_)
00624 {
00625 poll_set_->delEvents(sock_, POLLIN);
00626 expecting_read_ = false;
00627 }
00628 }
00629
00630 void TransportUDP::enableWrite()
00631 {
00632 {
00633 boost::mutex::scoped_lock lock(close_mutex_);
00634
00635 if (closed_)
00636 {
00637 return;
00638 }
00639 }
00640
00641 if (!expecting_write_)
00642 {
00643 poll_set_->addEvents(sock_, POLLOUT);
00644 expecting_write_ = true;
00645 }
00646 }
00647
00648 void TransportUDP::disableWrite()
00649 {
00650 {
00651 boost::mutex::scoped_lock lock(close_mutex_);
00652
00653 if (closed_)
00654 {
00655 return;
00656 }
00657 }
00658
00659 if (expecting_write_)
00660 {
00661 poll_set_->delEvents(sock_, POLLOUT);
00662 expecting_write_ = false;
00663 }
00664 }
00665
00666 TransportUDPPtr TransportUDP::createOutgoing(std::string host, int port, int connection_id, int max_datagram_size)
00667 {
00668 ROS_ASSERT(is_server_);
00669
00670 TransportUDPPtr transport(new TransportUDP(poll_set_, flags_, max_datagram_size));
00671 if (!transport->connect(host, port, connection_id))
00672 {
00673 ROS_ERROR("Failed to create outgoing connection");
00674 return TransportUDPPtr();
00675 }
00676 return transport;
00677
00678 }
00679
00680 }