1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: common TCPROS libraries"""
36
37
38 try:
39 from cStringIO import StringIO
40 python3 = 0
41 except ImportError:
42 from io import StringIO, BytesIO
43 python3 = 1
44 import socket
45 import logging
46
47 import threading
48 import time
49 import traceback
50 import select
51
52 import rosgraph
53 import rosgraph.network
54 from genpy import DeserializationError, Message
55 from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header
56
57
58 from rospy.core import *
59 from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn
60 from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError
61 from rospy.msg import deserialize_messages, serialize_message
62 from rospy.service import ServiceException
63
64 from rospy.impl.transport import Transport, BIDIRECTIONAL
65
66 logger = logging.getLogger('rospy.tcpros')
67
68
69 DEFAULT_BUFF_SIZE = 65536
70
71
72 TCPROS = "TCPROS"
73
74 _PARAM_TCP_KEEPALIVE = '/tcp_keepalive'
75 _use_tcp_keepalive = None
76 _use_tcp_keepalive_lock = threading.Lock()
90
92 """
93 Read data from socket into buffer.
94 @param sock: socket to read from
95 @type sock: socket.socket
96 @param b: buffer to receive into
97 @type b: StringIO
98 @param buff_size: recv read size
99 @type buff_size: int
100 @return: number of bytes read
101 @rtype: int
102 """
103 d = sock.recv(buff_size)
104 if d:
105 b.write(d)
106 return len(d)
107 else:
108 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
109
111 """
112 Simple server that accepts inbound TCP/IP connections and hands
113 them off to a handler function. TCPServer obeys the
114 ROS_IP/ROS_HOSTNAME environment variables
115 """
116
117 - def __init__(self, inbound_handler, port=0):
118 """
119 Setup a server socket listening on the specified port. If the
120 port is omitted, will choose any open port.
121 @param inbound_handler: handler to invoke with
122 new connection
123 @type inbound_handler: fn(sock, addr)
124 @param port: port to bind to, omit/0 to bind to any
125 @type port: int
126 """
127 self.port = port
128 self.addr = None
129 self.is_shutdown = False
130 self.inbound_handler = inbound_handler
131 try:
132 self.server_sock = self._create_server_sock()
133 except:
134 self.server_sock = None
135 raise
136
138 """Runs the run() loop in a separate thread"""
139 t = threading.Thread(target=self.run, args=())
140 t.setDaemon(True)
141 t.start()
142
144 """
145 Main TCP receive loop. Should be run in a separate thread -- use start()
146 to do this automatically.
147 """
148 self.is_shutdown = False
149 if not self.server_sock:
150 raise ROSInternalException("%s did not connect"%self.__class__.__name__)
151 while not self.is_shutdown:
152 try:
153 (client_sock, client_addr) = self.server_sock.accept()
154 except socket.timeout:
155 continue
156 except IOError as e:
157 (errno, msg) = e.args
158 if errno == 4:
159 continue
160 raise
161 if self.is_shutdown:
162 break
163 try:
164
165 self.inbound_handler(client_sock, client_addr)
166 except socket.error as e:
167 if not self.is_shutdown:
168 traceback.print_exc()
169 logwarn("Failed to handle inbound connection due to socket error: %s"%e)
170 logdebug("TCPServer[%s] shutting down", self.port)
171
172
174 """
175 @return: (ip address, port) of server socket binding
176 @rtype: (str, int)
177 """
178
179
180 return (rosgraph.network.get_host_name(), self.port)
181
183 """
184 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
185 binding to loopback interface.
186 """
187 if rosgraph.network.use_ipv6():
188 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
189 else:
190 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
191 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
192 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port))
193 server_sock.bind((rosgraph.network.get_bind_address(), self.port))
194 (self.addr, self.port) = server_sock.getsockname()[0:2]
195 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port))
196 server_sock.listen(5)
197 return server_sock
198
200 """shutdown I/O resources uses by this server"""
201 if not self.is_shutdown:
202 self.is_shutdown = True
203 self.server_sock.close()
204
205
206
207
208 _tcpros_server = None
209
222
230
231
232
234 """
235 get the address of the tcpros server.
236 @raise Exception: if tcpros server has not been started or created
237 """
238 return _tcpros_server.get_address()
239
241 """
242 utility handler that does nothing more than provide a rejection header
243 @param sock: socket connection
244 @type sock: socket.socket
245 @param client_addr: client address
246 @type client_addr: str
247 @param header: request header
248 @type header: dict
249 """
250 return {'error': "unhandled connection"}
251
253 """
254 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
255 connections as well as ROS service connections over TCP. TCP server
256 socket is run once start_server() is called -- this is implicitly
257 called during init_publisher().
258 """
259
261 """
262 Constructur
263 @param port: port number to bind to (default 0/any)
264 @type port: int
265 """
266 self.port = port
267 self.tcp_ros_server = None
268 self.lock = threading.Lock()
269
270 self.topic_connection_handler = _error_connection_handler
271
272 self.service_connection_handler = _error_connection_handler
273
275 """
276 Starts the TCP socket server if one is not already running
277 """
278 if self.tcp_ros_server:
279 return
280 with self.lock:
281 try:
282 if not self.tcp_ros_server:
283 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port)
284 self.tcp_ros_server.start()
285 except Exception as e:
286 self.tcp_ros_server = None
287 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc()))
288 return 0, "unable to establish TCPROS server: %s"%e, []
289
291 """
292 @return: address and port of TCP server socket for accepting
293 inbound connections
294 @rtype: str, int
295 """
296 if self.tcp_ros_server is not None:
297 return self.tcp_ros_server.get_full_addr()
298 return None, None
299
301 """stops the TCP/IP server responsible for receiving inbound connections"""
302 if self.tcp_ros_server:
303 self.tcp_ros_server.shutdown()
304
306 """
307 TCPServer callback: detects incoming topic or service connection and passes connection accordingly
308
309 @param sock: socket connection
310 @type sock: socket.socket
311 @param client_addr: client address
312 @type client_addr: (str, int)
313 @raise TransportInitError: If transport cannot be succesfully initialized
314 """
315
316
317 try:
318 buff_size = 4096
319 if python3 == 0:
320
321 header = read_ros_handshake_header(sock, StringIO(), buff_size)
322 else:
323 header = read_ros_handshake_header(sock, BytesIO(), buff_size)
324
325 if 'topic' in header:
326 err_msg = self.topic_connection_handler(sock, client_addr, header)
327 elif 'service' in header:
328 err_msg = self.service_connection_handler(sock, client_addr, header)
329 else:
330 err_msg = 'no topic or service name detected'
331 if err_msg:
332
333
334
335
336
337
338 if not rospy.core.is_shutdown_requested():
339 write_ros_handshake_header(sock, {'error' : err_msg})
340 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header))
341 else:
342 write_ros_handshake_header(sock, {'error' : 'node shutting down'})
343 return
344 except rospy.exceptions.TransportInitError as e:
345 logwarn(str(e))
346 if sock is not None:
347 sock.close()
348 except Exception as e:
349
350 if not rospy.core.is_shutdown_requested():
351 logwarn("Inbound TCP/IP connection failed: %s", e)
352 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc())
353 if sock is not None:
354 sock.close()
355
357 """
358 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
359 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
360 well as which fields to send when creating a new connection (get_header_fields()).
361 """
362
364 """
365 ctor
366 @param resolved_name: resolved service or topic name
367 @type resolved_name: str
368 @param recv_data_class: message class for deserializing inbound messages
369 @type recv_data_class: Class
370 @param queue_size: maximum number of inbound messages to maintain
371 @type queue_size: int
372 @param buff_size: receive buffer size (in bytes) for reading from the connection.
373 @type buff_size: int
374 """
375 if recv_data_class and not issubclass(recv_data_class, Message):
376 raise TransportInitError("Unable to initialize transport: data class is not a message data class")
377 self.resolved_name = resolved_name
378 self.recv_data_class = recv_data_class
379 self.queue_size = queue_size
380 self.buff_size = buff_size
381 self.direction = BIDIRECTIONAL
382
383
385 """
386 @param b StringIO: read buffer
387 @param msg_queue [Message]: queue of deserialized messages
388 @type msg_queue: [Message]
389 @param sock socket: protocol can optionally read more data from
390 the socket, but in most cases the required data will already be
391 in b
392 """
393
394 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
395
397 """
398 Header fields that should be sent over the connection. The header fields
399 are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
400 @return: {str : str}: header fields to send over connection
401 @rtype: dict
402 """
403 return {}
404
405
406
407
408
409
410
411
412
413
414
416 """
417 Generic implementation of TCPROS exchange routines for both topics and services
418 """
419 transport_type = 'TCPROS'
420
421 - def __init__(self, protocol, name, header=None):
422 """
423 ctor
424 @param name str: identifier
425 @param protocol TCPROSTransportProtocol protocol implementation
426 @param header dict: (optional) handshake header if transport handshake header was
427 already read off of transport.
428 @raise TransportInitError if transport cannot be initialized according to arguments
429 """
430 super(TCPROSTransport, self).__init__(protocol.direction, name=name)
431 if not name:
432 raise TransportInitError("Unable to initialize transport: name is not set")
433
434 self.protocol = protocol
435
436 self.socket = None
437 self.endpoint_id = 'unknown'
438 self.callerid_pub = 'unknown'
439 self.dest_address = None
440
441 if python3 == 0:
442 self.read_buff = StringIO()
443 self.write_buff = StringIO()
444 else:
445 self.read_buff = BytesIO()
446 self.write_buff = BytesIO()
447
448
449 self.header = header
450
451
452 self.is_latched = False
453 self.latch = None
454
455
456
457 self._fileno = None
458
459
460
461
462 self.md5sum = None
463 self.type = None
464
465
466 self.local_endpoint = (None, None)
467 self.remote_endpoint = (None, None)
468
470 """
471 Get detailed connection information.
472 Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp'
473 e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6]
474 """
475 return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno)
476
478 """
479 Get descriptor for select
480 """
481 return self._fileno
482
484 """
485 Set the endpoint_id of this transport.
486 Allows the endpoint_id to be set before the socket is initialized.
487 """
488 self.endpoint_id = endpoint_id
489
491 """
492 Set the socket for this transport
493 @param sock: socket
494 @type sock: socket.socket
495 @param endpoint_id: identifier for connection endpoint
496 @type endpoint_id: str
497 @raise TransportInitError: if socket has already been set
498 """
499 if self.socket is not None:
500 raise TransportInitError("socket already initialized")
501 self.socket = sock
502 self.endpoint_id = endpoint_id
503 self._fileno = sock.fileno()
504 self.local_endpoint = self.socket.getsockname()
505
506 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
507 """
508 Establish TCP connection to the specified
509 address/port. connect() always calls L{write_header()} and
510 L{read_header()} after the connection is made
511 @param dest_addr: destination IP address
512 @type dest_addr: str
513 @param dest_port: destination port
514 @type dest_port: int
515 @param endpoint_id: string identifier for connection (for statistics)
516 @type endpoint_id: str
517 @param timeout: (optional keyword) timeout in seconds
518 @type timeout: float
519 @raise TransportInitError: if unable to create connection
520 """
521
522
523 if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"):
524 if not rosgraph.network.is_local_address(dest_addr):
525 msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr)
526 logwarn(msg)
527 self.close()
528 raise TransportInitError(msg)
529
530
531 try:
532 self.endpoint_id = endpoint_id
533 self.dest_address = (dest_addr, dest_port)
534 if rosgraph.network.use_ipv6():
535 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
536 else:
537 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
538 if _is_use_tcp_keepalive():
539
540 if hasattr(socket, 'TCP_KEEPCNT') and \
541 hasattr(socket, 'TCP_KEEPIDLE') and \
542 hasattr(socket, 'TCP_KEEPINTVL'):
543
544 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
545
546 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9)
547
548 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
549
550 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)
551 if timeout is not None:
552 s.settimeout(timeout)
553 self.socket = s
554 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port))
555 self.socket.connect((dest_addr, dest_port))
556 self.write_header()
557 self.read_header()
558 self.local_endpoint = self.socket.getsockname()
559 self.remote_endpoint = (dest_addr, dest_port)
560 except TransportInitError as tie:
561 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
562 raise
563 except Exception as e:
564
565 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
566
567
568 self.close()
569 raise TransportInitError(str(e))
570
572 """
573 Validate header and initialize fields accordingly
574 @param header: header fields from publisher
575 @type header: dict
576 @raise TransportInitError: if header fails to validate
577 """
578 self.header = header
579 if 'error' in header:
580 raise TransportInitError("remote error reported: %s"%header['error'])
581 for required in ['md5sum', 'type']:
582 if not required in header:
583 raise TransportInitError("header missing required field [%s]"%required)
584 self.type = header['type']
585 self.md5sum = header['md5sum']
586 if 'callerid' in header:
587 self.callerid_pub = header['callerid']
588 if header.get('latching', '0') == '1':
589 self.is_latched = True
590
592 """Writes the TCPROS header to the active connection."""
593
594 sock = self.socket
595 protocol = self.protocol
596
597
598
599 if sock is None or protocol is None:
600 return
601 fileno = sock.fileno()
602 ready = None
603 while not ready:
604 _, ready, _ = select.select([], [fileno], [])
605 logger.debug("[%s]: writing header", self.name)
606 sock.setblocking(1)
607 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
608
610 """
611 Read TCPROS header from active socket
612 @raise TransportInitError if header fails to validate
613 """
614 sock = self.socket
615 if sock is None:
616 return
617 sock.setblocking(1)
618
619 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
620
622 """
623 Convenience routine for services to send a message across a
624 particular connection. NOTE: write_data is much more efficient
625 if same message is being sent to multiple connections. Not
626 threadsafe.
627 @param msg: message to send
628 @type msg: Msg
629 @param seq: sequence number for message
630 @type seq: int
631 @raise TransportException: if error occurred sending message
632 """
633
634 serialize_message(self.write_buff, seq, msg)
635 self.write_data(self.write_buff.getvalue())
636 self.write_buff.truncate(0)
637
639 """
640 Write raw data to transport
641 @raise TransportInitialiationError: could not be initialized
642 @raise TransportTerminated: no longer open for publishing
643 """
644 if not self.socket:
645 raise TransportInitError("TCPROS transport was not successfully initialized")
646 if self.done:
647 raise TransportTerminated("connection closed")
648 try:
649
650 self.socket.sendall(data)
651 self.stat_bytes += len(data)
652 self.stat_num_msg += 1
653 except IOError as ioe:
654
655 (errno, msg) = ioe.args
656 if errno == 32:
657 logdebug("ERROR: Broken Pipe")
658 self.close()
659 raise TransportTerminated(str(errno)+msg)
660 raise
661 except socket.error as se:
662
663 (errno, msg) = se.args
664 if errno == 32:
665 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id)
666 self.close()
667 raise TransportTerminated(msg)
668 elif errno == 104:
669 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)
670 self.close()
671 raise TransportTerminated(msg)
672 else:
673 rospydebug("unknown socket error writing data: %s",traceback.format_exc())
674 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)
675 self.close()
676 raise TransportTerminated(str(errno)+' '+msg)
677 return True
678
680 """
681 block until messages are read off of socket
682 @return: list of newly received messages
683 @rtype: [Msg]
684 @raise TransportException: if unable to receive message due to error
685 """
686 sock = self.socket
687 if sock is None:
688 raise TransportException("connection not initialized")
689 b = self.read_buff
690 msg_queue = []
691 p = self.protocol
692 try:
693 sock.setblocking(1)
694 while not msg_queue and not self.done and not is_shutdown():
695 if b.tell() >= 4:
696 p.read_messages(b, msg_queue, sock)
697 if not msg_queue:
698 self.stat_bytes += recv_buff(sock, b, p.buff_size)
699 self.stat_num_msg += len(msg_queue)
700
701 for m in msg_queue:
702 m._connection_header = self.header
703
704
705 if self.is_latched and msg_queue:
706 self.latch = msg_queue[-1]
707
708 return msg_queue
709
710 except DeserializationError as e:
711 rospyerr(traceback.format_exc())
712 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e)))
713 except TransportTerminated as e:
714 raise
715 except ServiceException as e:
716 raise
717 except Exception as e:
718 rospyerr(traceback.format_exc())
719 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
720 return retval
721
723
724
725
726 if self.dest_address is None:
727 raise ROSInitException("internal error with reconnection state: address not stored")
728
729 interval = 0.5
730 while self.socket is None and not self.done and not rospy.is_shutdown():
731 try:
732
733
734
735
736 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.)
737 except TransportInitError:
738 self.socket = None
739
740 if self.socket is None:
741
742 interval = interval * 2
743
744 time.sleep(interval)
745
747 """
748 Receive messages until shutdown
749 @param msgs_callback: callback to invoke for new messages received
750 @type msgs_callback: fn([msg])
751 """
752
753 logger.debug("receive_loop for [%s]", self.name)
754 try:
755 while not self.done and not is_shutdown():
756 try:
757 if self.socket is not None:
758 msgs = self.receive_once()
759 if not self.done and not is_shutdown():
760 msgs_callback(msgs, self)
761 else:
762 self._reconnect()
763
764 except TransportException as e:
765
766 try:
767 if self.socket is not None:
768 try:
769 self.socket.shutdown()
770 except:
771 pass
772 finally:
773 self.socket.close()
774 except:
775 pass
776 self.socket = None
777
778 except DeserializationError as e:
779
780
781 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e))
782 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc())
783 except:
784
785 try:
786
787
788
789 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
790 except: pass
791
792 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
793 finally:
794 if not self.done:
795 self.close()
796
798 """close i/o and release resources"""
799 if not self.done:
800 try:
801 if self.socket is not None:
802 try:
803 self.socket.shutdown(socket.SHUT_RDWR)
804 except:
805 pass
806 finally:
807 self.socket.close()
808 finally:
809 self.socket = self.read_buff = self.write_buff = self.protocol = None
810 super(TCPROSTransport, self).close()
811