42 #include <boost/bind.hpp> 46 #include <sys/socket.h> 57 , expecting_read_(false)
58 , expecting_write_(false)
85 ROS_ERROR(
"setting socket [%d] as non_blocking failed with error [%d]",
sock_, result);
114 std::stringstream ss;
124 switch (local_address_.ss_family)
127 local_port_ = ntohs(((sockaddr_in *)&local_address_)->sin_port);
130 local_port_ = ntohs(((sockaddr_in6 *)&local_address_)->sin6_port);
135 #ifdef ROSCPP_USE_TCP_NODELAY 144 #if defined(POLLRDHUP) // POLLRDHUP is not part of POSIX! 150 if (!(
flags_ & SYNCHRONOUS))
161 if (header.
getValue(
"tcp_nodelay", nodelay) && nodelay ==
"1")
170 int flag = nodelay ? 1 : 0;
171 int result = setsockopt(
sock_, IPPROTO_TCP, TCP_NODELAY, (
char *) &flag,
sizeof(
int));
183 if (setsockopt(
sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val),
sizeof(val)) != 0)
189 #if defined(SOL_TCP) && defined(TCP_KEEPIDLE) 191 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPIDLE, &val,
sizeof(val)) != 0)
199 #if defined(SOL_TCP) && defined(TCP_KEEPINTVL) 201 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPINTVL, &val,
sizeof(val)) != 0)
209 #if defined(SOL_TCP) && defined(TCP_KEEPCNT) 211 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPCNT, &val,
sizeof(val)) != 0)
222 if (setsockopt(
sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val),
sizeof(val)) != 0)
246 sockaddr_storage sas;
251 if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
253 sockaddr_in *address = (sockaddr_in*) &sas;
254 sas_len =
sizeof(sockaddr_in);
257 address->sin_family = AF_INET;
258 address->sin_port = htons(port);
259 address->sin_addr.s_addr = ina.s_addr;
261 else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
263 sockaddr_in6 *address = (sockaddr_in6*) &sas;
264 sas_len =
sizeof(sockaddr_in6);
265 la_len_ =
sizeof(sockaddr_in6);
266 address->sin6_family = AF_INET6;
267 address->sin6_port = htons(port);
268 memcpy(address->sin6_addr.s6_addr, in6a.s6_addr,
sizeof(in6a.s6_addr));
272 struct addrinfo* addr;
273 struct addrinfo hints;
274 memset(&hints, 0,
sizeof(hints));
275 hints.ai_family = AF_UNSPEC;
277 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
280 ROS_ERROR(
"couldn't resolve publisher host [%s]", host.c_str());
285 struct addrinfo* it = addr;
286 char namebuf[128] = {};
287 for (; it; it = it->ai_next)
291 sockaddr_in *address = (sockaddr_in*) &sas;
292 sas_len =
sizeof(*address);
294 memcpy(address, it->ai_addr, it->ai_addrlen);
295 address->sin_family = it->ai_family;
296 address->sin_port = htons(port);
298 strncpy(namebuf, inet_ntoa(address->sin_addr),
sizeof(namebuf)-1);
304 sockaddr_in6 *address = (sockaddr_in6*) &sas;
305 sas_len =
sizeof(*address);
307 memcpy(address, it->ai_addr, it->ai_addrlen);
308 address->sin6_family = it->ai_family;
309 address->sin6_port = htons((u_short) port);
312 inet_ntop(AF_INET6, (
void*)&(address->sin6_addr), namebuf,
sizeof(namebuf));
322 ROS_ERROR(
"Couldn't resolve an address for [%s]\n", host.c_str());
326 ROSCPP_LOG_DEBUG(
"Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf,
sock_);
350 std::stringstream ss;
351 ss << host <<
":" << port <<
" on socket " <<
sock_;
378 sock_ = socket(AF_INET6, SOCK_STREAM, 0);
380 address->sin6_family = AF_INET6;
384 address->sin6_port = htons(port);
385 sa_len_ =
sizeof(sockaddr_in6);
389 sock_ = socket(AF_INET, SOCK_STREAM, 0);
391 address->sin_family = AF_INET;
393 htonl(INADDR_LOOPBACK) :
395 address->sin_port = htons(port);
413 getsockname(
sock_, (sockaddr *)&server_address_, &
sa_len_);
415 switch (server_address_.ss_family)
418 server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
421 server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
480 disconnect_cb(shared_from_this());
499 uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
500 int num_bytes = ::recv(
sock_, reinterpret_cast<char*>(buffer), read_size, 0);
513 else if (num_bytes == 0)
538 uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
539 int num_bytes = ::send(
sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
640 sockaddr client_address;
641 socklen_t len =
sizeof(client_address);
642 int new_sock =
::accept(
sock_, (sockaddr *)&client_address, &len);
648 if (!transport->setSocket(new_sock))
650 ROS_ERROR(
"Failed to set socket on transport for socket %d", new_sock);
704 if((events & POLLERR) ||
705 (events & POLLHUP) ||
706 #
if defined(POLLRDHUP)
707 (events & POLLRDHUP) ||
712 socklen_t len =
sizeof(error);
713 if (getsockopt(
sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
719 strerror_s(err,60,error);
722 ROSCPP_LOG_DEBUG(
"Socket %d closed with (ERR|HUP|NVAL) events %d: %s",
sock_, events, strerror(error));
731 std::stringstream str;
740 sockaddr_storage sas;
741 socklen_t sas_len =
sizeof(sas);
742 getpeername(
sock_, (sockaddr *)&sas, &sas_len);
744 sockaddr_in *sin = (sockaddr_in *)&sas;
745 sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
747 char namebuf[128] = {};
750 switch (sas.ss_family)
753 port = ntohs(sin->sin_port);
754 strncpy(namebuf, inet_ntoa(sin->sin_addr),
sizeof(namebuf)-1);
757 port = ntohs(sin6->sin6_port);
758 inet_ntop(AF_INET6, (
void*)&(sin6->sin6_addr), namebuf,
sizeof(namebuf));
766 std::string ip = namebuf;
767 std::stringstream uri;
768 uri << ip <<
":" << port;
boost::function< void(const TransportTCPPtr &)> AcceptCallback
virtual void parseHeader(const Header &header)
Provides an opportunity for transport-specific options to come in through the header.
bool addEvents(int sock, int events)
Add events to be polled on a socket.
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
std::string connected_host_
Manages a set of sockets being polled through the poll() function call.
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
std::string cached_remote_host_
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
ROSCPP_DECL const char * last_socket_error_string()
void setNoDelay(bool nodelay)
#define ROSCPP_LOG_DEBUG(...)
AcceptCallback accept_cb_
bool setSocket(int sock)
Set the socket to be used by this transport.
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
ROSCPP_DECL bool last_socket_error_is_would_block()
#define ROS_ASSERT_MSG(cond,...)
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to...
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
bool delSocket(int sock)
Delete a socket.
boost::recursive_mutex close_mutex_
#define ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
sockaddr_storage server_address_
TransportTCPPtr accept()
Accept a connection on a server socket. Blocks until a connection is available.
sockaddr_storage local_address_
bool listen(int port, int backlog, const AcceptCallback &accept_cb)
Start a server socket and listen on a port.
boost::function< void(const TransportPtr &)> Callback
static bool s_use_keepalive_
void socketUpdate(int events)
boost::shared_ptr< TransportTCP > TransportTCPPtr
#define ROS_INVALID_SOCKET
#define ROSCPP_CONN_LOG_DEBUG(...)
TransportTCP(PollSet *poll_set, int flags=0)
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
#define ROS_SOCKETS_SHUT_RDWR
std::string getClientURI()
Returns the URI of the remote host.
bool connect(const std::string &host, int port)
Connect to a remote host.
ROSCPP_DECL int last_socket_error()
void setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.