1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: common TCPROS libraries"""
36
37
38 try:
39 from cStringIO import StringIO
40 python3 = 0
41 except ImportError:
42 from io import StringIO, BytesIO
43 python3 = 1
44 import socket
45 import logging
46
47 import threading
48 import time
49 import traceback
50 import select
51
52 import rosgraph
53 import rosgraph.network
54 from genpy import DeserializationError, Message
55 from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header
56
57
58 from rospy.core import *
59 from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn
60 from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError
61 from rospy.msg import deserialize_messages, serialize_message
62 from rospy.service import ServiceException
63
64 from rospy.impl.transport import Transport, BIDIRECTIONAL
65
66 logger = logging.getLogger('rospy.tcpros')
67
68
69 DEFAULT_BUFF_SIZE = 65536
70
71
72 TCPROS = "TCPROS"
73
74 _PARAM_TCP_KEEPALIVE = '/tcp_keepalive'
75 _use_tcp_keepalive = None
76 _use_tcp_keepalive_lock = threading.Lock()
90
92 """
93 Read data from socket into buffer.
94 @param sock: socket to read from
95 @type sock: socket.socket
96 @param b: buffer to receive into
97 @type b: StringIO
98 @param buff_size: recv read size
99 @type buff_size: int
100 @return: number of bytes read
101 @rtype: int
102 """
103 d = sock.recv(buff_size)
104 if d:
105 b.write(d)
106 return len(d)
107 else:
108 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
109
111 """
112 Simple server that accepts inbound TCP/IP connections and hands
113 them off to a handler function. TCPServer obeys the
114 ROS_IP/ROS_HOSTNAME environment variables
115 """
116
117 - def __init__(self, inbound_handler, port=0):
118 """
119 Setup a server socket listening on the specified port. If the
120 port is omitted, will choose any open port.
121 @param inbound_handler: handler to invoke with
122 new connection
123 @type inbound_handler: fn(sock, addr)
124 @param port: port to bind to, omit/0 to bind to any
125 @type port: int
126 """
127 self.port = port
128 self.addr = None
129 self.is_shutdown = False
130 self.inbound_handler = inbound_handler
131 try:
132 self.server_sock = self._create_server_sock()
133 except:
134 self.server_sock = None
135 raise
136
138 """Runs the run() loop in a separate thread"""
139 t = threading.Thread(target=self.run, args=())
140 t.setDaemon(True)
141 t.start()
142
144 """
145 Main TCP receive loop. Should be run in a separate thread -- use start()
146 to do this automatically.
147 """
148 self.is_shutdown = False
149 if not self.server_sock:
150 raise ROSInternalException("%s did not connect"%self.__class__.__name__)
151 while not self.is_shutdown:
152 try:
153 (client_sock, client_addr) = self.server_sock.accept()
154 except socket.timeout:
155 continue
156 except IOError as e:
157 (errno, msg) = e.args
158 if errno == 4:
159 continue
160 raise
161 if self.is_shutdown:
162 break
163 try:
164
165 self.inbound_handler(client_sock, client_addr)
166 except socket.error as e:
167 if not self.is_shutdown:
168 traceback.print_exc()
169 logwarn("Failed to handle inbound connection due to socket error: %s"%e)
170 logdebug("TCPServer[%s] shutting down", self.port)
171
172
174 """
175 @return: (ip address, port) of server socket binding
176 @rtype: (str, int)
177 """
178
179
180 return (rosgraph.network.get_host_name(), self.port)
181
183 """
184 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
185 binding to loopback interface.
186 """
187 if rosgraph.network.use_ipv6():
188 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
189 else:
190 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
191 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
192 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port))
193 server_sock.bind((rosgraph.network.get_bind_address(), self.port))
194 (self.addr, self.port) = server_sock.getsockname()[0:2]
195 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port))
196 server_sock.listen(5)
197 return server_sock
198
200 """shutdown I/O resources uses by this server"""
201 if not self.is_shutdown:
202 self.is_shutdown = True
203 self.server_sock.close()
204
205
206
207
208 _tcpros_server = None
209
222
230
231
232
234 """
235 get the address of the tcpros server.
236 @raise Exception: if tcpros server has not been started or created
237 """
238 return _tcpros_server.get_address()
239
241 """
242 utility handler that does nothing more than provide a rejection header
243 @param sock: socket connection
244 @type sock: socket.socket
245 @param client_addr: client address
246 @type client_addr: str
247 @param header: request header
248 @type header: dict
249 """
250 return {'error': "unhandled connection"}
251
253 """
254 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
255 connections as well as ROS service connections over TCP. TCP server
256 socket is run once start_server() is called -- this is implicitly
257 called during init_publisher().
258 """
259
261 """
262 Constructur
263 @param port: port number to bind to (default 0/any)
264 @type port: int
265 """
266 self.port = port
267 self.tcp_ros_server = None
268 self.lock = threading.Lock()
269
270 self.topic_connection_handler = _error_connection_handler
271
272 self.service_connection_handler = _error_connection_handler
273
275 """
276 Starts the TCP socket server if one is not already running
277 """
278 if self.tcp_ros_server:
279 return
280 with self.lock:
281 try:
282 if not self.tcp_ros_server:
283 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port)
284 self.tcp_ros_server.start()
285 except Exception as e:
286 self.tcp_ros_server = None
287 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc()))
288 return 0, "unable to establish TCPROS server: %s"%e, []
289
291 """
292 @return: address and port of TCP server socket for accepting
293 inbound connections
294 @rtype: str, int
295 """
296 if self.tcp_ros_server is not None:
297 return self.tcp_ros_server.get_full_addr()
298 return None, None
299
301 """stops the TCP/IP server responsible for receiving inbound connections"""
302 if self.tcp_ros_server:
303 self.tcp_ros_server.shutdown()
304
306 """
307 TCPServer callback: detects incoming topic or service connection and passes connection accordingly
308
309 @param sock: socket connection
310 @type sock: socket.socket
311 @param client_addr: client address
312 @type client_addr: (str, int)
313 @raise TransportInitError: If transport cannot be succesfully initialized
314 """
315
316
317 try:
318 buff_size = 4096
319 if python3 == 0:
320
321 header = read_ros_handshake_header(sock, StringIO(), buff_size)
322 else:
323 header = read_ros_handshake_header(sock, BytesIO(), buff_size)
324
325 if 'topic' in header:
326 err_msg = self.topic_connection_handler(sock, client_addr, header)
327 elif 'service' in header:
328 err_msg = self.service_connection_handler(sock, client_addr, header)
329 else:
330 err_msg = 'no topic or service name detected'
331 if err_msg:
332
333
334
335
336
337
338 if not rospy.core.is_shutdown_requested():
339 write_ros_handshake_header(sock, {'error' : err_msg})
340 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header))
341 else:
342 write_ros_handshake_header(sock, {'error' : 'node shutting down'})
343 return
344 except rospy.exceptions.TransportInitError as e:
345 logwarn(str(e))
346 if sock is not None:
347 sock.close()
348 except Exception as e:
349
350 if not rospy.core.is_shutdown_requested():
351 logwarn("Inbound TCP/IP connection failed: %s", e)
352 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc(e))
353 if sock is not None:
354 sock.close()
355
357 """
358 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
359 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
360 well as which fields to send when creating a new connection (get_header_fields()).
361 """
362
364 """
365 ctor
366 @param resolved_name: resolved service or topic name
367 @type resolved_name: str
368 @param recv_data_class: message class for deserializing inbound messages
369 @type recv_data_class: Class
370 @param queue_size: maximum number of inbound messages to maintain
371 @type queue_size: int
372 @param buff_size: receive buffer size (in bytes) for reading from the connection.
373 @type buff_size: int
374 """
375 if recv_data_class and not issubclass(recv_data_class, Message):
376 raise TransportInitError("Unable to initialize transport: data class is not a message data class")
377 self.resolved_name = resolved_name
378 self.recv_data_class = recv_data_class
379 self.queue_size = queue_size
380 self.buff_size = buff_size
381 self.direction = BIDIRECTIONAL
382
383
385 """
386 @param b StringIO: read buffer
387 @param msg_queue [Message]: queue of deserialized messages
388 @type msg_queue: [Message]
389 @param sock socket: protocol can optionally read more data from
390 the socket, but in most cases the required data will already be
391 in b
392 """
393
394 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
395
397 """
398 Header fields that should be sent over the connection. The header fields
399 are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
400 @return: {str : str}: header fields to send over connection
401 @rtype: dict
402 """
403 return {}
404
405
406
407
408
409
410
411
412
413
414
416 """
417 Generic implementation of TCPROS exchange routines for both topics and services
418 """
419 transport_type = 'TCPROS'
420
421 - def __init__(self, protocol, name, header=None):
422 """
423 ctor
424 @param name str: identifier
425 @param protocol TCPROSTransportProtocol protocol implementation
426 @param header dict: (optional) handshake header if transport handshake header was
427 already read off of transport.
428 @raise TransportInitError if transport cannot be initialized according to arguments
429 """
430 super(TCPROSTransport, self).__init__(protocol.direction, name=name)
431 if not name:
432 raise TransportInitError("Unable to initialize transport: name is not set")
433
434 self.protocol = protocol
435
436 self.socket = None
437 self.endpoint_id = 'unknown'
438 self.dest_address = None
439
440 if python3 == 0:
441 self.read_buff = StringIO()
442 self.write_buff = StringIO()
443 else:
444 self.read_buff = BytesIO()
445 self.write_buff = BytesIO()
446
447
448 self.header = header
449
450
451 self.is_latched = False
452 self.latch = None
453
454
455
456 self._fileno = None
457
458
459
460
461 self.md5sum = None
462 self.type = None
463
465 """
466 Get descriptor for select
467 """
468 return self._fileno
469
471 """
472 Set the endpoint_id of this transport.
473 Allows the endpoint_id to be set before the socket is initialized.
474 """
475 self.endpoint_id = endpoint_id
476
478 """
479 Set the socket for this transport
480 @param sock: socket
481 @type sock: socket.socket
482 @param endpoint_id: identifier for connection endpoint
483 @type endpoint_id: str
484 @raise TransportInitError: if socket has already been set
485 """
486 if self.socket is not None:
487 raise TransportInitError("socket already initialized")
488 self.socket = sock
489 self.endpoint_id = endpoint_id
490 self._fileno = sock.fileno()
491
492 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
493 """
494 Establish TCP connection to the specified
495 address/port. connect() always calls L{write_header()} and
496 L{read_header()} after the connection is made
497 @param dest_addr: destination IP address
498 @type dest_addr: str
499 @param dest_port: destination port
500 @type dest_port: int
501 @param endpoint_id: string identifier for connection (for statistics)
502 @type endpoint_id: str
503 @param timeout: (optional keyword) timeout in seconds
504 @type timeout: float
505 @raise TransportInitError: if unable to create connection
506 """
507 try:
508 self.endpoint_id = endpoint_id
509 self.dest_address = (dest_addr, dest_port)
510 if rosgraph.network.use_ipv6():
511 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
512 else:
513 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
514 if _is_use_tcp_keepalive():
515
516 if hasattr(socket, 'TCP_KEEPCNT') and \
517 hasattr(socket, 'TCP_KEEPIDLE') and \
518 hasattr(socket, 'TCP_KEEPINTVL'):
519
520 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
521
522 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9)
523
524 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
525
526 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)
527 if timeout is not None:
528 s.settimeout(timeout)
529 self.socket = s
530 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port))
531 self.socket.connect((dest_addr, dest_port))
532 self.write_header()
533 self.read_header()
534 except TransportInitError as tie:
535 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
536 raise
537 except Exception as e:
538
539 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
540
541
542 self.close()
543 raise TransportInitError(str(e))
544
546 """
547 Validate header and initialize fields accordingly
548 @param header: header fields from publisher
549 @type header: dict
550 @raise TransportInitError: if header fails to validate
551 """
552 self.header = header
553 if 'error' in header:
554 raise TransportInitError("remote error reported: %s"%header['error'])
555 for required in ['md5sum', 'type']:
556 if not required in header:
557 raise TransportInitError("header missing required field [%s]"%required)
558 self.md5sum = header['md5sum']
559 self.type = header['type']
560 if header.get('latching', '0') == '1':
561 self.is_latched = True
562
564 """Writes the TCPROS header to the active connection."""
565
566 sock = self.socket
567 protocol = self.protocol
568
569
570
571 if sock is None or protocol is None:
572 return
573 fileno = sock.fileno()
574 ready = None
575 while not ready:
576 _, ready, _ = select.select([], [fileno], [])
577 logger.debug("[%s]: writing header", self.name)
578 sock.setblocking(1)
579 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
580
582 """
583 Read TCPROS header from active socket
584 @raise TransportInitError if header fails to validate
585 """
586 sock = self.socket
587 if sock is None:
588 return
589 sock.setblocking(1)
590 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
591
593 """
594 Convenience routine for services to send a message across a
595 particular connection. NOTE: write_data is much more efficient
596 if same message is being sent to multiple connections. Not
597 threadsafe.
598 @param msg: message to send
599 @type msg: Msg
600 @param seq: sequence number for message
601 @type seq: int
602 @raise TransportException: if error occurred sending message
603 """
604
605 serialize_message(self.write_buff, seq, msg)
606 self.write_data(self.write_buff.getvalue())
607 self.write_buff.truncate(0)
608
610 """
611 Write raw data to transport
612 @raise TransportInitialiationError: could not be initialized
613 @raise TransportTerminated: no longer open for publishing
614 """
615 if not self.socket:
616 raise TransportInitError("TCPROS transport was not successfully initialized")
617 if self.done:
618 raise TransportTerminated("connection closed")
619 try:
620
621 self.socket.sendall(data)
622 self.stat_bytes += len(data)
623 self.stat_num_msg += 1
624 except IOError as ioe:
625
626 (errno, msg) = ioe.args
627 if errno == 32:
628 logdebug("ERROR: Broken Pipe")
629 self.close()
630 raise TransportTerminated(str(errno)+msg)
631 raise
632 except socket.error as se:
633
634 (errno, msg) = se.args
635 if errno == 32:
636 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id)
637 self.close()
638 raise TransportTerminated(msg)
639 elif errno == 104:
640 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)
641 self.close()
642 raise TransportTerminated(msg)
643 else:
644 rospydebug("unknown socket error writing data: %s",traceback.format_exc())
645 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)
646 self.close()
647 raise TransportTerminated(str(errno)+' '+msg)
648 return True
649
651 """
652 block until messages are read off of socket
653 @return: list of newly received messages
654 @rtype: [Msg]
655 @raise TransportException: if unable to receive message due to error
656 """
657 sock = self.socket
658 if sock is None:
659 raise TransportException("connection not initialized")
660 b = self.read_buff
661 msg_queue = []
662 p = self.protocol
663 try:
664 sock.setblocking(1)
665 while not msg_queue and not self.done and not is_shutdown():
666 if b.tell() >= 4:
667 p.read_messages(b, msg_queue, sock)
668 if not msg_queue:
669 recv_buff(sock, b, p.buff_size)
670 self.stat_num_msg += len(msg_queue)
671
672 for m in msg_queue:
673 m._connection_header = self.header
674
675
676 if self.is_latched and msg_queue:
677 self.latch = msg_queue[-1]
678
679 return msg_queue
680
681 except DeserializationError as e:
682 rospyerr(traceback.format_exc())
683 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e)))
684 except TransportTerminated as e:
685 raise
686 except ServiceException as e:
687 raise
688 except Exception as e:
689 rospyerr(traceback.format_exc())
690 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
691 return retval
692
694
695
696
697 if self.dest_address is None:
698 raise ROSInitException("internal error with reconnection state: address not stored")
699
700 interval = 0.5
701 while self.socket is None and not self.done and not rospy.is_shutdown():
702 try:
703
704
705
706
707 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.)
708 except TransportInitError:
709 self.socket = None
710
711 if self.socket is None:
712
713 interval = interval * 2
714
715 time.sleep(interval)
716
718 """
719 Receive messages until shutdown
720 @param msgs_callback: callback to invoke for new messages received
721 @type msgs_callback: fn([msg])
722 """
723
724 logger.debug("receive_loop for [%s]", self.name)
725 try:
726 while not self.done and not is_shutdown():
727 try:
728 if self.socket is not None:
729 msgs = self.receive_once()
730 if not self.done and not is_shutdown():
731 msgs_callback(msgs)
732 else:
733 self._reconnect()
734
735 except TransportException as e:
736
737 try:
738 if self.socket is not None:
739 try:
740 self.socket.shutdown()
741 except:
742 pass
743 finally:
744 self.socket.close()
745 except:
746 pass
747 self.socket = None
748
749 except DeserializationError as e:
750
751
752 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e))
753 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc())
754 except:
755
756 try:
757
758
759
760 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
761 except: pass
762
763 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
764 finally:
765 if not self.done:
766 self.close()
767
769 """close i/o and release resources"""
770 if not self.done:
771 try:
772 if self.socket is not None:
773 try:
774 self.socket.shutdown(socket.SHUT_RDWR)
775 except:
776 pass
777 finally:
778 self.socket.close()
779 finally:
780 self.socket = self.read_buff = self.write_buff = self.protocol = None
781 super(TCPROSTransport, self).close()
782