1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33   
 34   
 35  """Internal use: common TCPROS libraries""" 
 36   
 37   
 38  try: 
 39      from cStringIO import StringIO  
 40      python3 = 0 
 41  except ImportError: 
 42      from io import StringIO, BytesIO  
 43      python3 = 1 
 44  import socket 
 45  import logging 
 46   
 47  import threading 
 48  import time 
 49  import traceback 
 50  import select 
 51   
 52  import rosgraph 
 53  import rosgraph.network 
 54  from genpy import DeserializationError, Message 
 55  from rosgraph.network import read_ros_handshake_header, write_ros_handshake_header 
 56   
 57   
 58  from rospy.core import * 
 59  from rospy.core import logwarn, loginfo, logerr, logdebug, rospydebug, rospyerr, rospywarn 
 60  from rospy.exceptions import ROSInternalException, TransportException, TransportTerminated, TransportInitError 
 61  from rospy.msg import deserialize_messages, serialize_message 
 62  from rospy.service import ServiceException 
 63   
 64  from rospy.impl.transport import Transport, BIDIRECTIONAL 
 65   
 66  logger = logging.getLogger('rospy.tcpros') 
 67   
 68   
 69  DEFAULT_BUFF_SIZE = 65536 
 70   
 71   
 72  TCPROS = "TCPROS"  
 73   
 74  _PARAM_TCP_KEEPALIVE = '/tcp_keepalive' 
 75  _use_tcp_keepalive = None 
 76  _use_tcp_keepalive_lock = threading.Lock() 
 90   
 92      """ 
 93      Read data from socket into buffer. 
 94      @param sock: socket to read from 
 95      @type  sock: socket.socket 
 96      @param b: buffer to receive into 
 97      @type  b: StringIO 
 98      @param buff_size: recv read size 
 99      @type  buff_size: int 
100      @return: number of bytes read 
101      @rtype: int 
102      """ 
103      d = sock.recv(buff_size) 
104      if d: 
105          b.write(d) 
106          return len(d) 
107      else:  
108          raise TransportTerminated("unable to receive data from sender, check sender's logs for details") 
 109   
111      """ 
112      Simple server that accepts inbound TCP/IP connections and hands 
113      them off to a handler function. TCPServer obeys the 
114      ROS_IP/ROS_HOSTNAME environment variables 
115      """ 
116   
117 -    def __init__(self, inbound_handler, port=0): 
 118          """ 
119          Setup a server socket listening on the specified port. If the 
120          port is omitted, will choose any open port. 
121          @param inbound_handler: handler to invoke with 
122          new connection 
123          @type  inbound_handler: fn(sock, addr) 
124          @param port: port to bind to, omit/0 to bind to any 
125          @type  port: int 
126          """ 
127          self.port = port  
128          self.addr = None  
129          self.is_shutdown = False 
130          self.inbound_handler = inbound_handler 
131          try: 
132              self.server_sock = self._create_server_sock() 
133          except: 
134              self.server_sock = None 
135              raise 
 136   
138          """Runs the run() loop in a separate thread""" 
139          t = threading.Thread(target=self.run, args=()) 
140          t.setDaemon(True) 
141          t.start() 
 142   
144          """ 
145          Main TCP receive loop. Should be run in a separate thread -- use start() 
146          to do this automatically. 
147          """ 
148          self.is_shutdown = False 
149          if not self.server_sock: 
150              raise ROSInternalException("%s did not connect"%self.__class__.__name__) 
151          while not self.is_shutdown: 
152              try: 
153                  (client_sock, client_addr) = self.server_sock.accept() 
154              except socket.timeout: 
155                  continue 
156              except IOError as e: 
157                  (errno, msg) = e.args 
158                  if errno == 4:  
159                      continue 
160                  if not self.is_shutdown: 
161                      raise 
162              if self.is_shutdown: 
163                  break 
164              try: 
165                   
166                  self.inbound_handler(client_sock, client_addr) 
167              except socket.error as e: 
168                  if not self.is_shutdown: 
169                      traceback.print_exc() 
170                      logwarn("Failed to handle inbound connection due to socket error: %s"%e) 
171          logdebug("TCPServer[%s] shutting down", self.port) 
 172   
