1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: common TCPROS libraries"""
36
37
38 try:
39 from cStringIO import StringIO
40 python3 = 0
41 except ImportError:
42 from io import StringIO, BytesIO
43 python3 = 1
44 import socket
45 import logging
46
47 import threading
48 import time
49 import traceback
50 import select
51
52 import rosgraph
53 import rosgraph.network
54 from genpy import DeserializationError, Message
55 from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header
56
57
58 from rospy.core import *
59 from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn
60 from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError
61 from rospy.msg import deserialize_messages, serialize_message
62 from rospy.service import ServiceException
63
64 from rospy.impl.transport import Transport, BIDIRECTIONAL
65
66 logger = logging.getLogger('rospy.tcpros')
67
68
69 DEFAULT_BUFF_SIZE = 65536
70
71
72 TCPROS = "TCPROS"
73
74 _PARAM_TCP_KEEPALIVE = '/tcp_keepalive'
75 _use_tcp_keepalive = None
76 _use_tcp_keepalive_lock = threading.Lock()
90
92 """
93 Read data from socket into buffer.
94 @param sock: socket to read from
95 @type sock: socket.socket
96 @param b: buffer to receive into
97 @type b: StringIO
98 @param buff_size: recv read size
99 @type buff_size: int
100 @return: number of bytes read
101 @rtype: int
102 """
103 d = sock.recv(buff_size)
104 if d:
105 b.write(d)
106 return len(d)
107 else:
108 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
109
111 """
112 Simple server that accepts inbound TCP/IP connections and hands
113 them off to a handler function. TCPServer obeys the
114 ROS_IP/ROS_HOSTNAME environment variables
115 """
116
117 - def __init__(self, inbound_handler, port=0):
118 """
119 Setup a server socket listening on the specified port. If the
120 port is omitted, will choose any open port.
121 @param inbound_handler: handler to invoke with
122 new connection
123 @type inbound_handler: fn(sock, addr)
124 @param port: port to bind to, omit/0 to bind to any
125 @type port: int
126 """
127 self.port = port
128 self.addr = None
129 self.is_shutdown = False
130 self.inbound_handler = inbound_handler
131 try:
132 self.server_sock = self._create_server_sock()
133 except:
134 self.server_sock = None
135 raise
136
138 """Runs the run() loop in a separate thread"""
139 t = threading.Thread(target=self.run, args=())
140 t.setDaemon(True)
141 t.start()
142
144 """
145 Main TCP receive loop. Should be run in a separate thread -- use start()
146 to do this automatically.
147 """
148 self.is_shutdown = False
149 if not self.server_sock:
150 raise ROSInternalException("%s did not connect"%self.__class__.__name__)
151 while not self.is_shutdown:
152 try:
153 (client_sock, client_addr) = self.server_sock.accept()
154 except socket.timeout:
155 continue
156 except IOError as e:
157 (errno, msg) = e.args
158 if errno == 4:
159 continue
160 if not self.is_shutdown:
161 raise
162 if self.is_shutdown:
163 break
164 try:
165
166 self.inbound_handler(client_sock, client_addr)
167 except socket.error as e:
168 if not self.is_shutdown:
169 traceback.print_exc()
170 logwarn("Failed to handle inbound connection due to socket error: %s"%e)
171 logdebug("TCPServer[%s] shutting down", self.port)
172
173
175 """
176 @return: (ip address, port) of server socket binding
177 @rtype: (str, int)
178 """
179
180
181 return (rosgraph.network.get_host_name(), self.port)
182
184 """
185 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
186 binding to loopback interface.
187 """
188 if rosgraph.network.use_ipv6():
189 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
190 else:
191 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
192 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
193 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port))
194 server_sock.bind((rosgraph.network.get_bind_address(), self.port))
195 (self.addr, self.port) = server_sock.getsockname()[0:2]
196 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port))
197 server_sock.listen(5)
198 return server_sock
199
201 """shutdown I/O resources uses by this server"""
202 if not self.is_shutdown:
203 self.is_shutdown = True
204 self.server_sock.close()
205
206
207
208
209 _tcpros_server = None
210
223
231
232
233
235 """
236 get the address of the tcpros server.
237 @raise Exception: if tcpros server has not been started or created
238 """
239 return _tcpros_server.get_address()
240
242 """
243 utility handler that does nothing more than provide a rejection header
244 @param sock: socket connection
245 @type sock: socket.socket
246 @param client_addr: client address
247 @type client_addr: str
248 @param header: request header
249 @type header: dict
250 """
251 return {'error': "unhandled connection"}
252
254 """
255 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
256 connections as well as ROS service connections over TCP. TCP server
257 socket is run once start_server() is called -- this is implicitly
258 called during init_publisher().
259 """
260
262 """
263 Constructur
264 @param port: port number to bind to (default 0/any)
265 @type port: int
266 """
267 self.port = port
268 self.tcp_ros_server = None
269 self.lock = threading.Lock()
270
271 self.topic_connection_handler = _error_connection_handler
272
273 self.service_connection_handler = _error_connection_handler
274
276 """
277 Starts the TCP socket server if one is not already running
278 """
279 if self.tcp_ros_server:
280 return
281 with self.lock:
282 try:
283 if not self.tcp_ros_server:
284 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port)
285 self.tcp_ros_server.start()
286 except Exception as e:
287 self.tcp_ros_server = None
288 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc()))
289 return 0, "unable to establish TCPROS server: %s"%e, []
290
292 """
293 @return: address and port of TCP server socket for accepting
294 inbound connections
295 @rtype: str, int
296 """
297 if self.tcp_ros_server is not None:
298 return self.tcp_ros_server.get_full_addr()
299 return None, None
300
302 """stops the TCP/IP server responsible for receiving inbound connections"""
303 if self.tcp_ros_server:
304 self.tcp_ros_server.shutdown()
305
307 """
308 TCPServer callback: detects incoming topic or service connection and passes connection accordingly
309
310 @param sock: socket connection
311 @type sock: socket.socket
312 @param client_addr: client address
313 @type client_addr: (str, int)
314 @raise TransportInitError: If transport cannot be succesfully initialized
315 """
316
317
318 try:
319 buff_size = 4096
320 if python3 == 0:
321
322 header = read_ros_handshake_header(sock, StringIO(), buff_size)
323 else:
324 header = read_ros_handshake_header(sock, BytesIO(), buff_size)
325
326 if 'topic' in header:
327 err_msg = self.topic_connection_handler(sock, client_addr, header)
328 elif 'service' in header:
329 err_msg = self.service_connection_handler(sock, client_addr, header)
330 else:
331 err_msg = 'no topic or service name detected'
332 if err_msg:
333
334
335
336
337
338
339 if not rospy.core.is_shutdown_requested():
340 write_ros_handshake_header(sock, {'error' : err_msg})
341 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header))
342 else:
343 write_ros_handshake_header(sock, {'error' : 'node shutting down'})
344 return
345 except rospy.exceptions.TransportInitError as e:
346 logwarn(str(e))
347 if sock is not None:
348 sock.close()
349 except Exception as e:
350
351 if not rospy.core.is_shutdown_requested():
352 logwarn("Inbound TCP/IP connection failed: %s", e)
353 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc())
354 if sock is not None:
355 sock.close()
356
358 """
359 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
360 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
361 well as which fields to send when creating a new connection (get_header_fields()).
362 """
363
365 """
366 ctor
367 @param resolved_name: resolved service or topic name
368 @type resolved_name: str
369 @param recv_data_class: message class for deserializing inbound messages
370 @type recv_data_class: Class
371 @param queue_size: maximum number of inbound messages to maintain
372 @type queue_size: int
373 @param buff_size: receive buffer size (in bytes) for reading from the connection.
374 @type buff_size: int
375 """
376 if recv_data_class and not issubclass(recv_data_class, Message):
377 raise TransportInitError("Unable to initialize transport: data class is not a message data class")
378 self.resolved_name = resolved_name
379 self.recv_data_class = recv_data_class
380 self.queue_size = queue_size
381 self.buff_size = buff_size
382 self.direction = BIDIRECTIONAL
383
384
386 """
387 @param b StringIO: read buffer
388 @param msg_queue [Message]: queue of deserialized messages
389 @type msg_queue: [Message]
390 @param sock socket: protocol can optionally read more data from
391 the socket, but in most cases the required data will already be
392 in b
393 """
394
395 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
396
398 """
399 Header fields that should be sent over the connection. The header fields
400 are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
401 @return: {str : str}: header fields to send over connection
402 @rtype: dict
403 """
404 return {}
405
406
407
408
409
410
411
412
413
414
415
417 """
418 Generic implementation of TCPROS exchange routines for both topics and services
419 """
420 transport_type = 'TCPROS'
421
422 - def __init__(self, protocol, name, header=None):
423 """
424 ctor
425 @param name str: identifier
426 @param protocol TCPROSTransportProtocol protocol implementation
427 @param header dict: (optional) handshake header if transport handshake header was
428 already read off of transport.
429 @raise TransportInitError if transport cannot be initialized according to arguments
430 """
431 super(TCPROSTransport, self).__init__(protocol.direction, name=name)
432 if not name:
433 raise TransportInitError("Unable to initialize transport: name is not set")
434
435 self.protocol = protocol
436
437 self.socket = None
438 self.endpoint_id = 'unknown'
439 self.callerid_pub = 'unknown'
440 self.dest_address = None
441
442 if python3 == 0:
443 self.read_buff = StringIO()
444 self.write_buff = StringIO()
445 else:
446 self.read_buff = BytesIO()
447 self.write_buff = BytesIO()
448
449
450 self.header = header
451
452
453 self.is_latched = False
454 self.latch = None
455
456
457
458 self._fileno = None
459
460
461
462
463 self.md5sum = None
464 self.type = None
465
466
467 self.local_endpoint = (None, None)
468 self.remote_endpoint = (None, None)
469
471 """
472 Get detailed connection information.
473 Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp'
474 e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6]
475 """
476 return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno)
477
479 """
480 Get descriptor for select
481 """
482 return self._fileno
483
485 """
486 Set the endpoint_id of this transport.
487 Allows the endpoint_id to be set before the socket is initialized.
488 """
489 self.endpoint_id = endpoint_id
490
492 """
493 Set the socket for this transport
494 @param sock: socket
495 @type sock: socket.socket
496 @param endpoint_id: identifier for connection endpoint
497 @type endpoint_id: str
498 @raise TransportInitError: if socket has already been set
499 """
500 if self.socket is not None:
501 raise TransportInitError("socket already initialized")
502 self.socket = sock
503 self.endpoint_id = endpoint_id
504 self._fileno = sock.fileno()
505 self.local_endpoint = self.socket.getsockname()
506
507 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
508 """
509 Establish TCP connection to the specified
510 address/port. connect() always calls L{write_header()} and
511 L{read_header()} after the connection is made
512 @param dest_addr: destination IP address
513 @type dest_addr: str
514 @param dest_port: destination port
515 @type dest_port: int
516 @param endpoint_id: string identifier for connection (for statistics)
517 @type endpoint_id: str
518 @param timeout: (optional keyword) timeout in seconds
519 @type timeout: float
520 @raise TransportInitError: if unable to create connection
521 """
522
523
524 if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"):
525 if not rosgraph.network.is_local_address(dest_addr):
526 msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr)
527 logwarn(msg)
528 self.close()
529 raise TransportInitError(msg)
530
531
532 try:
533 self.endpoint_id = endpoint_id
534 self.dest_address = (dest_addr, dest_port)
535 if rosgraph.network.use_ipv6():
536 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
537 else:
538 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
539 if _is_use_tcp_keepalive():
540
541 if hasattr(socket, 'TCP_KEEPCNT') and \
542 hasattr(socket, 'TCP_KEEPIDLE') and \
543 hasattr(socket, 'TCP_KEEPINTVL'):
544
545 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
546
547 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9)
548
549 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
550
551 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)
552 if timeout is not None:
553 s.settimeout(timeout)
554 self.socket = s
555 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port))
556 self.socket.connect((dest_addr, dest_port))
557 self.write_header()
558 self.read_header()
559 self.local_endpoint = self.socket.getsockname()
560 self.remote_endpoint = (dest_addr, dest_port)
561 except TransportInitError as tie:
562 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
563 raise
564 except Exception as e:
565
566 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
567
568
569 if not isinstance(e, socket.error):
570
571 self.close()
572 elif not isinstance(e, socket.timeout) and e.errno not in [100, 101, 102, 103, 110, 112, 113]:
573
574
575
576
577
578
579
580
581
582 self.close()
583 raise TransportInitError(str(e))
584
586 """
587 Validate header and initialize fields accordingly
588 @param header: header fields from publisher
589 @type header: dict
590 @raise TransportInitError: if header fails to validate
591 """
592 self.header = header
593 if 'error' in header:
594 raise TransportInitError("remote error reported: %s"%header['error'])
595 for required in ['md5sum', 'type']:
596 if not required in header:
597 raise TransportInitError("header missing required field [%s]"%required)
598 self.type = header['type']
599 self.md5sum = header['md5sum']
600 if 'callerid' in header:
601 self.callerid_pub = header['callerid']
602 if header.get('latching', '0') == '1':
603 self.is_latched = True
604
606 """Writes the TCPROS header to the active connection."""
607
608 sock = self.socket
609 protocol = self.protocol
610
611
612
613 if sock is None or protocol is None:
614 return
615 fileno = sock.fileno()
616 ready = None
617 poller = None
618 if hasattr(select, 'poll'):
619 poller = select.poll()
620 poller.register(fileno, select.POLLOUT)
621 while not ready:
622 events = poller.poll()
623 for _, flag in events:
624 if flag & select.POLLOUT:
625 ready = True
626 else:
627 while not ready:
628 try:
629 _, ready, _ = select.select([], [fileno], [])
630 except ValueError as e:
631 logger.error("[%s]: select fileno '%s': %s", self.name, str(fileno), str(e))
632 raise
633
634 logger.debug("[%s]: writing header", self.name)
635 sock.setblocking(1)
636 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
637 if poller:
638 poller.unregister(fileno)
639
640
642 """
643 Read TCPROS header from active socket
644 @raise TransportInitError if header fails to validate
645 """
646 sock = self.socket
647 if sock is None:
648 return
649 sock.setblocking(1)
650
651 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
652
654 """
655 Convenience routine for services to send a message across a
656 particular connection. NOTE: write_data is much more efficient
657 if same message is being sent to multiple connections. Not
658 threadsafe.
659 @param msg: message to send
660 @type msg: Msg
661 @param seq: sequence number for message
662 @type seq: int
663 @raise TransportException: if error occurred sending message
664 """
665
666 serialize_message(self.write_buff, seq, msg)
667 self.write_data(self.write_buff.getvalue())
668 self.write_buff.truncate(0)
669
671 """
672 Write raw data to transport
673 @raise TransportInitialiationError: could not be initialized
674 @raise TransportTerminated: no longer open for publishing
675 """
676 if not self.socket:
677 raise TransportInitError("TCPROS transport was not successfully initialized")
678 if self.done:
679 raise TransportTerminated("connection closed")
680 try:
681
682 self.socket.sendall(data)
683 self.stat_bytes += len(data)
684 self.stat_num_msg += 1
685 except IOError as ioe:
686
687 (errno, msg) = ioe.args
688 if errno == 32:
689 logdebug("ERROR: Broken Pipe")
690 self.close()
691 raise TransportTerminated(str(errno)+msg)
692 raise
693 except socket.error as se:
694
695 (errno, msg) = se.args
696 if errno == 32:
697 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id)
698 self.close()
699 raise TransportTerminated(msg)
700 elif errno == 104:
701 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)
702 self.close()
703 raise TransportTerminated(msg)
704 else:
705 rospydebug("unknown socket error writing data: %s",traceback.format_exc())
706 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)
707 self.close()
708 raise TransportTerminated(str(errno)+' '+msg)
709 return True
710
712 """
713 block until messages are read off of socket
714 @return: list of newly received messages
715 @rtype: [Msg]
716 @raise TransportException: if unable to receive message due to error
717 """
718 sock = self.socket
719 if sock is None:
720 raise TransportException("connection not initialized")
721 b = self.read_buff
722 msg_queue = []
723 p = self.protocol
724 try:
725 sock.setblocking(1)
726 while not msg_queue and not self.done and not is_shutdown():
727 if b.tell() >= 4:
728 p.read_messages(b, msg_queue, sock)
729 if not msg_queue:
730 self.stat_bytes += recv_buff(sock, b, p.buff_size)
731 self.stat_num_msg += len(msg_queue)
732
733 for m in msg_queue:
734 m._connection_header = self.header
735
736
737 if self.is_latched and msg_queue:
738 self.latch = msg_queue[-1]
739
740 return msg_queue
741
742 except DeserializationError as e:
743 rospyerr(traceback.format_exc())
744 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e)))
745 except TransportTerminated as e:
746 raise
747 except ServiceException as e:
748 raise
749 except Exception as e:
750 rospyerr(traceback.format_exc())
751 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
752 return retval
753
755
756
757
758 if self.dest_address is None:
759 raise ROSInitException("internal error with reconnection state: address not stored")
760
761 interval = 0.5
762 while self.socket is None and not self.done and not rospy.is_shutdown():
763 try:
764
765
766
767
768 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.)
769 except TransportInitError:
770 self.socket = None
771
772 if self.socket is None and interval < 30.:
773
774 interval = interval * 2
775
776 time.sleep(interval)
777
779 """
780 Receive messages until shutdown
781 @param msgs_callback: callback to invoke for new messages received
782 @type msgs_callback: fn([msg])
783 """
784
785 logger.debug("receive_loop for [%s]", self.name)
786 try:
787 while not self.done and not is_shutdown():
788 try:
789 if self.socket is not None:
790 msgs = self.receive_once()
791 if not self.done and not is_shutdown():
792 msgs_callback(msgs, self)
793 else:
794 self._reconnect()
795
796 except TransportException as e:
797
798 try:
799 if self.socket is not None:
800 try:
801 self.socket.shutdown()
802 except:
803 pass
804 finally:
805 self.socket.close()
806 except:
807 pass
808 self.socket = None
809
810 except DeserializationError as e:
811
812
813 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e))
814 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc())
815 except:
816
817 try:
818
819
820
821 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
822 except: pass
823
824 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
825 finally:
826 if not self.done:
827 self.close()
828
830 """close i/o and release resources"""
831 if not self.done:
832 try:
833 if self.socket is not None:
834 try:
835 self.socket.shutdown(socket.SHUT_RDWR)
836 except:
837 pass
838 finally:
839 self.socket.close()
840 finally:
841 self.socket = self.read_buff = self.write_buff = self.protocol = None
842 super(TCPROSTransport, self).close()
843