1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: common TCPROS libraries"""
36
37
38 try:
39 from cStringIO import StringIO
40 python3 = 0
41 except ImportError:
42 from io import StringIO, BytesIO
43 python3 = 1
44 import socket
45 import logging
46
47 try:
48 import _thread
49 except ImportError:
50 import thread as _thread
51
52 import threading
53 import time
54 import traceback
55 import select
56
57 import rosgraph
58 import rosgraph.network
59 from genpy import DeserializationError, Message
60 from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header
61
62
63 from rospy.core import *
64 from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn
65 from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError
66 from rospy.msg import deserialize_messages, serialize_message
67 from rospy.service import ServiceException
68
69 from rospy.impl.transport import Transport, BIDIRECTIONAL
70
71 logger = logging.getLogger('rospy.tcpros')
72
73
74 DEFAULT_BUFF_SIZE = 65536
75
76
77 TCPROS = "TCPROS"
78
79 _PARAM_TCP_KEEPALIVE = '/tcp_keepalive'
80 _use_tcp_keepalive = None
81 _use_tcp_keepalive_lock = threading.Lock()
95
97 """
98 Read data from socket into buffer.
99 @param sock: socket to read from
100 @type sock: socket.socket
101 @param b: buffer to receive into
102 @type b: StringIO
103 @param buff_size: recv read size
104 @type buff_size: int
105 @return: number of bytes read
106 @rtype: int
107 """
108 d = sock.recv(buff_size)
109 if d:
110 b.write(d)
111 return len(d)
112 else:
113 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
114
116 """
117 Simple server that accepts inbound TCP/IP connections and hands
118 them off to a handler function. TCPServer obeys the
119 ROS_IP/ROS_HOSTNAME environment variables
120 """
121
122 - def __init__(self, inbound_handler, port=0):
123 """
124 Setup a server socket listening on the specified port. If the
125 port is omitted, will choose any open port.
126 @param inbound_handler: handler to invoke with
127 new connection
128 @type inbound_handler: fn(sock, addr)
129 @param port: port to bind to, omit/0 to bind to any
130 @type port: int
131 """
132 self.port = port
133 self.addr = None
134 self.is_shutdown = False
135 self.inbound_handler = inbound_handler
136 try:
137 self.server_sock = self._create_server_sock()
138 except:
139 self.server_sock = None
140 raise
141
143 """Runs the run() loop in a separate thread"""
144 _thread.start_new_thread(self.run, ())
145
147 """
148 Main TCP receive loop. Should be run in a separate thread -- use start()
149 to do this automatically.
150 """
151 self.is_shutdown = False
152 if not self.server_sock:
153 raise ROSInternalException("%s did not connect"%self.__class__.__name__)
154 while not self.is_shutdown:
155 try:
156 (client_sock, client_addr) = self.server_sock.accept()
157 except socket.timeout:
158 continue
159 except IOError as e:
160 (errno, msg) = e.args
161 if errno == 4:
162 continue
163 raise
164 if self.is_shutdown:
165 break
166 try:
167
168 self.inbound_handler(client_sock, client_addr)
169 except socket.error as e:
170 if not self.is_shutdown:
171 traceback.print_exc()
172 logwarn("Failed to handle inbound connection due to socket error: %s"%e)
173 logdebug("TCPServer[%s] shutting down", self.port)
174
175
177 """
178 @return: (ip address, port) of server socket binding
179 @rtype: (str, int)
180 """
181
182
183 return (rosgraph.network.get_host_name(), self.port)
184
186 """
187 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
188 binding to loopback interface.
189 """
190 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
191 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
192 server_sock.bind((rosgraph.network.get_bind_address(), self.port))
193 (self.addr, self.port) = server_sock.getsockname()
194 server_sock.listen(5)
195 return server_sock
196
198 """shutdown I/O resources uses by this server"""
199 if not self.is_shutdown:
200 self.is_shutdown = True
201 self.server_sock.close()
202
203
204
205
206 _tcpros_server = None
207
215
223
224
225
227 """
228 get the address of the tcpros server.
229 @raise Exception: if tcpros server has not been started or created
230 """
231 return _tcpros_server.get_address()
232
234 """
235 utility handler that does nothing more than provide a rejection header
236 @param sock: socket connection
237 @type sock: socket.socket
238 @param client_addr: client address
239 @type client_addr: str
240 @param header: request header
241 @type header: dict
242 """
243 return {'error': "unhandled connection"}
244
246 """
247 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
248 connections as well as ROS service connections over TCP. TCP server
249 socket is run once start_server() is called -- this is implicitly
250 called during init_publisher().
251 """
252
261
263 """
264 Starts the TCP socket server if one is not already running
265 @param port: port number to bind to (default 0/any)
266 @type port: int
267 """
268 if self.tcp_ros_server:
269 return
270 with self.lock:
271 try:
272 if not self.tcp_ros_server:
273 self.tcp_ros_server = TCPServer(self._tcp_server_callback, port)
274 self.tcp_ros_server.start()
275 except Exception as e:
276 self.tcp_ros_server = None
277 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc()))
278 return 0, "unable to establish TCPROS server: %s"%e, []
279
281 """
282 @return: address and port of TCP server socket for accepting
283 inbound connections
284 @rtype: str, int
285 """
286 if self.tcp_ros_server is not None:
287 return self.tcp_ros_server.get_full_addr()
288 return None, None
289
291 """stops the TCP/IP server responsible for receiving inbound connections"""
292 if self.tcp_ros_server:
293 self.tcp_ros_server.shutdown()
294
296 """
297 TCPServer callback: detects incoming topic or service connection and passes connection accordingly
298
299 @param sock: socket connection
300 @type sock: socket.socket
301 @param client_addr: client address
302 @type client_addr: (str, int)
303 @raise TransportInitError: If transport cannot be succesfully initialized
304 """
305
306
307 try:
308 buff_size = 4096
309 if python3 == 0:
310
311 header = read_ros_handshake_header(sock, StringIO(), buff_size)
312 else:
313 header = read_ros_handshake_header(sock, BytesIO(), buff_size)
314
315 if 'topic' in header:
316 err_msg = self.topic_connection_handler(sock, client_addr, header)
317 elif 'service' in header:
318 err_msg = self.service_connection_handler(sock, client_addr, header)
319 else:
320 err_msg = 'no topic or service name detected'
321 if err_msg:
322
323
324
325
326
327
328 if not rospy.core.is_shutdown_requested():
329 write_ros_handshake_header(sock, {'error' : err_msg})
330 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header))
331 else:
332 write_ros_handshake_header(sock, {'error' : 'node shutting down'})
333 return
334 except rospy.exceptions.TransportInitError as e:
335 logwarn(str(e))
336 if sock is not None:
337 sock.close()
338 except Exception as e:
339
340 if not rospy.core.is_shutdown_requested():
341 logwarn("Inbound TCP/IP connection failed: %s", e)
342 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc(e))
343 if sock is not None:
344 sock.close()
345
347 """
348 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
349 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
350 well as which fields to send when creating a new connection (get_header_fields()).
351 """
352
354 """
355 ctor
356 @param resolved_name: resolved service or topic name
357 @type resolved_name: str
358 @param recv_data_class: message class for deserializing inbound messages
359 @type recv_data_class: Class
360 @param queue_size: maximum number of inbound messages to maintain
361 @type queue_size: int
362 @param buff_size: receive buffer size (in bytes) for reading from the connection.
363 @type buff_size: int
364 """
365 if recv_data_class and not issubclass(recv_data_class, Message):
366 raise TransportInitError("Unable to initialize transport: data class is not a message data class")
367 self.resolved_name = resolved_name
368 self.recv_data_class = recv_data_class
369 self.queue_size = queue_size
370 self.buff_size = buff_size
371 self.direction = BIDIRECTIONAL
372
373
375 """
376 @param b StringIO: read buffer
377 @param msg_queue [Message]: queue of deserialized messages
378 @type msg_queue: [Message]
379 @param sock socket: protocol can optionally read more data from
380 the socket, but in most cases the required data will already be
381 in b
382 """
383
384 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
385
387 """
388 Header fields that should be sent over the connection. The header fields
389 are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
390 @return: {str : str}: header fields to send over connection
391 @rtype: dict
392 """
393 return {}
394
395
396
397
398
399
400
401
402
403
404
406 """
407 Generic implementation of TCPROS exchange routines for both topics and services
408 """
409 transport_type = 'TCPROS'
410
411 - def __init__(self, protocol, name, header=None):
412 """
413 ctor
414 @param name str: identifier
415 @param protocol TCPROSTransportProtocol protocol implementation
416 @param header dict: (optional) handshake header if transport handshake header was
417 already read off of transport.
418 @raise TransportInitError if transport cannot be initialized according to arguments
419 """
420 super(TCPROSTransport, self).__init__(protocol.direction, name=name)
421 if not name:
422 raise TransportInitError("Unable to initialize transport: name is not set")
423
424 self.protocol = protocol
425
426 self.socket = None
427 self.endpoint_id = 'unknown'
428 self.dest_address = None
429
430 if python3 == 0:
431 self.read_buff = StringIO()
432 self.write_buff = StringIO()
433 else:
434 self.read_buff = BytesIO()
435 self.write_buff = BytesIO()
436
437
438 self.header = header
439
440
441 self.is_latched = False
442 self.latch = None
443
444
445
446 self._fileno = None
447
448
449
450
451 self.md5sum = None
452 self.type = None
453
455 """
456 Get descriptor for select
457 """
458 return self._fileno
459
461 """
462 Set the socket for this transport
463 @param sock: socket
464 @type sock: socket.socket
465 @param endpoint_id: identifier for connection endpoint
466 @type endpoint_id: str
467 @raise TransportInitError: if socket has already been set
468 """
469 if self.socket is not None:
470 raise TransportInitError("socket already initialized")
471 self.socket = sock
472 self.endpoint_id = endpoint_id
473 self._fileno = sock.fileno()
474
475 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
476 """
477 Establish TCP connection to the specified
478 address/port. connect() always calls L{write_header()} and
479 L{read_header()} after the connection is made
480 @param dest_addr: destination IP address
481 @type dest_addr: str
482 @param dest_port: destination port
483 @type dest_port: int
484 @param endpoint_id: string identifier for connection (for statistics)
485 @type endpoint_id: str
486 @param timeout: (optional keyword) timeout in seconds
487 @type timeout: float
488 @raise TransportInitError: if unable to create connection
489 """
490 try:
491 self.endpoint_id = endpoint_id
492 self.dest_address = (dest_addr, dest_port)
493
494 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
495 if _is_use_tcp_keepalive():
496
497 if hasattr(socket, 'TCP_KEEPCNT') and \
498 hasattr(socket, 'TCP_KEEPIDLE') and \
499 hasattr(socket, 'TCP_KEEPINTVL'):
500
501 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
502
503 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9)
504
505 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
506
507 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)
508 if timeout is not None:
509 s.settimeout(timeout)
510 self.socket = s
511 self.socket.connect((dest_addr, dest_port))
512 self.write_header()
513 self.read_header()
514 except TransportInitError as tie:
515 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
516 raise
517 except Exception as e:
518
519 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
520
521
522 self.done = True
523 if self.socket:
524 try:
525 self.socket.shutdown(socket.SHUT_RDWR)
526 except:
527 pass
528 finally:
529 self.socket.close()
530 self.socket = None
531
532 raise TransportInitError(str(e))
533
535 """
536 Validate header and initialize fields accordingly
537 @param header: header fields from publisher
538 @type header: dict
539 @raise TransportInitError: if header fails to validate
540 """
541 self.header = header
542 if 'error' in header:
543 raise TransportInitError("remote error reported: %s"%header['error'])
544 for required in ['md5sum', 'type']:
545 if not required in header:
546 raise TransportInitError("header missing required field [%s]"%required)
547 self.md5sum = header['md5sum']
548 self.type = header['type']
549 if header.get('latching', '0') == '1':
550 self.is_latched = True
551
553 """Writes the TCPROS header to the active connection."""
554
555 sock = self.socket
556 protocol = self.protocol
557
558
559
560 if sock is None or protocol is None:
561 return
562 fileno = sock.fileno()
563 ready = None
564 while not ready:
565 _, ready, _ = select.select([], [fileno], [])
566 logger.debug("[%s]: writing header", self.name)
567 sock.setblocking(1)
568 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
569
571 """
572 Read TCPROS header from active socket
573 @raise TransportInitError if header fails to validate
574 """
575 sock = self.socket
576 if sock is None:
577 return
578 sock.setblocking(1)
579 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
580
582 """
583 Convenience routine for services to send a message across a
584 particular connection. NOTE: write_data is much more efficient
585 if same message is being sent to multiple connections. Not
586 threadsafe.
587 @param msg: message to send
588 @type msg: Msg
589 @param seq: sequence number for message
590 @type seq: int
591 @raise TransportException: if error occurred sending message
592 """
593
594 serialize_message(self.write_buff, seq, msg)
595 self.write_data(self.write_buff.getvalue())
596 self.write_buff.truncate(0)
597
599 """
600 Write raw data to transport
601 @raise TransportInitialiationError: could not be initialized
602 @raise TransportTerminated: no longer open for publishing
603 """
604 if not self.socket:
605 raise TransportInitError("TCPROS transport was not successfully initialized")
606 if self.done:
607 raise TransportTerminated("connection closed")
608 try:
609
610 self.socket.sendall(data)
611 self.stat_bytes += len(data)
612 self.stat_num_msg += 1
613 except IOError as ioe:
614
615 (errno, msg) = ioe.args
616 if errno == 32:
617 logdebug("ERROR: Broken Pipe")
618 self.close()
619 raise TransportTerminated(str(errno)+msg)
620 raise
621 except socket.error as se:
622
623 (errno, msg) = se.args
624 if errno == 32:
625 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id)
626 self.close()
627 raise TransportTerminated(msg)
628 elif errno == 104:
629 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)
630 self.close()
631 raise TransportTerminated(msg)
632 else:
633 rospydebug("unknown socket error writing data: %s",traceback.format_exc())
634 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)
635 self.close()
636 raise TransportTerminated(str(errno)+' '+msg)
637 return True
638
640 """
641 block until messages are read off of socket
642 @return: list of newly received messages
643 @rtype: [Msg]
644 @raise TransportException: if unable to receive message due to error
645 """
646 sock = self.socket
647 if sock is None:
648 raise TransportException("connection not initialized")
649 b = self.read_buff
650 msg_queue = []
651 p = self.protocol
652 try:
653 sock.setblocking(1)
654 while not msg_queue and not self.done and not is_shutdown():
655 if b.tell() >= 4:
656 p.read_messages(b, msg_queue, sock)
657 if not msg_queue:
658 recv_buff(sock, b, p.buff_size)
659 self.stat_num_msg += len(msg_queue)
660
661 for m in msg_queue:
662 m._connection_header = self.header
663
664
665 if self.is_latched and msg_queue:
666 self.latch = msg_queue[-1]
667
668 return msg_queue
669
670 except DeserializationError as e:
671 rospyerr(traceback.format_exc())
672 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e)))
673 except TransportTerminated as e:
674 raise
675 except ServiceException as e:
676 raise
677 except Exception as e:
678 rospyerr(traceback.format_exc())
679 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
680 return retval
681
683
684
685
686 if self.dest_address is None:
687 raise ROSInitException("internal error with reconnection state: address not stored")
688
689 interval = 0.5
690 while self.socket is None and not self.done and not rospy.is_shutdown():
691 try:
692
693
694
695
696 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.)
697 except TransportInitError:
698 self.socket = None
699
700 if self.socket is None:
701
702 interval = interval * 2
703
704 time.sleep(interval)
705
707 """
708 Receive messages until shutdown
709 @param msgs_callback: callback to invoke for new messages received
710 @type msgs_callback: fn([msg])
711 """
712
713 logger.debug("receive_loop for [%s]", self.name)
714 try:
715 while not self.done and not is_shutdown():
716 try:
717 if self.socket is not None:
718 msgs = self.receive_once()
719 if not self.done and not is_shutdown():
720 msgs_callback(msgs)
721 else:
722 self._reconnect()
723
724 except TransportException as e:
725
726 try:
727 if self.socket is not None:
728 try:
729 self.socket.shutdown()
730 except:
731 pass
732 finally:
733 self.socket.close()
734 except:
735 pass
736 self.socket = None
737
738 except DeserializationError as e:
739
740
741 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e))
742 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc())
743 except:
744
745 try:
746
747
748
749 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
750 except: pass
751
752 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
753 finally:
754 if not self.done:
755 self.close()
756
758 """close i/o and release resources"""
759 if not self.done:
760 try:
761 if self.socket is not None:
762 try:
763 self.socket.shutdown(socket.SHUT_RDWR)
764 except:
765 pass
766 finally:
767 self.socket.close()
768 finally:
769 self.socket = self.read_buff = self.write_buff = self.protocol = None
770 super(TCPROSTransport, self).close()
771