173       
175          """ 
176          @return: (ip address, port) of server socket binding 
177          @rtype: (str, int) 
178          """ 
179           
180           
181          return (rosgraph.network.get_host_name(), self.port) 
 182       
184          """ 
185          binds the server socket. ROS_IP/ROS_HOSTNAME may restrict 
186          binding to loopback interface. 
187          """ 
188          if rosgraph.network.use_ipv6(): 
189              server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 
190          else: 
191              server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
192          server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
193          logdebug('binding to ' + str(rosgraph.network.get_bind_address()) + ' ' + str(self.port)) 
194          server_sock.bind((rosgraph.network.get_bind_address(), self.port)) 
195          (self.addr, self.port) = server_sock.getsockname()[0:2] 
196          logdebug('bound to ' + str(self.addr) + ' ' + str(self.port)) 
197          server_sock.listen(5) 
198          return server_sock 
 199   
201          """shutdown I/O resources uses by this server""" 
202          if not self.is_shutdown: 
203              self.is_shutdown = True 
204              self.server_sock.close() 
  205   
206   
207   
208   
209  _tcpros_server = None 
210   
223       
231   
232   
233   
235      """ 
236      get the address of the tcpros server. 
237      @raise Exception: if tcpros server has not been started or created 
238      """ 
239      return _tcpros_server.get_address() 
 240   
242      """ 
243      utility handler that does nothing more than provide a rejection header 
244      @param sock: socket connection 
245      @type  sock: socket.socket 
246      @param client_addr: client address 
247      @type  client_addr: str 
248      @param header: request header 
249      @type  header: dict 
250      """ 
251      return {'error': "unhandled connection"} 
 252   
254      """ 
255      ROS Protocol handler for TCPROS. Accepts both TCPROS topic 
256      connections as well as ROS service connections over TCP. TCP server 
257      socket is run once start_server() is called -- this is implicitly 
258      called during init_publisher(). 
259      """ 
260       
262          """ 
263          Constructur 
264          @param port: port number to bind to (default 0/any) 
265          @type  port: int 
266          """ 
267          self.port = port 
268          self.tcp_ros_server = None  
269          self.lock = threading.Lock() 
270           
271          self.topic_connection_handler = _error_connection_handler 
272           
273          self.service_connection_handler = _error_connection_handler 
 274           
276          """ 
277          Starts the TCP socket server if one is not already running 
278          """ 
279          if self.tcp_ros_server: 
280              return 
281          with self.lock: 
282              try: 
283                  if not self.tcp_ros_server: 
284                      self.tcp_ros_server = TCPServer(self._tcp_server_callback, self.port)  
285                      self.tcp_ros_server.start() 
286              except Exception as e: 
287                  self.tcp_ros_server = None 
288                  logerr("unable to start TCPROS server: %s\n%s"%(e, traceback.format_exc())) 
289                  return 0, "unable to establish TCPROS server: %s"%e, [] 
 290       
292          """ 
293          @return: address and port of TCP server socket for accepting 
294          inbound connections 
295          @rtype: str, int 
296          """ 
297          if self.tcp_ros_server is not None: 
298              return self.tcp_ros_server.get_full_addr() 
299          return None, None 
 300       
302          """stops the TCP/IP server responsible for receiving inbound connections""" 
303          if self.tcp_ros_server: 
304              self.tcp_ros_server.shutdown() 
 305   
