40 #include <boost/bind.hpp> 43 #if defined(__APPLE__) 45 #include <sys/types.h> 48 #elif defined(__ANDROID__) 51 #elif defined(_POSIX_VERSION) 62 , expecting_read_(false)
63 , expecting_write_(false)
70 , current_message_id_(0)
73 , max_datagram_size_(max_datagram_size)
111 if((events & POLLERR) ||
112 (events & POLLHUP) ||
141 std::stringstream str;
151 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
161 sin.sin_family = AF_INET;
162 if (inet_addr(host.c_str()) == INADDR_NONE)
164 struct addrinfo* addr;
165 struct addrinfo hints;
166 memset(&hints, 0,
sizeof(hints));
167 hints.ai_family = AF_UNSPEC;
169 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
172 ROS_ERROR(
"couldn't resolve host [%s]", host.c_str());
177 struct addrinfo* it = addr;
178 for (; it; it = it->ai_next)
180 if (it->ai_family == AF_INET)
182 memcpy(&sin, it->ai_addr, it->ai_addrlen);
183 sin.sin_family = it->ai_family;
184 sin.sin_port = htons(port);
195 ROS_ERROR(
"Couldn't find an AF_INET address for [%s]\n", host.c_str());
199 ROSCPP_LOG_DEBUG(
"Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
203 sin.sin_addr.s_addr = inet_addr(host.c_str());
206 sin.sin_port = htons(port);
223 std::stringstream ss;
224 ss << host <<
":" << port <<
" on socket " <<
sock_;
241 sock_ = socket(AF_INET, SOCK_DGRAM, 0);
252 htonl(INADDR_LOOPBACK) :
283 ROS_ERROR(
"setting socket [%d] as non_blocking failed with error [%d]",
sock_, result);
342 disconnect_cb(shared_from_this());
359 uint32_t bytes_read = 0;
361 while (bytes_read < size)
368 uint32_t copy_bytes = 0;
369 bool from_previous =
false;
374 from_previous =
true;
388 SSIZE_T num_bytes = 0;
389 DWORD received_bytes = 0;
392 iov[0].buf =
reinterpret_cast<char*
>(&header);
393 iov[0].len =
sizeof(header);
396 int rc = WSARecv(
sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
397 if ( rc == SOCKET_ERROR) {
400 num_bytes = received_bytes;
405 iov[0].iov_base = &header;
406 iov[0].iov_len =
sizeof(header);
410 num_bytes = readv(
sock_, iov, 2);
426 else if (num_bytes == 0)
432 else if (num_bytes < (
unsigned)
sizeof(header))
439 num_bytes -=
sizeof(header);
445 from_previous =
true;
451 memcpy(buffer + bytes_read,
data_start_, copy_bytes);
461 bytes_read += copy_bytes;
512 bytes_read += copy_bytes;
541 uint32_t bytes_sent = 0;
542 uint32_t this_block = 0;
545 while (bytes_sent < size)
553 header.
block_ = (size + max_payload_size - 1) / max_payload_size;
558 header.
block_ = this_block;
564 SSIZE_T num_bytes = 0;
567 iov[0].buf =
reinterpret_cast<char*
>(&header);
568 iov[0].len =
sizeof(header);
569 iov[1].buf =
reinterpret_cast<char*
>(buffer + bytes_sent);
570 iov[1].len = std::min(max_payload_size, size - bytes_sent);
571 rc = WSASend(
sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
572 num_bytes = sent_bytes;
573 if (rc == SOCKET_ERROR) {
578 iov[0].iov_base = &header;
579 iov[0].iov_len =
sizeof(header);
580 iov[1].iov_base = buffer + bytes_sent;
581 iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
582 ssize_t num_bytes = writev(
sock_, iov, 2);
599 else if (num_bytes < (
unsigned)
sizeof(header))
607 num_bytes -=
sizeof(header);
609 bytes_sent += num_bytes;
694 if (!transport->connect(host, port, connection_id))
696 ROS_ERROR(
"Failed to create outgoing connection");
707 sockaddr_storage sas;
708 socklen_t sas_len =
sizeof(sas);
709 getpeername(
sock_, (sockaddr *)&sas, &sas_len);
711 sockaddr_in *sin = (sockaddr_in *)&sas;
714 int port = ntohs(sin->sin_port);
715 strcpy(namebuf, inet_ntoa(sin->sin_addr));
717 std::string ip = namebuf;
718 std::stringstream uri;
719 uri << ip <<
":" << port;
boost::mutex close_mutex_
std::string getClientURI()
Returns the URI of the remote host.
TransportUDPPtr createOutgoing(std::string host, int port, int conn_id, int max_datagram_size)
Create a connection to a server socket.
bool connect(const std::string &host, int port, int conn_id)
Connect to a remote host.
bool addEvents(int sock, int events)
Add events to be polled on a socket.
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
TransportUDP(PollSet *poll_set, int flags=0, int max_datagram_size=0)
bool createIncoming(int port, bool is_server)
Start a server socket and listen on a port.
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
Manages a set of sockets being polled through the poll() function call.
void socketUpdate(int events)
sockaddr_in local_address_
uint8_t * reorder_buffer_
ROSCPP_DECL const char * last_socket_error_string()
boost::shared_ptr< TransportUDP > TransportUDPPtr
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to...
std::string cached_remote_host_
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
bool setSocket(int sock)
Set the socket to be used by this transport.
#define ROSCPP_LOG_DEBUG(...)
uint8_t current_message_id_
ROSCPP_DECL bool last_socket_error_is_would_block()
#define ROS_ASSERT_MSG(cond,...)
struct ros::TransportUDPHeader TransportUDPHeader
TransportUDPHeader reorder_header_
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
bool delSocket(int sock)
Delete a socket.
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
sockaddr_in server_address_
boost::function< void(const TransportPtr &)> Callback
uint32_t max_datagram_size_
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
#define ROS_INVALID_SOCKET
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.