Package rospy :: Package impl :: Module tcpros_base

Source Code for Module rospy.impl.tcpros_base

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37   
 38  try: 
 39      from cStringIO import StringIO #Python 2.x 
 40      python3 = 0 
 41  except ImportError: 
 42      from io import StringIO, BytesIO #Python 3.x 
 43      python3 = 1 
 44  import socket 
 45  import logging 
 46   
 47  import threading 
 48  import time 
 49  import traceback 
 50  import select 
 51   
 52  import rosgraph 
 53  import rosgraph.network 
 54  from genpy import DeserializationError, Message 
 55  from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header 
 56   
 57  # TODO: remove * import from core 
 58  from rospy.core import * 
 59  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 60  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 61  from rospy.msg import deserialize_messages, serialize_message 
 62  from rospy.service import ServiceException 
 63   
 64  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 65   
 66  logger = logging.getLogger('rospy.tcpros') 
 67   
 68  # Receive buffer size for topics/services (in bytes) 
 69  DEFAULT_BUFF_SIZE = 65536 
 70   
 71  ## name of our customized TCP protocol for accepting flows over server socket 
 72  TCPROS = "TCPROS"  
 73   
 74  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 75  _use_tcp_keepalive = None 
 76  _use_tcp_keepalive_lock = threading.Lock() 