307          """ 
308          TCPServer callback: detects incoming topic or service connection and passes connection accordingly 
309       
310          @param sock: socket connection 
311          @type  sock: socket.socket 
312          @param client_addr: client address 
313          @type  client_addr: (str, int) 
314          @raise TransportInitError: If transport cannot be succesfully initialized 
315          """ 
316           
317           
318          try: 
319              buff_size = 4096  
320              if python3 == 0: 
321                   
322                  header = read_ros_handshake_header(sock, StringIO(), buff_size) 
323              else: 
324                  header = read_ros_handshake_header(sock, BytesIO(), buff_size) 
325               
326              if 'topic' in header: 
327                  err_msg = self.topic_connection_handler(sock, client_addr, header) 
328              elif 'service' in header: 
329                  err_msg = self.service_connection_handler(sock, client_addr, header) 
330              else: 
331                  err_msg = 'no topic or service name detected' 
332              if err_msg: 
333                   
334                   
335                   
336                   
337                   
338                   
339                  if not rospy.core.is_shutdown_requested(): 
340                      write_ros_handshake_header(sock, {'error' : err_msg}) 
341                      raise TransportInitError("Could not process inbound connection: "+err_msg+str(header)) 
342                  else: 
343                      write_ros_handshake_header(sock, {'error' : 'node shutting down'}) 
344                      return 
345          except rospy.exceptions.TransportInitError as e: 
346              logwarn(str(e)) 
347              if sock is not None: 
348                  sock.close() 
349          except Exception as e: 
350               
351              if not rospy.core.is_shutdown_requested(): 
352                  logwarn("Inbound TCP/IP connection failed: %s", e) 
353                  rospyerr("Inbound TCP/IP connection failed:\n%s", traceback.format_exc()) 
354              if sock is not None: 
355                  sock.close() 
  356   
358      """ 
359      Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this 
360      protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as 
361      well as which fields to send when creating a new connection (get_header_fields()). 
362      """ 
363   
365          """ 
366          ctor 
367          @param resolved_name: resolved service or topic name 
368          @type  resolved_name: str 
369          @param recv_data_class: message class for deserializing inbound messages 
370          @type  recv_data_class: Class 
371          @param queue_size: maximum number of inbound messages to maintain 
372          @type  queue_size: int 
373          @param buff_size: receive buffer size (in bytes) for reading from the connection. 
374          @type  buff_size: int 
375          """ 
376          if recv_data_class and not issubclass(recv_data_class, Message): 
377              raise TransportInitError("Unable to initialize transport: data class is not a message data class") 
378          self.resolved_name = resolved_name 
379          self.recv_data_class = recv_data_class 
380          self.queue_size = queue_size 
381          self.buff_size = buff_size 
382          self.direction = BIDIRECTIONAL 
 383           
384       
386          """ 
387          @param b StringIO: read buffer         
388          @param msg_queue [Message]: queue of deserialized messages 
389          @type  msg_queue: [Message] 
390          @param sock socket: protocol can optionally read more data from 
391          the socket, but in most cases the required data will already be 
392          in b 
393          """ 
394           
395          deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size) 
 396           
398          """ 
399          Header fields that should be sent over the connection. The header fields 
400          are protocol specific (i.e. service vs. topic, publisher vs. subscriber). 
401          @return: {str : str}: header fields to send over connection 
402          @rtype: dict 
403          """ 
404          return {} 
  405   
406   
407   
408   
409   
410   
411   
412   
413   
414   
415   
417      """ 
418      Generic implementation of TCPROS exchange routines for both topics and services 
419      """ 
420      transport_type = 'TCPROS' 
421       
422 -    def __init__(self, protocol, name, header=None): 
 423          """ 
424          ctor 
425          @param name str: identifier 
426          @param protocol TCPROSTransportProtocol protocol implementation     
427          @param header dict: (optional) handshake header if transport handshake header was 
428          already read off of transport. 
429          @raise TransportInitError if transport cannot be initialized according to arguments 
430          """ 
431          super(TCPROSTransport, self).__init__(protocol.direction, name=name) 
432          if not name: 
433              raise TransportInitError("Unable to initialize transport: name is not set") 
434   
435          self.protocol = protocol 
436   
437          self.socket = None 
438          self.endpoint_id = 'unknown' 
439          self.callerid_pub = 'unknown' 
440          self.dest_address = None  
441           
442          if python3 == 0:  
443              self.read_buff = StringIO() 
444              self.write_buff = StringIO() 
445          else:  
446              self.read_buff = BytesIO() 
447              self.write_buff = BytesIO() 
448                               
449           
450          self.header = header 
451   
452           
453          self.is_latched = False 
454          self.latch = None 
455   
456           
457           
458          self._fileno = None 
459           
460           
461           
462           
463          self.md5sum = None 
464          self.type = None  
465   
466           
467          self.local_endpoint = (None, None) 
468          self.remote_endpoint = (None, None) 
 469   
