Package rospy :: Package impl :: Module tcpros_base

Source Code for Module rospy.impl.tcpros_base

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37   
 38  try: 
 39      from cStringIO import StringIO #Python 2.x 
 40      python3 = 0 
 41  except ImportError: 
 42      from io import StringIO, BytesIO #Python 3.x 
 43      python3 = 1 
 44  import socket 
 45  import logging 
 46   
 47  import threading 
 48  import time 
 49  import traceback 
 50  import select 
 51   
 52  import rosgraph 
 53  import rosgraph.network 
 54  from genpy import DeserializationError, Message 
 55  from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header 
 56   
 57  # TODO: remove * import from core 
 58  from rospy.core import * 
 59  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 60  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 61  from rospy.msg import deserialize_messages, serialize_message 
 62  from rospy.service import ServiceException 
 63   
 64  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 65   
 66  logger = logging.getLogger('rospy.tcpros') 
 67   
 68  # Receive buffer size for topics/services (in bytes) 
 69  DEFAULT_BUFF_SIZE = 65536 
 70   
 71  ## name of our customized TCP protocol for accepting flows over server socket 
 72  TCPROS = "TCPROS"  
 73   
 74  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 75  _use_tcp_keepalive = None 
 76  _use_tcp_keepalive_lock = threading.Lock() 
