Package rospy :: Package impl :: Module tcpros_base

Source Code for Module rospy.impl.tcpros_base

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37   
 38  try: 
 39      from cStringIO import StringIO #Python 2.x 
 40      python3 = 0 
 41  except ImportError: 
 42      from io import StringIO, BytesIO #Python 3.x 
 43      python3 = 1 
 44  import socket 
 45  import logging 
 46   
 47  import threading 
 48  import time 
 49  import traceback 
 50  import select 
 51   
 52  import rosgraph 
 53  import rosgraph.network 
 54  from genpy import DeserializationError, Message 
 55  from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header 
 56   
 57  # TODO: remove * import from core 
 58  from rospy.core import * 
 59  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 60  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 61  from rospy.msg import deserialize_messages, serialize_message 
 62  from rospy.service import ServiceException 
 63   
 64  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 65   
 66  logger = logging.getLogger('rospy.tcpros') 
 67   
 68  # Receive buffer size for topics/services (in bytes) 
 69  DEFAULT_BUFF_SIZE = 65536 
 70   
 71  ## name of our customized TCP protocol for accepting flows over server socket 
 72  TCPROS = "TCPROS"  
 73   
 74  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 75  _use_tcp_keepalive = None 
 76  _use_tcp_keepalive_lock = threading.Lock() 
