Package rospy :: Package impl :: Module tcpros_base

Source Code for Module rospy.impl.tcpros_base

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37   
 38  try: 
 39      from cStringIO import StringIO #Python 2.x 
 40      python3 = 0 
 41  except ImportError: 
 42      from io import StringIO, BytesIO #Python 3.x 
 43      python3 = 1 
 44  import errno 
 45  import socket 
 46  import logging 
 47   
 48  import threading 
 49  import time 
 50  import traceback 
 51  import select 
 52   
 53  import rosgraph 
 54  import rosgraph.network 
 55  from genpy import DeserializationError, Message 
 56  from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header 
 57   
 58  # TODO: remove * import from core 
 59  from rospy.core import * 
 60  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 61  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 62  from rospy.msg import deserialize_messages, serialize_message 
 63  from rospy.service import ServiceException 
 64   
 65  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 66   
 67  logger = logging.getLogger('rospy.tcpros') 
 68   
 69  # Receive buffer size for topics/services (in bytes) 
 70  DEFAULT_BUFF_SIZE = 65536 
 71   
 72  ## name of our customized TCP protocol for accepting flows over server socket 
 73  TCPROS = "TCPROS"  
 74   
 75  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 76  _use_tcp_keepalive = None 
 77  _use_tcp_keepalive_lock = threading.Lock() 