471          """ 
472          Get detailed connection information. 
473          Similar to getTransportInfo() in 'libros/transport/transport_tcp.cpp' 
474          e.g. TCPROS connection on port 41374 to [127.0.0.1:40623 on socket 6] 
475          """ 
476          return "%s connection on port %s to [%s:%s on socket %s]" % (self.transport_type, self.local_endpoint[1], self.remote_endpoint[0], self.remote_endpoint[1], self._fileno) 
 477   
479          """ 
480          Get descriptor for select 
481          """ 
482          return self._fileno 
 483           
485          """ 
486          Set the endpoint_id of this transport. 
487          Allows the endpoint_id to be set before the socket is initialized. 
488          """ 
489          self.endpoint_id = endpoint_id 
 490   
492          """ 
493          Set the socket for this transport 
494          @param sock: socket 
495          @type  sock: socket.socket 
496          @param endpoint_id: identifier for connection endpoint 
497          @type  endpoint_id: str 
498          @raise TransportInitError: if socket has already been set 
499          """ 
500          if self.socket is not None: 
501              raise TransportInitError("socket already initialized") 
502          self.socket = sock 
503          self.endpoint_id = endpoint_id 
504          self._fileno = sock.fileno() 
505          self.local_endpoint = self.socket.getsockname() 
 506   
507 -    def connect(self, dest_addr, dest_port, endpoint_id, timeout=None): 
 508          """ 
509          Establish TCP connection to the specified 
510          address/port. connect() always calls L{write_header()} and 
511          L{read_header()} after the connection is made 
512          @param dest_addr: destination IP address 
513          @type  dest_addr: str 
514          @param dest_port: destination port 
515          @type  dest_port: int                 
516          @param endpoint_id: string identifier for connection (for statistics) 
517          @type  endpoint_id: str 
518          @param timeout: (optional keyword) timeout in seconds 
519          @type  timeout: float 
520          @raise TransportInitError: if unable to create connection 
521          """ 
522           
523           
524          if ("ROS_HOSTNAME" in os.environ) and (os.environ["ROS_HOSTNAME"] == "localhost"): 
525            if not rosgraph.network.is_local_address(dest_addr): 
526              msg = "attempted to connect to non-local host [%s] from a node launched with ROS_HOSTNAME=localhost" % (dest_addr) 
527              logwarn(msg) 
528              self.close() 
529              raise TransportInitError(msg)   
530    
531           
532          try: 
533              self.endpoint_id = endpoint_id 
534              self.dest_address = (dest_addr, dest_port) 
535              if rosgraph.network.use_ipv6(): 
536                  s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 
537              else: 
538                  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
539              if _is_use_tcp_keepalive(): 
540                   
541                  if hasattr(socket, 'TCP_KEEPCNT') and \ 
542                     hasattr(socket, 'TCP_KEEPIDLE') and \ 
543                     hasattr(socket, 'TCP_KEEPINTVL'): 
544                       
545                      s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 
546                       
547                      s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 9) 
548                       
549                      s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) 
550                       
551                      s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10) 
552              if timeout is not None: 
553                  s.settimeout(timeout) 
554              self.socket = s 
555              logdebug('connecting to ' + str(dest_addr)+ ' ' + str(dest_port)) 
556              self.socket.connect((dest_addr, dest_port)) 
557              self.write_header() 
558              self.read_header() 
559              self.local_endpoint = self.socket.getsockname() 
560              self.remote_endpoint = (dest_addr, dest_port) 
561          except TransportInitError as tie: 
562              rospyerr("Unable to initiate TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))             
563              raise 
564          except Exception as e: 
565               
566              rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) 
567               
568               
569              if not isinstance(e, socket.error): 
570                   
571                  self.close() 
572              elif not isinstance(e, socket.timeout) and e.errno not in [100, 101, 102, 103, 110, 112, 113]: 
573                   
574                   
575                   
576                   
577                   
578                   
579                   
580                   
581                   
582                  self.close() 
583              raise TransportInitError(str(e))  
 584                   
