Package rospy :: Package impl :: Module tcpros_base

Source Code for Module rospy.impl.tcpros_base

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37   
 38  try: 
 39      from cStringIO import StringIO #Python 2.x 
 40      python3 = 0 
 41  except ImportError: 
 42      from io import StringIO, BytesIO #Python 3.x 
 43      python3 = 1 
 44  import errno 
 45  import socket 
 46  import logging 
 47   
 48  import threading 
 49  import time 
 50  import traceback 
 51  import select 
 52   
 53  import rosgraph 
 54  import rosgraph.network 
 55  from genpy import DeserializationError, Message 
 56  from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header 
 57   
 58  # TODO: remove * import from core 
 59  from rospy.core import * 
 60  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 61  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 62  from rospy.msg import deserialize_messages, serialize_message 
 63  from rospy.service import ServiceException 
 64   
 65  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 66  from errno import EAGAIN, EWOULDBLOCK 
 67   
 68  logger = logging.getLogger('rospy.tcpros') 
 69   
 70  # Receive buffer size for topics/services (in bytes) 
 71  DEFAULT_BUFF_SIZE = 65536 
 72   
 73  ## name of our customized TCP protocol for accepting flows over server socket 
 74  TCPROS = "TCPROS"  
 75   
 76  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 77  _use_tcp_keepalive = None 
 78  _use_tcp_keepalive_lock = threading.Lock() 