77 -def _is_use_tcp_keepalive():
78 global _use_tcp_keepalive 79 if _use_tcp_keepalive is not None: 80 return _use_tcp_keepalive 81 with _use_tcp_keepalive_lock: 82 if _use_tcp_keepalive is not None: 83 return _use_tcp_keepalive 84 # in order to prevent circular dependencies, this does not use the 85 # builtin libraries for interacting with the parameter server 86 m = rospy.core.xmlrpcapi(rosgraph.get_master_uri()) 87 code, msg, val = m.getParam(rospy.names.get_caller_id(), _PARAM_TCP_KEEPALIVE) 88 _use_tcp_keepalive = val if code == 1 else True 89 return _use_tcp_keepalive
90
91 -def recv_buff(sock, b, buff_size):
92 """ 93 Read data from socket into buffer. 94 @param sock: socket to read from 95 @type sock: socket.socket 96 @param b: buffer to receive into 97 @type b: StringIO 98 @param buff_size: recv read size 99 @type buff_size: int 100 @return: number of bytes read 101 @rtype: int 102 """ 103 d = sock.recv(buff_size) 104 if d: 105 b.write(d) 106 return len(d) 107 else: #bomb out 108 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
109
110 -class TCPServer(object):
111 """ 112 Simple server that accepts inbound TCP/IP connections and hands 113 them off to a handler function. TCPServer obeys the 114 ROS_IP/ROS_HOSTNAME environment variables 115 """ 116
117 - def __init__(self, inbound_handler, port=0):
118 """ 119 Setup a server socket listening on the specified port. If the 120 port is omitted, will choose any open port. 121 @param inbound_handler: handler to invoke with 122 new connection 123 @type inbound_handler: fn(sock, addr) 124 @param port: port to bind to, omit/0 to bind to any 125 @type port: int 126 """ 127 self.port = port #will get overwritten if port=0 128 self.addr = None #set at socket bind 129 self.is_shutdown = False 130 self.inbound_handler = inbound_handler 131 try: 132 self.server_sock = self._create_server_sock() 133 except: 134 self.server_sock = None 135 raise
136
137 - def start(self):
138 """Runs the run() loop in a separate thread""" 139 t = threading.Thread(target=self.run, args=()) 140 t.setDaemon(True) 141 t.start()
142
143 - def run(self):
144 """ 145 Main TCP receive loop. Should be run in a separate thread -- use start() 146 to do this automatically. 147 """ 148 self.is_shutdown = False 149 if not self.server_sock: 150 raise ROSInternalException("%s did not connect"%self.__class__.__name__) 151 while not self.is_shutdown: 152 try: 153 (client_sock, client_addr) = self.server_sock.accept() 154 except socket.timeout: 155 continue 156 except IOError as e: 157 (errno, msg) = e.args 158 if errno == 4: #interrupted system call 159 continue 160 raise 161 if self.is_shutdown: 162 break 163 try: 164 #leave threading decisions up to inbound_handler 165 self.inbound_handler(client_sock, client_addr) 166 except socket.error as e: 167 if not self.is_shutdown: 168 traceback.print_exc() 169 logwarn("Failed to handle inbound connection due to socket error: %s"%e) 170 logdebug("TCPServer[%s] shutting down", self.port)
171 172
173 - def get_full_addr(self):
174 """ 175 @return: (ip address, port) of server socket binding 176 @rtype: (str, int) 177 """ 178 # return rosgraph.network.get_host_name() instead of address so that it 179 # obeys ROS_IP/ROS_HOSTNAME behavior 180 return (rosgraph.network.get_host_name(), self.port)
181
182 - def _create_server_sock(self):
183 """ 184 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 185 binding to loopback interface. 186 """ 187 if rosgraph.network.use_ipv6(): 188 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 189 else: 190 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 191 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 192 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port)) 193 server_sock.bind((rosgraph.network.get_bind_address(), self.port)) 194 (self.addr, self.port) = server_sock.getsockname()[0:2] 195 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port)) 196 server_sock.listen(5) 197 return server_sock
198
199 - def shutdown(self):
200 """shutdown I/O resources uses by this server""" 201 if not self.is_shutdown: 202 self.is_shutdown = True 203 self.server_sock.close()
204 205 # base maintains a tcpros_server singleton that is shared between 206 # services and topics for inbound connections. This global is set in 207 # the tcprosserver constructor. Constructor is called by init_tcpros() 208 _tcpros_server = None 209
210 -def init_tcpros_server(port=0):
211 """ 212 starts the TCPROS server socket for inbound connections 213 @param port: listen on the provided port. If the port number is 0, the port will 214 be chosen randomly 215 @type port: int 216 """ 217 global _tcpros_server 218 if _tcpros_server is None: 219 _tcpros_server = TCPROSServer(port=port) 220 rospy.core.add_shutdown_hook(_tcpros_server.shutdown) 221 return _tcpros_server
222
223 -def start_tcpros_server():
224 """ 225 start the TCPROS server if it has not started already 226 """ 227 if _tcpros_server is None: 228 init_tcpros_server() 229 return _tcpros_server.start_server()
230 231 # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers 232
233 -def get_tcpros_server_address():
234 """ 235 get the address of the tcpros server. 236 @raise Exception: if tcpros server has not been started or created 237 """ 238 return _tcpros_server.get_address()
239
240 -def _error_connection_handler(sock, client_addr, header):
241 """ 242 utility handler that does nothing more than provide a rejection header 243 @param sock: socket connection 244 @type sock: socket.socket 245 @param client_addr: client address 246 @type client_addr: str 247 @param header: request header 248 @type header: dict 249 """ 250 return {'error': "unhandled connection"}
251
252 -class TCPROSServer(object):
253 """ 254 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 255 connections as well as ROS service connections over TCP. TCP server 256 socket is run once start_server() is called -- this is implicitly 257 called during init_publisher(). 258 """ 259
260 - def __init__(self, port=0):
261 """ 262 Constructur 263 @param port: port number to bind to (default 0/any) 264 @type port: int 265 """ 266 self.port = port 267 self.tcp_ros_server = None #: server for receiving tcp conn 268 self.lock = threading.Lock() 269 # should be set to fn(sock, client_addr, header) for topic connections 270 self.topic_connection_handler = _error_connection_handler 271 # should be set to fn(sock, client_addr, header) for service connections 272 self.service_connection_handler = _error_connection_handler
273
274 - def start_server(self):
275 """ 276 Starts the TCP socket server if one is not already running 277 """ 278 if self.tcp_ros_server: 279 return 280 with self.lock: 281 try: 282 if not self.tcp_ros_server: 283 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port) 284 self.tcp_ros_server.start() 285 except Exception as e: 286 self.tcp_ros_server = None 287 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 288 return 0, "unable to establish TCPROS server: %s"%e, []
289
290 - def get_address(self):
291 """ 292 @return: address and port of TCP server socket for accepting 293 inbound connections 294 @rtype: str, int 295 """ 296 if self.tcp_ros_server is not None: 297 return self.tcp_ros_server.get_full_addr() 298 return None, None
299
300 - def shutdown(self, reason=''):
301 """stops the TCP/IP server responsible for receiving inbound connections""" 302 if self.tcp_ros_server: 303 self.tcp_ros_server.shutdown()
304
305 - def _tcp_server_callback(self, sock, client_addr):
306 """ 307 TCPServer callback: detects incoming topic or service connection and passes connection accordingly 308 309 @param sock: socket connection 310 @type sock: socket.socket 311 @param client_addr: client address 312 @type client_addr: (str, int) 313 @raise TransportInitError: If transport cannot be succesfully initialized 314 """ 315 #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, 316 #and then use that to do the writing 317 try: 318 buff_size = 4096 # size of read buffer 319 if python3 == 0: 320 #initialize read_ros_handshake_header with BytesIO for Python 3 (instead of bytesarray()) 321 header = read_ros_handshake_header(sock, StringIO(), buff_size) 322 else: 323 header = read_ros_handshake_header(sock, BytesIO(), buff_size) 324 325 if 'topic' in header: 326 err_msg = self.topic_connection_handler(sock, client_addr, header) 327 elif 'service' in header: 328 err_msg = self.service_connection_handler(sock, client_addr, header) 329 else: 330 err_msg = 'no topic or service name detected' 331 if err_msg: 332 # shutdown race condition: nodes that come up and down 333 # quickly can receive connections during teardown. 334 335 # We use is_shutdown_requested() because we can get 336 # into bad connection states during client shutdown 337 # hooks. 338 if not rospy.core.is_shutdown_requested(): 339 write_ros_handshake_header(sock, {'error' : err_msg}) 340 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 341 else: 342 write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 343 return 344 except rospy.exceptions.TransportInitError as e: 345 logwarn(str(e)) 346 if sock is not None: 347 sock.close() 348 except Exception as e: 349 # collect stack trace separately in local log file 350 if not rospy.core.is_shutdown_requested(): 351 logwarn("Inbound TCP/IP connection failed: %s", e) 352 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc()) 353 if sock is not None: 354 sock.close()
355
356 -class TCPROSTransportProtocol(object):
357 """ 358 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 359 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 360 well as which fields to send when creating a new connection (get_header_fields()). 361 """ 362
363 - def __init__(self, resolved_name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
364 """ 365 ctor 366 @param resolved_name: resolved service or topic name 367 @type resolved_name: str 368 @param recv_data_class: message class for deserializing inbound messages 369 @type recv_data_class: Class 370 @param queue_size: maximum number of inbound messages to maintain 371 @type queue_size: int 372 @param buff_size: receive buffer size (in bytes) for reading from the connection. 373 @type buff_size: int 374 """ 375 if recv_data_class and not issubclass(recv_data_class, Message): 376 raise TransportInitError("Unable to initialize transport: data class is not a message data class") 377 self.resolved_name = resolved_name 378 self.recv_data_class = recv_data_class 379 self.queue_size = queue_size 380 self.buff_size = buff_size 381 self.direction = BIDIRECTIONAL
382 383
384 - def read_messages(self, b, msg_queue, sock):
385 """ 386 @param b StringIO: read buffer 387 @param msg_queue [Message]: queue of deserialized messages 388 @type msg_queue: [Message] 389 @param sock socket: protocol can optionally read more data from 390 the socket, but in most cases the required data will already be 391 in b 392 """ 393 # default implementation 394 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
395
396 - def get_header_fields(self):
397 """ 398 Header fields that should be sent over the connection. The header fields 399 are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 400 @return: {str : str}: header fields to send over connection 401 @rtype: dict 402 """ 403 return {}
404 405 # TODO: this still isn't as clean and seamless as I want it to 406 # be. This code came from the merger of publisher, subscriber, and 407 # service code into a common TCPROS transport class. The transport is 408 # customized by a 'protocol' class, which is how the different 409 # pub/sub/service behaviors are achieved. More behavior needs to be 410 # transferred from the transport class into the protocol class, 411 # e.g. deserialization as the state each maintains is somewhat 412 # duplicative. I would also come up with a better name than 413 # protocol. 414
415 -class TCPROSTransport(Transport):
416 """ 417 Generic implementation of TCPROS exchange routines for both topics and services 418 """ 419 transport_type = 'TCPROS' 420
421 - def __init__(self, protocol, name, header=None):
422 """ 423 ctor 424 @param name str: identifier 425 @param protocol TCPROSTransportProtocol protocol implementation 426 @param header dict: (optional) handshake header if transport handshake header was 427 already read off of transport. 428 @raise TransportInitError if transport cannot be initialized according to arguments 429 """ 430 super(TCPROSTransport, self).__init__(protocol.direction, name=name) 431 if not name: 432 raise TransportInitError("Unable to initialize transport: name is not set") 433 434 self.protocol = protocol 435 436 self.socket = None 437 self.endpoint_id = 'unknown' 438 self.callerid_pub = 'unknown' 439 self.dest_address = None # for reconnection 440 441 if python3 == 0: # Python 2.x 442 self.read_buff = StringIO() 443 self.write_buff = StringIO() 444 else: # Python 3.x 445 self.read_buff = BytesIO() 446 self.write_buff = BytesIO() 447 448 #self.write_buff = StringIO() 449 self.header = header 450 451 # #1852 have to hold onto latched messages on subscriber side 452 self.is_latched = False 453 self.latch = None 454 455 # save the fileno separately so we can garbage collect the 456 # socket but still unregister will poll objects 457 self._fileno = None 458 459 # these fields are actually set by the remote 460 # publisher/service. they are set for tools that connect 461 # without knowing the actual field name 462 self.md5sum = None 463 self.type = None 464 465 # Endpoint Details (IP, Port) 466 self.local_endpoint = (None, None) 467 self.remote_endpoint = (None, None)
468
469 - def get_transport_info(self):
470 """ 471 Get detailed connection information. 472 Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp' 473 e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6] 474 """ 475 return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno)
476
477 - def fileno(self):
478 """ 479 Get descriptor for select 480 """ 481 return self._fileno
482
483 - def set_endpoint_id(self, endpoint_id):
484 """ 485 Set the endpoint_id of this transport. 486 Allows the endpoint_id to be set before the socket is initialized. 487 """ 488 self.endpoint_id = endpoint_id
489
490 - def set_socket(self, sock, endpoint_id):
491 """ 492 Set the socket for this transport 493 @param sock: socket 494 @type sock: socket.socket 495 @param endpoint_id: identifier for connection endpoint 496 @type endpoint_id: str 497 @raise TransportInitError: if socket has already been set 498 """ 499 if self.socket is not None: 500 raise TransportInitError("socket already initialized") 501 self.socket = sock 502 self.endpoint_id = endpoint_id 503 self._fileno = sock.fileno() 504 self.local_endpoint = self.socket.getsockname()
505
506 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
507 """ 508 Establish TCP connection to the specified 509 address/port. connect() always calls L{write_header()} and 510 L{read_header()} after the connection is made 511 @param dest_addr: destination IP address 512 @type dest_addr: str 513 @param dest_port: destination port 514 @type dest_port: int 515 @param endpoint_id: string identifier for connection (for statistics) 516 @type endpoint_id: str 517 @param timeout: (optional keyword) timeout in seconds 518 @type timeout: float 519 @raise TransportInitError: if unable to create connection 520 """ 521 # first make sure that if ROS_HOSTNAME=localhost, we will not attempt 522 # to connect to anything other than localhost 523 if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"): 524 if not rosgraph.network.is_local_address(dest_addr): 525 msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr) 526 logwarn(msg) 527 self.close() 528 raise TransportInitError(msg) # bubble up 529 530 # now we can proceed with trying to connect. 531 try: 532 self.endpoint_id = endpoint_id 533 self.dest_address = (dest_addr, dest_port) 534 if rosgraph.network.use_ipv6(): 535 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 536 else: 537 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 538 if _is_use_tcp_keepalive(): 539 # OSX (among others) does not define these options 540 if hasattr(socket, 'TCP_KEEPCNT') and \ 541 hasattr(socket, 'TCP_KEEPIDLE') and \ 542 hasattr(socket, 'TCP_KEEPINTVL'): 543 # turn on KEEPALIVE 544 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 545 # - # keepalive failures before actual connection failure 546 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 547 # - timeout before starting KEEPALIVE process 548 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 549 # - interval to send KEEPALIVE after IDLE timeout 550 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 551 if timeout is not None: 552 s.settimeout(timeout) 553 self.socket = s 554 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port)) 555 self.socket.connect((dest_addr, dest_port)) 556 self.write_header() 557 self.read_header() 558 self.local_endpoint = self.socket.getsockname() 559 self.remote_endpoint = (dest_addr, dest_port) 560 except TransportInitError as tie: 561 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 562 raise 563 except Exception as e: 564 #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e))) 565 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 566 567 # FATAL: no reconnection as error is unknown 568 self.close() 569 raise TransportInitError(str(e)) #re-raise i/o error
570
571 - def _validate_header(self, header):
572 """ 573 Validate header and initialize fields accordingly 574 @param header: header fields from publisher 575 @type header: dict 576 @raise TransportInitError: if header fails to validate 577 """ 578 self.header = header 579 if 'error' in header: 580 raise TransportInitError("remote error reported: %s"%header['error']) 581 for required in ['md5sum', 'type']: 582 if not required in header: 583 raise TransportInitError("header missing required field [%s]"%required) 584 self.type = header['type'] 585 self.md5sum = header['md5sum'] 586 if 'callerid' in header: 587 self.callerid_pub = header['callerid'] 588 if header.get('latching', '0') == '1': 589 self.is_latched = True
590
591 - def write_header(self):
592 """Writes the TCPROS header to the active connection.""" 593 # socket may still be getting spun up, so wait for it to be writable 594 sock = self.socket 595 protocol = self.protocol 596 # race condition on close, better fix is to pass these in, 597 # functional style, but right now trying to cause minimal 598 # perturbance to codebase. 599 if sock is None or protocol is None: 600 return 601 fileno = sock.fileno() 602 ready = None 603 while not ready: 604 _, ready, _ = select.select([], [fileno], []) 605 logger.debug("[%s]: writing header", self.name) 606 sock.setblocking(1) 607 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields())
608
609 - def read_header(self):
610 """ 611 Read TCPROS header from active socket 612 @raise TransportInitError if header fails to validate 613 """ 614 sock = self.socket 615 if sock is None: 616 return 617 sock.setblocking(1) 618 # TODO: add bytes received to self.stat_bytes 619 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
620
621 - def send_message(self, msg, seq):
622 """ 623 Convenience routine for services to send a message across a 624 particular connection. NOTE: write_data is much more efficient 625 if same message is being sent to multiple connections. Not 626 threadsafe. 627 @param msg: message to send 628 @type msg: Msg 629 @param seq: sequence number for message 630 @type seq: int 631 @raise TransportException: if error occurred sending message 632 """ 633 # this will call write_data(), so no need to keep track of stats 634 serialize_message(self.write_buff, seq, msg) 635 self.write_data(self.write_buff.getvalue()) 636 self.write_buff.truncate(0)
637
638 - def write_data(self, data):
639 """ 640 Write raw data to transport 641 @raise TransportInitialiationError: could not be initialized 642 @raise TransportTerminated: no longer open for publishing 643 """ 644 if not self.socket: 645 raise TransportInitError("TCPROS transport was not successfully initialized") 646 if self.done: 647 raise TransportTerminated("connection closed") 648 try: 649 #TODO: get rid of sendalls and replace with async-style publishing 650 self.socket.sendall(data) 651 self.stat_bytes += len(data) 652 self.stat_num_msg += 1 653 except IOError as ioe: 654 #for now, just document common errno's in code 655 (errno, msg) = ioe.args 656 if errno == 32: #broken pipe 657 logdebug("ERROR: Broken Pipe") 658 self.close() 659 raise TransportTerminated(str(errno)+msg) 660 raise #re-raise 661 except socket.error as se: 662 #for now, just document common errno's in code 663 (errno, msg) = se.args 664 if errno == 32: #broken pipe 665 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 666 self.close() 667 raise TransportTerminated(msg) 668 elif errno == 104: #connection reset by peer 669 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id) 670 self.close() 671 raise TransportTerminated(msg) 672 else: 673 rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 674 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg) 675 self.close() 676 raise TransportTerminated(str(errno)+' '+msg) 677 return True
678
679 - def receive_once(self):
680 """ 681 block until messages are read off of socket 682 @return: list of newly received messages 683 @rtype: [Msg] 684 @raise TransportException: if unable to receive message due to error 685 """ 686 sock = self.socket 687 if sock is None: 688 raise TransportException("connection not initialized") 689 b = self.read_buff 690 msg_queue = [] 691 p = self.protocol 692 try: 693 sock.setblocking(1) 694 while not msg_queue and not self.done and not is_shutdown(): 695 if b.tell() >= 4: 696 p.read_messages(b, msg_queue, sock) 697 if not msg_queue: 698 self.stat_bytes += recv_buff(sock, b, p.buff_size) 699 self.stat_num_msg += len(msg_queue) #STATS 700 # set the _connection_header field 701 for m in msg_queue: 702 m._connection_header = self.header 703 704 # #1852: keep track of last latched message 705 if self.is_latched and msg_queue: 706 self.latch = msg_queue[-1] 707 708 return msg_queue 709 710 except DeserializationError as e: 711 rospyerr(traceback.format_exc()) 712 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 713 except TransportTerminated as e: 714 raise #reraise 715 except ServiceException as e: 716 raise 717 except Exception as e: 718 rospyerr(traceback.format_exc()) 719 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 720 return retval
721
722 - def _reconnect(self):
723 # This reconnection logic is very hacky right now. I need to 724 # rewrite the I/O core so that this is handled more centrally. 725 726 if self.dest_address is None: 727 raise ROSInitException("internal error with reconnection state: address not stored") 728 729 interval = 0.5 # seconds 730 while self.socket is None and not self.done and not rospy.is_shutdown(): 731 try: 732 # set a timeout so that we can continue polling for 733 # exit. 30. is a bit high, but I'm concerned about 734 # embedded platforms. To do this properly, we'd have 735 # to move to non-blocking routines. 736 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 737 except TransportInitError: 738 self.socket = None 739 740 if self.socket is None: 741 # exponential backoff 742 interval = interval * 2 743 744 time.sleep(interval)
745
746 - def receive_loop(self, msgs_callback):
747 """ 748 Receive messages until shutdown 749 @param msgs_callback: callback to invoke for new messages received 750 @type msgs_callback: fn([msg]) 751 """ 752 # - use assert here as this would be an internal error, aka bug 753 logger.debug("receive_loop for [%s]", self.name) 754 try: 755 while not self.done and not is_shutdown(): 756 try: 757 if self.socket is not None: 758 msgs = self.receive_once() 759 if not self.done and not is_shutdown(): 760 msgs_callback(msgs, self) 761 else: 762 self._reconnect() 763 764 except TransportException as e: 765 # set socket to None so we reconnect 766 try: 767 if self.socket is not None: 768 try: 769 self.socket.shutdown() 770 except: 771 pass 772 finally: 773 self.socket.close() 774 except: 775 pass 776 self.socket = None 777 778 except DeserializationError as e: 779 #TODO: how should we handle reconnect in this case? 780 781 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e)) 782 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc()) 783 except: 784 # in many cases this will be a normal hangup, but log internally 785 try: 786 #1467 sometimes we get exceptions due to 787 #interpreter shutdown, so blanket ignore those if 788 #the reporting fails 789 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) 790 except: pass 791 792 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) 793 finally: 794 if not self.done: 795 self.close()
796
797 - def close(self):
798 """close i/o and release resources""" 799 if not self.done: 800 try: 801 if self.socket is not None: 802 try: 803 self.socket.shutdown(socket.SHUT_RDWR) 804 except: 805 pass 806 finally: 807 self.socket.close() 808 finally: 809 self.socket = self.read_buff = self.write_buff = self.protocol = None 810 super(TCPROSTransport, self).close()
811