42 #include <boost/bind.hpp> 54 , expecting_read_(false)
55 , expecting_write_(false)
82 ROS_ERROR(
"setting socket [%d] as non_blocking failed with error [%d]",
sock_, result);
111 std::stringstream ss;
132 #ifdef ROSCPP_USE_TCP_NODELAY 141 #if defined(POLLRDHUP) // POLLRDHUP is not part of POSIX! 147 if (!(
flags_ & SYNCHRONOUS))
158 if (header.
getValue(
"tcp_nodelay", nodelay) && nodelay ==
"1")
167 int flag = nodelay ? 1 : 0;
168 int result = setsockopt(
sock_, IPPROTO_TCP, TCP_NODELAY, (
char *) &flag,
sizeof(
int));
180 if (setsockopt(
sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val),
sizeof(val)) != 0)
186 #if defined(SOL_TCP) && defined(TCP_KEEPIDLE) 188 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPIDLE, &val,
sizeof(val)) != 0)
194 #if defined(SOL_TCP) && defined(TCP_KEEPINTVL) 196 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPINTVL, &val,
sizeof(val)) != 0)
202 #if defined(SOL_TCP) && defined(TCP_KEEPCNT) 204 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPCNT, &val,
sizeof(val)) != 0)
213 if (setsockopt(
sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val),
sizeof(val)) != 0)
237 sockaddr_storage sas;
242 if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
244 sockaddr_in *address = (sockaddr_in*) &sas;
245 sas_len =
sizeof(sockaddr_in);
248 address->sin_family = AF_INET;
249 address->sin_port = htons(port);
250 address->sin_addr.s_addr = ina.s_addr;
252 else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
254 sockaddr_in6 *address = (sockaddr_in6*) &sas;
255 sas_len =
sizeof(sockaddr_in6);
256 la_len_ =
sizeof(sockaddr_in6);
257 address->sin6_family = AF_INET6;
258 address->sin6_port = htons(port);
259 memcpy(address->sin6_addr.s6_addr, in6a.s6_addr,
sizeof(in6a.s6_addr));
263 struct addrinfo* addr;
264 struct addrinfo hints;
265 memset(&hints, 0,
sizeof(hints));
266 hints.ai_family = AF_UNSPEC;
268 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
271 ROS_ERROR(
"couldn't resolve publisher host [%s]", host.c_str());
276 struct addrinfo* it = addr;
277 char namebuf[128] = {};
278 for (; it; it = it->ai_next)
282 sockaddr_in *address = (sockaddr_in*) &sas;
283 sas_len =
sizeof(*address);
285 memcpy(address, it->ai_addr, it->ai_addrlen);
286 address->sin_family = it->ai_family;
287 address->sin_port = htons(port);
289 strncpy(namebuf, inet_ntoa(address->sin_addr),
sizeof(namebuf)-1);
295 sockaddr_in6 *address = (sockaddr_in6*) &sas;
296 sas_len =
sizeof(*address);
298 memcpy(address, it->ai_addr, it->ai_addrlen);
299 address->sin6_family = it->ai_family;
300 address->sin6_port = htons((u_short) port);
303 inet_ntop(AF_INET6, (
void*)&(address->sin6_addr), namebuf,
sizeof(namebuf));
313 ROS_ERROR(
"Couldn't resolve an address for [%s]\n", host.c_str());
317 ROSCPP_LOG_DEBUG(
"Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf,
sock_);
341 std::stringstream ss;
342 ss << host <<
":" << port <<
" on socket " <<
sock_;
369 sock_ = socket(AF_INET6, SOCK_STREAM, 0);
371 address->sin6_family = AF_INET6;
375 address->sin6_port = htons(port);
376 sa_len_ =
sizeof(sockaddr_in6);
380 sock_ = socket(AF_INET, SOCK_STREAM, 0);
382 address->sin_family = AF_INET;
384 htonl(INADDR_LOOPBACK) :
386 address->sin_port = htons(port);
471 disconnect_cb(shared_from_this());
490 uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
491 int num_bytes = ::recv(
sock_, reinterpret_cast<char*>(buffer), read_size, 0);
504 else if (num_bytes == 0)
529 uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
530 int num_bytes = ::send(
sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
631 sockaddr client_address;
632 socklen_t len =
sizeof(client_address);
633 int new_sock =
::accept(
sock_, (sockaddr *)&client_address, &len);
639 if (!transport->setSocket(new_sock))
641 ROS_ERROR(
"Failed to set socket on transport for socket %d", new_sock);
695 if((events & POLLERR) ||
696 (events & POLLHUP) ||
697 #
if defined(POLLRDHUP)
698 (events & POLLRDHUP) ||
703 socklen_t len =
sizeof(error);
704 if (getsockopt(
sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
710 strerror_s(err,60,error);
713 ROSCPP_LOG_DEBUG(
"Socket %d closed with (ERR|HUP|NVAL) events %d: %s",
sock_, events, strerror(error));
722 std::stringstream str;
731 sockaddr_storage sas;
732 socklen_t sas_len =
sizeof(sas);
733 getpeername(
sock_, (sockaddr *)&sas, &sas_len);
735 sockaddr_in *sin = (sockaddr_in *)&sas;
736 sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
738 char namebuf[128] = {};
741 switch (sas.ss_family)
744 port = ntohs(sin->sin_port);
745 strncpy(namebuf, inet_ntoa(sin->sin_addr),
sizeof(namebuf)-1);
748 port = ntohs(sin6->sin6_port);
749 inet_ntop(AF_INET6, (
void*)&(sin6->sin6_addr), namebuf,
sizeof(namebuf));
757 std::string ip = namebuf;
758 std::stringstream uri;
759 uri << ip <<
":" << port;
boost::function< void(const TransportTCPPtr &)> AcceptCallback
virtual void parseHeader(const Header &header)
Provides an opportunity for transport-specific options to come in through the header.
bool addEvents(int sock, int events)
Add events to be polled on a socket.
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
std::string connected_host_
Manages a set of sockets being polled through the poll() function call.
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
std::string cached_remote_host_
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
ROSCPP_DECL const char * last_socket_error_string()
void setNoDelay(bool nodelay)
#define ROSCPP_LOG_DEBUG(...)
AcceptCallback accept_cb_
bool setSocket(int sock)
Set the socket to be used by this transport.
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
ROSCPP_DECL bool last_socket_error_is_would_block()
#define ROS_ASSERT_MSG(cond,...)
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to...
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
bool delSocket(int sock)
Delete a socket.
boost::recursive_mutex close_mutex_
#define ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
sockaddr_storage server_address_
TransportTCPPtr accept()
Accept a connection on a server socket. Blocks until a connection is available.
sockaddr_storage local_address_
bool listen(int port, int backlog, const AcceptCallback &accept_cb)
Start a server socket and listen on a port.
boost::function< void(const TransportPtr &)> Callback
static bool s_use_keepalive_
void socketUpdate(int events)
boost::shared_ptr< TransportTCP > TransportTCPPtr
#define ROS_INVALID_SOCKET
#define ROSCPP_CONN_LOG_DEBUG(...)
TransportTCP(PollSet *poll_set, int flags=0)
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
#define ROS_SOCKETS_SHUT_RDWR
std::string getClientURI()
Returns the URI of the remote host.
bool connect(const std::string &host, int port)
Connect to a remote host.
ROSCPP_DECL int last_socket_error()
void setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.