00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/transport/transport_udp.h"
00036 #include "ros/poll_set.h"
00037 #include "ros/file_log.h"
00038
00039 #include <ros/assert.h>
00040
00041 #include <boost/bind.hpp>
00042
00043 #include <sys/uio.h>
00044 #include <sys/socket.h>
00045 #include <netinet/in.h>
00046 #include <arpa/inet.h>
00047 #include <netdb.h>
00048 #include <fcntl.h>
00049
00050 namespace ros
00051 {
00052
00053 TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
00054 : sock_(-1)
00055 , closed_(false)
00056 , expecting_read_(false)
00057 , expecting_write_(false)
00058 , is_server_(false)
00059 , server_port_(-1)
00060 , poll_set_(poll_set)
00061 , flags_(flags)
00062 , connection_id_(0)
00063 , current_message_id_(0)
00064 , total_blocks_(0)
00065 , last_block_(0)
00066 , max_datagram_size_(max_datagram_size)
00067 , data_filled_(0)
00068 , reorder_buffer_(0)
00069 , reorder_bytes_(0)
00070 {
00071
00072 if (max_datagram_size_ == 0)
00073 max_datagram_size_ = 1500;
00074 reorder_buffer_ = new uint8_t[max_datagram_size_];
00075 reorder_start_ = reorder_buffer_;
00076 data_buffer_ = new uint8_t[max_datagram_size_];
00077 data_start_ = data_buffer_;
00078 }
00079
00080 TransportUDP::~TransportUDP()
00081 {
00082 ROS_ASSERT_MSG(sock_ == -1, "TransportUDP socket [%d] was never closed", sock_);
00083 delete [] reorder_buffer_;
00084 delete [] data_buffer_;
00085 }
00086
00087 bool TransportUDP::setSocket(int sock)
00088 {
00089 sock_ = sock;
00090 return initializeSocket();
00091 }
00092
00093 void TransportUDP::socketUpdate(int events)
00094 {
00095 {
00096 boost::mutex::scoped_lock lock(close_mutex_);
00097
00098 if (closed_)
00099 {
00100 return;
00101 }
00102 }
00103
00104 if((events & POLLERR) ||
00105 (events & POLLHUP) ||
00106 (events & POLLNVAL))
00107 {
00108 ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d", sock_, events);
00109 close();
00110 }
00111 else
00112 {
00113 if ((events & POLLIN) && expecting_read_)
00114 {
00115 if (read_cb_)
00116 {
00117 read_cb_(shared_from_this());
00118 }
00119 }
00120
00121 if ((events & POLLOUT) && expecting_write_)
00122 {
00123 if (write_cb_)
00124 {
00125 write_cb_(shared_from_this());
00126 }
00127 }
00128 }
00129
00130 }
00131
00132 std::string TransportUDP::getTransportInfo()
00133 {
00134 return "UDPROS connection to [" + cached_remote_host_ + "]";
00135 }
00136
00137 bool TransportUDP::connect(const std::string& host, int port, int connection_id)
00138 {
00139 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00140 connection_id_ = connection_id;
00141
00142 if (sock_ == -1)
00143 {
00144 ROS_ERROR("socket() failed with error [%s]", strerror(errno));
00145 return false;
00146 }
00147
00148 sockaddr_in sin;
00149 sin.sin_family = AF_INET;
00150 if (inet_addr(host.c_str()) == INADDR_NONE)
00151 {
00152 struct addrinfo* addr;
00153 if (getaddrinfo(host.c_str(), NULL, NULL, &addr) != 0)
00154 {
00155 close();
00156 ROS_ERROR("couldn't resolve host [%s]", host.c_str());
00157 return false;
00158 }
00159
00160 bool found = false;
00161 struct addrinfo* it = addr;
00162 for (; it; it = it->ai_next)
00163 {
00164 if (it->ai_family == AF_INET)
00165 {
00166 memcpy(&sin, it->ai_addr, it->ai_addrlen);
00167 sin.sin_family = it->ai_family;
00168 sin.sin_port = htons(port);
00169
00170 found = true;
00171 break;
00172 }
00173 }
00174
00175 freeaddrinfo(addr);
00176
00177 if (!found)
00178 {
00179 ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00180 return false;
00181 }
00182
00183 ROSCPP_LOG_DEBUG("Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
00184 }
00185 else
00186 {
00187 sin.sin_addr.s_addr = inet_addr(host.c_str());
00188 }
00189
00190 sin.sin_port = htons(port);
00191
00192 if (::connect(sock_, (sockaddr *)&sin, sizeof(sin)))
00193 {
00194 ROSCPP_LOG_DEBUG("Connect to udpros host [%s:%d] failed with error [%s]", host.c_str(), port, strerror(errno));
00195 close();
00196
00197 return false;
00198 }
00199
00200 if (!initializeSocket())
00201 {
00202 return false;
00203 }
00204
00205 ROSCPP_LOG_DEBUG("Connect succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00206
00207 return true;
00208 }
00209
00210 bool TransportUDP::createIncoming(int port, bool is_server)
00211 {
00212 is_server_ = is_server;
00213
00214 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00215
00216 if (sock_ <= 0)
00217 {
00218 ROS_ERROR("socket() failed with error [%s]", strerror(errno));
00219 return false;
00220 }
00221
00222 server_address_.sin_family = AF_INET;
00223 server_address_.sin_port = htons(port);
00224 server_address_.sin_addr.s_addr = INADDR_ANY;
00225 if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00226 {
00227 ROS_ERROR("bind() failed with error [%s]", strerror(errno));
00228 return false;
00229 }
00230
00231 socklen_t len = sizeof(server_address_);
00232 getsockname(sock_, (sockaddr *)&server_address_, &len);
00233 server_port_ = ntohs(server_address_.sin_port);
00234 ROSCPP_LOG_DEBUG("UDPROS server listening on port [%d]", server_port_);
00235
00236 if (!initializeSocket())
00237 {
00238 return false;
00239 }
00240
00241 enableRead();
00242
00243 return true;
00244 }
00245
00246 bool TransportUDP::initializeSocket()
00247 {
00248 ROS_ASSERT(sock_ != -1);
00249
00250 if (!(flags_ & SYNCHRONOUS))
00251 {
00252
00253 if(fcntl(sock_, F_SETFL, O_NONBLOCK) == -1)
00254 {
00255 ROS_ERROR("fcntl (non-blocking) to socket [%d] failed with error [%s]", sock_, strerror(errno));
00256
00257 close();
00258 return false;
00259 }
00260 }
00261
00262 ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00263 if (poll_set_)
00264 {
00265 poll_set_->addSocket(sock_, boost::bind(&TransportUDP::socketUpdate, this, _1), shared_from_this());
00266 }
00267
00268 return true;
00269 }
00270
00271 void TransportUDP::close()
00272 {
00273 Callback disconnect_cb;
00274
00275 if (!closed_)
00276 {
00277 {
00278 boost::mutex::scoped_lock lock(close_mutex_);
00279
00280 if (!closed_)
00281 {
00282 closed_ = true;
00283
00284 ROSCPP_LOG_DEBUG("UDP socket [%d] closed", sock_);
00285
00286 ROS_ASSERT(sock_ != -1);
00287
00288 if (poll_set_)
00289 {
00290 poll_set_->delSocket(sock_);
00291 }
00292
00293 if (::close(sock_) < 0)
00294 {
00295 ROS_ERROR("Error closing socket [%d]: [%s]", sock_, strerror(errno));
00296 }
00297
00298 sock_ = -1;
00299
00300 disconnect_cb = disconnect_cb_;
00301
00302 disconnect_cb_ = Callback();
00303 read_cb_ = Callback();
00304 write_cb_ = Callback();
00305 }
00306 }
00307 }
00308
00309 if (disconnect_cb)
00310 {
00311 disconnect_cb(shared_from_this());
00312 }
00313 }
00314
00315 int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
00316 {
00317 {
00318 boost::mutex::scoped_lock lock(close_mutex_);
00319 if (closed_)
00320 {
00321 ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00322 return -1;
00323 }
00324 }
00325
00326 ROS_ASSERT((int32_t)size > 0);
00327
00328 uint32_t bytes_read = 0;
00329
00330 while (bytes_read < size)
00331 {
00332 TransportUDPHeader header;
00333
00334 ssize_t num_bytes = 0;
00335 bool from_previous = false;
00336 if (reorder_bytes_)
00337 {
00338 if (reorder_start_ != reorder_buffer_)
00339 {
00340 from_previous = true;
00341 }
00342
00343 num_bytes = std::min(size - bytes_read, reorder_bytes_);
00344 header = reorder_header_;
00345 memcpy(buffer + bytes_read, reorder_start_, num_bytes);
00346 reorder_bytes_ -= num_bytes;
00347 reorder_start_ += num_bytes;
00348 }
00349 else
00350 {
00351 if (data_filled_ == 0)
00352 {
00353 struct iovec iov[2];
00354 iov[0].iov_base = &header;
00355 iov[0].iov_len = sizeof(header);
00356 iov[1].iov_base = data_buffer_;
00357 iov[1].iov_len = max_datagram_size_ - sizeof(header);
00358
00359
00360 num_bytes = readv(sock_, iov, 2);
00361
00362 if (num_bytes < 0)
00363 {
00364 if (errno == EAGAIN || errno == EWOULDBLOCK)
00365 {
00366 num_bytes = 0;
00367 break;
00368 }
00369 else
00370 {
00371 ROSCPP_LOG_DEBUG("readv() failed with error [%s]", strerror(errno));
00372 close();
00373 break;
00374 }
00375 }
00376 else if (num_bytes == 0)
00377 {
00378 ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00379 close();
00380 return -1;
00381 }
00382 else if (num_bytes < (ssize_t)sizeof(header))
00383 {
00384 ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes), strerror(errno));
00385 close();
00386 return -1;
00387 }
00388
00389 num_bytes -= sizeof(header);
00390 data_filled_ = num_bytes;
00391 data_start_ = data_buffer_;
00392 }
00393 else
00394 {
00395 from_previous = true;
00396 }
00397
00398 num_bytes = std::min(size - bytes_read, data_filled_);
00399
00400
00401 memcpy(buffer + bytes_read, data_start_, num_bytes);
00402 data_filled_ = std::max((int64_t)0, (int64_t)data_filled_ - (int64_t)size);
00403 data_start_ += num_bytes;
00404 }
00405
00406
00407 if (from_previous)
00408 {
00409 bytes_read += num_bytes;
00410 }
00411 else
00412 {
00413
00414 switch (header.op_)
00415 {
00416 case ROS_UDP_DATA0:
00417 if (current_message_id_)
00418 {
00419 ROS_DEBUG("Received new message [%d:%d], while still working on [%d] (block %d of %d)", header.message_id_, header.block_, current_message_id_, last_block_ + 1, total_blocks_);
00420 reorder_header_ = header;
00421 reorder_bytes_ = num_bytes;
00422 memcpy(reorder_buffer_, buffer + bytes_read, num_bytes);
00423 reorder_start_ = reorder_buffer_;
00424 current_message_id_ = 0;
00425 total_blocks_ = 0;
00426 last_block_ = 0;
00427
00428 data_filled_ = 0;
00429 data_start_ = data_buffer_;
00430 return -1;
00431 }
00432 total_blocks_ = header.block_;
00433 last_block_ = 0;
00434 current_message_id_ = header.message_id_;
00435 break;
00436 case ROS_UDP_DATAN:
00437 if (header.message_id_ != current_message_id_)
00438 {
00439 ROS_DEBUG("Message Id mismatch: %d != %d", header.message_id_, current_message_id_);
00440 return 0;
00441 }
00442 if (header.block_ != last_block_ + 1)
00443 {
00444 ROS_DEBUG("Expected block %d, received %d", last_block_ + 1, header.block_);
00445 return 0;
00446 }
00447 last_block_ = header.block_;
00448
00449 break;
00450 default:
00451 ROS_ERROR("Unexpected UDP header OP [%d]", header.op_);
00452 return -1;
00453 }
00454
00455 bytes_read += num_bytes;
00456
00457 if (last_block_ == (total_blocks_ - 1))
00458 {
00459 current_message_id_ = 0;
00460 break;
00461 }
00462 }
00463 }
00464
00465 return bytes_read;
00466 }
00467
00468 int32_t TransportUDP::write(uint8_t* buffer, uint32_t size)
00469 {
00470 {
00471 boost::mutex::scoped_lock lock(close_mutex_);
00472
00473 if (closed_)
00474 {
00475 ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00476 return -1;
00477 }
00478 }
00479
00480 ROS_ASSERT((int32_t)size > 0);
00481
00482 const uint32_t max_payload_size = max_datagram_size_ - sizeof(TransportUDPHeader);
00483
00484 uint32_t bytes_sent = 0;
00485 uint32_t this_block = 0;
00486 if (++current_message_id_ == 0)
00487 ++current_message_id_;
00488 while (bytes_sent < size)
00489 {
00490 TransportUDPHeader header;
00491 header.connection_id_ = connection_id_;
00492 header.message_id_ = current_message_id_;
00493 if (this_block == 0)
00494 {
00495 header.op_ = ROS_UDP_DATA0;
00496 header.block_ = (size + max_payload_size - 1) / max_payload_size;
00497 }
00498 else
00499 {
00500 header.op_ = ROS_UDP_DATAN;
00501 header.block_ = this_block;
00502 }
00503 ++this_block;
00504 struct iovec iov[2];
00505 iov[0].iov_base = &header;
00506 iov[0].iov_len = sizeof(header);
00507 iov[1].iov_base = buffer + bytes_sent;
00508 iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
00509 ssize_t num_bytes = writev(sock_, iov, 2);
00510
00511 if (num_bytes < 0)
00512 {
00513 if(errno != EAGAIN)
00514 {
00515 ROSCPP_LOG_DEBUG("writev() failed with error [%s]", strerror(errno));
00516 close();
00517 break;
00518 }
00519 else
00520 {
00521 num_bytes = 0;
00522 }
00523 }
00524 else if (num_bytes < ssize_t(sizeof(header)))
00525 {
00526 ROSCPP_LOG_DEBUG("Socket [%d] short write (%d bytes), closing", sock_, int(num_bytes));
00527 close();
00528 break;
00529 }
00530 else
00531 {
00532 num_bytes -= sizeof(header);
00533 }
00534 bytes_sent += num_bytes;
00535 }
00536
00537 return bytes_sent;
00538 }
00539
00540 void TransportUDP::enableRead()
00541 {
00542 {
00543 boost::mutex::scoped_lock lock(close_mutex_);
00544
00545 if (closed_)
00546 {
00547 return;
00548 }
00549 }
00550
00551 if (!expecting_read_)
00552 {
00553 poll_set_->addEvents(sock_, POLLIN);
00554 expecting_read_ = true;
00555 }
00556 }
00557
00558 void TransportUDP::disableRead()
00559 {
00560 ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00561
00562 {
00563 boost::mutex::scoped_lock lock(close_mutex_);
00564
00565 if (closed_)
00566 {
00567 return;
00568 }
00569 }
00570
00571 if (expecting_read_)
00572 {
00573 poll_set_->delEvents(sock_, POLLIN);
00574 expecting_read_ = false;
00575 }
00576 }
00577
00578 void TransportUDP::enableWrite()
00579 {
00580 {
00581 boost::mutex::scoped_lock lock(close_mutex_);
00582
00583 if (closed_)
00584 {
00585 return;
00586 }
00587 }
00588
00589 if (!expecting_write_)
00590 {
00591 poll_set_->addEvents(sock_, POLLOUT);
00592 expecting_write_ = true;
00593 }
00594 }
00595
00596 void TransportUDP::disableWrite()
00597 {
00598 {
00599 boost::mutex::scoped_lock lock(close_mutex_);
00600
00601 if (closed_)
00602 {
00603 return;
00604 }
00605 }
00606
00607 if (expecting_write_)
00608 {
00609 poll_set_->delEvents(sock_, POLLOUT);
00610 expecting_write_ = false;
00611 }
00612 }
00613
00614 TransportUDPPtr TransportUDP::createOutgoing(std::string host, int port, int connection_id, int max_datagram_size)
00615 {
00616 ROS_ASSERT(is_server_);
00617
00618 TransportUDPPtr transport(new TransportUDP(poll_set_, flags_, max_datagram_size));
00619 if (!transport->connect(host, port, connection_id))
00620 {
00621 ROS_ERROR("Failed to create outgoing connection");
00622 return TransportUDPPtr();
00623 }
00624 return transport;
00625
00626 }
00627
00628 }