40 #include <boost/bind.hpp> 42 #include <sys/socket.h> 46 #if defined(__APPLE__) 48 #include <sys/types.h> 51 #elif defined(__ANDROID__) 54 #elif defined(_POSIX_VERSION) 65 , expecting_read_(false)
66 , expecting_write_(false)
116 if((events & POLLERR) ||
117 (events & POLLHUP) ||
146 std::stringstream str;
156 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
165 sockaddr_in sin = {};
166 sin.sin_family = AF_INET;
167 if (inet_addr(host.c_str()) == INADDR_NONE)
169 struct addrinfo* addr;
170 struct addrinfo hints;
171 memset(&hints, 0,
sizeof(hints));
172 hints.ai_family = AF_UNSPEC;
174 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
177 ROS_ERROR(
"couldn't resolve host [%s]", host.c_str());
182 struct addrinfo* it = addr;
183 for (; it; it = it->ai_next)
185 if (it->ai_family == AF_INET)
187 memcpy(&sin, it->ai_addr, it->ai_addrlen);
188 sin.sin_family = it->ai_family;
189 sin.sin_port = htons(port);
200 ROS_ERROR(
"Couldn't find an AF_INET address for [%s]\n", host.c_str());
204 ROSCPP_LOG_DEBUG(
"Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
208 sin.sin_addr.s_addr = inet_addr(host.c_str());
211 sin.sin_port = htons(port);
228 std::stringstream ss;
229 ss << host <<
":" << port <<
" on socket " <<
sock_;
246 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
257 htonl(INADDR_LOOPBACK) :
266 getsockname(
sock_, (sockaddr *)&server_address_, &len);
288 ROS_ERROR(
"setting socket [%d] as non_blocking failed with error [%d]",
sock_, result);
347 disconnect_cb(shared_from_this());
364 uint32_t bytes_read = 0;
366 while (bytes_read < size)
373 uint32_t copy_bytes = 0;
374 bool from_previous =
false;
379 from_previous =
true;
393 SSIZE_T num_bytes = 0;
394 DWORD received_bytes = 0;
397 iov[0].buf =
reinterpret_cast<char*
>(&header);
398 iov[0].len =
sizeof(header);
401 int rc = WSARecv(
sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
402 if ( rc == SOCKET_ERROR) {
405 num_bytes = received_bytes;
410 iov[0].iov_base = &header;
411 iov[0].iov_len =
sizeof(header);
415 num_bytes = readv(
sock_, iov, 2);
431 else if (num_bytes == 0)
437 else if (num_bytes < (
unsigned)
sizeof(header))
444 num_bytes -=
sizeof(header);
450 from_previous =
true;
456 memcpy(buffer + bytes_read,
data_start_, copy_bytes);
466 bytes_read += copy_bytes;
517 bytes_read += copy_bytes;
546 uint32_t bytes_sent = 0;
547 uint32_t this_block = 0;
550 while (bytes_sent < size)
558 header.
block_ = (size + max_payload_size - 1) / max_payload_size;
563 header.
block_ = this_block;
569 SSIZE_T num_bytes = 0;
572 iov[0].buf =
reinterpret_cast<char*
>(&header);
573 iov[0].len =
sizeof(header);
574 iov[1].buf =
reinterpret_cast<char*
>(buffer + bytes_sent);
575 iov[1].len = std::min(max_payload_size, size - bytes_sent);
576 rc = WSASend(
sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
577 num_bytes = sent_bytes;
578 if (rc == SOCKET_ERROR) {
583 iov[0].iov_base = &header;
584 iov[0].iov_len =
sizeof(header);
585 iov[1].iov_base = buffer + bytes_sent;
586 iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
587 ssize_t num_bytes = writev(
sock_, iov, 2);
604 else if (num_bytes < (
unsigned)
sizeof(header))
612 num_bytes -=
sizeof(header);
614 bytes_sent += num_bytes;
699 if (!transport->connect(host, port, connection_id))
701 ROS_ERROR(
"Failed to create outgoing connection");
712 sockaddr_storage sas;
713 socklen_t sas_len =
sizeof(sas);
714 getpeername(
sock_, (sockaddr *)&sas, &sas_len);
716 sockaddr_in *sin = (sockaddr_in *)&sas;
718 char namebuf[128] = {};
719 int port = ntohs(sin->sin_port);
720 strncpy(namebuf, inet_ntoa(sin->sin_addr),
sizeof(namebuf)-1);
722 std::string ip = namebuf;
723 std::stringstream uri;
724 uri << ip <<
":" << port;
boost::mutex close_mutex_
std::string getClientURI()
Returns the URI of the remote host.
TransportUDPPtr createOutgoing(std::string host, int port, int conn_id, int max_datagram_size)
Create a connection to a server socket.
bool connect(const std::string &host, int port, int conn_id)
Connect to a remote host.
bool addEvents(int sock, int events)
Add events to be polled on a socket.
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
TransportUDP(PollSet *poll_set, int flags=0, int max_datagram_size=0)
bool createIncoming(int port, bool is_server)
Start a server socket and listen on a port.
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
Manages a set of sockets being polled through the poll() function call.
void socketUpdate(int events)
sockaddr_in local_address_
uint8_t * reorder_buffer_
ROSCPP_DECL const char * last_socket_error_string()
boost::shared_ptr< TransportUDP > TransportUDPPtr
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to...
std::string cached_remote_host_
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
bool setSocket(int sock)
Set the socket to be used by this transport.
#define ROSCPP_LOG_DEBUG(...)
uint8_t current_message_id_
ROSCPP_DECL bool last_socket_error_is_would_block()
#define ROS_ASSERT_MSG(cond,...)
struct ros::TransportUDPHeader TransportUDPHeader
TransportUDPHeader reorder_header_
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
bool delSocket(int sock)
Delete a socket.
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
sockaddr_in server_address_
boost::function< void(const TransportPtr &)> Callback
uint32_t max_datagram_size_
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
#define ROS_INVALID_SOCKET
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.