1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: common TCPROS libraries"""
36
37
38 try:
39 from cStringIO import StringIO
40 python3 = 0
41 except ImportError:
42 from io import StringIO, BytesIO
43 python3 = 1
44 import socket
45 import logging
46
47 try:
48 import _thread
49 except ImportError:
50 import thread as _thread
51
52 import threading
53 import time
54 import traceback
55 import select
56
57 import roslib.rosenv
58 from roslib.message import DeserializationError, Message
59 from roslib.network import read_ros_handshake_header, write_ros_handshake_header
60
61
62 from rospy.core import *
63 from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn
64 from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError
65 from rospy.msg import deserialize_messages, serialize_message
66 from rospy.service import ServiceException
67
68 from rospy.impl.transport import Transport, BIDIRECTIONAL
69
70 logger = logging.getLogger('rospy.tcpros')
71
72
73 DEFAULT_BUFF_SIZE = 65536
74
75
76 TCPROS = "TCPROS"
77
78 _PARAM_TCP_KEEPALIVE = '/tcp_keepalive'
79 _use_tcp_keepalive = None
80 _use_tcp_keepalive_lock = threading.Lock()
94
96 """
97 Read data from socket into buffer.
98 @param sock: socket to read from
99 @type sock: socket.socket
100 @param b: buffer to receive into
101 @type b: StringIO
102 @param buff_size: recv read size
103 @type buff_size: int
104 @return: number of bytes read
105 @rtype: int
106 """
107 d = sock.recv(buff_size)
108 if d:
109 b.write(d)
110 return len(d)
111 else:
112 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
113
115 """
116 Simple server that accepts inbound TCP/IP connections and hands
117 them off to a handler function. TCPServer obeys the
118 ROS_IP/ROS_HOSTNAME environment variables
119 """
120
121 - def __init__(self, inbound_handler, port=0):
122 """
123 Setup a server socket listening on the specified port. If the
124 port is omitted, will choose any open port.
125 @param inbound_handler: handler to invoke with
126 new connection
127 @type inbound_handler: fn(sock, addr)
128 @param port: port to bind to, omit/0 to bind to any
129 @type port: int
130 """
131 self.port = port
132 self.addr = None
133 self.is_shutdown = False
134 self.inbound_handler = inbound_handler
135 try:
136 self.server_sock = self._create_server_sock()
137 except:
138 self.server_sock = None
139 raise
140
142 """Runs the run() loop in a separate thread"""
143 _thread.start_new_thread(self.run, ())
144
146 """
147 Main TCP receive loop. Should be run in a separate thread -- use start()
148 to do this automatically.
149 """
150 self.is_shutdown = False
151 if not self.server_sock:
152 raise ROSInternalException("%s did not connect"%self.__class__.__name__)
153 while not self.is_shutdown:
154 try:
155 (client_sock, client_addr) = self.server_sock.accept()
156 except socket.timeout:
157 continue
158 except IOError as e:
159 (errno, msg) = e.args
160 if errno == 4:
161 continue
162 raise
163 if self.is_shutdown:
164 break
165 try:
166
167 self.inbound_handler(client_sock, client_addr)
168 except socket.error as e:
169 if not self.is_shutdown:
170 traceback.print_exc()
171 logwarn("Failed to handle inbound connection due to socket error: %s"%e)
172 logdebug("TCPServer[%s] shutting down", self.port)
173
174
176 """
177 @return: (ip address, port) of server socket binding
178 @rtype: (str, int)
179 """
180
181
182 return (roslib.network.get_host_name(), self.port)
183
185 """
186 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
187 binding to loopback interface.
188 """
189 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
190 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
191 server_sock.bind((roslib.network.get_bind_address(), self.port))
192 (self.addr, self.port) = server_sock.getsockname()
193 server_sock.listen(5)
194 return server_sock
195
197 """shutdown I/O resources uses by this server"""
198 if not self.is_shutdown:
199 self.is_shutdown = True
200 self.server_sock.close()
201
202
203
204
205 _tcpros_server = None
206
214
222
223
224
226 """
227 get the address of the tcpros server.
228 @raise Exception: if tcpros server has not been started or created
229 """
230 return _tcpros_server.get_address()
231
233 """
234 utility handler that does nothing more than provide a rejection header
235 @param sock: socket connection
236 @type sock: socket.socket
237 @param client_addr: client address
238 @type client_addr: str
239 @param header: request header
240 @type header: dict
241 """
242 return {'error': "unhandled connection"}
243
245 """
246 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
247 connections as well as ROS service connections over TCP. TCP server
248 socket is run once start_server() is called -- this is implicitly
249 called during init_publisher().
250 """
251
260
262 """
263 Starts the TCP socket server if one is not already running
264 @param port: port number to bind to (default 0/any)
265 @type port: int
266 """
267 if self.tcp_ros_server:
268 return
269 with self.lock:
270 try:
271 if not self.tcp_ros_server:
272 self.tcp_ros_server = TCPServer(self._tcp_server_callback, port)
273 self.tcp_ros_server.start()
274 except Exception as e:
275 self.tcp_ros_server = None
276 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc()))
277 return 0, "unable to establish TCPROS server: %s"%e, []
278
280 """
281 @return: address and port of TCP server socket for accepting
282 inbound connections
283 @rtype: str, int
284 """
285 if self.tcp_ros_server is not None:
286 return self.tcp_ros_server.get_full_addr()
287 return None, None
288
290 """stops the TCP/IP server responsible for receiving inbound connections"""
291 if self.tcp_ros_server:
292 self.tcp_ros_server.shutdown()
293
295 """
296 TCPServer callback: detects incoming topic or service connection and passes connection accordingly
297
298 @param sock: socket connection
299 @type sock: socket.socket
300 @param client_addr: client address
301 @type client_addr: (str, int)
302 @raise TransportInitError: If transport cannot be succesfully initialized
303 """
304
305
306 try:
307 buff_size = 4096
308 if python3 == 0:
309
310 header = read_ros_handshake_header(sock, StringIO(), buff_size)
311 else:
312 header = read_ros_handshake_header(sock, BytesIO(), buff_size)
313
314 if 'topic' in header:
315 err_msg = self.topic_connection_handler(sock, client_addr, header)
316 elif 'service' in header:
317 err_msg = self.service_connection_handler(sock, client_addr, header)
318 else:
319 err_msg = 'no topic or service name detected'
320 if err_msg:
321
322
323
324
325
326
327 if not rospy.core.is_shutdown_requested():
328 write_ros_handshake_header(sock, {'error' : err_msg})
329 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header))
330 else:
331 write_ros_handshake_header(sock, {'error' : 'node shutting down'})
332 return
333 except rospy.exceptions.TransportInitError as e:
334 logwarn(str(e))
335 if sock is not None:
336 sock.close()
337 except Exception as e:
338
339 if not rospy.core.is_shutdown_requested():
340 logwarn("Inbound TCP/IP connection failed: %s", e)
341 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc(e))
342 if sock is not None:
343 sock.close()
344
346 """
347 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
348 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
349 well as which fields to send when creating a new connection (get_header_fields()).
350 """
351
353 """
354 ctor
355 @param resolved_name: resolved service or topic name
356 @type resolved_name: str
357 @param recv_data_class: message class for deserializing inbound messages
358 @type recv_data_class: Class
359 @param queue_size: maximum number of inbound messages to maintain
360 @type queue_size: int
361 @param buff_size: receive buffer size (in bytes) for reading from the connection.
362 @type buff_size: int
363 """
364 if recv_data_class and not issubclass(recv_data_class, Message):
365 raise TransportInitError("Unable to initialize transport: data class is not a message data class")
366 self.resolved_name = resolved_name
367 self.recv_data_class = recv_data_class
368 self.queue_size = queue_size
369 self.buff_size = buff_size
370 self.direction = BIDIRECTIONAL
371
372
374 """
375 @param b StringIO: read buffer
376 @param msg_queue [Message]: queue of deserialized messages
377 @type msg_queue: [Message]
378 @param sock socket: protocol can optionally read more data from
379 the socket, but in most cases the required data will already be
380 in b
381 """
382
383 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
384
386 """
387 Header fields that should be sent over the connection. The header fields
388 are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
389 @return: {str : str}: header fields to send over connection
390 @rtype: dict
391 """
392 return {}
393
394
395
396
397
398
399
400
401
402
403
405 """
406 Generic implementation of TCPROS exchange routines for both topics and services
407 """
408 transport_type = 'TCPROS'
409
410 - def __init__(self, protocol, name, header=None):
411 """
412 ctor
413 @param name str: identifier
414 @param protocol TCPROSTransportProtocol protocol implementation
415 @param header dict: (optional) handshake header if transport handshake header was
416 already read off of transport.
417 @raise TransportInitError if transport cannot be initialized according to arguments
418 """
419 super(TCPROSTransport, self).__init__(protocol.direction, name=name)
420 if not name:
421 raise TransportInitError("Unable to initialize transport: name is not set")
422
423 self.protocol = protocol
424
425 self.socket = None
426 self.endpoint_id = 'unknown'
427 self.dest_address = None
428
429 if python3 == 0:
430 self.read_buff = StringIO()
431 self.write_buff = StringIO()
432 else:
433 self.read_buff = BytesIO()
434 self.write_buff = BytesIO()
435
436
437 self.header = header
438
439
440 self.is_latched = False
441 self.latch = None
442
443
444
445
446 self.md5sum = None
447 self.type = None
448
450 """
451 Set the socket for this transport
452 @param sock: socket
453 @type sock: socket.socket
454 @param endpoint_id: identifier for connection endpoint
455 @type endpoint_id: str
456 @raise TransportInitError: if socket has already been set
457 """
458 if self.socket is not None:
459 raise TransportInitError("socket already initialized")
460 self.socket = sock
461 self.endpoint_id = endpoint_id
462
463 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
464 """
465 Establish TCP connection to the specified
466 address/port. connect() always calls L{write_header()} and
467 L{read_header()} after the connection is made
468 @param dest_addr: destination IP address
469 @type dest_addr: str
470 @param dest_port: destination port
471 @type dest_port: int
472 @param endpoint_id: string identifier for connection (for statistics)
473 @type endpoint_id: str
474 @param timeout: (optional keyword) timeout in seconds
475 @type timeout: float
476 @raise TransportInitError: if unable to create connection
477 """
478 try:
479 self.endpoint_id = endpoint_id
480 self.dest_address = (dest_addr, dest_port)
481
482 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
483 if _is_use_tcp_keepalive():
484
485 if hasattr(socket, 'TCP_KEEPCNT') and \
486 hasattr(socket, 'TCP_KEEPIDLE') and \
487 hasattr(socket, 'TCP_KEEPINTVL'):
488
489 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
490
491 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9)
492
493 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
494
495 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)
496 if timeout is not None:
497 s.settimeout(timeout)
498 self.socket = s
499 self.socket.connect((dest_addr, dest_port))
500 self.write_header()
501 self.read_header()
502 except TransportInitError as tie:
503 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
504 raise
505 except Exception as e:
506
507 self.done = True
508 if self.socket:
509 try:
510 self.socket.shutdown()
511 except:
512 pass
513 finally:
514 self.socket.close()
515 self.socket = None
516
517
518 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
519 raise TransportInitError(str(e))
520
522 """
523 Validate header and initialize fields accordingly
524 @param header: header fields from publisher
525 @type header: dict
526 @raise TransportInitError: if header fails to validate
527 """
528 self.header = header
529 if 'error' in header:
530 raise TransportInitError("remote error reported: %s"%header['error'])
531 for required in ['md5sum', 'type']:
532 if not required in header:
533 raise TransportInitError("header missing required field [%s]"%required)
534 self.md5sum = header['md5sum']
535 self.type = header['type']
536 if header.get('latching', '0') == '1':
537 self.is_latched = True
538
540 """Writes the TCPROS header to the active connection."""
541 sock = self.socket
542
543 fileno = sock.fileno()
544 ready = None
545 while not ready:
546 _, ready, _ = select.select([], [fileno], [])
547 logger.debug("[%s]: writing header", self.name)
548 sock.setblocking(1)
549 self.stat_bytes += write_ros_handshake_header(sock, self.protocol.get_header_fields())
550
552 """
553 Read TCPROS header from active socket
554 @raise TransportInitError if header fails to validate
555 """
556 self.socket.setblocking(1)
557 self._validate_header(read_ros_handshake_header(self.socket, self.read_buff, self.protocol.buff_size))
558
560 """
561 Convenience routine for services to send a message across a
562 particular connection. NOTE: write_data is much more efficient
563 if same message is being sent to multiple connections. Not
564 threadsafe.
565 @param msg: message to send
566 @type msg: Msg
567 @param seq: sequence number for message
568 @type seq: int
569 @raise TransportException: if error occurred sending message
570 """
571
572 serialize_message(self.write_buff, seq, msg)
573 self.write_data(self.write_buff.getvalue())
574 self.write_buff.truncate(0)
575
577 """
578 Write raw data to transport
579 @raise TransportInitialiationError: could not be initialized
580 @raise TransportTerminated: no longer open for publishing
581 """
582 if not self.socket:
583 raise TransportInitError("TCPROS transport was not successfully initialized")
584 if self.done:
585 raise TransportTerminated("connection closed")
586 try:
587
588 self.socket.sendall(data)
589 self.stat_bytes += len(data)
590 self.stat_num_msg += 1
591 except IOError as ioe:
592
593 (errno, msg) = ioe.args
594 if errno == 32:
595 logdebug("ERROR: Broken Pipe")
596 self.close()
597 raise TransportTerminated(str(errno)+msg)
598 raise
599 except socket.error as se:
600
601 (errno, msg) = se.args
602 if errno == 32:
603 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id)
604 self.close()
605 raise TransportTerminated(msg)
606 elif errno == 104:
607 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)
608 self.close()
609 raise TransportTerminated(msg)
610 else:
611 rospydebug("unknown socket error writing data: %s",traceback.format_exc())
612 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)
613 self.close()
614 raise TransportTerminated(str(errno)+' '+msg)
615 return True
616
618 """
619 block until messages are read off of socket
620 @return: list of newly received messages
621 @rtype: [Msg]
622 @raise TransportException: if unable to receive message due to error
623 """
624 sock = self.socket
625 if sock is None:
626 raise TransportException("connection not initialized")
627 b = self.read_buff
628 msg_queue = []
629 p = self.protocol
630 try:
631 sock.setblocking(1)
632 while not msg_queue and not self.done and not is_shutdown():
633 if b.tell() >= 4:
634 p.read_messages(b, msg_queue, sock)
635 if not msg_queue:
636 recv_buff(sock, b, p.buff_size)
637 self.stat_num_msg += len(msg_queue)
638
639 for m in msg_queue:
640 m._connection_header = self.header
641
642
643 if self.is_latched and msg_queue:
644 self.latch = msg_queue[-1]
645
646 return msg_queue
647
648 except DeserializationError as e:
649 rospyerr(traceback.format_exc())
650 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e)))
651 except TransportTerminated as e:
652 raise
653 except ServiceException as e:
654 raise
655 except Exception as e:
656 rospyerr(traceback.format_exc())
657 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
658 return retval
659
661
662
663
664 if self.dest_address is None:
665 raise ROSInitException("internal error with reconnection state: address not stored")
666
667 interval = 0.5
668 while self.socket is None and not self.done and not rospy.is_shutdown():
669 try:
670
671
672
673
674 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.)
675 except TransportInitError:
676 self.socket = None
677
678 if self.socket is None:
679
680 interval = interval * 2
681
682 time.sleep(interval)
683
685 """
686 Receive messages until shutdown
687 @param msgs_callback: callback to invoke for new messages received
688 @type msgs_callback: fn([msg])
689 """
690
691 logger.debug("receive_loop for [%s]", self.name)
692 try:
693 while not self.done and not is_shutdown():
694 try:
695 if self.socket is not None:
696 msgs = self.receive_once()
697 if not self.done and not is_shutdown():
698 msgs_callback(msgs)
699 else:
700 self._reconnect()
701
702 except TransportException as e:
703
704 try:
705 if self.socket is not None:
706 try:
707 self.socket.shutdown()
708 except:
709 pass
710 finally:
711 self.socket.close()
712 except:
713 pass
714 self.socket = None
715
716 except DeserializationError as e:
717
718
719 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e))
720 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc())
721 except:
722
723 try:
724
725
726
727 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
728 except: pass
729
730 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
731 finally:
732 if not self.done:
733 self.close()
734
736 """close i/o and release resources"""
737 self.done = True
738 try:
739 if self.socket is not None:
740 try:
741 self.socket.shutdown()
742 except:
743 pass
744 finally:
745 self.socket.close()
746 finally:
747 self.socket = self.read_buff = self.write_buff = self.protocol = None
748 super(TCPROSTransport, self).close()
749