78 -def _is_use_tcp_keepalive():
79 global _use_tcp_keepalive 80 if _use_tcp_keepalive is not None: 81 return _use_tcp_keepalive 82 with _use_tcp_keepalive_lock: 83 if _use_tcp_keepalive is not None: 84 return _use_tcp_keepalive 85 # in order to prevent circular dependencies, this does not use the 86 # builtin libraries for interacting with the parameter server 87 m = rospy.core.xmlrpcapi(rosgraph.get_master_uri()) 88 code, msg, val = m.getParam(rospy.names.get_caller_id(), _PARAM_TCP_KEEPALIVE) 89 _use_tcp_keepalive = val if code == 1 else True 90 return _use_tcp_keepalive
91
92 -def recv_buff(sock, b, buff_size):
93 """ 94 Read data from socket into buffer. 95 @param sock: socket to read from 96 @type sock: socket.socket 97 @param b: buffer to receive into 98 @type b: StringIO 99 @param buff_size: recv read size 100 @type buff_size: int 101 @return: number of bytes read 102 @rtype: int 103 """ 104 d = sock.recv(buff_size) 105 if d: 106 b.write(d) 107 return len(d) 108 else: #bomb out 109 raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
110
111 -class TCPServer(object):
112 """ 113 Simple server that accepts inbound TCP/IP connections and hands 114 them off to a handler function. TCPServer obeys the 115 ROS_IP/ROS_HOSTNAME environment variables 116 """ 117
118 - def __init__(self, inbound_handler, port=0):
119 """ 120 Setup a server socket listening on the specified port. If the 121 port is omitted, will choose any open port. 122 @param inbound_handler: handler to invoke with 123 new connection 124 @type inbound_handler: fn(sock, addr) 125 @param port: port to bind to, omit/0 to bind to any 126 @type port: int 127 """ 128 self.port = port #will get overwritten if port=0 129 self.addr = None #set at socket bind 130 self.is_shutdown = False 131 self.inbound_handler = inbound_handler 132 try: 133 self.server_sock = self._create_server_sock() 134 except: 135 self.server_sock = None 136 raise
137
138 - def start(self):
139 """Runs the run() loop in a separate thread""" 140 t = threading.Thread(target=self.run, args=()) 141 t.daemon = True 142 t.start()
143
144 - def run(self):
145 """ 146 Main TCP receive loop. Should be run in a separate thread -- use start() 147 to do this automatically. 148 """ 149 self.is_shutdown = False 150 if not self.server_sock: 151 raise ROSInternalException("%s did not connect"%self.__class__.__name__) 152 while not self.is_shutdown: 153 try: 154 (client_sock, client_addr) = self.server_sock.accept() 155 except socket.timeout: 156 continue 157 except IOError as e: 158 (e_errno, msg) = e.args 159 if e_errno == errno.EINTR: #interrupted system call 160 continue 161 raise 162 if self.is_shutdown: 163 break 164 try: 165 #leave threading decisions up to inbound_handler 166 self.inbound_handler(client_sock, client_addr) 167 except socket.error as e: 168 if not self.is_shutdown: 169 traceback.print_exc() 170 logwarn("Failed to handle inbound connection due to socket error: %s"%e) 171 logdebug("TCPServer[%s] shutting down", self.port)
172 173
174 - def get_full_addr(self):
175 """ 176 @return: (ip address, port) of server socket binding 177 @rtype: (str, int) 178 """ 179 # return rosgraph.network.get_host_name() instead of address so that it 180 # obeys ROS_IP/ROS_HOSTNAME behavior 181 return (rosgraph.network.get_host_name(), self.port)
182
183 - def _create_server_sock(self):
184 """ 185 binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 186 binding to loopback interface. 187 """ 188 if rosgraph.network.use_ipv6(): 189 server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 190 else: 191 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 192 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 193 logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port)) 194 server_sock.bind((rosgraph.network.get_bind_address(), self.port)) 195 (self.addr, self.port) = server_sock.getsockname()[0:2] 196 logdebug('bound to ' + str(self.addr) + ' ' + str(self.port)) 197 server_sock.listen(5) 198 return server_sock
199
200 - def shutdown(self):
201 """shutdown I/O resources uses by this server""" 202 if not self.is_shutdown: 203 self.is_shutdown = True 204 self.server_sock.close()
205 206 # base maintains a tcpros_server singleton that is shared between 207 # services and topics for inbound connections. This global is set in 208 # the tcprosserver constructor. Constructor is called by init_tcpros() 209 _tcpros_server = None 210
211 -def init_tcpros_server(port=0):
212 """ 213 starts the TCPROS server socket for inbound connections 214 @param port: listen on the provided port. If the port number is 0, the port will 215 be chosen randomly 216 @type port: int 217 """ 218 global _tcpros_server 219 if _tcpros_server is None: 220 _tcpros_server = TCPROSServer(port=port) 221 rospy.core.add_shutdown_hook(_tcpros_server.shutdown) 222 return _tcpros_server
223
224 -def start_tcpros_server():
225 """ 226 start the TCPROS server if it has not started already 227 """ 228 if _tcpros_server is None: 229 init_tcpros_server() 230 return _tcpros_server.start_server()
231 232 # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers 233
234 -def get_tcpros_server_address():
235 """ 236 get the address of the tcpros server. 237 @raise Exception: if tcpros server has not been started or created 238 """ 239 return _tcpros_server.get_address()
240
241 -def _error_connection_handler(sock, client_addr, header):
242 """ 243 utility handler that does nothing more than provide a rejection header 244 @param sock: socket connection 245 @type sock: socket.socket 246 @param client_addr: client address 247 @type client_addr: str 248 @param header: request header 249 @type header: dict 250 """ 251 return {'error': "unhandled connection"}
252
253 -class TCPROSServer(object):
254 """ 255 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 256 connections as well as ROS service connections over TCP. TCP server 257 socket is run once start_server() is called -- this is implicitly 258 called during init_publisher(). 259 """ 260
261 - def __init__(self, port=0):
262 """ 263 Constructur 264 @param port: port number to bind to (default 0/any) 265 @type port: int 266 """ 267 self.port = port 268 self.tcp_ros_server = None #: server for receiving tcp conn 269 self.lock = threading.Lock() 270 # should be set to fn(sock, client_addr, header) for topic connections 271 self.topic_connection_handler = _error_connection_handler 272 # should be set to fn(sock, client_addr, header) for service connections 273 self.service_connection_handler = _error_connection_handler
274
275 - def start_server(self):
276 """ 277 Starts the TCP socket server if one is not already running 278 """ 279 if self.tcp_ros_server: 280 return 281 with self.lock: 282 try: 283 if not self.tcp_ros_server: 284 self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port) 285 self.tcp_ros_server.start() 286 except Exception as e: 287 self.tcp_ros_server = None 288 logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 289 return 0, "unable to establish TCPROS server: %s"%e, []
290
291 - def get_address(self):
292 """ 293 @return: address and port of TCP server socket for accepting 294 inbound connections 295 @rtype: str, int 296 """ 297 if self.tcp_ros_server is not None: 298 return self.tcp_ros_server.get_full_addr() 299 return None, None
300
301 - def shutdown(self, reason=''):
302 """stops the TCP/IP server responsible for receiving inbound connections""" 303 if self.tcp_ros_server: 304 self.tcp_ros_server.shutdown()
305
306 - def _tcp_server_callback(self, sock, client_addr):
307 """ 308 TCPServer callback: detects incoming topic or service connection and passes connection accordingly 309 310 @param sock: socket connection 311 @type sock: socket.socket 312 @param client_addr: client address 313 @type client_addr: (str, int) 314 @raise TransportInitError: If transport cannot be succesfully initialized 315 """ 316 #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, 317 #and then use that to do the writing 318 try: 319 buff_size = 4096 # size of read buffer 320 if python3 == 0: 321 #initialize read_ros_handshake_header with BytesIO for Python 3 (instead of bytesarray()) 322 header = read_ros_handshake_header(sock, StringIO(), buff_size) 323 else: 324 header = read_ros_handshake_header(sock, BytesIO(), buff_size) 325 326 if 'topic' in header: 327 err_msg = self.topic_connection_handler(sock, client_addr, header) 328 elif 'service' in header: 329 err_msg = self.service_connection_handler(sock, client_addr, header) 330 else: 331 err_msg = 'no topic or service name detected' 332 if err_msg: 333 # shutdown race condition: nodes that come up and down 334 # quickly can receive connections during teardown. 335 336 # We use is_shutdown_requested() because we can get 337 # into bad connection states during client shutdown 338 # hooks. 339 if not rospy.core.is_shutdown_requested(): 340 write_ros_handshake_header(sock, {'error' : err_msg}) 341 raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 342 else: 343 write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 344 return 345 except rospy.exceptions.TransportInitError as e: 346 logwarn(str(e)) 347 if sock is not None: 348 sock.close() 349 except Exception as e: 350 # collect stack trace separately in local log file 351 if not rospy.core.is_shutdown_requested(): 352 logwarn("Inbound TCP/IP connection failed: %s", e) 353 rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc()) 354 if sock is not None: 355 sock.close()
356
357 -class TCPROSTransportProtocol(object):
358 """ 359 Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 360 protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 361 well as which fields to send when creating a new connection (get_header_fields()). 362 """ 363
364 - def __init__(self, resolved_name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
365 """ 366 ctor 367 @param resolved_name: resolved service or topic name 368 @type resolved_name: str 369 @param recv_data_class: message class for deserializing inbound messages 370 @type recv_data_class: Class 371 @param queue_size: maximum number of inbound messages to maintain 372 @type queue_size: int 373 @param buff_size: receive buffer size (in bytes) for reading from the connection. 374 @type buff_size: int 375 """ 376 if recv_data_class and not issubclass(recv_data_class, Message): 377 raise TransportInitError("Unable to initialize transport: data class is not a message data class") 378 self.resolved_name = resolved_name 379 self.recv_data_class = recv_data_class 380 self.queue_size = queue_size 381 self.buff_size = buff_size 382 self.direction = BIDIRECTIONAL
383 384
385 - def read_messages(self, b, msg_queue, sock):
386 """ 387 @param b StringIO: read buffer 388 @param msg_queue [Message]: queue of deserialized messages 389 @type msg_queue: [Message] 390 @param sock socket: protocol can optionally read more data from 391 the socket, but in most cases the required data will already be 392 in b 393 """ 394 # default implementation 395 deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
396
397 - def get_header_fields(self):
398 """ 399 Header fields that should be sent over the connection. The header fields 400 are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 401 @return: {str : str}: header fields to send over connection 402 @rtype: dict 403 """ 404 return {}
405 406 # TODO: this still isn't as clean and seamless as I want it to 407 # be. This code came from the merger of publisher, subscriber, and 408 # service code into a common TCPROS transport class. The transport is 409 # customized by a 'protocol' class, which is how the different 410 # pub/sub/service behaviors are achieved. More behavior needs to be 411 # transferred from the transport class into the protocol class, 412 # e.g. deserialization as the state each maintains is somewhat 413 # duplicative. I would also come up with a better name than 414 # protocol. 415
416 -class TCPROSTransport(Transport):
417 """ 418 Generic implementation of TCPROS exchange routines for both topics and services 419 """ 420 transport_type = 'TCPROS' 421
422 - def __init__(self, protocol, name, header=None):
423 """ 424 ctor 425 @param name str: identifier 426 @param protocol TCPROSTransportProtocol protocol implementation 427 @param header dict: (optional) handshake header if transport handshake header was 428 already read off of transport. 429 @raise TransportInitError if transport cannot be initialized according to arguments 430 """ 431 super(TCPROSTransport, self).__init__(protocol.direction, name=name) 432 if not name: 433 raise TransportInitError("Unable to initialize transport: name is not set") 434 435 self.protocol = protocol 436 437 self.socket = None 438 self.endpoint_id = 'unknown' 439 self.callerid_pub = 'unknown' 440 self.dest_address = None # for reconnection 441 442 if python3 == 0: # Python 2.x 443 self.read_buff = StringIO() 444 self.write_buff = StringIO() 445 else: # Python 3.x 446 self.read_buff = BytesIO() 447 self.write_buff = BytesIO() 448 449 #self.write_buff = StringIO() 450 self.header = header 451 452 # #1852 have to hold onto latched messages on subscriber side 453 self.is_latched = False 454 self.latch = None 455 456 # save the fileno separately so we can garbage collect the 457 # socket but still unregister will poll objects 458 self._fileno = None 459 460 # these fields are actually set by the remote 461 # publisher/service. they are set for tools that connect 462 # without knowing the actual field name 463 self.md5sum = None 464 self.type = None 465 466 # Endpoint Details (IP, Port) 467 self.local_endpoint = (None, None) 468 self.remote_endpoint = (None, None)
469
470 - def get_transport_info(self):
471 """ 472 Get detailed connection information. 473 Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp' 474 e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6] 475 """ 476 return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno)
477
478 - def fileno(self):
479 """ 480 Get descriptor for select 481 """ 482 return self._fileno
483
484 - def set_endpoint_id(self, endpoint_id):
485 """ 486 Set the endpoint_id of this transport. 487 Allows the endpoint_id to be set before the socket is initialized. 488 """ 489 self.endpoint_id = endpoint_id
490
491 - def set_socket(self, sock, endpoint_id):
492 """ 493 Set the socket for this transport 494 @param sock: socket 495 @type sock: socket.socket 496 @param endpoint_id: identifier for connection endpoint 497 @type endpoint_id: str 498 @raise TransportInitError: if socket has already been set 499 """ 500 if self.socket is not None: 501 raise TransportInitError("socket already initialized") 502 self.socket = sock 503 self.endpoint_id = endpoint_id 504 self._fileno = sock.fileno() 505 self.local_endpoint = self.socket.getsockname()
506
507 - def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
508 """ 509 Establish TCP connection to the specified 510 address/port. connect() always calls L{write_header()} and 511 L{read_header()} after the connection is made 512 @param dest_addr: destination IP address 513 @type dest_addr: str 514 @param dest_port: destination port 515 @type dest_port: int 516 @param endpoint_id: string identifier for connection (for statistics) 517 @type endpoint_id: str 518 @param timeout: (optional keyword) timeout in seconds 519 @type timeout: float 520 @raise TransportInitError: if unable to create connection 521 """ 522 # first make sure that if ROS_HOSTNAME=localhost, we will not attempt 523 # to connect to anything other than localhost 524 if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"): 525 if not rosgraph.network.is_local_address(dest_addr): 526 msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr) 527 logwarn(msg) 528 self.close() 529 raise TransportInitError(msg) # bubble up 530 531 # now we can proceed with trying to connect. 532 try: 533 self.endpoint_id = endpoint_id 534 self.dest_address = (dest_addr, dest_port) 535 if rosgraph.network.use_ipv6(): 536 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 537 else: 538 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 539 if _is_use_tcp_keepalive(): 540 # OSX (among others) does not define these options 541 if hasattr(socket, 'TCP_KEEPCNT') and \ 542 hasattr(socket, 'TCP_KEEPIDLE') and \ 543 hasattr(socket, 'TCP_KEEPINTVL'): 544 # turn on KEEPALIVE 545 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 546 # - # keepalive failures before actual connection failure 547 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 548 # - timeout before starting KEEPALIVE process 549 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 550 # - interval to send KEEPALIVE after IDLE timeout 551 s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 552 if timeout is not None: 553 s.settimeout(timeout) 554 self.socket = s 555 logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port)) 556 self.socket.connect((dest_addr, dest_port)) 557 self.write_header() 558 self.read_header() 559 self.local_endpoint = self.socket.getsockname() 560 self.remote_endpoint = (dest_addr, dest_port) 561 except TransportInitError as tie: 562 rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 563 raise 564 except Exception as e: 565 #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e))) 566 rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 567 # check for error type and reason. On unknown errors the socket will be closed 568 # to avoid reconnection and error reproduction 569 if not isinstance(e, socket.error): 570 # FATAL: no reconnection as error is unknown 571 self.close() 572 elif not isinstance(e, socket.timeout) and e.errno not in [ 573 errno.ENETDOWN, errno.ENETUNREACH, errno.ENETRESET, 574 errno.ECONNABORTED, errno.ETIMEDOUT, errno.EHOSTDOWN, errno.EHOSTUNREACH]: 575 # reconnect in follow cases, otherwise close the socket: 576 # 1. socket.timeout: on timeouts caused by delays on wireless links 577 # 2. ENETDOWN (100), ENETUNREACH (101), ENETRESET (102), ECONNABORTED (103): 578 # while using ROS_HOSTNAME ros binds to a specific interface. Theses errors 579 # are thrown on interface shutdown e.g. on reconnection in LTE networks 580 # 3. ETIMEDOUT (110): same like 1. (for completeness) 581 # 4. EHOSTDOWN (112), EHOSTUNREACH (113): while network and/or DNS-server is not reachable 582 # 583 # no reconnection as error is not 1.-4. 584 self.close() 585 raise TransportInitError(str(e)) #re-raise i/o error
586
587 - def _validate_header(self, header):
588 """ 589 Validate header and initialize fields accordingly 590 @param header: header fields from publisher 591 @type header: dict 592 @raise TransportInitError: if header fails to validate 593 """ 594 self.header = header 595 if 'error' in header: 596 raise TransportInitError("remote error reported: %s"%header['error']) 597 for required in ['md5sum', 'type']: 598 if not required in header: 599 raise TransportInitError("header missing required field [%s]"%required) 600 self.type = header['type'] 601 self.md5sum = header['md5sum'] 602 if 'callerid' in header: 603 self.callerid_pub = header['callerid'] 604 if header.get('latching', '0') == '1': 605 self.is_latched = True
606
607 - def write_header(self):
608 """Writes the TCPROS header to the active connection.""" 609 # socket may still be getting spun up, so wait for it to be writable 610 sock = self.socket 611 protocol = self.protocol 612 # race condition on close, better fix is to pass these in, 613 # functional style, but right now trying to cause minimal 614 # perturbance to codebase. 615 if sock is None or protocol is None: 616 return 617 fileno = sock.fileno() 618 ready = None 619 poller = None 620 if hasattr(select, 'poll'): 621 poller = select.poll() 622 poller.register(fileno, select.POLLOUT) 623 while not ready: 624 events = poller.poll() 625 for _, flag in events: 626 if flag & select.POLLOUT: 627 ready = True 628 else: 629 while not ready: 630 try: 631 _, ready, _ = select.select([], [fileno], []) 632 except ValueError as e: 633 logger.error("[%s]: select fileno '%s': %s", self.name, str(fileno), str(e)) 634 raise 635 636 logger.debug("[%s]: writing header", self.name) 637 sock.setblocking(1) 638 self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields()) 639 if poller: 640 poller.unregister(fileno)
641 642
643 - def read_header(self):
644 """ 645 Read TCPROS header from active socket 646 @raise TransportInitError if header fails to validate 647 """ 648 sock = self.socket 649 if sock is None: 650 return 651 sock.setblocking(1) 652 # TODO: add bytes received to self.stat_bytes 653 self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
654
655 - def send_message(self, msg, seq):
656 """ 657 Convenience routine for services to send a message across a 658 particular connection. NOTE: write_data is much more efficient 659 if same message is being sent to multiple connections. Not 660 threadsafe. 661 @param msg: message to send 662 @type msg: Msg 663 @param seq: sequence number for message 664 @type seq: int 665 @raise TransportException: if error occurred sending message 666 """ 667 # this will call write_data(), so no need to keep track of stats 668 serialize_message(self.write_buff, seq, msg) 669 self.write_data(self.write_buff.getvalue()) 670 self.write_buff.truncate(0)
671
672 - def write_data(self, data):
673 """ 674 Write raw data to transport 675 @raise TransportInitialiationError: could not be initialized 676 @raise TransportTerminated: no longer open for publishing 677 """ 678 if not self.socket: 679 raise TransportInitError("TCPROS transport was not successfully initialized") 680 if self.done: 681 raise TransportTerminated("connection closed") 682 try: 683 #TODO: get rid of sendalls and replace with async-style publishing 684 self.socket.sendall(data) 685 self.stat_bytes += len(data) 686 self.stat_num_msg += 1 687 except IOError as ioe: 688 #for now, just document common errno's in code 689 (ioe_errno, msg) = ioe.args 690 if ioe_errno == errno.EPIPE: #broken pipe 691 logdebug("ERROR: Broken Pipe") 692 self.close() 693 raise TransportTerminated(str(ioe_errno)+msg) 694 raise #re-raise 695 except socket.error as se: 696 #for now, just document common errno's in code 697 (se_errno, msg) = se.args 698 if se_errno == errno.EPIPE: #broken pipe 699 logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 700 self.close() 701 raise TransportTerminated(msg) 702 elif se_errno == errno.ECONNRESET: #connection reset by peer 703 logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id) 704 self.close() 705 raise TransportTerminated(msg) 706 else: 707 rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 708 logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg) 709 self.close() 710 raise TransportTerminated(str(se_errno)+' '+msg) 711 return True
712
713 - def receive_once(self):
714 """ 715 block until messages are read off of socket 716 @return: list of newly received messages 717 @rtype: [Msg] 718 @raise TransportException: if unable to receive message due to error 719 """ 720 sock = self.socket 721 if sock is None: 722 raise TransportException("connection not initialized") 723 b = self.read_buff 724 msg_queue = [] 725 p = self.protocol 726 try: 727 sock.setblocking(1) 728 while not msg_queue and not self.done and not is_shutdown(): 729 if b.tell() >= 4: 730 p.read_messages(b, msg_queue, sock) 731 if not msg_queue: 732 self.stat_bytes += recv_buff(sock, b, p.buff_size) 733 self.stat_num_msg += len(msg_queue) #STATS 734 # set the _connection_header field 735 for m in msg_queue: 736 m._connection_header = self.header 737 738 # #1852: keep track of last latched message 739 if self.is_latched and msg_queue: 740 self.latch = msg_queue[-1] 741 742 return msg_queue 743 744 except DeserializationError as e: 745 rospyerr(traceback.format_exc()) 746 raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 747 except TransportTerminated as e: 748 raise #reraise 749 except ServiceException as e: 750 raise 751 except Exception as e: 752 rospyerr(traceback.format_exc()) 753 raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 754 return retval
755
756 - def _reconnect(self):
757 # This reconnection logic is very hacky right now. I need to 758 # rewrite the I/O core so that this is handled more centrally. 759 760 if self.dest_address is None: 761 raise ROSInitException("internal error with reconnection state: address not stored") 762 763 interval = 0.5 # seconds 764 while self.socket is None and not self.done and not rospy.is_shutdown(): 765 try: 766 # set a timeout so that we can continue polling for 767 # exit. 30. is a bit high, but I'm concerned about 768 # embedded platforms. To do this properly, we'd have 769 # to move to non-blocking routines. 770 self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 771 except TransportInitError: 772 self.socket = None 773 774 if self.socket is None and interval < 30.: 775 # exponential backoff (maximum 32 seconds) 776 interval = interval * 2 777 778 time.sleep(interval)
779
780 - def receive_loop(self, msgs_callback):
781 """ 782 Receive messages until shutdown 783 @param msgs_callback: callback to invoke for new messages received 784 @type msgs_callback: fn([msg]) 785 """ 786 # - use assert here as this would be an internal error, aka bug 787 logger.debug("receive_loop for [%s]", self.name) 788 try: 789 while not self.done and not is_shutdown(): 790 try: 791 if self.socket is not None: 792 msgs = self.receive_once() 793 if not self.done and not is_shutdown(): 794 msgs_callback(msgs, self) 795 else: 796 self._reconnect() 797 798 except TransportException as e: 799 # set socket to None so we reconnect 800 try: 801 if self.socket is not None: 802 try: 803 self.socket.shutdown() 804 except: 805 pass 806 finally: 807 self.socket.close() 808 except: 809 pass 810 self.socket = None 811 812 except DeserializationError as e: 813 #TODO: how should we handle reconnect in this case? 814 815 logerr("[%s] error deserializing incoming request: %s"%self.name, str(e)) 816 rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc()) 817 except: 818 # in many cases this will be a normal hangup, but log internally 819 try: 820 #1467 sometimes we get exceptions due to 821 #interpreter shutdown, so blanket ignore those if 822 #the reporting fails 823 rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc()) 824 except: pass 825 826 rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name) 827 finally: 828 if not self.done: 829 self.close()
830
831 - def close(self):
832 """close i/o and release resources""" 833 if not self.done: 834 try: 835 if self.socket is not None: 836 try: 837 self.socket.shutdown(socket.SHUT_RDWR) 838 except: 839 pass 840 finally: 841 self.socket.close() 842 finally: 843 self.socket = self.read_buff = self.write_buff = self.protocol = None 844 super(TCPROSTransport, self).close()
845