77 -def _is_use_tcp_keepalive():
78 global _use_tcp_keepalive 79 if _use_tcp_keepalive is not None: 80 return _use_tcp_keepalive 81 with _use_tcp_keepalive_lock: 82 if _use_tcp_keepalive is not None: 83 return _use_tcp_keepalive 84 # in order to prevent circular dependencies, this does not use the 85 # builtin libraries for interacting with the parameter server 86 m = rospy.core.xmlrpcapi(rosgraph.get_master_uri()) 87 code, msg, val = m.getParam(rospy.names.get_caller_id(), _PARAM_TCP_KEEPALIVE) 88 _use_tcp_keepalive = val if code == 1 else True 89 return _use_tcp_keepalive
90
91 -def recv_buff(sock, b, buff_size):
92 """ 93 Read data from socket into buffer. 94 @param sock: socket to read from 95 @type sock: socket.socket 96 @param b: buffer to receive into 97 @type b: StringIO 98 @param buff_size: recv read size 99 @type buff_size: int 100 @return: number of bytes read 101 @rtype: int 102 """ 103 d = sock.recv(buff_size) 104 if d: 105 b.write(d) 106 return len(d) 107 else: #bomb out 108 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
109
110 -class TCPServer(object):
111 """ 112 Simple server that accepts inbound TCP/IP connections and hands 113 them off to a handler function. TCPServer obeys the 114 ROS_IP/ROS_HOSTNAME environment variables 115 """ 116
117 - def __init__(self, inbound_handler, port=0):
118 """ 119 Setup a server socket listening on the specified port. If the 120 port is omitted, will choose any open port. 121 @param inbound_handler: handler to invoke with 122 new connection 123 @type inbound_handler: fn(sock, addr) 124 @param port: port to bind to, omit/0 to bind to any 125 @type port: int 126 """ 127 self.port = port #will get overwritten if port=0 128 self.addr = None #set at socket bind 129 self.is_shutdown = False 130 self.inbound_handler = inbound_handler 131 try: 132 self.server_sock = self._create_server_sock() 133 except: 134 self.server_sock = None 135 raise
136
137 - def start(self):
138 """Runs the run() loop in a separate thread""" 139 t = threading.Thread(target=self.run, args=()) 140 t.setDaemon(True) 141 t.start()
142
143 - def run(self):
144 """ 145 Main TCP receive loop. Should be run in a separate thread -- use start() 146 to do this automatically. 147 """ 148 self.is_shutdown = False 149 if not self.server_sock: 150 raise ROSInternalException("%s did not connect"%self.__class__.__name__) 151 while not self.is_shutdown: 152 try: 153 (client_sock, client_addr) = self.server_sock.accept() 154 except socket.timeout: 155 continue 156 except IOError as e: 157 (errno, msg) = e.args 158 if errno == 4: #interrupted system call 159 continue 160 raise 161 if self.is_shutdown: 162 break 163 try: 164 #leave threading decisions up to inbound_handler 165 self.inbound_handler(client_sock, client_addr) 166 except socket.error as e: 167 if not self.is_shutdown: 168 traceback.print_exc() 169 logwarn("Failed to handle inbound connection due to socket error: %s"%e) 170 logdebug("TCPServer[%s] shutting down", self.port)
171 172
173 - def get_full_addr(self):
174 """ 175 @return: (ip address, port) of server socket binding 176 @rtype: (str, int) 177 """ 178 # return rosgraph.network.get_host_name() instead of address so that it 179 # obeys ROS_IP/ROS_HOSTNAME behavior 180 return (rosgraph.network.get_host_name(), self.port)
181
182 - def _create_server_sock(self):
183 """ 184 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 185 binding to loopback interface. 186 """ 187 if rosgraph.network.use_ipv6(): 188 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 189 else: 190 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 191 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 192 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port)) 193 server_sock.bind((rosgraph.network.get_bind_address(), self.port)) 194 (self.addr, self.port) = server_sock.getsockname()[0:2] 195 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port)) 196 server_sock.listen(5) 197 return server_sock
198
199 - def shutdown(self):
200 """shutdown I/O resources uses by this server""" 201 if not self.is_shutdown: 202 self.is_shutdown = True 203 self.server_sock.close()
204 205 # base maintains a tcpros_server singleton that is shared between 206 # services and topics for inbound connections. This global is set in 207 # the tcprosserver constructor. Constructor is called by init_tcpros() 208 _tcpros_server = None 209
210 -def init_tcpros_server(port=0):
211 """ 212 starts the TCPROS server socket for inbound connections 213 @param port: listen on the provided port. If the port number is 0, the port will 214 be chosen randomly 215 @type port: int 216 """ 217 global _tcpros_server 218 if _tcpros_server is None: 219 _tcpros_server = TCPROSServer(port=port) 220 rospy.core.add_shutdown_hook(_tcpros_server.shutdown) 221 return _tcpros_server
222
223 -def start_tcpros_server():
224 """ 225 start the TCPROS server if it has not started already 226 """ 227 if _tcpros_server is None: 228 init_tcpros_server() 229 return _tcpros_server.start_server()
230 231 # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers 232
233 -def get_tcpros_server_address():
234 """ 235 get the address of the tcpros server. 236 @raise Exception: if tcpros server has not been started or created 237 """ 238 return _tcpros_server.get_address()
239
240 -def _error_connection_handler(sock, client_addr, header):
241 """ 242 utility handler that does nothing more than provide a rejection header 243 @param sock: socket connection 244 @type sock: socket.socket 245 @param client_addr: client address 246 @type client_addr: str 247 @param header: request header 248 @type header: dict 249 """ 250 return {'error': "unhandled connection"}
251
252 -class TCPROSServer(object):
253 """ 254 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 255 connections as well as ROS service connections over TCP. TCP server 256 socket is run once start_server() is called -- this is implicitly 257 called during init_publisher(). 258 """ 259
260 - def __init__(self, port=0):
261 """ 262 Constructur 263 @param port: port number to bind to (default 0/any) 264 @type port: int 265 """ 266 self.port = port 267 self.tcp_ros_server = None #: server for receiving tcp conn 268 self.lock = threading.Lock() 269 # should be set to fn(sock, client_addr, header) for topic connections 270 self.topic_connection_handler = _error_connection_handler 271 # should be set to fn(sock, client_addr, header) for service connections 272 self.service_connection_handler = _error_connection_handler
273
274 - def start_server(self):
275 """ 276 Starts the TCP socket server if one is not already running 277 """ 278 if self.tcp_ros_server: 279 return 280 with self.lock: 281 try: 282 if not self.tcp_ros_server: 283 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port) 284 self.tcp_ros_server.start() 285 except Exception as e: 286 self.tcp_ros_server = None 287 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 288 return 0, "unable to establish TCPROS server: %s"%e, []
289
290 - def get_address(self):
291 """ 292 @return: address and port of TCP server socket for accepting 293 inbound connections 294 @rtype: str, int 295 """ 296 if self.tcp_ros_server is not None: 297 return self.tcp_ros_server.get_full_addr() 298 return None, None
299
300 - def shutdown(self, reason=''):
301 """stops the TCP/IP server responsible for receiving inbound connections""" 302 if self.tcp_ros_server: 303 self.tcp_ros_server.shutdown()
304
305 - def _tcp_server_callback(self, sock, client_addr):
306 """ 307 TCPServer callback: detects incoming topic or service connection and passes connection accordingly 308 309 @param sock: socket connection 310 @type sock: socket.socket 311 @param client_addr: client address 312 @type client_addr: (str, int) 313 @raise TransportInitError: If transport cannot be succesfully initialized 314 """ 315 #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, 316 #and then use that to do the writing 317 try: 318 buff_size = 4096 # size of read buffer 319 if python3 == 0: 320 #initialize read_ros_handshake_header with BytesIO for Python 3 (instead of bytesarray()) 321 header = read_ros_handshake_header(sock, StringIO(), buff_size) 322 else: 323 header = read_ros_handshake_header(sock, BytesIO(), buff_size) 324 325 if 'topic' in header: 326 err_msg = self.topic_connection_handler(sock, client_addr, header) 327 elif 'service' in header: 328 err_msg = self.service_connection_handler(sock, client_addr, header) 329 else: 330 err_msg = 'no topic or service name detected' 331 if err_msg: 332 # shutdown race condition: nodes that come up and down 333 # quickly can receive connections during teardown. 334 335 # We use is_shutdown_requested() because we can get 336 # into bad connection states during client shutdown 337 # hooks. 338 if not rospy.core.is_shutdown_requested(): 339 write_ros_handshake_header(sock, {'error' : err_msg}) 340 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 341 else: 342 write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 343 return 344 except rospy.exceptions.TransportInitError as e: 345 logwarn(str(e)) 346 if sock is not None: 347 sock.close() 348 except Exception as e: 349 # collect stack trace separately in local log file 350 if not rospy.core.is_shutdown_requested(): 351 logwarn("Inbound TCP/IP connection failed: %s", e) 352 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc(e)) 353 if sock is not None: 354 sock.close()
355
356 -class TCPROSTransportProtocol(object):
357 """ 358 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 359 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 360 well as which fields to send when creating a new connection (get_header_fields()). 361 """ 362
363 - def __init__(self, resolved_name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
364 """ 365 ctor 366 @param resolved_name: resolved service or topic name 367 @type resolved_name: str 368 @param recv_data_class: message class for deserializing inbound messages 369 @type recv_data_class: Class 370 @param queue_size: maximum number of inbound messages to maintain 371 @type queue_size: int 372 @param buff_size: receive buffer size (in bytes) for reading from the connection. 373 @type buff_size: int 374 """ 375 if recv_data_class and not issubclass(recv_data_class, Message): 376 raise TransportInitError("Unable to initialize transport: data class is not a message data class") 377 self.resolved_name = resolved_name 378 self.recv_data_class = recv_data_class 379 self.queue_size = queue_size 380 self.buff_size = buff_size 381 self.direction = BIDIRECTIONAL
382 383
384 - def read_messages(self, b, msg_queue, sock):
385 """ 386 @param b StringIO: read buffer 387 @param msg_queue [Message]: queue of deserialized messages 388 @type msg_queue: [Message] 389 @param sock socket: protocol can optionally read more data from 390 the socket, but in most cases the required data will already be 391 in b 392 """ 393 # default implementation 394 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
395
396 - def get_header_fields(self):
397 """ 398 Header fields that should be sent over the connection. The header fields 399 are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 400 @return: {str : str}: header fields to send over connection 401 @rtype: dict 402 """ 403 return {}
404 405 # TODO: this still isn't as clean and seamless as I want it to 406 # be. This code came from the merger of publisher, subscriber, and 407 # service code into a common TCPROS transport class. The transport is 408 # customized by a 'protocol' class, which is how the different 409 # pub/sub/service behaviors are achieved. More behavior needs to be 410 # transferred from the transport class into the protocol class, 411 # e.g. deserialization as the state each maintains is somewhat 412 # duplicative. I would also come up with a better name than 413 # protocol. 414
415 -class TCPROSTransport(Transport):
416 """ 417 Generic implementation of TCPROS exchange routines for both topics and services 418 """ 419 transport_type = 'TCPROS' 420
421 - def __init__(self, protocol, name, header=None):
422 """ 423 ctor 424 @param name str: identifier 425 @param protocol TCPROSTransportProtocol protocol implementation 426 @param header dict: (optional) handshake header if transport handshake header was 427 already read off of transport. 428 @raise TransportInitError if transport cannot be initialized according to arguments 429 """ 430 super(TCPROSTransport, self).__init__(protocol.direction, name=name) 431 if not name: 432 raise TransportInitError("Unable to initialize transport: name is not set") 433 434 self.protocol = protocol 435 436 self.socket = None 437 self.endpoint_id = 'unknown' 438 self.dest_address = None # for reconnection 439 440 if python3 == 0: # Python 2.x 441 self.read_buff = StringIO() 442 self.write_buff = StringIO() 443 else: # Python 3.x 444 self.read_buff = BytesIO() 445 self.write_buff = BytesIO() 446 447 #self.write_buff = StringIO() 448 self.header = header 449 450 # #1852 have to hold onto latched messages on subscriber side 451 self.is_latched = False 452 self.latch = None 453 454 # save the fileno separately so we can garbage collect the 455 # socket but still unregister will poll objects 456 self._fileno = None 457 458 # these fields are actually set by the remote 459 # publisher/service. they are set for tools that connect 460 # without knowing the actual field name 461 self.md5sum = None 462 self.type = None
463
464 - def fileno(self):
465 """ 466 Get descriptor for select 467 """ 468 return self._fileno
469
470 - def set_endpoint_id(self, endpoint_id):
471 """ 472 Set the endpoint_id of this transport. 473 Allows the endpoint_id to be set before the socket is initialized. 474 """ 475 self.endpoint_id = endpoint_id
476
477 - def set_socket(self, sock, endpoint_id):
478 """ 479 Set the socket for this transport 480 @param sock: socket 481 @type sock: socket.socket 482 @param endpoint_id: identifier for connection endpoint 483 @type endpoint_id: str 484 @raise TransportInitError: if socket has already been set 485 """ 486 if self.socket is not None: 487 raise TransportInitError("socket already initialized") 488 self.socket = sock 489 self.endpoint_id = endpoint_id 490 self._fileno = sock.fileno()
491
492 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
493 """ 494 Establish TCP connection to the specified 495 address/port. connect() always calls L{write_header()} and 496 L{read_header()} after the connection is made 497 @param dest_addr: destination IP address 498 @type dest_addr: str 499 @param dest_port: destination port 500 @type dest_port: int 501 @param endpoint_id: string identifier for connection (for statistics) 502 @type endpoint_id: str 503 @param timeout: (optional keyword) timeout in seconds 504 @type timeout: float 505 @raise TransportInitError: if unable to create connection 506 """ 507 try: 508 self.endpoint_id = endpoint_id 509 self.dest_address = (dest_addr, dest_port) 510 if rosgraph.network.use_ipv6(): 511 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 512 else: 513 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 514 if _is_use_tcp_keepalive(): 515 # OSX (among others) does not define these options 516 if hasattr(socket, 'TCP_KEEPCNT') and \ 517 hasattr(socket, 'TCP_KEEPIDLE') and \ 518 hasattr(socket, 'TCP_KEEPINTVL'): 519 # turn on KEEPALIVE 520 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 521 # - # keepalive failures before actual connection failure 522 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 523 # - timeout before starting KEEPALIVE process 524 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 525 # - interval to send KEEPALIVE after IDLE timeout 526 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 527 if timeout is not None: 528 s.settimeout(timeout) 529 self.socket = s 530 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port)) 531 self.socket.connect((dest_addr, dest_port)) 532 self.write_header() 533 self.read_header() 534 except TransportInitError as tie: 535 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 536 raise 537 except Exception as e: 538 #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e))) 539 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 540 541 # FATAL: no reconnection as error is unknown 542 self.close() 543 raise TransportInitError(str(e)) #re-raise i/o error
544
545 - def _validate_header(self, header):
546 """ 547 Validate header and initialize fields accordingly 548 @param header: header fields from publisher 549 @type header: dict 550 @raise TransportInitError: if header fails to validate 551 """ 552 self.header = header 553 if 'error' in header: 554 raise TransportInitError("remote error reported: %s"%header['error']) 555 for required in ['md5sum', 'type']: 556 if not required in header: 557 raise TransportInitError("header missing required field [%s]"%required) 558 self.md5sum = header['md5sum'] 559 self.type = header['type'] 560 if header.get('latching', '0') == '1': 561 self.is_latched = True
562
563 - def write_header(self):
564 """Writes the TCPROS header to the active connection.""" 565 # socket may still be getting spun up, so wait for it to be writable 566 sock = self.socket 567 protocol = self.protocol 568 # race condition on close, better fix is to pass these in, 569 # functional style, but right now trying to cause minimal 570 # perturbance to codebase. 571 if sock is None or protocol is None: 572 return 573 fileno = sock.fileno() 574 ready = None 575 while not ready: 576 _, ready, _ = select.select([], [fileno], []) 577 logger.debug("[%s]: writing header", self.name) 578 sock.setblocking(1) 579 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
580
581 - def read_header(self):
582 """ 583 Read TCPROS header from active socket 584 @raise TransportInitError if header fails to validate 585 """ 586 sock = self.socket 587 if sock is None: 588 return 589 sock.setblocking(1) 590 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
591
592 - def send_message(self, msg, seq):
593 """ 594 Convenience routine for services to send a message across a 595 particular connection. NOTE: write_data is much more efficient 596 if same message is being sent to multiple connections. Not 597 threadsafe. 598 @param msg: message to send 599 @type msg: Msg 600 @param seq: sequence number for message 601 @type seq: int 602 @raise TransportException: if error occurred sending message 603 """ 604 # this will call write_data(), so no need to keep track of stats 605 serialize_message(self.write_buff, seq, msg) 606 self.write_data(self.write_buff.getvalue()) 607 self.write_buff.truncate(0)
608
609 - def write_data(self, data):
610 """ 611 Write raw data to transport 612 @raise TransportInitialiationError: could not be initialized 613 @raise TransportTerminated: no longer open for publishing 614 """ 615 if not self.socket: 616 raise TransportInitError("TCPROS transport was not successfully initialized") 617 if self.done: 618 raise TransportTerminated("connection closed") 619 try: 620 #TODO: get rid of sendalls and replace with async-style publishing 621 self.socket.sendall(data) 622 self.stat_bytes += len(data) 623 self.stat_num_msg += 1 624 except IOError as ioe: 625 #for now, just document common errno's in code 626 (errno, msg) = ioe.args 627 if errno == 32: #broken pipe 628 logdebug("ERROR: Broken Pipe") 629 self.close() 630 raise TransportTerminated(str(errno)+msg) 631 raise #re-raise 632 except socket.error as se: 633 #for now, just document common errno's in code 634 (errno, msg) = se.args 635 if errno == 32: #broken pipe 636 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 637 self.close() 638 raise TransportTerminated(msg) 639 elif errno == 104: #connection reset by peer 640 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id) 641 self.close() 642 raise TransportTerminated(msg) 643 else: 644 rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 645 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg) 646 self.close() 647 raise TransportTerminated(str(errno)+' '+msg) 648 return True
649
650 - def receive_once(self):
651 """ 652 block until messages are read off of socket 653 @return: list of newly received messages 654 @rtype: [Msg] 655 @raise TransportException: if unable to receive message due to error 656 """ 657 sock = self.socket 658 if sock is None: 659 raise TransportException("connection not initialized") 660 b = self.read_buff 661 msg_queue = [] 662 p = self.protocol 663 try: 664 sock.setblocking(1) 665 while not msg_queue and not self.done and not is_shutdown(): 666 if b.tell() >= 4: 667 p.read_messages(b, msg_queue, sock) 668 if not msg_queue: 669 recv_buff(sock, b, p.buff_size) 670 self.stat_num_msg += len(msg_queue) #STATS 671 # set the _connection_header field 672 for m in msg_queue: 673 m._connection_header = self.header 674 675 # #1852: keep track of last latched message 676 if self.is_latched and msg_queue: 677 self.latch = msg_queue[-1] 678 679 return msg_queue 680 681 except DeserializationError as e: 682 rospyerr(traceback.format_exc()) 683 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 684 except TransportTerminated as e: 685 raise #reraise 686 except ServiceException as e: 687 raise 688 except Exception as e: 689 rospyerr(traceback.format_exc()) 690 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 691 return retval
692
693 - def _reconnect(self):
694 # This reconnection logic is very hacky right now. I need to 695 # rewrite the I/O core so that this is handled more centrally. 696 697 if self.dest_address is None: 698 raise ROSInitException("internal error with reconnection state: address not stored") 699 700 interval = 0.5 # seconds 701 while self.socket is None and not self.done and not rospy.is_shutdown(): 702 try: 703 # set a timeout so that we can continue polling for 704 # exit. 30. is a bit high, but I'm concerned about 705 # embedded platforms. To do this properly, we'd have 706 # to move to non-blocking routines. 707 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 708 except TransportInitError: 709 self.socket = None 710 711 if self.socket is None: 712 # exponential backoff 713 interval = interval * 2 714 715 time.sleep(interval)
716
717 - def receive_loop(self, msgs_callback):
718 """ 719 Receive messages until shutdown 720 @param msgs_callback: callback to invoke for new messages received 721 @type msgs_callback: fn([msg]) 722 """ 723 # - use assert here as this would be an internal error, aka bug 724 logger.debug("receive_loop for [%s]", self.name) 725 try: 726 while not self.done and not is_shutdown(): 727 try: 728 if self.socket is not None: 729 msgs = self.receive_once() 730 if not self.done and not is_shutdown(): 731 msgs_callback(msgs) 732 else: 733 self._reconnect() 734 735 except TransportException as e: 736 # set socket to None so we reconnect 737 try: 738 if self.socket is not None: 739 try: 740 self.socket.shutdown() 741 except: 742 pass 743 finally: 744 self.socket.close() 745 except: 746 pass 747 self.socket = None 748 749 except DeserializationError as e: 750 #TODO: how should we handle reconnect in this case? 751 752 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e)) 753 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc()) 754 except: 755 # in many cases this will be a normal hangup, but log internally 756 try: 757 #1467 sometimes we get exceptions due to 758 #interpreter shutdown, so blanket ignore those if 759 #the reporting fails 760 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) 761 except: pass 762 763 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) 764 finally: 765 if not self.done: 766 self.close()
767
768 - def close(self):
769 """close i/o and release resources""" 770 if not self.done: 771 try: 772 if self.socket is not None: 773 try: 774 self.socket.shutdown(socket.SHUT_RDWR) 775 except: 776 pass 777 finally: 778 self.socket.close() 779 finally: 780 self.socket = self.read_buff = self.write_buff = self.protocol = None 781 super(TCPROSTransport, self).close()
782