1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: common TCPROS libraries"""
36
37
38 try:
39 from cStringIO import StringIO
40 python3 = 0
41 except ImportError:
42 from io import StringIO, BytesIO
43 python3 = 1
44 import errno
45 import socket
46 import logging
47
48 import threading
49 import time
50 import traceback
51 import select
52
53 import rosgraph
54 import rosgraph.network
55 from genpy import DeserializationError, Message
56 from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header
57
58
59 from rospy.core import *
60 from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn
61 from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError
62 from rospy.msg import deserialize_messages, serialize_message
63 from rospy.service import ServiceException
64
65 from rospy.impl.transport import Transport, BIDIRECTIONAL
66 from errno import EAGAIN, EWOULDBLOCK
67
68 logger = logging.getLogger('rospy.tcpros')
69
70
71 DEFAULT_BUFF_SIZE = 65536
72
73
74 TCPROS = "TCPROS"
75
76 _PARAM_TCP_KEEPALIVE = '/tcp_keepalive'
77 _use_tcp_keepalive = None
78 _use_tcp_keepalive_lock = threading.Lock()
92
94 """
95 Read data from socket into buffer.
96 @param sock: socket to read from
97 @type sock: socket.socket
98 @param b: buffer to receive into
99 @type b: StringIO
100 @param buff_size: recv read size
101 @type buff_size: int
102 @return: number of bytes read
103 @rtype: int
104 """
105 try:
106 d = sock.recv(buff_size)
107 if d:
108 b.write(d)
109 return len(d)
110 else:
111 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
112 except socket.error as ex:
113 if ex.errno not in (EAGAIN, EWOULDBLOCK):
114 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
115 else:
116 return 0
117
119 """
120 Simple server that accepts inbound TCP/IP connections and hands
121 them off to a handler function. TCPServer obeys the
122 ROS_IP/ROS_HOSTNAME environment variables
123 """
124
125 - def __init__(self, inbound_handler, port=0):
126 """
127 Setup a server socket listening on the specified port. If the
128 port is omitted, will choose any open port.
129 @param inbound_handler: handler to invoke with
130 new connection
131 @type inbound_handler: fn(sock, addr)
132 @param port: port to bind to, omit/0 to bind to any
133 @type port: int
134 """
135 self.port = port
136 self.addr = None
137 self.is_shutdown = False
138 self.inbound_handler = inbound_handler
139 try:
140 self.server_sock = self._create_server_sock()
141 except:
142 self.server_sock = None
143 raise
144
146 """Runs the run() loop in a separate thread"""
147 t = threading.Thread(target=self.run, args=())
148 t.daemon = True
149 t.start()
150
152 """
153 Main TCP receive loop. Should be run in a separate thread -- use start()
154 to do this automatically.
155 """
156 self.is_shutdown = False
157 if not self.server_sock:
158 raise ROSInternalException("%s did not connect"%self.__class__.__name__)
159 while not self.is_shutdown:
160 try:
161 (client_sock, client_addr) = self.server_sock.accept()
162 except socket.timeout:
163 continue
164 except ConnectionAbortedError:
165 continue
166 except IOError as e:
167 (e_errno, msg) = e.args
168 if e_errno == errno.EINTR:
169 continue
170 if not self.is_shutdown:
171 raise
172 if self.is_shutdown:
173 break
174 try:
175
176 self.inbound_handler(client_sock, client_addr)
177 except socket.error as e:
178 if not self.is_shutdown:
179 traceback.print_exc()
180 logwarn("Failed to handle inbound connection due to socket error: %s"%e)
181 logdebug("TCPServer[%s] shutting down", self.port)
182
183
185 """
186 @return: (ip address, port) of server socket binding
187 @rtype: (str, int)
188 """
189
190
191 return (rosgraph.network.get_host_name(), self.port)
192
194 """
195 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
196 binding to loopback interface.
197 """
198 if rosgraph.network.use_ipv6():
199 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
200 else:
201 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
202 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
203 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port))
204 server_sock.bind((rosgraph.network.get_bind_address(), self.port))
205 (self.addr, self.port) = server_sock.getsockname()[0:2]
206 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port))
207 server_sock.listen(5)
208 return server_sock
209
211 """shutdown I/O resources uses by this server"""
212 if not self.is_shutdown:
213 self.is_shutdown = True
214 self.server_sock.close()
215
216
217
218
219 _tcpros_server = None
220
233
241
242
243
245 """
246 get the address of the tcpros server.
247 @raise Exception: if tcpros server has not been started or created
248 """
249 return _tcpros_server.get_address()
250
252 """
253 utility handler that does nothing more than provide a rejection header
254 @param sock: socket connection
255 @type sock: socket.socket
256 @param client_addr: client address
257 @type client_addr: str
258 @param header: request header
259 @type header: dict
260 """
261 return {'error': "unhandled connection"}
262
264 """
265 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
266 connections as well as ROS service connections over TCP. TCP server
267 socket is run once start_server() is called -- this is implicitly
268 called during init_publisher().
269 """
270
272 """
273 Constructor
274 @param port: port number to bind to (default 0/any)
275 @type port: int
276 """
277 self.port = port
278 self.tcp_ros_server = None
279 self.lock = threading.Lock()
280
281 self.topic_connection_handler = _error_connection_handler
282
283 self.service_connection_handler = _error_connection_handler
284
286 """
287 Starts the TCP socket server if one is not already running
288 """
289 if self.tcp_ros_server:
290 return
291 with self.lock:
292 try:
293 if not self.tcp_ros_server:
294 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port)
295 self.tcp_ros_server.start()
296 except Exception as e:
297 self.tcp_ros_server = None
298 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc()))
299 return 0, "unable to establish TCPROS server: %s"%e, []
300
302 """
303 @return: address and port of TCP server socket for accepting
304 inbound connections
305 @rtype: str, int
306 """
307 if self.tcp_ros_server is not None:
308 return self.tcp_ros_server.get_full_addr()
309 return None, None
310
312 """stops the TCP/IP server responsible for receiving inbound connections"""
313 if self.tcp_ros_server:
314 self.tcp_ros_server.shutdown()
315
317 """
318 TCPServer callback: detects incoming topic or service connection and passes connection accordingly
319
320 @param sock: socket connection
321 @type sock: socket.socket
322 @param client_addr: client address
323 @type client_addr: (str, int)
324 @raise TransportInitError: If transport cannot be successfully initialized
325 """
326
327
328 try:
329 buff_size = 4096
330 if python3 == 0:
331
332 header = read_ros_handshake_header(sock, StringIO(), buff_size)
333 else:
334 header = read_ros_handshake_header(sock, BytesIO(), buff_size)
335
336 if 'topic' in header:
337 err_msg = self.topic_connection_handler(sock, client_addr, header)
338 elif 'service' in header:
339 err_msg = self.service_connection_handler(sock, client_addr, header)
340 else:
341 err_msg = 'no topic or service name detected'
342 if err_msg:
343
344
345
346
347
348
349 if not rospy.core.is_shutdown_requested():
350 write_ros_handshake_header(sock, {'error' : err_msg})
351 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header))
352 else:
353 write_ros_handshake_header(sock, {'error' : 'node shutting down'})
354 return
355 except rospy.exceptions.TransportInitError as e:
356 logwarn(str(e))
357 if sock is not None:
358 sock.close()
359 except Exception as e:
360
361 if not rospy.core.is_shutdown_requested():
362 logwarn("Inbound TCP/IP connection failed: %s", e)
363 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc())
364 if sock is not None:
365 sock.close()
366
368 """
369 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
370 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
371 well as which fields to send when creating a new connection (get_header_fields()).
372 """
373
375 """
376 ctor
377 @param resolved_name: resolved service or topic name
378 @type resolved_name: str
379 @param recv_data_class: message class for deserializing inbound messages
380 @type recv_data_class: Class
381 @param queue_size: maximum number of inbound messages to maintain
382 @type queue_size: int
383 @param buff_size: receive buffer size (in bytes) for reading from the connection.
384 @type buff_size: int
385 """
386 if recv_data_class and not issubclass(recv_data_class, Message):
387 raise TransportInitError("Unable to initialize transport: data class is not a message data class")
388 self.resolved_name = resolved_name
389 self.recv_data_class = recv_data_class
390 self.queue_size = queue_size
391 self.buff_size = buff_size
392 self.direction = BIDIRECTIONAL
393
394
396 """
397 @param b StringIO: read buffer
398 @param msg_queue [Message]: queue of deserialized messages
399 @type msg_queue: [Message]
400 @param sock socket: protocol can optionally read more data from
401 the socket, but in most cases the required data will already be
402 in b
403 """
404
405 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
406
408 """
409 Header fields that should be sent over the connection. The header fields
410 are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
411 @return: {str : str}: header fields to send over connection
412 @rtype: dict
413 """
414 return {}
415
416
417
418
419
420
421
422
423
424
425
427 """
428 Generic implementation of TCPROS exchange routines for both topics and services
429 """
430 transport_type = 'TCPROS'
431
432 - def __init__(self, protocol, name, header=None):
433 """
434 ctor
435 @param name str: identifier
436 @param protocol TCPROSTransportProtocol protocol implementation
437 @param header dict: (optional) handshake header if transport handshake header was
438 already read off of transport.
439 @raise TransportInitError if transport cannot be initialized according to arguments
440 """
441 super(TCPROSTransport, self).__init__(protocol.direction, name=name)
442 if not name:
443 raise TransportInitError("Unable to initialize transport: name is not set")
444
445 self.protocol = protocol
446
447 self.socket = None
448 self.endpoint_id = 'unknown'
449 self.callerid_pub = 'unknown'
450 self.dest_address = None
451
452 if python3 == 0:
453 self.read_buff = StringIO()
454 self.write_buff = StringIO()
455 else:
456 self.read_buff = BytesIO()
457 self.write_buff = BytesIO()
458
459
460 self.header = header
461
462
463 self.is_latched = False
464 self.latch = None
465
466
467
468 self._fileno = None
469
470
471
472
473 self.md5sum = None
474 self.type = None
475
476
477 self.local_endpoint = (None, None)
478 self.remote_endpoint = (None, None)
479
481 """
482 Get detailed connection information.
483 Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp'
484 e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6]
485 """
486
487 return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno)
488
490 """
491 Get descriptor for select
492 """
493 return self._fileno
494
496 """
497 Set the endpoint_id of this transport.
498 Allows the endpoint_id to be set before the socket is initialized.
499 """
500 self.endpoint_id = endpoint_id
501
503 """
504 Set the socket for this transport
505 @param sock: socket
506 @type sock: socket.socket
507 @param endpoint_id: identifier for connection endpoint
508 @type endpoint_id: str
509 @raise TransportInitError: if socket has already been set
510 """
511 if self.socket is not None:
512 raise TransportInitError("socket already initialized")
513 self.socket = sock
514 self.endpoint_id = endpoint_id
515 self._fileno = sock.fileno()
516 self.local_endpoint = self.socket.getsockname()
517
518 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
519 """
520 Establish TCP connection to the specified
521 address/port. connect() always calls L{write_header()} and
522 L{read_header()} after the connection is made
523 @param dest_addr: destination IP address
524 @type dest_addr: str
525 @param dest_port: destination port
526 @type dest_port: int
527 @param endpoint_id: string identifier for connection (for statistics)
528 @type endpoint_id: str
529 @param timeout: (optional keyword) timeout in seconds
530 @type timeout: float
531 @raise TransportInitError: if unable to create connection
532 """
533
534
535 if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"):
536 if not rosgraph.network.is_local_address(dest_addr):
537 msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr)
538 logwarn(msg)
539 self.close()
540 raise TransportInitError(msg)
541
542
543 try:
544 self.endpoint_id = endpoint_id
545 self.dest_address = (dest_addr, dest_port)
546 if rosgraph.network.use_ipv6():
547 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
548 else:
549 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
550 if _is_use_tcp_keepalive():
551
552 if hasattr(socket, 'TCP_KEEPCNT') and \
553 hasattr(socket, 'TCP_KEEPIDLE') and \
554 hasattr(socket, 'TCP_KEEPINTVL'):
555
556 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
557
558 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9)
559
560 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
561
562 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)
563 if timeout is not None:
564 s.settimeout(timeout)
565 self.socket = s
566 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port))
567 self.socket.connect((dest_addr, dest_port))
568 self.write_header()
569 self.read_header()
570 self.local_endpoint = self.socket.getsockname()
571 self.remote_endpoint = (dest_addr, dest_port)
572 except TransportInitError as tie:
573 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
574 raise
575 except Exception as e:
576
577 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
578
579
580 if not isinstance(e, socket.error):
581
582 self.close()
583 elif not isinstance(e, socket.timeout) and e.errno not in [
584 errno.ENETDOWN, errno.ENETUNREACH, errno.ENETRESET,
585 errno.ECONNABORTED, errno.ETIMEDOUT, errno.EHOSTDOWN, errno.EHOSTUNREACH]:
586
587
588
589
590
591
592
593
594
595 self.close()
596 raise TransportInitError(str(e))
597
599 """
600 Validate header and initialize fields accordingly
601 @param header: header fields from publisher
602 @type header: dict
603 @raise TransportInitError: if header fails to validate
604 """
605 self.header = header
606 if 'error' in header:
607 raise TransportInitError("remote error reported: %s"%header['error'])
608 for required in ['md5sum', 'type']:
609 if not required in header:
610 raise TransportInitError("header missing required field [%s]"%required)
611 self.type = header['type']
612 self.md5sum = header['md5sum']
613 if 'callerid' in header:
614 self.callerid_pub = header['callerid']
615 if header.get('latching', '0') == '1':
616 self.is_latched = True
617
619 """Writes the TCPROS header to the active connection."""
620
621 sock = self.socket
622 protocol = self.protocol
623
624
625
626 if sock is None or protocol is None:
627 return
628 fileno = sock.fileno()
629 ready = None
630 poller = None
631 if hasattr(select, 'poll'):
632 poller = select.poll()
633 poller.register(fileno, select.POLLOUT)
634 while not ready:
635 events = poller.poll()
636 for _, flag in events:
637 if flag & select.POLLOUT:
638 ready = True
639 else:
640 while not ready:
641 try:
642 _, ready, _ = select.select([], [fileno], [])
643 except ValueError as e:
644 logger.error("[%s]: select fileno '%s': %s", self.name, str(fileno), str(e))
645 raise
646
647 logger.debug("[%s]: writing header", self.name)
648 sock.setblocking(1)
649 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
650 if poller:
651 poller.unregister(fileno)
652
653
655 """
656 Read TCPROS header from active socket
657 @raise TransportInitError if header fails to validate
658 """
659 sock = self.socket
660 if sock is None:
661 return
662 sock.setblocking(1)
663
664 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
665
667 """
668 Convenience routine for services to send a message across a
669 particular connection. NOTE: write_data is much more efficient
670 if same message is being sent to multiple connections. Not
671 threadsafe.
672 @param msg: message to send
673 @type msg: Msg
674 @param seq: sequence number for message
675 @type seq: int
676 @raise TransportException: if error occurred sending message
677 """
678
679 serialize_message(self.write_buff, seq, msg)
680 self.write_data(self.write_buff.getvalue())
681 self.write_buff.truncate(0)
682 self.write_buff.seek(0)
683
685 """
686 Write raw data to transport
687 @raise TransportInitialiationError: could not be initialized
688 @raise TransportTerminated: no longer open for publishing
689 """
690 if not self.socket:
691 raise TransportInitError("TCPROS transport was not successfully initialized")
692 if self.done:
693 raise TransportTerminated("connection closed")
694 try:
695
696 self.socket.sendall(data)
697 self.stat_bytes += len(data)
698 self.stat_num_msg += 1
699 except IOError as ioe:
700
701 (ioe_errno, msg) = ioe.args
702 if ioe_errno == errno.EPIPE:
703 logdebug("ERROR: Broken Pipe")
704 self.close()
705 raise TransportTerminated(str(ioe_errno)+msg)
706 raise
707 except socket.error as se:
708
709 (se_errno, msg) = se.args
710 if se_errno == errno.EPIPE:
711 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id)
712 self.close()
713 raise TransportTerminated(msg)
714 elif se_errno == errno.ECONNRESET:
715 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)
716 self.close()
717 raise TransportTerminated(msg)
718 else:
719 rospydebug("unknown socket error writing data: %s",traceback.format_exc())
720 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)
721 self.close()
722 raise TransportTerminated(str(se_errno)+' '+msg)
723 return True
724
726 """
727 block until messages are read off of socket
728 @return: list of newly received messages
729 @rtype: [Msg]
730 @raise TransportException: if unable to receive message due to error
731 """
732 sock = self.socket
733 if sock is None:
734 raise TransportException("connection not initialized")
735 b = self.read_buff
736 msg_queue = []
737 p = self.protocol
738 try:
739 sock.setblocking(1)
740 while not msg_queue and not self.done and not is_shutdown():
741 if b.tell() >= 4:
742 p.read_messages(b, msg_queue, sock)
743 if not msg_queue:
744 self.stat_bytes += recv_buff(sock, b, p.buff_size)
745 self.stat_num_msg += len(msg_queue)
746
747 for m in msg_queue:
748 m._connection_header = self.header
749
750
751 if self.is_latched and msg_queue:
752 self.latch = msg_queue[-1]
753
754 return msg_queue
755
756 except DeserializationError as e:
757 rospyerr(traceback.format_exc())
758 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e)))
759 except TransportTerminated as e:
760 raise
761 except ServiceException as e:
762 raise
763 except Exception as e:
764 rospyerr(traceback.format_exc())
765 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
766 return retval
767
769
770
771
772 if self.dest_address is None:
773 raise ROSInitException("internal error with reconnection state: address not stored")
774
775 interval = 0.5
776 while self.socket is None and not self.done and not rospy.is_shutdown():
777 try:
778
779
780
781
782 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.)
783 except TransportInitError:
784 self.socket = None
785
786 if self.socket is None and interval < 30.:
787
788 interval = interval * 2
789
790 time.sleep(interval)
791
793 """
794 Receive messages until shutdown
795 @param msgs_callback: callback to invoke for new messages received
796 @type msgs_callback: fn([msg])
797 """
798
799 logger.debug("receive_loop for [%s]", self.name)
800 try:
801 while not self.done and not is_shutdown():
802 try:
803 if self.socket is not None:
804 msgs = self.receive_once()
805 if not self.done and not is_shutdown():
806 msgs_callback(msgs, self)
807 else:
808 self._reconnect()
809
810 except TransportTerminated as e:
811 logdebug("[%s] failed to receive incoming message : %s" % (self.name, str(e)))
812 rospydebug("[%s] failed to receive incoming message: %s" % (self.name, traceback.format_exc()))
813 break
814
815 except TransportException as e:
816
817 try:
818 if self.socket is not None:
819 try:
820 self.socket.shutdown()
821 except:
822 pass
823 finally:
824 self.socket.close()
825 except:
826 pass
827 self.socket = None
828
829 except DeserializationError as e:
830
831
832 logerr("[%s] error deserializing incoming request: %s"%(self.name, str(e)))
833 rospyerr("[%s] error deserializing incoming request: %s"%(self.name, traceback.format_exc()))
834 except:
835
836 try:
837
838
839
840 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
841 except: pass
842
843 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
844 finally:
845 if not self.done:
846 self.close()
847
849 """close i/o and release resources"""
850 if not self.done:
851 try:
852 if self.socket is not None:
853 try:
854 self.socket.shutdown(socket.SHUT_RDWR)
855 except:
856 pass
857 finally:
858 self.socket.close()
859 finally:
860 self.socket = self.read_buff = self.write_buff = self.protocol = None
861 super(TCPROSTransport, self).close()
862