586          """ 
587          Validate header and initialize fields accordingly 
588          @param header: header fields from publisher 
589          @type  header: dict 
590          @raise TransportInitError: if header fails to validate 
591          """ 
592          self.header = header 
593          if 'error' in header: 
594              raise TransportInitError("remote error reported: %s"%header['error']) 
595          for required in ['md5sum', 'type']: 
596              if not required in header: 
597                  raise TransportInitError("header missing required field [%s]"%required) 
598          self.type = header['type'] 
599          self.md5sum = header['md5sum'] 
600          if 'callerid' in header: 
601              self.callerid_pub = header['callerid'] 
602          if header.get('latching', '0') == '1': 
603              self.is_latched = True 
 604   
606          """Writes the TCPROS header to the active connection.""" 
607           
608          sock = self.socket 
609          protocol = self.protocol 
610           
611           
612           
613          if sock is None or protocol is None: 
614              return 
615          fileno = sock.fileno() 
616          ready = None 
617          poller = None 
618          if hasattr(select, 'poll'): 
619              poller = select.poll() 
620              poller.register(fileno, select.POLLOUT) 
621              while not ready: 
622                  events = poller.poll() 
623                  for _, flag in events: 
624                    if flag & select.POLLOUT: 
625                          ready = True 
626          else: 
627              while not ready: 
628                  try: 
629                      _, ready, _ = select.select([], [fileno], []) 
630                  except ValueError as e: 
631                      logger.error("[%s]: select fileno '%s': %s", self.name, str(fileno), str(e)) 
632                      raise 
633   
634          logger.debug("[%s]: writing header", self.name) 
635          sock.setblocking(1) 
636          self.stat_bytes += write_ros_handshake_header(sock, protocol.get_header_fields()) 
637          if poller: 
638              poller.unregister(fileno) 
 639   
640   
642          """ 
643          Read TCPROS header from active socket 
644          @raise TransportInitError if header fails to validate 
645          """ 
646          sock = self.socket 
647          if sock is None: 
648              return 
649          sock.setblocking(1) 
650           
651          self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size)) 
 652                   
654          """ 
655          Convenience routine for services to send a message across a 
656          particular connection. NOTE: write_data is much more efficient 
657          if same message is being sent to multiple connections. Not 
658          threadsafe. 
659          @param msg: message to send 
660          @type  msg: Msg 
661          @param seq: sequence number for message 
662          @type  seq: int 
663          @raise TransportException: if error occurred sending message 
664          """ 
665           
666          serialize_message(self.write_buff, seq, msg) 
667          self.write_data(self.write_buff.getvalue()) 
668          self.write_buff.truncate(0) 
 669   
671          """ 
672          Write raw data to transport 
673          @raise TransportInitialiationError: could not be initialized 
674          @raise TransportTerminated: no longer open for publishing 
675          """ 
676          if not self.socket: 
677              raise TransportInitError("TCPROS transport was not successfully initialized") 
678          if self.done: 
679              raise TransportTerminated("connection closed") 
680          try: 
681               
682              self.socket.sendall(data) 
683              self.stat_bytes  += len(data) 
684              self.stat_num_msg += 1 
685          except IOError as ioe: 
686               
687              (errno, msg) = ioe.args 
688              if errno == 32:  
689                  logdebug("ERROR: Broken Pipe") 
690                  self.close() 
691                  raise TransportTerminated(str(errno)+msg) 
692              raise  
693          except socket.error as se: 
694               
695              (errno, msg) = se.args 
696              if errno == 32:  
697                  logdebug("[%s]: Closing connection [%s] due to broken pipe", self.name, self.endpoint_id) 
698                  self.close() 
699                  raise TransportTerminated(msg) 
700              elif errno == 104:  
701                  logdebug("[%s]: Peer [%s] has closed connection", self.name, self.endpoint_id)  
702                  self.close() 
703                  raise TransportTerminated(msg) 
704              else: 
705                  rospydebug("unknown socket error writing data: %s",traceback.format_exc()) 
706                  logdebug("[%s]: closing connection [%s] due to unknown socket error: %s", self.name, self.endpoint_id, msg)  
707                  self.close() 
708                  raise TransportTerminated(str(errno)+' '+msg)                 
709          return True 
 710       
