40 #include <boost/bind/bind.hpp>
42 #include <sys/socket.h>
46 #if defined(__APPLE__)
48 #include <sys/types.h>
51 #elif defined(__ANDROID__)
54 #elif defined(_POSIX_VERSION)
65 , expecting_read_(false)
66 , expecting_write_(false)
75 , current_message_id_(0)
78 , max_datagram_size_(max_datagram_size)
84 if (max_datagram_size_ == 0)
85 max_datagram_size_ = 1500;
86 reorder_buffer_ =
new uint8_t[max_datagram_size_];
87 reorder_start_ = reorder_buffer_;
88 data_buffer_ =
new uint8_t[max_datagram_size_];
89 data_start_ = data_buffer_;
116 if((events & POLLERR) ||
117 (events & POLLHUP) ||
146 std::stringstream str;
156 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
165 sockaddr_in sin = {};
166 sin.sin_family = AF_INET;
167 if (inet_addr(host.c_str()) == INADDR_NONE)
169 struct addrinfo* addr;
170 struct addrinfo hints;
171 memset(&hints, 0,
sizeof(hints));
172 hints.ai_family = AF_UNSPEC;
174 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
177 ROS_ERROR(
"couldn't resolve host [%s]", host.c_str());
182 struct addrinfo* it = addr;
183 for (; it; it = it->ai_next)
185 if (it->ai_family == AF_INET)
187 memcpy(&sin, it->ai_addr, it->ai_addrlen);
188 sin.sin_family = it->ai_family;
189 sin.sin_port = htons(port);
200 ROS_ERROR(
"Couldn't find an AF_INET address for [%s]\n", host.c_str());
204 ROSCPP_LOG_DEBUG(
"Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
208 sin.sin_addr.s_addr = inet_addr(host.c_str());
211 sin.sin_port = htons(port);
228 std::stringstream ss;
229 ss << host <<
":" << port <<
" on socket " <<
sock_;
246 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
257 htonl(INADDR_LOOPBACK) :
288 ROS_ERROR(
"setting socket [%d] as non_blocking failed with error [%d]",
sock_, result);
347 disconnect_cb(shared_from_this());
364 uint32_t bytes_read = 0;
366 while (bytes_read < size)
373 uint32_t copy_bytes = 0;
374 bool from_previous =
false;
379 from_previous =
true;
393 SSIZE_T num_bytes = 0;
394 DWORD received_bytes = 0;
397 iov[0].buf =
reinterpret_cast<char*
>(&
header);
398 iov[0].len =
sizeof(
header);
401 int rc = WSARecv(
sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
402 if ( rc == SOCKET_ERROR) {
405 num_bytes = received_bytes;
410 iov[0].iov_base = &
header;
411 iov[0].iov_len =
sizeof(
header);
415 num_bytes = readv(
sock_, iov, 2);
431 else if (num_bytes == 0)
437 else if (num_bytes < (
unsigned)
sizeof(
header))
444 num_bytes -=
sizeof(
header);
450 from_previous =
true;
456 memcpy(buffer + bytes_read,
data_start_, copy_bytes);
466 bytes_read += copy_bytes;
517 bytes_read += copy_bytes;
546 uint32_t bytes_sent = 0;
547 uint32_t this_block = 0;
550 while (bytes_sent < size)
558 header.block_ = (size + max_payload_size - 1) / max_payload_size;
563 header.block_ = this_block;
569 SSIZE_T num_bytes = 0;
572 iov[0].buf =
reinterpret_cast<char*
>(&
header);
573 iov[0].len =
sizeof(
header);
574 iov[1].buf =
reinterpret_cast<char*
>(buffer + bytes_sent);
575 iov[1].len = std::min(max_payload_size, size - bytes_sent);
576 rc = WSASend(
sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
577 num_bytes = sent_bytes;
578 if (rc == SOCKET_ERROR) {
583 iov[0].iov_base = &
header;
584 iov[0].iov_len =
sizeof(
header);
585 iov[1].iov_base = buffer + bytes_sent;
586 iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
587 ssize_t num_bytes = writev(
sock_, iov, 2);
604 else if (num_bytes < (
unsigned)
sizeof(
header))
612 num_bytes -=
sizeof(
header);
614 bytes_sent += num_bytes;
699 if (!transport->connect(host, port, connection_id))
701 ROS_ERROR(
"Failed to create outgoing connection");
712 sockaddr_storage sas;
713 socklen_t sas_len =
sizeof(sas);
714 getpeername(
sock_, (sockaddr *)&sas, &sas_len);
716 sockaddr_in *sin = (sockaddr_in *)&sas;
718 char namebuf[128] = {};
719 int port = ntohs(sin->sin_port);
720 strncpy(namebuf, inet_ntoa(sin->sin_addr),
sizeof(namebuf)-1);
722 std::string ip = namebuf;
723 std::stringstream uri;
724 uri << ip <<
":" << port;