00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/transport/transport_udp.h"
00036 #include "ros/poll_set.h"
00037 #include "ros/file_log.h"
00038
00039 #include <ros/assert.h>
00040 #include <boost/bind.hpp>
00041
00042 #include <fcntl.h>
00043 #if defined(__APPLE__)
00044
00045 #include <sys/types.h>
00046 #include <sys/uio.h>
00047 #include <unistd.h>
00048 #endif
00049
00050 namespace ros
00051 {
00052
00053 TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
00054 : sock_(-1)
00055 , closed_(false)
00056 , expecting_read_(false)
00057 , expecting_write_(false)
00058 , is_server_(false)
00059 , server_port_(-1)
00060 , poll_set_(poll_set)
00061 , flags_(flags)
00062 , connection_id_(0)
00063 , current_message_id_(0)
00064 , total_blocks_(0)
00065 , last_block_(0)
00066 , max_datagram_size_(max_datagram_size)
00067 , data_filled_(0)
00068 , reorder_buffer_(0)
00069 , reorder_bytes_(0)
00070 {
00071
00072 if (max_datagram_size_ == 0)
00073 max_datagram_size_ = 1500;
00074 reorder_buffer_ = new uint8_t[max_datagram_size_];
00075 reorder_start_ = reorder_buffer_;
00076 data_buffer_ = new uint8_t[max_datagram_size_];
00077 data_start_ = data_buffer_;
00078 }
00079
00080 TransportUDP::~TransportUDP()
00081 {
00082 ROS_ASSERT_MSG(sock_ == ROS_INVALID_SOCKET, "TransportUDP socket [%d] was never closed", sock_);
00083 delete [] reorder_buffer_;
00084 delete [] data_buffer_;
00085 }
00086
00087 bool TransportUDP::setSocket(int sock)
00088 {
00089 sock_ = sock;
00090 return initializeSocket();
00091 }
00092
00093 void TransportUDP::socketUpdate(int events)
00094 {
00095 {
00096 boost::mutex::scoped_lock lock(close_mutex_);
00097
00098 if (closed_)
00099 {
00100 return;
00101 }
00102 }
00103
00104 if((events & POLLERR) ||
00105 (events & POLLHUP) ||
00106 (events & POLLNVAL))
00107 {
00108 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d", sock_, events);
00109 close();
00110 }
00111 else
00112 {
00113 if ((events & POLLIN) && expecting_read_)
00114 {
00115 if (read_cb_)
00116 {
00117 read_cb_(shared_from_this());
00118 }
00119 }
00120
00121 if ((events & POLLOUT) && expecting_write_)
00122 {
00123 if (write_cb_)
00124 {
00125 write_cb_(shared_from_this());
00126 }
00127 }
00128 }
00129
00130 }
00131
00132 std::string TransportUDP::getTransportInfo()
00133 {
00134 return "UDPROS connection to [" + cached_remote_host_ + "]";
00135 }
00136
00137 bool TransportUDP::connect(const std::string& host, int port, int connection_id)
00138 {
00139 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00140 connection_id_ = connection_id;
00141
00142 if (sock_ == ROS_INVALID_SOCKET)
00143 {
00144 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00145 return false;
00146 }
00147
00148 sockaddr_in sin;
00149 sin.sin_family = AF_INET;
00150 if (inet_addr(host.c_str()) == INADDR_NONE)
00151 {
00152 struct addrinfo* addr;
00153 if (getaddrinfo(host.c_str(), NULL, NULL, &addr) != 0)
00154 {
00155 close();
00156 ROS_ERROR("couldn't resolve host [%s]", host.c_str());
00157 return false;
00158 }
00159
00160 bool found = false;
00161 struct addrinfo* it = addr;
00162 for (; it; it = it->ai_next)
00163 {
00164 if (it->ai_family == AF_INET)
00165 {
00166 memcpy(&sin, it->ai_addr, it->ai_addrlen);
00167 sin.sin_family = it->ai_family;
00168 sin.sin_port = htons(port);
00169
00170 found = true;
00171 break;
00172 }
00173 }
00174
00175 freeaddrinfo(addr);
00176
00177 if (!found)
00178 {
00179 ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00180 return false;
00181 }
00182
00183 ROSCPP_LOG_DEBUG("Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
00184 }
00185 else
00186 {
00187 sin.sin_addr.s_addr = inet_addr(host.c_str());
00188 }
00189
00190 sin.sin_port = htons(port);
00191
00192 if (::connect(sock_, (sockaddr *)&sin, sizeof(sin)))
00193 {
00194 ROSCPP_LOG_DEBUG("Connect to udpros host [%s:%d] failed with error [%s]", host.c_str(), port, last_socket_error_string());
00195 close();
00196
00197 return false;
00198 }
00199
00200
00201 #ifdef WIN32
00202
00203
00204 Sleep(100);
00205 #endif
00206
00207 if (!initializeSocket())
00208 {
00209 return false;
00210 }
00211
00212 ROSCPP_LOG_DEBUG("Connect succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00213
00214 return true;
00215 }
00216
00217 bool TransportUDP::createIncoming(int port, bool is_server)
00218 {
00219 is_server_ = is_server;
00220
00221 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00222
00223 if (sock_ <= 0)
00224 {
00225 ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00226 return false;
00227 }
00228
00229 server_address_.sin_family = AF_INET;
00230 server_address_.sin_port = htons(port);
00231 server_address_.sin_addr.s_addr = INADDR_ANY;
00232 if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00233 {
00234 ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
00235 return false;
00236 }
00237
00238 socklen_t len = sizeof(server_address_);
00239 getsockname(sock_, (sockaddr *)&server_address_, &len);
00240 server_port_ = ntohs(server_address_.sin_port);
00241 ROSCPP_LOG_DEBUG("UDPROS server listening on port [%d]", server_port_);
00242
00243 if (!initializeSocket())
00244 {
00245 return false;
00246 }
00247
00248 enableRead();
00249
00250 return true;
00251 }
00252
00253 bool TransportUDP::initializeSocket()
00254 {
00255 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00256
00257 if (!(flags_ & SYNCHRONOUS))
00258 {
00259 int result = set_non_blocking(sock_);
00260 if ( result != 0 ) {
00261 ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00262 close();
00263 return false;
00264 }
00265 }
00266
00267 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00268 if (poll_set_)
00269 {
00270 poll_set_->addSocket(sock_, boost::bind(&TransportUDP::socketUpdate, this, _1), shared_from_this());
00271 }
00272
00273 return true;
00274 }
00275
00276 void TransportUDP::close()
00277 {
00278 Callback disconnect_cb;
00279
00280 if (!closed_)
00281 {
00282 {
00283 boost::mutex::scoped_lock lock(close_mutex_);
00284
00285 if (!closed_)
00286 {
00287 closed_ = true;
00288
00289 ROSCPP_LOG_DEBUG("UDP socket [%d] closed", sock_);
00290
00291 ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00292
00293 if (poll_set_)
00294 {
00295 poll_set_->delSocket(sock_);
00296 }
00297
00298 if ( close_socket(sock_) != 0 )
00299 {
00300 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00301 }
00302
00303 sock_ = ROS_INVALID_SOCKET;
00304
00305 disconnect_cb = disconnect_cb_;
00306
00307 disconnect_cb_ = Callback();
00308 read_cb_ = Callback();
00309 write_cb_ = Callback();
00310 }
00311 }
00312 }
00313
00314 if (disconnect_cb)
00315 {
00316 disconnect_cb(shared_from_this());
00317 }
00318 }
00319
00320 int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
00321 {
00322 {
00323 boost::mutex::scoped_lock lock(close_mutex_);
00324 if (closed_)
00325 {
00326 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00327 return -1;
00328 }
00329 }
00330
00331 ROS_ASSERT((int32_t)size > 0);
00332
00333 uint32_t bytes_read = 0;
00334
00335 while (bytes_read < size)
00336 {
00337 TransportUDPHeader header;
00338
00339 uint32_t copy_bytes = 0;
00340 bool from_previous = false;
00341 if (reorder_bytes_)
00342 {
00343 if (reorder_start_ != reorder_buffer_)
00344 {
00345 from_previous = true;
00346 }
00347
00348 copy_bytes = std::min(size - bytes_read, reorder_bytes_);
00349 header = reorder_header_;
00350 memcpy(buffer + bytes_read, reorder_start_, copy_bytes);
00351 reorder_bytes_ -= copy_bytes;
00352 reorder_start_ += copy_bytes;
00353 }
00354 else
00355 {
00356 if (data_filled_ == 0)
00357 {
00358 #if defined(WIN32)
00359 SSIZE_T num_bytes = 0;
00360 DWORD received_bytes = 0;
00361 DWORD flags = 0;
00362 WSABUF iov[2];
00363 iov[0].buf = reinterpret_cast<char*>(&header);
00364 iov[0].len = sizeof(header);
00365 iov[1].buf = reinterpret_cast<char*>(data_buffer_);
00366 iov[1].len = max_datagram_size_ - sizeof(header);
00367 int rc = WSARecv(sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
00368 if ( rc == SOCKET_ERROR) {
00369 num_bytes = -1;
00370 } else {
00371 num_bytes = received_bytes;
00372 }
00373 #else
00374 ssize_t num_bytes;
00375 struct iovec iov[2];
00376 iov[0].iov_base = &header;
00377 iov[0].iov_len = sizeof(header);
00378 iov[1].iov_base = data_buffer_;
00379 iov[1].iov_len = max_datagram_size_ - sizeof(header);
00380
00381 num_bytes = readv(sock_, iov, 2);
00382 #endif
00383 if (num_bytes < 0)
00384 {
00385 if ( last_socket_error_is_would_block() )
00386 {
00387 num_bytes = 0;
00388 break;
00389 }
00390 else
00391 {
00392 ROSCPP_LOG_DEBUG("readv() failed with error [%s]", last_socket_error_string());
00393 close();
00394 break;
00395 }
00396 }
00397 else if (num_bytes == 0)
00398 {
00399 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00400 close();
00401 return -1;
00402 }
00403 else if (num_bytes < (unsigned) sizeof(header))
00404 {
00405 ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes), last_socket_error_string());
00406 close();
00407 return -1;
00408 }
00409
00410 num_bytes -= sizeof(header);
00411 data_filled_ = num_bytes;
00412 data_start_ = data_buffer_;
00413 }
00414 else
00415 {
00416 from_previous = true;
00417 }
00418
00419 copy_bytes = std::min(size - bytes_read, data_filled_);
00420
00421
00422 memcpy(buffer + bytes_read, data_start_, copy_bytes);
00423 data_filled_ = std::max((int64_t)0, (int64_t)data_filled_ - (int64_t)size);
00424 data_start_ += copy_bytes;
00425 }
00426
00427
00428 if (from_previous)
00429 {
00430 bytes_read += copy_bytes;
00431 }
00432 else
00433 {
00434
00435 switch (header.op_)
00436 {
00437 case ROS_UDP_DATA0:
00438 if (current_message_id_)
00439 {
00440 ROS_DEBUG("Received new message [%d:%d], while still working on [%d] (block %d of %d)", header.message_id_, header.block_, current_message_id_, last_block_ + 1, total_blocks_);
00441 reorder_header_ = header;
00442 reorder_bytes_ = copy_bytes;
00443 memcpy(reorder_buffer_, buffer + bytes_read, copy_bytes);
00444 reorder_start_ = reorder_buffer_;
00445 current_message_id_ = 0;
00446 total_blocks_ = 0;
00447 last_block_ = 0;
00448
00449 data_filled_ = 0;
00450 data_start_ = data_buffer_;
00451 return -1;
00452 }
00453 total_blocks_ = header.block_;
00454 last_block_ = 0;
00455 current_message_id_ = header.message_id_;
00456 break;
00457 case ROS_UDP_DATAN:
00458 if (header.message_id_ != current_message_id_)
00459 {
00460 ROS_DEBUG("Message Id mismatch: %d != %d", header.message_id_, current_message_id_);
00461 return 0;
00462 }
00463 if (header.block_ != last_block_ + 1)
00464 {
00465 ROS_DEBUG("Expected block %d, received %d", last_block_ + 1, header.block_);
00466 return 0;
00467 }
00468 last_block_ = header.block_;
00469
00470 break;
00471 default:
00472 ROS_ERROR("Unexpected UDP header OP [%d]", header.op_);
00473 return -1;
00474 }
00475
00476 bytes_read += copy_bytes;
00477
00478 if (last_block_ == (total_blocks_ - 1))
00479 {
00480 current_message_id_ = 0;
00481 break;
00482 }
00483 }
00484 }
00485
00486 return bytes_read;
00487 }
00488
00489 int32_t TransportUDP::write(uint8_t* buffer, uint32_t size)
00490 {
00491 {
00492 boost::mutex::scoped_lock lock(close_mutex_);
00493
00494 if (closed_)
00495 {
00496 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00497 return -1;
00498 }
00499 }
00500
00501 ROS_ASSERT((int32_t)size > 0);
00502
00503 const uint32_t max_payload_size = max_datagram_size_ - sizeof(TransportUDPHeader);
00504
00505 uint32_t bytes_sent = 0;
00506 uint32_t this_block = 0;
00507 if (++current_message_id_ == 0)
00508 ++current_message_id_;
00509 while (bytes_sent < size)
00510 {
00511 TransportUDPHeader header;
00512 header.connection_id_ = connection_id_;
00513 header.message_id_ = current_message_id_;
00514 if (this_block == 0)
00515 {
00516 header.op_ = ROS_UDP_DATA0;
00517 header.block_ = (size + max_payload_size - 1) / max_payload_size;
00518 }
00519 else
00520 {
00521 header.op_ = ROS_UDP_DATAN;
00522 header.block_ = this_block;
00523 }
00524 ++this_block;
00525 #if defined(WIN32)
00526 WSABUF iov[2];
00527 DWORD sent_bytes;
00528 SSIZE_T num_bytes = 0;
00529 DWORD flags = 0;
00530 int rc;
00531 iov[0].buf = reinterpret_cast<char*>(&header);
00532 iov[0].len = sizeof(header);
00533 iov[1].buf = reinterpret_cast<char*>(buffer + bytes_sent);
00534 iov[1].len = std::min(max_payload_size, size - bytes_sent);
00535 rc = WSASend(sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
00536 num_bytes = sent_bytes;
00537 if (rc == SOCKET_ERROR) {
00538 num_bytes = -1;
00539 }
00540 #else
00541 struct iovec iov[2];
00542 iov[0].iov_base = &header;
00543 iov[0].iov_len = sizeof(header);
00544 iov[1].iov_base = buffer + bytes_sent;
00545 iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
00546 ssize_t num_bytes = writev(sock_, iov, 2);
00547 #endif
00548
00549 if (num_bytes < 0)
00550 {
00551 if( !last_socket_error_is_would_block() )
00552 {
00553 ROSCPP_LOG_DEBUG("writev() failed with error [%s]", last_socket_error_string());
00554 close();
00555 break;
00556 }
00557 else
00558 {
00559 num_bytes = 0;
00560 }
00561 }
00562 else if (num_bytes < (unsigned) sizeof(header))
00563 {
00564 ROSCPP_LOG_DEBUG("Socket [%d] short write (%d bytes), closing", sock_, int(num_bytes));
00565 close();
00566 break;
00567 }
00568 else
00569 {
00570 num_bytes -= sizeof(header);
00571 }
00572 bytes_sent += num_bytes;
00573 }
00574
00575 return bytes_sent;
00576 }
00577
00578 void TransportUDP::enableRead()
00579 {
00580 {
00581 boost::mutex::scoped_lock lock(close_mutex_);
00582
00583 if (closed_)
00584 {
00585 return;
00586 }
00587 }
00588
00589 if (!expecting_read_)
00590 {
00591 poll_set_->addEvents(sock_, POLLIN);
00592 expecting_read_ = true;
00593 }
00594 }
00595
00596 void TransportUDP::disableRead()
00597 {
00598 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00599
00600 {
00601 boost::mutex::scoped_lock lock(close_mutex_);
00602
00603 if (closed_)
00604 {
00605 return;
00606 }
00607 }
00608
00609 if (expecting_read_)
00610 {
00611 poll_set_->delEvents(sock_, POLLIN);
00612 expecting_read_ = false;
00613 }
00614 }
00615
00616 void TransportUDP::enableWrite()
00617 {
00618 {
00619 boost::mutex::scoped_lock lock(close_mutex_);
00620
00621 if (closed_)
00622 {
00623 return;
00624 }
00625 }
00626
00627 if (!expecting_write_)
00628 {
00629 poll_set_->addEvents(sock_, POLLOUT);
00630 expecting_write_ = true;
00631 }
00632 }
00633
00634 void TransportUDP::disableWrite()
00635 {
00636 {
00637 boost::mutex::scoped_lock lock(close_mutex_);
00638
00639 if (closed_)
00640 {
00641 return;
00642 }
00643 }
00644
00645 if (expecting_write_)
00646 {
00647 poll_set_->delEvents(sock_, POLLOUT);
00648 expecting_write_ = false;
00649 }
00650 }
00651
00652 TransportUDPPtr TransportUDP::createOutgoing(std::string host, int port, int connection_id, int max_datagram_size)
00653 {
00654 ROS_ASSERT(is_server_);
00655
00656 TransportUDPPtr transport(new TransportUDP(poll_set_, flags_, max_datagram_size));
00657 if (!transport->connect(host, port, connection_id))
00658 {
00659 ROS_ERROR("Failed to create outgoing connection");
00660 return TransportUDPPtr();
00661 }
00662 return transport;
00663
00664 }
00665
00666 }