1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: common TCPROS libraries"""
36
37 from __future__ import with_statement
38
39 import cStringIO
40 import select
41 import socket
42 import logging
43 import thread
44 import threading
45 import time
46 import traceback
47
48 import roslib.rosenv
49 from roslib.message import DeserializationError, Message
50 from roslib.network import read_ros_handshake_header, write_ros_handshake_header
51
52
53 from rospy.core import *
54 from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn
55 from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError
56 from rospy.msg import deserialize_messages, serialize_message
57 from rospy.service import ServiceException
58
59 from rospy.impl.transport import Transport, BIDIRECTIONAL
60
61 logger = logging.getLogger('rospy.tcpros')
62
63
64 DEFAULT_BUFF_SIZE = 65536
65
66
67 TCPROS = "TCPROS"
68
69 _PARAM_TCP_KEEPALIVE = '/tcp_keepalive'
70 _use_tcp_keepalive = None
71 _use_tcp_keepalive_lock = threading.Lock()
85
87 """
88 Read data from socket into buffer.
89 @param sock: socket to read from
90 @type sock: socket.socket
91 @param b: buffer to receive into
92 @type b: StringIO
93 @param buff_size: recv read size
94 @type buff_size: int
95 @return: number of bytes read
96 @rtype: int
97 """
98 d = sock.recv(buff_size)
99 if d:
100 b.write(d)
101 return len(d)
102 else:
103 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
104
106 """
107 Simple server that accepts inbound TCP/IP connections and hands
108 them off to a handler function. TCPServer obeys the
109 ROS_IP/ROS_HOSTNAME environment variables
110 """
111
112 - def __init__(self, inbound_handler, port=0):
113 """
114 Setup a server socket listening on the specified port. If the
115 port is omitted, will choose any open port.
116 @param inbound_handler: handler to invoke with
117 new connection
118 @type inbound_handler: fn(sock, addr)
119 @param port: port to bind to, omit/0 to bind to any
120 @type port: int
121 """
122 self.port = port
123 self.addr = None
124 self.is_shutdown = False
125 self.inbound_handler = inbound_handler
126 try:
127 self.server_sock = self._create_server_sock()
128 except:
129 self.server_sock = None
130 raise
131
133 """Runs the run() loop in a separate thread"""
134 thread.start_new_thread(self.run, ())
135
137 """
138 Main TCP receive loop. Should be run in a separate thread -- use start()
139 to do this automatically.
140 """
141 self.is_shutdown = False
142 if not self.server_sock:
143 raise ROSInternalException("%s did not connect"%self.__class__.__name__)
144 while not self.is_shutdown:
145 try:
146 (client_sock, client_addr) = self.server_sock.accept()
147 except socket.timeout:
148 continue
149 except IOError, (errno, msg):
150 if errno == 4:
151 continue
152 raise
153 if self.is_shutdown:
154 break
155 try:
156
157 self.inbound_handler(client_sock, client_addr)
158 except socket.error, e:
159 if not self.is_shutdown:
160 traceback.print_exc()
161 logwarn("Failed to handle inbound connection due to socket error: %s"%e)
162 logdebug("TCPServer[%s] shutting down", self.port)
163
164
166 """
167 @return: (ip address, port) of server socket binding
168 @rtype: (str, int)
169 """
170
171
172 return (roslib.network.get_host_name(), self.port)
173
175 """
176 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
177 binding to loopback interface.
178 """
179 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
180 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
181 server_sock.bind((roslib.network.get_bind_address(), self.port))
182 (self.addr, self.port) = server_sock.getsockname()
183 server_sock.listen(5)
184 return server_sock
185
187 """shutdown I/O resources uses by this server"""
188 if not self.is_shutdown:
189 self.is_shutdown = True
190 self.server_sock.close()
191
192
193
194
195 _tcpros_server = None
196
204
212
213
214
216 """
217 get the address of the tcpros server.
218 @raise Exception: if tcpros server has not been started or created
219 """
220 return _tcpros_server.get_address()
221
223 """
224 utility handler that does nothing more than provide a rejection header
225 @param sock: socket connection
226 @type sock: socket.socket
227 @param client_addr: client address
228 @type client_addr: str
229 @param header: request header
230 @type header: dict
231 """
232 return {'error': "unhandled connection"}
233
235 """
236 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
237 connections as well as ROS service connections over TCP. TCP server
238 socket is run once start_server() is called -- this is implicitly
239 called during init_publisher().
240 """
241
250
252 """
253 Starts the TCP socket server if one is not already running
254 @param port: port number to bind to (default 0/any)
255 @type port: int
256 """
257 if self.tcp_ros_server:
258 return
259 with self.lock:
260 try:
261 if not self.tcp_ros_server:
262 self.tcp_ros_server = TCPServer(self._tcp_server_callback, port)
263 self.tcp_ros_server.start()
264 except Exception, e:
265 self.tcp_ros_server = None
266 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc()))
267 return 0, "unable to establish TCPROS server: %s"%e, []
268
270 """
271 @return: address and port of TCP server socket for accepting
272 inbound connections
273 @rtype: str, int
274 """
275 if self.tcp_ros_server is not None:
276 return self.tcp_ros_server.get_full_addr()
277 return None, None
278
280 """stops the TCP/IP server responsible for receiving inbound connections"""
281 if self.tcp_ros_server:
282 self.tcp_ros_server.shutdown()
283
285 """
286 TCPServer callback: detects incoming topic or service connection and passes connection accordingly
287
288 @param sock: socket connection
289 @type sock: socket.socket
290 @param client_addr: client address
291 @type client_addr: (str, int)
292 @raise TransportInitError: If transport cannot be succesfully initialized
293 """
294
295
296 try:
297 buff_size = 4096
298 header = read_ros_handshake_header(sock, cStringIO.StringIO(), buff_size)
299 if 'topic' in header:
300 err_msg = self.topic_connection_handler(sock, client_addr, header)
301 elif 'service' in header:
302 err_msg = self.service_connection_handler(sock, client_addr, header)
303 else:
304 err_msg = 'no topic or service name detected'
305 if err_msg:
306
307
308
309
310
311
312 if not rospy.core.is_shutdown_requested():
313 write_ros_handshake_header(sock, {'error' : err_msg})
314 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header))
315 else:
316 write_ros_handshake_header(sock, {'error' : 'node shutting down'})
317 return
318 except rospy.exceptions.TransportInitError as e:
319 logwarn(str(e))
320 if sock is not None:
321 sock.close()
322 except Exception as e:
323
324 if not rospy.core.is_shutdown_requested():
325 logwarn("Inbound TCP/IP connection failed: %s", e)
326 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc(e))
327 if sock is not None:
328 sock.close()
329
331 """
332 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
333 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
334 well as which fields to send when creating a new connection (get_header_fields()).
335 """
336
338 """
339 ctor
340 @param resolved_name: resolved service or topic name
341 @type resolved_name: str
342 @param recv_data_class: message class for deserializing inbound messages
343 @type recv_data_class: Class
344 @param queue_size: maximum number of inbound messages to maintain
345 @type queue_size: int
346 @param buff_size: receive buffer size (in bytes) for reading from the connection.
347 @type buff_size: int
348 """
349 if recv_data_class and not issubclass(recv_data_class, Message):
350 raise TransportInitError("Unable to initialize transport: data class is not a message data class")
351 self.resolved_name = resolved_name
352 self.recv_data_class = recv_data_class
353 self.queue_size = queue_size
354 self.buff_size = buff_size
355 self.direction = BIDIRECTIONAL
356
357
359 """
360 @param b StringIO: read buffer
361 @param msg_queue [Message]: queue of deserialized messages
362 @type msg_queue: [Message]
363 @param sock socket: protocol can optionally read more data from
364 the socket, but in most cases the required data will already be
365 in b
366 """
367
368 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
369
371 """
372 Header fields that should be sent over the connection. The header fields
373 are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
374 @return: {str : str}: header fields to send over connection
375 @rtype: dict
376 """
377 return {}
378
379
380
381
382
383
384
385
386
387
388
390 """
391 Generic implementation of TCPROS exchange routines for both topics and services
392 """
393 transport_type = 'TCPROS'
394
395 - def __init__(self, protocol, name, header=None):
396 """
397 ctor
398 @param name str: identifier
399 @param protocol TCPROSTransportProtocol protocol implementation
400 @param header dict: (optional) handshake header if transport handshake header was
401 already read off of transport.
402 @raise TransportInitError if transport cannot be initialized according to arguments
403 """
404 super(TCPROSTransport, self).__init__(protocol.direction, name=name)
405 if not name:
406 raise TransportInitError("Unable to initialize transport: name is not set")
407
408 self.protocol = protocol
409
410 self.socket = None
411 self.endpoint_id = 'unknown'
412 self.dest_address = None
413 self.read_buff = cStringIO.StringIO()
414 self.write_buff = cStringIO.StringIO()
415
416 self.header = header
417
418
419 self.is_latched = False
420 self.latch = None
421
422
423
424
425 self.md5sum = None
426 self.type = None
427
429 """
430 Set the socket for this transport
431 @param sock: socket
432 @type sock: socket.socket
433 @param endpoint_id: identifier for connection endpoint
434 @type endpoint_id: str
435 @raise TransportInitError: if socket has already been set
436 """
437 if self.socket is not None:
438 raise TransportInitError("socket already initialized")
439 self.socket = sock
440 self.endpoint_id = endpoint_id
441
442 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
443 """
444 Establish TCP connection to the specified
445 address/port. connect() always calls L{write_header()} and
446 L{read_header()} after the connection is made
447 @param dest_addr: destination IP address
448 @type dest_addr: str
449 @param dest_port: destination port
450 @type dest_port: int
451 @param endpoint_id: string identifier for connection (for statistics)
452 @type endpoint_id: str
453 @param timeout: (optional keyword) timeout in seconds
454 @type timeout: float
455 @raise TransportInitError: if unable to create connection
456 """
457 try:
458 self.endpoint_id = endpoint_id
459 self.dest_address = (dest_addr, dest_port)
460
461 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
462 if _is_use_tcp_keepalive():
463
464 if hasattr(socket, 'TCP_KEEPCNT') and \
465 hasattr(socket, 'TCP_KEEPIDLE') and \
466 hasattr(socket, 'TCP_KEEPINTVL'):
467
468 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
469
470 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9)
471
472 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
473
474 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)
475 if timeout is not None:
476 s.settimeout(timeout)
477 self.socket = s
478 self.socket.connect((dest_addr, dest_port))
479 self.write_header()
480 self.read_header()
481
482 except TransportInitError, tie:
483 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
484 raise
485 except Exception, e:
486
487 self.done = True
488 if self.socket:
489 self.socket.close()
490 self.socket = None
491
492
493 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
494 raise TransportInitError(str(e))
495
497 """
498 Validate header and initialize fields accordingly
499 @param header: header fields from publisher
500 @type header: dict
501 @raise TransportInitError: if header fails to validate
502 """
503 self.header = header
504 if 'error' in header:
505 raise TransportInitError("remote error reported: %s"%header['error'])
506 for required in ['md5sum', 'type']:
507 if not required in header:
508 raise TransportInitError("header missing required field [%s]"%required)
509 self.md5sum = header['md5sum']
510 self.type = header['type']
511 if header.get('latching', '0') == '1':
512 self.is_latched = True
513
515 """Writes the TCPROS header to the active connection."""
516 sock = self.socket
517
518 fileno = sock.fileno()
519 ready = None
520 while not ready:
521 _, ready, _ = select.select([], [fileno], [])
522 logger.debug("[%s]: writing header", self.name)
523 sock.setblocking(1)
524 self.stat_bytes += write_ros_handshake_header(sock, self.protocol.get_header_fields())
525
527 """
528 Read TCPROS header from active socket
529 @raise TransportInitError if header fails to validate
530 """
531 self.socket.setblocking(1)
532 self._validate_header(read_ros_handshake_header(self.socket, self.read_buff, self.protocol.buff_size))
533
535 """
536 Convenience routine for services to send a message across a
537 particular connection. NOTE: write_data is much more efficient
538 if same message is being sent to multiple connections. Not
539 threadsafe.
540 @param msg: message to send
541 @type msg: Msg
542 @param seq: sequence number for message
543 @type seq: int
544 @raise TransportException: if error occurred sending message
545 """
546
547 serialize_message(self.write_buff, seq, msg)
548 self.write_data(self.write_buff.getvalue())
549 self.write_buff.truncate(0)
550
552 """
553 Write raw data to transport
554 @raise TransportInitialiationError: could not be initialized
555 @raise TransportTerminated: no longer open for publishing
556 """
557 if not self.socket:
558 raise TransportInitError("TCPROS transport was not successfully initialized")
559 if self.done:
560 raise TransportTerminated("connection closed")
561 try:
562
563 self.socket.sendall(data)
564 self.stat_bytes += len(data)
565 self.stat_num_msg += 1
566 except IOError, (errno, msg):
567
568 if errno == 32:
569 logdebug("ERROR: Broken Pipe")
570 self.close()
571 raise TransportTerminated(str(errno)+msg)
572 raise
573 except socket.error, (errno, msg):
574
575 if errno == 32:
576 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id)
577 self.close()
578 raise TransportTerminated(msg)
579 elif errno == 104:
580 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)
581 self.close()
582 raise TransportTerminated(msg)
583 else:
584 rospydebug("unknown socket error writing data: %s",traceback.format_exc())
585 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)
586 self.close()
587 raise TransportTerminated(str(errno)+' '+msg)
588 except:
589
590 raise
591 return True
592
594 """
595 block until messages are read off of socket
596 @return: list of newly received messages
597 @rtype: [Msg]
598 @raise TransportException: if unable to receive message due to error
599 """
600 sock = self.socket
601 if sock is None:
602 raise TransportException("connection not initialized")
603 b = self.read_buff
604 msg_queue = []
605 p = self.protocol
606 try:
607 sock.setblocking(1)
608 while not msg_queue and not self.done and not is_shutdown():
609 if b.tell() >= 4:
610 p.read_messages(b, msg_queue, sock)
611 if not msg_queue:
612 recv_buff(sock, b, p.buff_size)
613 self.stat_num_msg += len(msg_queue)
614
615 for m in msg_queue:
616 m._connection_header = self.header
617
618
619 if self.is_latched and msg_queue:
620 self.latch = msg_queue[-1]
621
622 return msg_queue
623
624 except DeserializationError, e:
625 rospyerr(traceback.format_exc())
626 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e)))
627 except TransportTerminated, e:
628 raise
629 except ServiceException, e:
630 raise
631 except Exception, e:
632 rospyerr(traceback.format_exc())
633 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
634 return retval
635
637
638
639
640 if self.dest_address is None:
641 raise ROSInitException("internal error with reconnection state: address not stored")
642
643 interval = 0.5
644 while self.socket is None and not self.done and not rospy.is_shutdown():
645 try:
646
647
648
649
650 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.)
651 except TransportInitError:
652 self.socket = None
653
654 if self.socket is None:
655
656 interval = interval * 2
657
658 time.sleep(interval)
659
661 """
662 Receive messages until shutdown
663 @param msgs_callback: callback to invoke for new messages received
664 @type msgs_callback: fn([msg])
665 """
666
667 logger.debug("receive_loop for [%s]", self.name)
668 try:
669 while not self.done and not is_shutdown():
670 try:
671 if self.socket is not None:
672 msgs = self.receive_once()
673 if not self.done and not is_shutdown():
674 msgs_callback(msgs)
675 else:
676 self._reconnect()
677
678 except TransportException, e:
679
680 try:
681 if self.socket is not None:
682 self.socket.close()
683 except:
684 pass
685 self.socket = None
686
687 except DeserializationError, e:
688
689
690 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e))
691 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc())
692
693 except:
694
695 try:
696
697
698
699 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())
700 except: pass
701
702 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)
703 finally:
704 if not self.done:
705 self.close()
706
708 """close i/o and release resources"""
709 self.done = True
710 try:
711 if self.socket is not None:
712 self.socket.close()
713 finally:
714 self.socket = self.read_buff = self.write_buff = self.protocol = None
715 super(TCPROSTransport, self).close()
716