79 -def _is_use_tcp_keepalive():
80 global _use_tcp_keepalive 81 if _use_tcp_keepalive is not None: 82 return _use_tcp_keepalive 83 with _use_tcp_keepalive_lock: 84 if _use_tcp_keepalive is not None: 85 return _use_tcp_keepalive 86 # in order to prevent circular dependencies, this does not use the 87 # builtin libraries for interacting with the parameter server 88 m = rospy.core.xmlrpcapi(rosgraph.get_master_uri()) 89 code, msg, val = m.getParam(rospy.names.get_caller_id(), _PARAM_TCP_KEEPALIVE) 90 _use_tcp_keepalive = val if code == 1 else True 91 return _use_tcp_keepalive
92
93 -def recv_buff(sock, b, buff_size):
94 """ 95 Read data from socket into buffer. 96 @param sock: socket to read from 97 @type sock: socket.socket 98 @param b: buffer to receive into 99 @type b: StringIO 100 @param buff_size: recv read size 101 @type buff_size: int 102 @return: number of bytes read 103 @rtype: int 104 """ 105 try: 106 d = sock.recv(buff_size) 107 if d: 108 b.write(d) 109 return len(d) 110 else: #bomb out 111 raise TransportTerminated("unable to receive data from sender, check sender's logs for details") 112 except socket.error as ex: 113 if ex.errno not in (EAGAIN, EWOULDBLOCK): 114 raise TransportTerminated("unable to receive data from sender, check sender's logs for details") 115 else: 116 return 0
117
118 -class TCPServer(object):
119 """ 120 Simple server that accepts inbound TCP/IP connections and hands 121 them off to a handler function. TCPServer obeys the 122 ROS_IP/ROS_HOSTNAME environment variables 123 """ 124
125 - def __init__(self, inbound_handler, port=0):
126 """ 127 Setup a server socket listening on the specified port. If the 128 port is omitted, will choose any open port. 129 @param inbound_handler: handler to invoke with 130 new connection 131 @type inbound_handler: fn(sock, addr) 132 @param port: port to bind to, omit/0 to bind to any 133 @type port: int 134 """ 135 self.port = port #will get overwritten if port=0 136 self.addr = None #set at socket bind 137 self.is_shutdown = False 138 self.inbound_handler = inbound_handler 139 try: 140 self.server_sock = self._create_server_sock() 141 except: 142 self.server_sock = None 143 raise
144
145 - def start(self):
146 """Runs the run() loop in a separate thread""" 147 t = threading.Thread(target=self.run, args=()) 148 t.daemon = True 149 t.start()
150
151 - def run(self):
152 """ 153 Main TCP receive loop. Should be run in a separate thread -- use start() 154 to do this automatically. 155 """ 156 self.is_shutdown = False 157 if not self.server_sock: 158 raise ROSInternalException("%s did not connect"%self.__class__.__name__) 159 while not self.is_shutdown: 160 try: 161 (client_sock, client_addr) = self.server_sock.accept() 162 except socket.timeout: 163 continue 164 except ConnectionAbortedError: 165 continue 166 except IOError as e: 167 (e_errno, msg) = e.args 168 if e_errno == errno.EINTR: #interrupted system call 169 continue 170 if not self.is_shutdown: 171 raise 172 if self.is_shutdown: 173 break 174 try: 175 #leave threading decisions up to inbound_handler 176 self.inbound_handler(client_sock, client_addr) 177 except socket.error as e: 178 if not self.is_shutdown: 179 traceback.print_exc() 180 logwarn("Failed to handle inbound connection due to socket error: %s"%e) 181 logdebug("TCPServer[%s] shutting down", self.port)
182 183
184 - def get_full_addr(self):
185 """ 186 @return: (ip address, port) of server socket binding 187 @rtype: (str, int) 188 """ 189 # return rosgraph.network.get_host_name() instead of address so that it 190 # obeys ROS_IP/ROS_HOSTNAME behavior 191 return (rosgraph.network.get_host_name(), self.port)
192
193 - def _create_server_sock(self):
194 """ 195 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 196 binding to loopback interface. 197 """ 198 if rosgraph.network.use_ipv6(): 199 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 200 else: 201 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 202 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 203 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port)) 204 server_sock.bind((rosgraph.network.get_bind_address(), self.port)) 205 (self.addr, self.port) = server_sock.getsockname()[0:2] 206 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port)) 207 server_sock.listen(5) 208 return server_sock
209
210 - def shutdown(self):
211 """shutdown I/O resources uses by this server""" 212 if not self.is_shutdown: 213 self.is_shutdown = True 214 self.server_sock.close()
215 216 # base maintains a tcpros_server singleton that is shared between 217 # services and topics for inbound connections. This global is set in 218 # the tcprosserver constructor. Constructor is called by init_tcpros() 219 _tcpros_server = None 220
221 -def init_tcpros_server(port=0):
222 """ 223 starts the TCPROS server socket for inbound connections 224 @param port: listen on the provided port. If the port number is 0, the port will 225 be chosen randomly 226 @type port: int 227 """ 228 global _tcpros_server 229 if _tcpros_server is None: 230 _tcpros_server = TCPROSServer(port=port) 231 rospy.core.add_shutdown_hook(_tcpros_server.shutdown) 232 return _tcpros_server
233
234 -def start_tcpros_server():
235 """ 236 start the TCPROS server if it has not started already 237 """ 238 if _tcpros_server is None: 239 init_tcpros_server() 240 return _tcpros_server.start_server()
241 242 # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers 243
244 -def get_tcpros_server_address():
245 """ 246 get the address of the tcpros server. 247 @raise Exception: if tcpros server has not been started or created 248 """ 249 return _tcpros_server.get_address()
250
251 -def _error_connection_handler(sock, client_addr, header):
252 """ 253 utility handler that does nothing more than provide a rejection header 254 @param sock: socket connection 255 @type sock: socket.socket 256 @param client_addr: client address 257 @type client_addr: str 258 @param header: request header 259 @type header: dict 260 """ 261 return {'error': "unhandled connection"}
262
263 -class TCPROSServer(object):
264 """ 265 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 266 connections as well as ROS service connections over TCP. TCP server 267 socket is run once start_server() is called -- this is implicitly 268 called during init_publisher(). 269 """ 270
271 - def __init__(self, port=0):
272 """ 273 Constructor 274 @param port: port number to bind to (default 0/any) 275 @type port: int 276 """ 277 self.port = port 278 self.tcp_ros_server = None #: server for receiving tcp conn 279 self.lock = threading.Lock() 280 # should be set to fn(sock, client_addr, header) for topic connections 281 self.topic_connection_handler = _error_connection_handler 282 # should be set to fn(sock, client_addr, header) for service connections 283 self.service_connection_handler = _error_connection_handler
284
285 - def start_server(self):
286 """ 287 Starts the TCP socket server if one is not already running 288 """ 289 if self.tcp_ros_server: 290 return 291 with self.lock: 292 try: 293 if not self.tcp_ros_server: 294 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port) 295 self.tcp_ros_server.start() 296 except Exception as e: 297 self.tcp_ros_server = None 298 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 299 return 0, "unable to establish TCPROS server: %s"%e, []
300
301 - def get_address(self):
302 """ 303 @return: address and port of TCP server socket for accepting 304 inbound connections 305 @rtype: str, int 306 """ 307 if self.tcp_ros_server is not None: 308 return self.tcp_ros_server.get_full_addr() 309 return None, None
310
311 - def shutdown(self, reason=''):
312 """stops the TCP/IP server responsible for receiving inbound connections""" 313 if self.tcp_ros_server: 314 self.tcp_ros_server.shutdown()
315
316 - def _tcp_server_callback(self, sock, client_addr):
317 """ 318 TCPServer callback: detects incoming topic or service connection and passes connection accordingly 319 320 @param sock: socket connection 321 @type sock: socket.socket 322 @param client_addr: client address 323 @type client_addr: (str, int) 324 @raise TransportInitError: If transport cannot be successfully initialized 325 """ 326 #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, 327 #and then use that to do the writing 328 try: 329 buff_size = 4096 # size of read buffer 330 if python3 == 0: 331 #initialize read_ros_handshake_header with BytesIO for Python 3 (instead of bytesarray()) 332 header = read_ros_handshake_header(sock, StringIO(), buff_size) 333 else: 334 header = read_ros_handshake_header(sock, BytesIO(), buff_size) 335 336 if 'topic' in header: 337 err_msg = self.topic_connection_handler(sock, client_addr, header) 338 elif 'service' in header: 339 err_msg = self.service_connection_handler(sock, client_addr, header) 340 else: 341 err_msg = 'no topic or service name detected' 342 if err_msg: 343 # shutdown race condition: nodes that come up and down 344 # quickly can receive connections during teardown. 345 346 # We use is_shutdown_requested() because we can get 347 # into bad connection states during client shutdown 348 # hooks. 349 if not rospy.core.is_shutdown_requested(): 350 write_ros_handshake_header(sock, {'error' : err_msg}) 351 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 352 else: 353 write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 354 return 355 except rospy.exceptions.TransportInitError as e: 356 logwarn(str(e)) 357 if sock is not None: 358 sock.close() 359 except Exception as e: 360 # collect stack trace separately in local log file 361 if not rospy.core.is_shutdown_requested(): 362 logwarn("Inbound TCP/IP connection failed: %s", e) 363 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc()) 364 if sock is not None: 365 sock.close()
366
367 -class TCPROSTransportProtocol(object):
368 """ 369 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 370 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 371 well as which fields to send when creating a new connection (get_header_fields()). 372 """ 373
374 - def __init__(self, resolved_name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
375 """ 376 ctor 377 @param resolved_name: resolved service or topic name 378 @type resolved_name: str 379 @param recv_data_class: message class for deserializing inbound messages 380 @type recv_data_class: Class 381 @param queue_size: maximum number of inbound messages to maintain 382 @type queue_size: int 383 @param buff_size: receive buffer size (in bytes) for reading from the connection. 384 @type buff_size: int 385 """ 386 if recv_data_class and not issubclass(recv_data_class, Message): 387 raise TransportInitError("Unable to initialize transport: data class is not a message data class") 388 self.resolved_name = resolved_name 389 self.recv_data_class = recv_data_class 390 self.queue_size = queue_size 391 self.buff_size = buff_size 392 self.direction = BIDIRECTIONAL
393 394
395 - def read_messages(self, b, msg_queue, sock):
396 """ 397 @param b StringIO: read buffer 398 @param msg_queue [Message]: queue of deserialized messages 399 @type msg_queue: [Message] 400 @param sock socket: protocol can optionally read more data from 401 the socket, but in most cases the required data will already be 402 in b 403 """ 404 # default implementation 405 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
406
407 - def get_header_fields(self):
408 """ 409 Header fields that should be sent over the connection. The header fields 410 are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 411 @return: {str : str}: header fields to send over connection 412 @rtype: dict 413 """ 414 return {}
415 416 # TODO: this still isn't as clean and seamless as I want it to 417 # be. This code came from the merger of publisher, subscriber, and 418 # service code into a common TCPROS transport class. The transport is 419 # customized by a 'protocol' class, which is how the different 420 # pub/sub/service behaviors are achieved. More behavior needs to be 421 # transferred from the transport class into the protocol class, 422 # e.g. deserialization as the state each maintains is somewhat 423 # duplicative. I would also come up with a better name than 424 # protocol. 425
426 -class TCPROSTransport(Transport):
427 """ 428 Generic implementation of TCPROS exchange routines for both topics and services 429 """ 430 transport_type = 'TCPROS' 431
432 - def __init__(self, protocol, name, header=None):
433 """ 434 ctor 435 @param name str: identifier 436 @param protocol TCPROSTransportProtocol protocol implementation 437 @param header dict: (optional) handshake header if transport handshake header was 438 already read off of transport. 439 @raise TransportInitError if transport cannot be initialized according to arguments 440 """ 441 super(TCPROSTransport, self).__init__(protocol.direction, name=name) 442 if not name: 443 raise TransportInitError("Unable to initialize transport: name is not set") 444 445 self.protocol = protocol 446 447 self.socket = None 448 self.endpoint_id = 'unknown' 449 self.callerid_pub = 'unknown' 450 self.dest_address = None # for reconnection 451 452 if python3 == 0: # Python 2.x 453 self.read_buff = StringIO() 454 self.write_buff = StringIO() 455 else: # Python 3.x 456 self.read_buff = BytesIO() 457 self.write_buff = BytesIO() 458 459 #self.write_buff = StringIO() 460 self.header = header 461 462 # #1852 have to hold onto latched messages on subscriber side 463 self.is_latched = False 464 self.latch = None 465 466 # save the fileno separately so we can garbage collect the 467 # socket but still unregister will poll objects 468 self._fileno = None 469 470 # these fields are actually set by the remote 471 # publisher/service. they are set for tools that connect 472 # without knowing the actual field name 473 self.md5sum = None 474 self.type = None 475 476 # Endpoint Details (IP, Port) 477 self.local_endpoint = (None, None) 478 self.remote_endpoint = (None, None)
479
480 - def get_transport_info(self):
481 """ 482 Get detailed connection information. 483 Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp' 484 e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6] 485 """ 486 # Pattern matching this output in tools/rosnode/src/rosnode/__init__.py CONNECTION_PATTERN 487 return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno)
488
489 - def fileno(self):
490 """ 491 Get descriptor for select 492 """ 493 return self._fileno
494
495 - def set_endpoint_id(self, endpoint_id):
496 """ 497 Set the endpoint_id of this transport. 498 Allows the endpoint_id to be set before the socket is initialized. 499 """ 500 self.endpoint_id = endpoint_id
501
502 - def set_socket(self, sock, endpoint_id):
503 """ 504 Set the socket for this transport 505 @param sock: socket 506 @type sock: socket.socket 507 @param endpoint_id: identifier for connection endpoint 508 @type endpoint_id: str 509 @raise TransportInitError: if socket has already been set 510 """ 511 if self.socket is not None: 512 raise TransportInitError("socket already initialized") 513 self.socket = sock 514 self.endpoint_id = endpoint_id 515 self._fileno = sock.fileno() 516 self.local_endpoint = self.socket.getsockname()
517
518 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
519 """ 520 Establish TCP connection to the specified 521 address/port. connect() always calls L{write_header()} and 522 L{read_header()} after the connection is made 523 @param dest_addr: destination IP address 524 @type dest_addr: str 525 @param dest_port: destination port 526 @type dest_port: int 527 @param endpoint_id: string identifier for connection (for statistics) 528 @type endpoint_id: str 529 @param timeout: (optional keyword) timeout in seconds 530 @type timeout: float 531 @raise TransportInitError: if unable to create connection 532 """ 533 # first make sure that if ROS_HOSTNAME=localhost, we will not attempt 534 # to connect to anything other than localhost 535 if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"): 536 if not rosgraph.network.is_local_address(dest_addr): 537 msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr) 538 logwarn(msg) 539 self.close() 540 raise TransportInitError(msg) # bubble up 541 542 # now we can proceed with trying to connect. 543 try: 544 self.endpoint_id = endpoint_id 545 self.dest_address = (dest_addr, dest_port) 546 if rosgraph.network.use_ipv6(): 547 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 548 else: 549 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 550 if _is_use_tcp_keepalive(): 551 # OSX (among others) does not define these options 552 if hasattr(socket, 'TCP_KEEPCNT') and \ 553 hasattr(socket, 'TCP_KEEPIDLE') and \ 554 hasattr(socket, 'TCP_KEEPINTVL'): 555 # turn on KEEPALIVE 556 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 557 # - # keepalive failures before actual connection failure 558 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 559 # - timeout before starting KEEPALIVE process 560 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 561 # - interval to send KEEPALIVE after IDLE timeout 562 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 563 if timeout is not None: 564 s.settimeout(timeout) 565 self.socket = s 566 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port)) 567 self.socket.connect((dest_addr, dest_port)) 568 self.write_header() 569 self.read_header() 570 self.local_endpoint = self.socket.getsockname() 571 self.remote_endpoint = (dest_addr, dest_port) 572 except TransportInitError as tie: 573 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 574 raise 575 except Exception as e: 576 #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e))) 577 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 578 # check for error type and reason. On unknown errors the socket will be closed 579 # to avoid reconnection and error reproduction 580 if not isinstance(e, socket.error): 581 # FATAL: no reconnection as error is unknown 582 self.close() 583 elif not isinstance(e, socket.timeout) and e.errno not in [ 584 errno.ENETDOWN, errno.ENETUNREACH, errno.ENETRESET, 585 errno.ECONNABORTED, errno.ETIMEDOUT, errno.EHOSTDOWN, errno.EHOSTUNREACH]: 586 # reconnect in follow cases, otherwise close the socket: 587 # 1. socket.timeout: on timeouts caused by delays on wireless links 588 # 2. ENETDOWN (100), ENETUNREACH (101), ENETRESET (102), ECONNABORTED (103): 589 # while using ROS_HOSTNAME ros binds to a specific interface. Theses errors 590 # are thrown on interface shutdown e.g. on reconnection in LTE networks 591 # 3. ETIMEDOUT (110): same like 1. (for completeness) 592 # 4. EHOSTDOWN (112), EHOSTUNREACH (113): while network and/or DNS-server is not reachable 593 # 594 # no reconnection as error is not 1.-4. 595 self.close() 596 raise TransportInitError(str(e)) #re-raise i/o error
597
598 - def _validate_header(self, header):
599 """ 600 Validate header and initialize fields accordingly 601 @param header: header fields from publisher 602 @type header: dict 603 @raise TransportInitError: if header fails to validate 604 """ 605 self.header = header 606 if 'error' in header: 607 raise TransportInitError("remote error reported: %s"%header['error']) 608 for required in ['md5sum', 'type']: 609 if not required in header: 610 raise TransportInitError("header missing required field [%s]"%required) 611 self.type = header['type'] 612 self.md5sum = header['md5sum'] 613 if 'callerid' in header: 614 self.callerid_pub = header['callerid'] 615 if header.get('latching', '0') == '1': 616 self.is_latched = True
617
618 - def write_header(self):
619 """Writes the TCPROS header to the active connection.""" 620 # socket may still be getting spun up, so wait for it to be writable 621 sock = self.socket 622 protocol = self.protocol 623 # race condition on close, better fix is to pass these in, 624 # functional style, but right now trying to cause minimal 625 # perturbance to codebase. 626 if sock is None or protocol is None: 627 return 628 fileno = sock.fileno() 629 ready = None 630 poller = None 631 if hasattr(select, 'poll'): 632 poller = select.poll() 633 poller.register(fileno, select.POLLOUT) 634 while not ready: 635 events = poller.poll() 636 for _, flag in events: 637 if flag & select.POLLOUT: 638 ready = True 639 else: 640 while not ready: 641 try: 642 _, ready, _ = select.select([], [fileno], []) 643 except ValueError as e: 644 logger.error("[%s]: select fileno '%s': %s", self.name, str(fileno), str(e)) 645 raise 646 647 logger.debug("[%s]: writing header", self.name) 648 sock.setblocking(1) 649 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields()) 650 if poller: 651 poller.unregister(fileno)
652 653
654 - def read_header(self):
655 """ 656 Read TCPROS header from active socket 657 @raise TransportInitError if header fails to validate 658 """ 659 sock = self.socket 660 if sock is None: 661 return 662 sock.setblocking(1) 663 # TODO: add bytes received to self.stat_bytes 664 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
665
666 - def send_message(self, msg, seq):
667 """ 668 Convenience routine for services to send a message across a 669 particular connection. NOTE: write_data is much more efficient 670 if same message is being sent to multiple connections. Not 671 threadsafe. 672 @param msg: message to send 673 @type msg: Msg 674 @param seq: sequence number for message 675 @type seq: int 676 @raise TransportException: if error occurred sending message 677 """ 678 # this will call write_data(), so no need to keep track of stats 679 serialize_message(self.write_buff, seq, msg) 680 self.write_data(self.write_buff.getvalue()) 681 self.write_buff.truncate(0) 682 self.write_buff.seek(0)
683
684 - def write_data(self, data):
685 """ 686 Write raw data to transport 687 @raise TransportInitialiationError: could not be initialized 688 @raise TransportTerminated: no longer open for publishing 689 """ 690 if not self.socket: 691 raise TransportInitError("TCPROS transport was not successfully initialized") 692 if self.done: 693 raise TransportTerminated("connection closed") 694 try: 695 #TODO: get rid of sendalls and replace with async-style publishing 696 self.socket.sendall(data) 697 self.stat_bytes += len(data) 698 self.stat_num_msg += 1 699 except IOError as ioe: 700 #for now, just document common errno's in code 701 (ioe_errno, msg) = ioe.args 702 if ioe_errno == errno.EPIPE: #broken pipe 703 logdebug("ERROR: Broken Pipe") 704 self.close() 705 raise TransportTerminated(str(ioe_errno)+msg) 706 raise #re-raise 707 except socket.error as se: 708 #for now, just document common errno's in code 709 (se_errno, msg) = se.args 710 if se_errno == errno.EPIPE: #broken pipe 711 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 712 self.close() 713 raise TransportTerminated(msg) 714 elif se_errno == errno.ECONNRESET: #connection reset by peer 715 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id) 716 self.close() 717 raise TransportTerminated(msg) 718 else: 719 rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 720 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg) 721 self.close() 722 raise TransportTerminated(str(se_errno)+' '+msg) 723 return True
724
725 - def receive_once(self):
726 """ 727 block until messages are read off of socket 728 @return: list of newly received messages 729 @rtype: [Msg] 730 @raise TransportException: if unable to receive message due to error 731 """ 732 sock = self.socket 733 if sock is None: 734 raise TransportException("connection not initialized") 735 b = self.read_buff 736 msg_queue = [] 737 p = self.protocol 738 try: 739 sock.setblocking(1) 740 while not msg_queue and not self.done and not is_shutdown(): 741 if b.tell() >= 4: 742 p.read_messages(b, msg_queue, sock) 743 if not msg_queue: 744 self.stat_bytes += recv_buff(sock, b, p.buff_size) 745 self.stat_num_msg += len(msg_queue) #STATS 746 # set the _connection_header field 747 for m in msg_queue: 748 m._connection_header = self.header 749 750 # #1852: keep track of last latched message 751 if self.is_latched and msg_queue: 752 self.latch = msg_queue[-1] 753 754 return msg_queue 755 756 except DeserializationError as e: 757 rospyerr(traceback.format_exc()) 758 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 759 except TransportTerminated as e: 760 raise #reraise 761 except ServiceException as e: 762 raise 763 except Exception as e: 764 rospyerr(traceback.format_exc()) 765 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 766 return retval
767
768 - def _reconnect(self):
769 # This reconnection logic is very hacky right now. I need to 770 # rewrite the I/O core so that this is handled more centrally. 771 772 if self.dest_address is None: 773 raise ROSInitException("internal error with reconnection state: address not stored") 774 775 interval = 0.5 # seconds 776 while self.socket is None and not self.done and not rospy.is_shutdown(): 777 try: 778 # set a timeout so that we can continue polling for 779 # exit. 30. is a bit high, but I'm concerned about 780 # embedded platforms. To do this properly, we'd have 781 # to move to non-blocking routines. 782 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 783 except TransportInitError: 784 self.socket = None 785 786 if self.socket is None and interval < 30.: 787 # exponential backoff (maximum 32 seconds) 788 interval = interval * 2 789 790 time.sleep(interval)
791
792 - def receive_loop(self, msgs_callback):
793 """ 794 Receive messages until shutdown 795 @param msgs_callback: callback to invoke for new messages received 796 @type msgs_callback: fn([msg]) 797 """ 798 # - use assert here as this would be an internal error, aka bug 799 logger.debug("receive_loop for [%s]", self.name) 800 try: 801 while not self.done and not is_shutdown(): 802 try: 803 if self.socket is not None: 804 msgs = self.receive_once() 805 if not self.done and not is_shutdown(): 806 msgs_callback(msgs, self) 807 else: 808 self._reconnect() 809 810 except TransportTerminated as e: 811 logdebug("[%s] failed to receive incoming message : %s" % (self.name, str(e))) 812 rospydebug("[%s] failed to receive incoming message: %s" % (self.name, traceback.format_exc())) 813 break 814 815 except TransportException as e: 816 # set socket to None so we reconnect 817 try: 818 if self.socket is not None: 819 try: 820 self.socket.shutdown() 821 except: 822 pass 823 finally: 824 self.socket.close() 825 except: 826 pass 827 self.socket = None 828 829 except DeserializationError as e: 830 #TODO: how should we handle reconnect in this case? 831 832 logerr("[%s] error deserializing incoming request: %s"%(self.name, str(e))) 833 rospyerr("[%s] error deserializing incoming request: %s"%(self.name, traceback.format_exc())) 834 except: 835 # in many cases this will be a normal hangup, but log internally 836 try: 837 #1467 sometimes we get exceptions due to 838 #interpreter shutdown, so blanket ignore those if 839 #the reporting fails 840 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) 841 except: pass 842 843 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) 844 finally: 845 if not self.done: 846 self.close()
847
848 - def close(self):
849 """close i/o and release resources""" 850 if not self.done: 851 try: 852 if self.socket is not None: 853 try: 854 self.socket.shutdown(socket.SHUT_RDWR) 855 except: 856 pass 857 finally: 858 self.socket.close() 859 finally: 860 self.socket = self.read_buff = self.write_buff = self.protocol = None 861 super(TCPROSTransport, self).close()
862