712          """ 
713          block until messages are read off of socket 
714          @return: list of newly received messages 
715          @rtype: [Msg] 
716          @raise TransportException: if unable to receive message due to error 
717          """ 
718          sock = self.socket 
719          if sock is None: 
720              raise TransportException("connection not initialized") 
721          b = self.read_buff 
722          msg_queue = [] 
723          p = self.protocol 
724          try: 
725              sock.setblocking(1)                 
726              while not msg_queue and not self.done and not is_shutdown(): 
727                  if b.tell() >= 4: 
728                      p.read_messages(b, msg_queue, sock)  
729                  if not msg_queue: 
730                      self.stat_bytes += recv_buff(sock, b, p.buff_size) 
731              self.stat_num_msg += len(msg_queue)  
732               
733              for m in msg_queue: 
734                  m._connection_header = self.header 
735                   
736               
737              if self.is_latched and msg_queue: 
738                  self.latch = msg_queue[-1] 
739               
740              return msg_queue 
741   
742          except DeserializationError as e: 
743              rospyerr(traceback.format_exc())             
744              raise TransportException("receive_once[%s]: DeserializationError %s"%(self.name, str(e))) 
745          except TransportTerminated as e: 
746              raise  
747          except ServiceException as e: 
748              raise 
749          except Exception as e: 
750              rospyerr(traceback.format_exc()) 
751              raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) 
752          return retval 
 753           
755           
756           
757   
758          if self.dest_address is None: 
759              raise ROSInitException("internal error with reconnection state: address not stored") 
760   
761          interval = 0.5  
762          while self.socket is None and not self.done and not rospy.is_shutdown(): 
763              try: 
764                   
765                   
766                   
767                   
768                  self.connect(self.dest_address[0], self.dest_address[1], self.endpoint_id, timeout=30.) 
769              except TransportInitError: 
770                  self.socket = None 
771                   
772              if self.socket is None and interval < 30.: 
773                   
774                  interval = interval * 2 
775                   
776              time.sleep(interval) 
 777   
779          """ 
780          Receive messages until shutdown 
781          @param msgs_callback: callback to invoke for new messages received     
782          @type  msgs_callback: fn([msg]) 
783          """ 
784           
785          logger.debug("receive_loop for [%s]", self.name) 
786          try: 
787              while not self.done and not is_shutdown(): 
788                  try: 
789                      if self.socket is not None: 
790                          msgs = self.receive_once() 
791                          if not self.done and not is_shutdown(): 
792                              msgs_callback(msgs, self) 
793                      else: 
794                          self._reconnect() 
795   
796                  except TransportException as e: 
797                       
798                      try: 
799                          if self.socket is not None: 
800                              try: 
801                                  self.socket.shutdown() 
802                              except: 
803                                  pass 
804                              finally: 
805                                  self.socket.close() 
806                      except: 
807                          pass 
808                      self.socket = None 
809   
810                  except DeserializationError as e: 
811                       
812                       
813                      logerr("[%s] error deserializing incoming request: %s"%self.name, str(e)) 
814                      rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc()) 
815                  except: 
816                       
817                      try: 
818                           
819                           
820                           
821                          rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())                     
822                      except: pass 
823                       
824              rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)                     
825          finally: 
826              if not self.done: 
827                  self.close() 
 828   
830          """close i/o and release resources""" 
831          if not self.done: 
832              try: 
833                  if self.socket is not None: 
834                      try: 
835                          self.socket.shutdown(socket.SHUT_RDWR) 
836                      except: 
837                          pass 
838                      finally: 
839                          self.socket.close() 
840              finally: 
841                  self.socket = self.read_buff = self.write_buff = self.protocol = None 
842                  super(TCPROSTransport, self).close() 
  843