1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: common TCPROS libraries"""
36
37
38 try:
39 from cStringIO import StringIO
40 python3 = 0
41 except ImportError:
42 from io import StringIO, BytesIO
43 python3 = 1
44 import errno
45 import socket
46 import logging
47
48 import threading
49 import time
50 import traceback
51 import select
52
53 import rosgraph
54 import rosgraph.network
55 from genpy import DeserializationError, Message
56 from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header
57
58
59 from rospy.core import *
60 from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn
61 from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError
62 from rospy.msg import deserialize_messages, serialize_message
63 from rospy.service import ServiceException
64
65 from rospy.impl.transport import Transport, BIDIRECTIONAL
66
67 logger = logging.getLogger('rospy.tcpros')
68
69
70 DEFAULT_BUFF_SIZE = 65536
71
72
73 TCPROS = "TCPROS"
74
75 _PARAM_TCP_KEEPALIVE = '/tcp_keepalive'
76 _use_tcp_keepalive = None
77 _use_tcp_keepalive_lock = threading.Lock()
91
93 """
94 Read data from socket into buffer.
95 @param sock: socket to read from
96 @type sock: socket.socket
97 @param b: buffer to receive into
98 @type b: StringIO
99 @param buff_size: recv read size
100 @type buff_size: int
101 @return: number of bytes read
102 @rtype: int
103 """
104 d = sock.recv(buff_size)
105 if d:
106 b.write(d)
107 return len(d)
108 else:
109 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
110
112 """
113 Simple server that accepts inbound TCP/IP connections and hands
114 them off to a handler function. TCPServer obeys the
115 ROS_IP/ROS_HOSTNAME environment variables
116 """
117
118 - def __init__(self, inbound_handler, port=0):
119 """
120 Setup a server socket listening on the specified port. If the
121 port is omitted, will choose any open port.
122 @param inbound_handler: handler to invoke with
123 new connection
124 @type inbound_handler: fn(sock, addr)
125 @param port: port to bind to, omit/0 to bind to any
126 @type port: int
127 """
128 self.port = port
129 self.addr = None
130 self.is_shutdown = False
131 self.inbound_handler = inbound_handler
132 try:
133 self.server_sock = self._create_server_sock()
134 except:
135 self.server_sock = None
136 raise
137
139 """Runs the run() loop in a separate thread"""
140 t = threading.Thread(target=self.run, args=())
141 t.daemon = True
142 t.start()
143
145 """
146 Main TCP receive loop. Should be run in a separate thread -- use start()
147 to do this automatically.
148 """
149 self.is_shutdown = False
150 if not self.server_sock:
151 raise ROSInternalException("%s did not connect"%self.__class__.__name__)
152 while not self.is_shutdown:
153 try:
154 (client_sock, client_addr) = self.server_sock.accept()
155 except socket.timeout:
156 continue
157 except IOError as e:
158 (e_errno, msg) = e.args
159 if e_errno == errno.EINTR:
160 continue
161 raise
162 if self.is_shutdown:
163 break
164 try:
165
166 self.inbound_handler(client_sock, client_addr)
167 except socket.error as e:
168 if not self.is_shutdown:
169 traceback.print_exc()
170 logwarn("Failed to handle inbound connection due to socket error: %s"%e)
171 logdebug("TCPServer[%s] shutting down", self.port)
172
173
175 """
176 @return: (ip address, port) of server socket binding
177 @rtype: (str, int)
178 """
179
180
181 return (rosgraph.network.get_host_name(), self.port)
182
184 """
185 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
186 binding to loopback interface.
187 """
188 if rosgraph.network.use_ipv6():
189 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
190 else:
191 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
192 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
193 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port))
194 server_sock.bind((rosgraph.network.get_bind_address(), self.port))
195 (self.addr, self.port) = server_sock.getsockname()[0:2]
196 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port))
197 server_sock.listen(5)
198 return server_sock
199
201 """shutdown I/O resources uses by this server"""
202 if not self.is_shutdown:
203 self.is_shutdown = True
204 self.server_sock.close()
205
206
207
208
209 _tcpros_server = None
210
223
231
232
233
235 """
236 get the address of the tcpros server.
237 @raise Exception: if tcpros server has not been started or created
238 """
239 return _tcpros_server.get_address()
240
242 """
243 utility handler that does nothing more than provide a rejection header
244 @param sock: socket connection
245 @type sock: socket.socket
246 @param client_addr: client address
247 @type client_addr: str
248 @param header: request header
249 @type header: dict
250 """
251 return {'error': "unhandled connection"}
252
254 """
255 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
256 connections as well as ROS service connections over TCP. TCP server
257 socket is run once start_server() is called -- this is implicitly
258 called during init_publisher().
259 """
260
262 """
263 Constructur
264 @param port: port number to bind to (default 0/any)
265 @type port: int
266 """
267 self.port = port
268 self.tcp_ros_server = None
269 self.lock = threading.Lock()
270
271 self.topic_connection_handler = _error_connection_handler
272
273 self.service_connection_handler = _error_connection_handler
274
276 """
277 Starts the TCP socket server if one is not already running
278 """
279 if self.tcp_ros_server:
280 return
281 with self.lock:
282 try:
283 if not self.tcp_ros_server:
284 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port)
285 self.tcp_ros_server.start()
286 except Exception as e:
287 self.tcp_ros_server = None
288 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc()))
289 return 0, "unable to establish TCPROS server: %s"%e, []
290
292 """
293 @return: address and port of TCP server socket for accepting
294 inbound connections
295 @rtype: str, int
296 """
297 if self.tcp_ros_server is not None:
298 return self.tcp_ros_server.get_full_addr()
299 return None, None
300
302 """stops the TCP/IP server responsible for receiving inbound connections"""
303 if self.tcp_ros_server:
304 self.tcp_ros_server.shutdown()
305
307 """
308 TCPServer callback: detects incoming topic or service connection and passes connection accordingly
309
310 @param sock: socket connection
311 @type sock: socket.socket
312 @param client_addr: client address
313 @type client_addr: (str, int)
314 @raise TransportInitError: If transport cannot be succesfully initialized
315 """
316
317
318 try:
319 buff_size = 4096
320 if python3 == 0:
321
322 header = read_ros_handshake_header(sock, StringIO(), buff_size)
323 else:
324 header = read_ros_handshake_header(sock, BytesIO(), buff_size)
325
326 if 'topic' in header:
327 err_msg = self.topic_connection_handler(sock, client_addr, header)
328 elif 'service' in header:
329 err_msg = self.service_connection_handler(sock, client_addr, header)
330 else:
331 err_msg = 'no topic or service name detected'
332 if err_msg:
333
334
335
336
337
338
339 if not rospy.core.is_shutdown_requested():
340 write_ros_handshake_header(sock, {'error' : err_msg})
341 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header))
342 else:
343 write_ros_handshake_header(sock, {'error' : 'node shutting down'})
344 return
345 except rospy.exceptions.TransportInitError as e:
346 logwarn(str(e))
347 if sock is not None:
348 sock.close()
349 except Exception as e:
350
351 if not rospy.core.is_shutdown_requested():
352 logwarn("Inbound TCP/IP connection failed: %s", e)
353 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc())
354 if sock is not None:
355 sock.close()
356
358 """
359 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
360 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
361 well as which fields to send when creating a new connection (get_header_fields()).
362 """
363
365 """
366 ctor
367 @param resolved_name: resolved service or topic name
368 @type resolved_name: str
369 @param recv_data_class: message class for deserializing inbound messages
370 @type recv_data_class: Class
371 @param queue_size: maximum number of inbound messages to maintain
372 @type queue_size: int
373 @param buff_size: receive buffer size (in bytes) for reading from the connection.
374 @type buff_size: int
375 """
376 if recv_data_class and not issubclass(recv_data_class, Message):
377 raise TransportInitError("Unable to initialize transport: data class is not a message data class")
378 self.resolved_name = resolved_name
379 self.recv_data_class = recv_data_class
380 self.queue_size = queue_size
381 self.buff_size = buff_size
382 self.direction = BIDIRECTIONAL
383
384
386 """
387 @param b StringIO: read buffer
388 @param msg_queue [Message]: queue of deserialized messages
389 @type msg_queue: [Message]
390 @param sock socket: protocol can optionally read more data from
391 the socket, but in most cases the required data will already be
392 in b
393 """
394
395 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
396
398 """
399 Header fields that should be sent over the connection. The header fields
400 are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
401 @return: {str : str}: header fields to send over connection
402 @rtype: dict
403 """
404 return {}
405
406
407
408
409
410
411
412
413
414
415
417 """
418 Generic implementation of TCPROS exchange routines for both topics and services
419 """
420 transport_type = 'TCPROS'
421
422 - def __init__(self, protocol, name, header=None):
423 """
424 ctor
425 @param name str: identifier
426 @param protocol TCPROSTransportProtocol protocol implementation
427 @param header dict: (optional) handshake header if transport handshake header was
428 already read off of transport.
429 @raise TransportInitError if transport cannot be initialized according to arguments
430 """
431 super(TCPROSTransport, self).__init__(protocol.direction, name=name)
432 if not name:
433 raise TransportInitError("Unable to initialize transport: name is not set")
434
435 self.protocol = protocol
436
437 self.socket = None
438 self.endpoint_id = 'unknown'
439 self.callerid_pub = 'unknown'
440 self.dest_address = None
441
442 if python3 == 0:
443 self.read_buff = StringIO()
444 self.write_buff = StringIO()
445 else:
446 self.read_buff = BytesIO()
447 self.write_buff = BytesIO()
448
449
450 self.header = header
451
452
453 self.is_latched = False
454 self.latch = None
455
456
457
458 self._fileno = None
459
460
461
462
463 self.md5sum = None
464 self.type = None
465
466
467 self.local_endpoint = (None, None)
468 self.remote_endpoint = (None, None)
469
471 """
472 Get detailed connection information.
473 Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp'
474 e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6]
475 """
476 return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno)
477
479 """
480 Get descriptor for select
481 """
482 return self._fileno
483
485 """
486 Set the endpoint_id of this transport.
487 Allows the endpoint_id to be set before the socket is initialized.
488 """
489 self.endpoint_id = endpoint_id
490
492 """
493 Set the socket for this transport
494 @param sock: socket
495 @type sock: socket.socket
496 @param endpoint_id: identifier for connection endpoint
497 @type endpoint_id: str
498 @raise TransportInitError: if socket has already been set
499 """
500 if self.socket is not None:
501 raise TransportInitError("socket already initialized")
502 self.socket = sock
503 self.endpoint_id = endpoint_id
504 self._fileno = sock.fileno()
505 self.local_endpoint = self.socket.getsockname()
506
507 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
508 """
509 Establish TCP connection to the specified
510 address/port. connect() always calls L{write_header()} and
511 L{read_header()} after the connection is made
512 @param dest_addr: destination IP address
513 @type dest_addr: str
514 @param dest_port: destination port
515 @type dest_port: int
516 @param endpoint_id: string identifier for connection (for statistics)
517 @type endpoint_id: str
518 @param timeout: (optional keyword) timeout in seconds
519 @type timeout: float
520 @raise TransportInitError: if unable to create connection
521 """
522
523
524 if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"):
525 if not rosgraph.network.is_local_address(dest_addr):
526 msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr)
527 logwarn(msg)
528 self.close()
529 raise TransportInitError(msg)
530
531
532 try:
533 self.endpoint_id = endpoint_id
534 self.dest_address = (dest_addr, dest_port)
535 if rosgraph.network.use_ipv6():
536 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
537 else:
538 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
539 if _is_use_tcp_keepalive():
540
541 if hasattr(socket, 'TCP_KEEPCNT') and \
542 hasattr(socket, 'TCP_KEEPIDLE') and \
543 hasattr(socket, 'TCP_KEEPINTVL'):
544
545 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
546
547 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9)
548
549 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
550
551 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)
552 if timeout is not None:
553 s.settimeout(timeout)
554 self.socket = s
555 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port))
556 self.socket.connect((dest_addr, dest_port))
557 self.write_header()
558 self.read_header()
559 self.local_endpoint = self.socket.getsockname()
560 self.remote_endpoint = (dest_addr, dest_port)
561 except TransportInitError as tie:
562 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
563 raise
564 except Exception as e:
565
566 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
567
568
569 if not isinstance(e, socket.error):
570
571 self.close()
572 elif not isinstance(e, socket.timeout) and e.errno not in [
573 errno.ENETDOWN, errno.ENETUNREACH, errno.ENETRESET,
574 errno.ECONNABORTED, errno.ETIMEDOUT, errno.EHOSTDOWN, errno.EHOSTUNREACH]:
575
576
577
578
579
580
581
582
583
584 self.close()
585 raise TransportInitError(str(e))
586
588 """
589 Validate header and initialize fields accordingly
590 @param header: header fields from publisher
591 @type header: dict
592 @raise TransportInitError: if header fails to validate
593 """
594 self.header = header
595 if 'error' in header:
596 raise TransportInitError("remote error reported: %s"%header['error'])
597 for required in ['md5sum', 'type']:
598 if not required in header:
599 raise TransportInitError("header missing required field [%s]"%required)
600 self.type = header['type']
601 self.md5sum = header['md5sum']
602 if 'callerid' in header:
603 self.callerid_pub = header['callerid']
604 if header.get('latching', '0') == '1':
605 self.is_latched = True
606
608 """Writes the TCPROS header to the active connection."""
609
610 sock = self.socket
611 protocol = self.protocol
612
613
614
615 if sock is None or protocol is None:
616 return
617 fileno = sock.fileno()
618 ready = None
619 poller = None
620 if hasattr(select, 'poll'):
621 poller = select.poll()
622 poller.register(fileno, select.POLLOUT)
623 while not ready:
624 events = poller.poll()
625 for _, flag in events:
626 if flag & select.POLLOUT:
627 ready = True
628 else:
629 while not ready:
630 try:
631 _, ready, _ = select.select([], [fileno], [])
632 except ValueError as e:
633 logger.error("[%s]: select fileno '%s': %s", self.name, str(fileno), str(e))
634 raise
635
636 logger.debug("[%s]: writing header", self.name)
637 sock.setblocking(1)
638 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
639 if poller:
640 poller.unregister(fileno)
641
642
644 """
645 Read TCPROS header from active socket
646 @raise TransportInitError if header fails to validate
647 """
648 sock = self.socket
649 if sock is None:
650 return
651 sock.setblocking(1)
652
653 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
654
656 """
657 Convenience routine for services to send a message across a
658 particular connection. NOTE: write_data is much more efficient
659 if same message is being sent to multiple connections. Not
660 threadsafe.
661 @param msg: message to send
662 @type msg: Msg
663 @param seq: sequence number for message
664 @type seq: int
665 @raise TransportException: if error occurred sending message
666 """
667
668 serialize_message(self.write_buff, seq, msg)
669 self.write_data(self.write_buff.getvalue())
670 self.write_buff.truncate(0)
671
673 """
674 Write raw data to transport
675 @raise TransportInitialiationError: could not be initialized
676 @raise TransportTerminated: no longer open for publishing
677 """
678 if not self.socket:
679 raise TransportInitError("TCPROS transport was not successfully initialized")
680 if self.done:
681 raise TransportTerminated("connection closed")
682 try:
683
684 self.socket.sendall(data)
685 self.stat_bytes += len(data)
686 self.stat_num_msg += 1
687 except IOError as ioe:
688
689 (ioe_errno, msg) = ioe.args
690 if ioe_errno == errno.EPIPE:
691 logdebug("ERROR: Broken Pipe")
692 self.close()
693 raise TransportTerminated(str(ioe_errno)+msg)
694 raise
695 except socket.error as se:
696
697 (se_errno, msg) = se.args
698 if se_errno == errno.EPIPE:
699 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id)
700 self.close()
701 raise TransportTerminated(msg)
702 elif se_errno == errno.ECONNRESET:
703 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)
704 self.close()
705 raise TransportTerminated(msg)
706 else:
707 rospydebug("unknown socket error writing data: %s",traceback.format_exc())
708 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)
709 self.close()
710 raise TransportTerminated(str(se_errno)+' '+msg)
711 return True
712
714 """
715 block until messages are read off of socket
716 @return: list of newly received messages
717 @rtype: [Msg]
718 @raise TransportException: if unable to receive message due to error
719 """
720 sock = self.socket
721 if sock is None:
722 raise TransportException("connection not initialized")
723 b = self.read_buff
724 msg_queue = []
725 p = self.protocol
726 try:
727 sock.setblocking(1)
728 while not msg_queue and not self.done and not is_shutdown():
729 if b.tell() >= 4:
730 p.read_messages(b, msg_queue, sock)
731 if not msg_queue:
732 self.stat_bytes += recv_buff(sock, b, p.buff_size)
733 self.stat_num_msg += len(msg_queue)
734
735 for m in msg_queue:
736 m._connection_header = self.header
737
738
739 if self.is_latched and msg_queue:
740 self.latch = msg_queue[-1]
741
742 return msg_queue
743
744 except DeserializationError as e:
745 rospyerr(traceback.format_exc())
746 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e)))
747 except TransportTerminated as e:
748 raise
749 except ServiceException as e:
750 raise
751 except Exception as e:
752 rospyerr(traceback.format_exc())
753 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
754 return retval
755
757
758
759
760 if self.dest_address is None:
761 raise ROSInitException("internal error with reconnection state: address not stored")
762
763 interval = 0.5
764 while self.socket is None and not self.done and not rospy.is_shutdown():
765 try:
766
767
768
769
770 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.)
771 except TransportInitError:
772 self.socket = None
773
774 if self.socket is None and interval < 30.:
775
776 interval = interval * 2
777
778 time.sleep(interval)
779
781 """
782 Receive messages until shutdown
783 @param msgs_callback: callback to invoke for new messages received
784 @type msgs_callback: fn([msg])
785 """
786
787 logger.debug("receive_loop for [%s]", self.name)
788 try:
789 while not self.done and not is_shutdown():
790 try:
791 if self.socket is not None:
792 msgs = self.receive_once()
793 if not self.done and not is_shutdown():
794 msgs_callback(msgs, self)
795 else:
796 self._reconnect()
797
798 except TransportException as e:
799
800 try:
801 if self.socket is not None:
802 try:
803 self.socket.shutdown()
804 except:
805 pass
806 finally:
807 self.socket.close()
808 except:
809 pass
810 self.socket = None
811
812 except DeserializationError as e:
813
814
815 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e))
816 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc())
817 except:
818
819 try:
820
821
822
823 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
824 except: pass
825
826 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
827 finally:
828 if not self.done:
829 self.close()
830
832 """close i/o and release resources"""
833 if not self.done:
834 try:
835 if self.socket is not None:
836 try:
837 self.socket.shutdown(socket.SHUT_RDWR)
838 except:
839 pass
840 finally:
841 self.socket.close()
842 finally:
843 self.socket = self.read_buff = self.write_buff = self.protocol = None
844 super(TCPROSTransport, self).close()
845