77 -def _is_use_tcp_keepalive():
78 global _use_tcp_keepalive 79 if _use_tcp_keepalive is not None: 80 return _use_tcp_keepalive 81 with _use_tcp_keepalive_lock: 82 if _use_tcp_keepalive is not None: 83 return _use_tcp_keepalive 84 # in order to prevent circular dependencies, this does not use the 85 # builtin libraries for interacting with the parameter server 86 m = rospy.core.xmlrpcapi(rosgraph.get_master_uri()) 87 code, msg, val = m.getParam(rospy.names.get_caller_id(), _PARAM_TCP_KEEPALIVE) 88 _use_tcp_keepalive = val if code == 1 else True 89 return _use_tcp_keepalive
90
91 -def recv_buff(sock, b, buff_size):
92 """ 93 Read data from socket into buffer. 94 @param sock: socket to read from 95 @type sock: socket.socket 96 @param b: buffer to receive into 97 @type b: StringIO 98 @param buff_size: recv read size 99 @type buff_size: int 100 @return: number of bytes read 101 @rtype: int 102 """ 103 d = sock.recv(buff_size) 104 if d: 105 b.write(d) 106 return len(d) 107 else: #bomb out 108 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
109
110 -class TCPServer(object):
111 """ 112 Simple server that accepts inbound TCP/IP connections and hands 113 them off to a handler function. TCPServer obeys the 114 ROS_IP/ROS_HOSTNAME environment variables 115 """ 116
117 - def __init__(self, inbound_handler, port=0):
118 """ 119 Setup a server socket listening on the specified port. If the 120 port is omitted, will choose any open port. 121 @param inbound_handler: handler to invoke with 122 new connection 123 @type inbound_handler: fn(sock, addr) 124 @param port: port to bind to, omit/0 to bind to any 125 @type port: int 126 """ 127 self.port = port #will get overwritten if port=0 128 self.addr = None #set at socket bind 129 self.is_shutdown = False 130 self.inbound_handler = inbound_handler 131 try: 132 self.server_sock = self._create_server_sock() 133 except: 134 self.server_sock = None 135 raise
136
137 - def start(self):
138 """Runs the run() loop in a separate thread""" 139 t = threading.Thread(target=self.run, args=()) 140 t.setDaemon(True) 141 t.start()
142
143 - def run(self):
144 """ 145 Main TCP receive loop. Should be run in a separate thread -- use start() 146 to do this automatically. 147 """ 148 self.is_shutdown = False 149 if not self.server_sock: 150 raise ROSInternalException("%s did not connect"%self.__class__.__name__) 151 while not self.is_shutdown: 152 try: 153 (client_sock, client_addr) = self.server_sock.accept() 154 except socket.timeout: 155 continue 156 except IOError as e: 157 (errno, msg) = e.args 158 if errno == 4: #interrupted system call 159 continue 160 if not self.is_shutdown: 161 raise 162 if self.is_shutdown: 163 break 164 try: 165 #leave threading decisions up to inbound_handler 166 self.inbound_handler(client_sock, client_addr) 167 except socket.error as e: 168 if not self.is_shutdown: 169 traceback.print_exc() 170 logwarn("Failed to handle inbound connection due to socket error: %s"%e) 171 logdebug("TCPServer[%s] shutting down", self.port)
172 173
174 - def get_full_addr(self):
175 """ 176 @return: (ip address, port) of server socket binding 177 @rtype: (str, int) 178 """ 179 # return rosgraph.network.get_host_name() instead of address so that it 180 # obeys ROS_IP/ROS_HOSTNAME behavior 181 return (rosgraph.network.get_host_name(), self.port)
182
183 - def _create_server_sock(self):
184 """ 185 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 186 binding to loopback interface. 187 """ 188 if rosgraph.network.use_ipv6(): 189 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 190 else: 191 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 192 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 193 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port)) 194 server_sock.bind((rosgraph.network.get_bind_address(), self.port)) 195 (self.addr, self.port) = server_sock.getsockname()[0:2] 196 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port)) 197 server_sock.listen(5) 198 return server_sock
199
200 - def shutdown(self):
201 """shutdown I/O resources uses by this server""" 202 if not self.is_shutdown: 203 self.is_shutdown = True 204 self.server_sock.close()
205 206 # base maintains a tcpros_server singleton that is shared between 207 # services and topics for inbound connections. This global is set in 208 # the tcprosserver constructor. Constructor is called by init_tcpros() 209 _tcpros_server = None 210
211 -def init_tcpros_server(port=0):
212 """ 213 starts the TCPROS server socket for inbound connections 214 @param port: listen on the provided port. If the port number is 0, the port will 215 be chosen randomly 216 @type port: int 217 """ 218 global _tcpros_server 219 if _tcpros_server is None: 220 _tcpros_server = TCPROSServer(port=port) 221 rospy.core.add_shutdown_hook(_tcpros_server.shutdown) 222 return _tcpros_server
223
224 -def start_tcpros_server():
225 """ 226 start the TCPROS server if it has not started already 227 """ 228 if _tcpros_server is None: 229 init_tcpros_server() 230 return _tcpros_server.start_server()
231 232 # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers 233
234 -def get_tcpros_server_address():
235 """ 236 get the address of the tcpros server. 237 @raise Exception: if tcpros server has not been started or created 238 """ 239 return _tcpros_server.get_address()
240
241 -def _error_connection_handler(sock, client_addr, header):
242 """ 243 utility handler that does nothing more than provide a rejection header 244 @param sock: socket connection 245 @type sock: socket.socket 246 @param client_addr: client address 247 @type client_addr: str 248 @param header: request header 249 @type header: dict 250 """ 251 return {'error': "unhandled connection"}
252
253 -class TCPROSServer(object):
254 """ 255 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 256 connections as well as ROS service connections over TCP. TCP server 257 socket is run once start_server() is called -- this is implicitly 258 called during init_publisher(). 259 """ 260
261 - def __init__(self, port=0):
262 """ 263 Constructur 264 @param port: port number to bind to (default 0/any) 265 @type port: int 266 """ 267 self.port = port 268 self.tcp_ros_server = None #: server for receiving tcp conn 269 self.lock = threading.Lock() 270 # should be set to fn(sock, client_addr, header) for topic connections 271 self.topic_connection_handler = _error_connection_handler 272 # should be set to fn(sock, client_addr, header) for service connections 273 self.service_connection_handler = _error_connection_handler
274
275 - def start_server(self):
276 """ 277 Starts the TCP socket server if one is not already running 278 """ 279 if self.tcp_ros_server: 280 return 281 with self.lock: 282 try: 283 if not self.tcp_ros_server: 284 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port) 285 self.tcp_ros_server.start() 286 except Exception as e: 287 self.tcp_ros_server = None 288 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 289 return 0, "unable to establish TCPROS server: %s"%e, []
290
291 - def get_address(self):
292 """ 293 @return: address and port of TCP server socket for accepting 294 inbound connections 295 @rtype: str, int 296 """ 297 if self.tcp_ros_server is not None: 298 return self.tcp_ros_server.get_full_addr() 299 return None, None
300
301 - def shutdown(self, reason=''):
302 """stops the TCP/IP server responsible for receiving inbound connections""" 303 if self.tcp_ros_server: 304 self.tcp_ros_server.shutdown()
305
306 - def _tcp_server_callback(self, sock, client_addr):
307 """ 308 TCPServer callback: detects incoming topic or service connection and passes connection accordingly 309 310 @param sock: socket connection 311 @type sock: socket.socket 312 @param client_addr: client address 313 @type client_addr: (str, int) 314 @raise TransportInitError: If transport cannot be succesfully initialized 315 """ 316 #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, 317 #and then use that to do the writing 318 try: 319 buff_size = 4096 # size of read buffer 320 if python3 == 0: 321 #initialize read_ros_handshake_header with BytesIO for Python 3 (instead of bytesarray()) 322 header = read_ros_handshake_header(sock, StringIO(), buff_size) 323 else: 324 header = read_ros_handshake_header(sock, BytesIO(), buff_size) 325 326 if 'topic' in header: 327 err_msg = self.topic_connection_handler(sock, client_addr, header) 328 elif 'service' in header: 329 err_msg = self.service_connection_handler(sock, client_addr, header) 330 else: 331 err_msg = 'no topic or service name detected' 332 if err_msg: 333 # shutdown race condition: nodes that come up and down 334 # quickly can receive connections during teardown. 335 336 # We use is_shutdown_requested() because we can get 337 # into bad connection states during client shutdown 338 # hooks. 339 if not rospy.core.is_shutdown_requested(): 340 write_ros_handshake_header(sock, {'error' : err_msg}) 341 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 342 else: 343 write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 344 return 345 except rospy.exceptions.TransportInitError as e: 346 logwarn(str(e)) 347 if sock is not None: 348 sock.close() 349 except Exception as e: 350 # collect stack trace separately in local log file 351 if not rospy.core.is_shutdown_requested(): 352 logwarn("Inbound TCP/IP connection failed: %s", e) 353 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc()) 354 if sock is not None: 355 sock.close()
356
357 -class TCPROSTransportProtocol(object):
358 """ 359 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 360 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 361 well as which fields to send when creating a new connection (get_header_fields()). 362 """ 363
364 - def __init__(self, resolved_name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
365 """ 366 ctor 367 @param resolved_name: resolved service or topic name 368 @type resolved_name: str 369 @param recv_data_class: message class for deserializing inbound messages 370 @type recv_data_class: Class 371 @param queue_size: maximum number of inbound messages to maintain 372 @type queue_size: int 373 @param buff_size: receive buffer size (in bytes) for reading from the connection. 374 @type buff_size: int 375 """ 376 if recv_data_class and not issubclass(recv_data_class, Message): 377 raise TransportInitError("Unable to initialize transport: data class is not a message data class") 378 self.resolved_name = resolved_name 379 self.recv_data_class = recv_data_class 380 self.queue_size = queue_size 381 self.buff_size = buff_size 382 self.direction = BIDIRECTIONAL
383 384
385 - def read_messages(self, b, msg_queue, sock):
386 """ 387 @param b StringIO: read buffer 388 @param msg_queue [Message]: queue of deserialized messages 389 @type msg_queue: [Message] 390 @param sock socket: protocol can optionally read more data from 391 the socket, but in most cases the required data will already be 392 in b 393 """ 394 # default implementation 395 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
396
397 - def get_header_fields(self):
398 """ 399 Header fields that should be sent over the connection. The header fields 400 are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 401 @return: {str : str}: header fields to send over connection 402 @rtype: dict 403 """ 404 return {}
405 406 # TODO: this still isn't as clean and seamless as I want it to 407 # be. This code came from the merger of publisher, subscriber, and 408 # service code into a common TCPROS transport class. The transport is 409 # customized by a 'protocol' class, which is how the different 410 # pub/sub/service behaviors are achieved. More behavior needs to be 411 # transferred from the transport class into the protocol class, 412 # e.g. deserialization as the state each maintains is somewhat 413 # duplicative. I would also come up with a better name than 414 # protocol. 415
416 -class TCPROSTransport(Transport):
417 """ 418 Generic implementation of TCPROS exchange routines for both topics and services 419 """ 420 transport_type = 'TCPROS' 421
422 - def __init__(self, protocol, name, header=None):
423 """ 424 ctor 425 @param name str: identifier 426 @param protocol TCPROSTransportProtocol protocol implementation 427 @param header dict: (optional) handshake header if transport handshake header was 428 already read off of transport. 429 @raise TransportInitError if transport cannot be initialized according to arguments 430 """ 431 super(TCPROSTransport, self).__init__(protocol.direction, name=name) 432 if not name: 433 raise TransportInitError("Unable to initialize transport: name is not set") 434 435 self.protocol = protocol 436 437 self.socket = None 438 self.endpoint_id = 'unknown' 439 self.callerid_pub = 'unknown' 440 self.dest_address = None # for reconnection 441 442 if python3 == 0: # Python 2.x 443 self.read_buff = StringIO() 444 self.write_buff = StringIO() 445 else: # Python 3.x 446 self.read_buff = BytesIO() 447 self.write_buff = BytesIO() 448 449 #self.write_buff = StringIO() 450 self.header = header 451 452 # #1852 have to hold onto latched messages on subscriber side 453 self.is_latched = False 454 self.latch = None 455 456 # save the fileno separately so we can garbage collect the 457 # socket but still unregister will poll objects 458 self._fileno = None 459 460 # these fields are actually set by the remote 461 # publisher/service. they are set for tools that connect 462 # without knowing the actual field name 463 self.md5sum = None 464 self.type = None 465 466 # Endpoint Details (IP, Port) 467 self.local_endpoint = (None, None) 468 self.remote_endpoint = (None, None)
469
470 - def get_transport_info(self):
471 """ 472 Get detailed connection information. 473 Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp' 474 e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6] 475 """ 476 return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno)
477
478 - def fileno(self):
479 """ 480 Get descriptor for select 481 """ 482 return self._fileno
483
484 - def set_endpoint_id(self, endpoint_id):
485 """ 486 Set the endpoint_id of this transport. 487 Allows the endpoint_id to be set before the socket is initialized. 488 """ 489 self.endpoint_id = endpoint_id
490
491 - def set_socket(self, sock, endpoint_id):
492 """ 493 Set the socket for this transport 494 @param sock: socket 495 @type sock: socket.socket 496 @param endpoint_id: identifier for connection endpoint 497 @type endpoint_id: str 498 @raise TransportInitError: if socket has already been set 499 """ 500 if self.socket is not None: 501 raise TransportInitError("socket already initialized") 502 self.socket = sock 503 self.endpoint_id = endpoint_id 504 self._fileno = sock.fileno() 505 self.local_endpoint = self.socket.getsockname()
506
507 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
508 """ 509 Establish TCP connection to the specified 510 address/port. connect() always calls L{write_header()} and 511 L{read_header()} after the connection is made 512 @param dest_addr: destination IP address 513 @type dest_addr: str 514 @param dest_port: destination port 515 @type dest_port: int 516 @param endpoint_id: string identifier for connection (for statistics) 517 @type endpoint_id: str 518 @param timeout: (optional keyword) timeout in seconds 519 @type timeout: float 520 @raise TransportInitError: if unable to create connection 521 """ 522 # first make sure that if ROS_HOSTNAME=localhost, we will not attempt 523 # to connect to anything other than localhost 524 if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"): 525 if not rosgraph.network.is_local_address(dest_addr): 526 msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr) 527 logwarn(msg) 528 self.close() 529 raise TransportInitError(msg) # bubble up 530 531 # now we can proceed with trying to connect. 532 try: 533 self.endpoint_id = endpoint_id 534 self.dest_address = (dest_addr, dest_port) 535 if rosgraph.network.use_ipv6(): 536 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 537 else: 538 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 539 if _is_use_tcp_keepalive(): 540 # OSX (among others) does not define these options 541 if hasattr(socket, 'TCP_KEEPCNT') and \ 542 hasattr(socket, 'TCP_KEEPIDLE') and \ 543 hasattr(socket, 'TCP_KEEPINTVL'): 544 # turn on KEEPALIVE 545 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 546 # - # keepalive failures before actual connection failure 547 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 548 # - timeout before starting KEEPALIVE process 549 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 550 # - interval to send KEEPALIVE after IDLE timeout 551 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 552 if timeout is not None: 553 s.settimeout(timeout) 554 self.socket = s 555 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port)) 556 self.socket.connect((dest_addr, dest_port)) 557 self.write_header() 558 self.read_header() 559 self.local_endpoint = self.socket.getsockname() 560 self.remote_endpoint = (dest_addr, dest_port) 561 except TransportInitError as tie: 562 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 563 raise 564 except Exception as e: 565 #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e))) 566 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 567 # check for error type and reason. On unknown errors the socket will be closed 568 # to avoid reconnection and error reproduction 569 if not isinstance(e, socket.error): 570 # FATAL: no reconnection as error is unknown 571 self.close() 572 elif not isinstance(e, socket.timeout) and e.errno not in [100, 101, 102, 103, 110, 112, 113]: 573 # reconnect in follow cases, otherwise close the socket: 574 # 1. socket.timeout: on timeouts caused by delays on wireless links 575 # 2. ENETDOWN (100), ENETUNREACH (101), ENETRESET (102), ECONNABORTED (103): 576 # while using ROS_HOSTNAME ros binds to a specific interface. Theses errors 577 # are thrown on interface shutdown e.g. on reconnection in LTE networks 578 # 3. ETIMEDOUT (110): same like 1. (for completeness) 579 # 4. EHOSTDOWN (112), EHOSTUNREACH (113): while network and/or DNS-server is not reachable 580 # 581 # no reconnection as error is not 1.-4. 582 self.close() 583 raise TransportInitError(str(e)) #re-raise i/o error
584
585 - def _validate_header(self, header):
586 """ 587 Validate header and initialize fields accordingly 588 @param header: header fields from publisher 589 @type header: dict 590 @raise TransportInitError: if header fails to validate 591 """ 592 self.header = header 593 if 'error' in header: 594 raise TransportInitError("remote error reported: %s"%header['error']) 595 for required in ['md5sum', 'type']: 596 if not required in header: 597 raise TransportInitError("header missing required field [%s]"%required) 598 self.type = header['type'] 599 self.md5sum = header['md5sum'] 600 if 'callerid' in header: 601 self.callerid_pub = header['callerid'] 602 if header.get('latching', '0') == '1': 603 self.is_latched = True
604
605 - def write_header(self):
606 """Writes the TCPROS header to the active connection.""" 607 # socket may still be getting spun up, so wait for it to be writable 608 sock = self.socket 609 protocol = self.protocol 610 # race condition on close, better fix is to pass these in, 611 # functional style, but right now trying to cause minimal 612 # perturbance to codebase. 613 if sock is None or protocol is None: 614 return 615 fileno = sock.fileno() 616 ready = None 617 poller = None 618 if hasattr(select, 'poll'): 619 poller = select.poll() 620 poller.register(fileno, select.POLLOUT) 621 while not ready: 622 events = poller.poll() 623 for _, flag in events: 624 if flag & select.POLLOUT: 625 ready = True 626 else: 627 while not ready: 628 try: 629 _, ready, _ = select.select([], [fileno], []) 630 except ValueError as e: 631 logger.error("[%s]: select fileno '%s': %s", self.name, str(fileno), str(e)) 632 raise 633 634 logger.debug("[%s]: writing header", self.name) 635 sock.setblocking(1) 636 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields()) 637 if poller: 638 poller.unregister(fileno)
639 640
641 - def read_header(self):
642 """ 643 Read TCPROS header from active socket 644 @raise TransportInitError if header fails to validate 645 """ 646 sock = self.socket 647 if sock is None: 648 return 649 sock.setblocking(1) 650 # TODO: add bytes received to self.stat_bytes 651 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
652
653 - def send_message(self, msg, seq):
654 """ 655 Convenience routine for services to send a message across a 656 particular connection. NOTE: write_data is much more efficient 657 if same message is being sent to multiple connections. Not 658 threadsafe. 659 @param msg: message to send 660 @type msg: Msg 661 @param seq: sequence number for message 662 @type seq: int 663 @raise TransportException: if error occurred sending message 664 """ 665 # this will call write_data(), so no need to keep track of stats 666 serialize_message(self.write_buff, seq, msg) 667 self.write_data(self.write_buff.getvalue()) 668 self.write_buff.truncate(0)
669
670 - def write_data(self, data):
671 """ 672 Write raw data to transport 673 @raise TransportInitialiationError: could not be initialized 674 @raise TransportTerminated: no longer open for publishing 675 """ 676 if not self.socket: 677 raise TransportInitError("TCPROS transport was not successfully initialized") 678 if self.done: 679 raise TransportTerminated("connection closed") 680 try: 681 #TODO: get rid of sendalls and replace with async-style publishing 682 self.socket.sendall(data) 683 self.stat_bytes += len(data) 684 self.stat_num_msg += 1 685 except IOError as ioe: 686 #for now, just document common errno's in code 687 (errno, msg) = ioe.args 688 if errno == 32: #broken pipe 689 logdebug("ERROR: Broken Pipe") 690 self.close() 691 raise TransportTerminated(str(errno)+msg) 692 raise #re-raise 693 except socket.error as se: 694 #for now, just document common errno's in code 695 (errno, msg) = se.args 696 if errno == 32: #broken pipe 697 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 698 self.close() 699 raise TransportTerminated(msg) 700 elif errno == 104: #connection reset by peer 701 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id) 702 self.close() 703 raise TransportTerminated(msg) 704 else: 705 rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 706 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg) 707 self.close() 708 raise TransportTerminated(str(errno)+' '+msg) 709 return True
710
711 - def receive_once(self):
712 """ 713 block until messages are read off of socket 714 @return: list of newly received messages 715 @rtype: [Msg] 716 @raise TransportException: if unable to receive message due to error 717 """ 718 sock = self.socket 719 if sock is None: 720 raise TransportException("connection not initialized") 721 b = self.read_buff 722 msg_queue = [] 723 p = self.protocol 724 try: 725 sock.setblocking(1) 726 while not msg_queue and not self.done and not is_shutdown(): 727 if b.tell() >= 4: 728 p.read_messages(b, msg_queue, sock) 729 if not msg_queue: 730 self.stat_bytes += recv_buff(sock, b, p.buff_size) 731 self.stat_num_msg += len(msg_queue) #STATS 732 # set the _connection_header field 733 for m in msg_queue: 734 m._connection_header = self.header 735 736 # #1852: keep track of last latched message 737 if self.is_latched and msg_queue: 738 self.latch = msg_queue[-1] 739 740 return msg_queue 741 742 except DeserializationError as e: 743 rospyerr(traceback.format_exc()) 744 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 745 except TransportTerminated as e: 746 raise #reraise 747 except ServiceException as e: 748 raise 749 except Exception as e: 750 rospyerr(traceback.format_exc()) 751 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 752 return retval
753
754 - def _reconnect(self):
755 # This reconnection logic is very hacky right now. I need to 756 # rewrite the I/O core so that this is handled more centrally. 757 758 if self.dest_address is None: 759 raise ROSInitException("internal error with reconnection state: address not stored") 760 761 interval = 0.5 # seconds 762 while self.socket is None and not self.done and not rospy.is_shutdown(): 763 try: 764 # set a timeout so that we can continue polling for 765 # exit. 30. is a bit high, but I'm concerned about 766 # embedded platforms. To do this properly, we'd have 767 # to move to non-blocking routines. 768 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 769 except TransportInitError: 770 self.socket = None 771 772 if self.socket is None and interval < 30.: 773 # exponential backoff (maximum 32 seconds) 774 interval = interval * 2 775 776 time.sleep(interval)
777
778 - def receive_loop(self, msgs_callback):
779 """ 780 Receive messages until shutdown 781 @param msgs_callback: callback to invoke for new messages received 782 @type msgs_callback: fn([msg]) 783 """ 784 # - use assert here as this would be an internal error, aka bug 785 logger.debug("receive_loop for [%s]", self.name) 786 try: 787 while not self.done and not is_shutdown(): 788 try: 789 if self.socket is not None: 790 msgs = self.receive_once() 791 if not self.done and not is_shutdown(): 792 msgs_callback(msgs, self) 793 else: 794 self._reconnect() 795 796 except TransportException as e: 797 # set socket to None so we reconnect 798 try: 799 if self.socket is not None: 800 try: 801 self.socket.shutdown() 802 except: 803 pass 804 finally: 805 self.socket.close() 806 except: 807 pass 808 self.socket = None 809 810 except DeserializationError as e: 811 #TODO: how should we handle reconnect in this case? 812 813 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e)) 814 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc()) 815 except: 816 # in many cases this will be a normal hangup, but log internally 817 try: 818 #1467 sometimes we get exceptions due to 819 #interpreter shutdown, so blanket ignore those if 820 #the reporting fails 821 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) 822 except: pass 823 824 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) 825 finally: 826 if not self.done: 827 self.close()
828
829 - def close(self):
830 """close i/o and release resources""" 831 if not self.done: 832 try: 833 if self.socket is not None: 834 try: 835 self.socket.shutdown(socket.SHUT_RDWR) 836 except: 837 pass 838 finally: 839 self.socket.close() 840 finally: 841 self.socket = self.read_buff = self.write_buff = self.protocol = None 842 super(TCPROSTransport, self).close()
843