42 #include <boost/bind.hpp> 54 , expecting_read_(false)
55 , expecting_write_(false)
82 ROS_ERROR(
"setting socket [%d] as non_blocking failed with error [%d]",
sock_, result);
111 std::stringstream ss;
132 #ifdef ROSCPP_USE_TCP_NODELAY 143 if (!(
flags_ & SYNCHRONOUS))
154 if (header.
getValue(
"tcp_nodelay", nodelay) && nodelay ==
"1")
163 int flag = nodelay ? 1 : 0;
164 int result = setsockopt(
sock_, IPPROTO_TCP, TCP_NODELAY, (
char *) &flag,
sizeof(
int));
176 if (setsockopt(
sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val),
sizeof(val)) != 0)
182 #if defined(SOL_TCP) && defined(TCP_KEEPIDLE) 184 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPIDLE, &val,
sizeof(val)) != 0)
192 #if defined(SOL_TCP) && defined(TCP_KEEPINTVL) 194 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPINTVL, &val,
sizeof(val)) != 0)
202 #if defined(SOL_TCP) && defined(TCP_KEEPCNT) 204 if (setsockopt(
sock_, SOL_TCP, TCP_KEEPCNT, &val,
sizeof(val)) != 0)
215 if (setsockopt(
sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val),
sizeof(val)) != 0)
239 sockaddr_storage sas;
244 if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
246 sockaddr_in *address = (sockaddr_in*) &sas;
247 sas_len =
sizeof(sockaddr_in);
250 address->sin_family = AF_INET;
251 address->sin_port = htons(port);
252 address->sin_addr.s_addr = ina.s_addr;
254 else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
256 sockaddr_in6 *address = (sockaddr_in6*) &sas;
257 sas_len =
sizeof(sockaddr_in6);
258 la_len_ =
sizeof(sockaddr_in6);
259 address->sin6_family = AF_INET6;
260 address->sin6_port = htons(port);
261 memcpy(address->sin6_addr.s6_addr, in6a.s6_addr,
sizeof(in6a.s6_addr));
265 struct addrinfo* addr;
266 struct addrinfo hints;
267 memset(&hints, 0,
sizeof(hints));
268 hints.ai_family = AF_UNSPEC;
270 if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
273 ROS_ERROR(
"couldn't resolve publisher host [%s]", host.c_str());
278 struct addrinfo* it = addr;
280 for (; it; it = it->ai_next)
284 sockaddr_in *address = (sockaddr_in*) &sas;
285 sas_len =
sizeof(*address);
287 memcpy(address, it->ai_addr, it->ai_addrlen);
288 address->sin_family = it->ai_family;
289 address->sin_port = htons(port);
291 strcpy(namebuf, inet_ntoa(address->sin_addr));
297 sockaddr_in6 *address = (sockaddr_in6*) &sas;
298 sas_len =
sizeof(*address);
300 memcpy(address, it->ai_addr, it->ai_addrlen);
301 address->sin6_family = it->ai_family;
302 address->sin6_port = htons((u_short) port);
305 inet_ntop(AF_INET6, (
void*)&(address->sin6_addr), namebuf,
sizeof(namebuf));
315 ROS_ERROR(
"Couldn't resolve an address for [%s]\n", host.c_str());
319 ROSCPP_LOG_DEBUG(
"Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf,
sock_);
343 std::stringstream ss;
344 ss << host <<
":" << port <<
" on socket " <<
sock_;
371 sock_ = socket(AF_INET6, SOCK_STREAM, 0);
373 address->sin6_family = AF_INET6;
377 address->sin6_port = htons(port);
378 sa_len_ =
sizeof(sockaddr_in6);
382 sock_ = socket(AF_INET, SOCK_STREAM, 0);
384 address->sin_family = AF_INET;
386 htonl(INADDR_LOOPBACK) :
388 address->sin_port = htons(port);
473 disconnect_cb(shared_from_this());
492 uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
493 int num_bytes = ::recv(
sock_, reinterpret_cast<char*>(buffer), read_size, 0);
506 else if (num_bytes == 0)
531 uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
532 int num_bytes = ::send(
sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
633 sockaddr client_address;
634 socklen_t len =
sizeof(client_address);
635 int new_sock =
::accept(
sock_, (sockaddr *)&client_address, &len);
641 if (!transport->setSocket(new_sock))
643 ROS_ERROR(
"Failed to set socket on transport for socket %d", new_sock);
697 if((events & POLLERR) ||
698 (events & POLLHUP) ||
702 socklen_t len =
sizeof(error);
703 if (getsockopt(
sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
709 strerror_s(err,60,error);
712 ROSCPP_LOG_DEBUG(
"Socket %d closed with (ERR|HUP|NVAL) events %d: %s",
sock_, events, strerror(error));
721 std::stringstream str;
730 sockaddr_storage sas;
731 socklen_t sas_len =
sizeof(sas);
732 getpeername(
sock_, (sockaddr *)&sas, &sas_len);
734 sockaddr_in *sin = (sockaddr_in *)&sas;
735 sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
740 switch (sas.ss_family)
743 port = ntohs(sin->sin_port);
744 strcpy(namebuf, inet_ntoa(sin->sin_addr));
747 port = ntohs(sin6->sin6_port);
748 inet_ntop(AF_INET6, (
void*)&(sin6->sin6_addr), namebuf,
sizeof(namebuf));
756 std::string ip = namebuf;
757 std::stringstream uri;
758 uri << ip <<
":" << port;
boost::function< void(const TransportTCPPtr &)> AcceptCallback
virtual void parseHeader(const Header &header)
Provides an opportunity for transport-specific options to come in through the header.
bool addEvents(int sock, int events)
Add events to be polled on a socket.
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
std::string connected_host_
Manages a set of sockets being polled through the poll() function call.
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
std::string cached_remote_host_
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
ROSCPP_DECL const char * last_socket_error_string()
void setNoDelay(bool nodelay)
#define ROSCPP_LOG_DEBUG(...)
AcceptCallback accept_cb_
bool setSocket(int sock)
Set the socket to be used by this transport.
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
ROSCPP_DECL bool last_socket_error_is_would_block()
#define ROS_ASSERT_MSG(cond,...)
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to...
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
bool delSocket(int sock)
Delete a socket.
boost::recursive_mutex close_mutex_
#define ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
sockaddr_storage server_address_
TransportTCPPtr accept()
Accept a connection on a server socket. Blocks until a connection is available.
sockaddr_storage local_address_
bool listen(int port, int backlog, const AcceptCallback &accept_cb)
Start a server socket and listen on a port.
boost::function< void(const TransportPtr &)> Callback
static bool s_use_keepalive_
void socketUpdate(int events)
boost::shared_ptr< TransportTCP > TransportTCPPtr
#define ROS_INVALID_SOCKET
#define ROSCPP_CONN_LOG_DEBUG(...)
TransportTCP(PollSet *poll_set, int flags=0)
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
#define ROS_SOCKETS_SHUT_RDWR
std::string getClientURI()
Returns the URI of the remote host.
bool connect(const std::string &host, int port)
Connect to a remote host.
ROSCPP_DECL int last_socket_error()
void setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.