1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33   
 34   
 35  """Internal use: Topic-specific extensions for TCPROS support""" 
 36   
 37  import socket 
 38  import threading 
 39  import time 
 40   
 41  try: 
 42      from xmlrpc.client import ServerProxy   
 43  except ImportError: 
 44      from xmlrpclib import ServerProxy   
 45   
 46  from rospy.core import logwarn, logerr, logdebug, rospyerr 
 47  import rospy.exceptions 
 48  import rospy.names 
 49   
 50  import rospy.impl.registration 
 51  import rospy.impl.transport 
 52   
 53  from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \ 
 54      get_tcpros_server_address, start_tcpros_server,\ 
 55      DEFAULT_BUFF_SIZE, TCPROS 
 56   
 58      """ 
 59      Subscription transport implementation for receiving topic data via 
 60      peer-to-peer TCP/IP sockets 
 61      """ 
 62   
 65          """ 
 66          ctor. 
 67   
 68          @param resolved_name: resolved subscription name 
 69          @type  resolved_name: str 
 70   
 71          @param recv_data_class: class to instantiate to receive 
 72          messages 
 73          @type recv_data_class: L{rospy.Message} 
 74   
 75          @param queue_size: maximum number of messages to 
 76          deserialize from newly read data off socket 
 77          @type queue_size: int 
 78   
 79          @param buff_size: recv buffer size 
 80          @type buff_size: int 
 81   
 82          @param tcp_nodelay: If True, request TCP_NODELAY from publisher 
 83          @type tcp_nodelay: bool 
 84          """ 
 85          super(TCPROSSub, self).__init__(resolved_name, recv_data_class, queue_size, buff_size) 
 86          self.direction = rospy.impl.transport.INBOUND 
 87          self.tcp_nodelay = tcp_nodelay 
  88           
 90          """ 
 91          @return: dictionary of subscriber fields 
 92          @rtype: dict 
 93          """ 
 94          return {'topic': self.resolved_name, 
 95                  'message_definition': self.recv_data_class._full_text, 
 96                  'tcp_nodelay': '1' if self.tcp_nodelay else '0', 
 97                  'md5sum': self.recv_data_class._md5sum, 
 98                  'type': self.recv_data_class._type, 
 99                  'callerid': rospy.names.get_caller_id()}         
  100   
101   
116   
117   
118   
120      """ 
121      Publisher transport implementation for publishing topic data via 
122      peer-to-peer TCP/IP sockets.  
123      """ 
124       
125 -    def __init__(self, resolved_name, pub_data_class, is_latch=False, headers=None): 
 126          """ 
127          ctor. 
128          @param resolved_name: resolved topic name 
129          @type  resolved_name: str 
130          @param pub_data_class: class to instance to receive messages 
131          @type  pub_data_class: L{rospy.Message} class 
132          @param is_latch: If True, Publisher is latching 
133          @type  is_latch: bool 
134          """ 
135           
136          super(TCPROSPub, self).__init__(resolved_name, None, queue_size=None, buff_size=128) 
137          self.pub_data_class = pub_data_class 
138          self.direction = rospy.impl.transport.OUTBOUND 
139          self.is_latch = is_latch 
140          self.headers = headers if headers else {} 
 141           
143          base = {'topic': self.resolved_name, 
144                  'type': self.pub_data_class._type, 
145                  'latching': '1' if self.is_latch else '0', 
146                  'message_definition': self.pub_data_class._full_text, 
147                  'md5sum': self.pub_data_class._md5sum,  
148                  'callerid': rospy.names.get_caller_id() } 
149           
150           
151           
152           
153          if self.headers: 
154              base.update(self.headers) 
155          return base 
  156   
158      """ 
159      Keeps trying to create connection for subscriber.  Then passes off to receive_loop once connected. 
160      """ 
161       
162       
163       
164       
165      interval = 0.5 
166      while conn.socket is None and not conn.done and not rospy.is_shutdown(): 
167          try: 
168              conn.connect(dest_addr, dest_port, pub_uri, timeout=60.) 
169          except rospy.exceptions.TransportInitError as e: 
170               
171               
172              if conn.protocol is None: 
173                  conn.done = True 
174                  break 
175              rospyerr("unable to create subscriber transport: %s.  Will try again in %ss", e, interval) 
176              if interval < 30.0: 
177                 
178                interval = interval * 2 
179              time.sleep(interval) 
180               
181               
182              conn.done = not check_if_still_publisher(resolved_topic_name, pub_uri) 
183           
184      if not conn.done: 
185          conn.receive_loop(receive_cb)        
 186   
188      try: 
189          s = ServerProxy(pub_uri) 
190          code, msg, val = s.getPublications(rospy.names.get_name()) 
191          if code == 1: 
192              return len([t for t in val if t[0] == resolved_topic_name]) > 0 
193          else: 
194              return False 
195      except: 
196          return False 
 197   
199      """ 
200      ROS Protocol handler for TCPROS. Accepts both TCPROS topic 
201      connections as well as ROS service connections over TCP. TCP server 
202      socket is run once start_server() is called -- this is implicitly 
203      called during init_publisher(). 
204      """ 
205   
207          """ctor""" 
208          self.tcp_nodelay_map = {}  
 209       
211          """ 
212          @param resolved_name: resolved topic name 
213          @type  resolved_name: str 
214   
215          @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's 
216          socket (disables Nagle algorithm). This results in lower 
217          latency publishing at the cost of efficiency. 
218          @type  tcp_nodelay: bool 
219          """ 
220          self.tcp_nodelay_map[resolved_name] = tcp_nodelay 
 221   
223          """ 
224          stops the TCP/IP server responsible for receiving inbound connections         
225          """ 
226          pass 
 227   
229          """ 
230          Connect to topic resolved_name on Publisher pub_uri using TCPROS. 
231          @param resolved_name str: resolved topic name 
232          @type  resolved_name: str 
233          @param pub_uri: XML-RPC URI of publisher  
234          @type  pub_uri: str 
235          @param protocol_params: protocol parameters to use for connecting 
236          @type protocol_params: [XmlRpcLegal] 
237          @return: code, message, debug 
238          @rtype: (int, str, int) 
239          """ 
240           
241           
242          if type(protocol_params) != list or len(protocol_params) != 3: 
243              return 0, "ERROR: invalid TCPROS parameters", 0 
244          if protocol_params[0] != TCPROS: 
245              return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%protocol_params[0], 0 
246          id, dest_addr, dest_port = protocol_params 
247   
248          sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name) 
249   
250           
251          protocol = TCPROSSub(resolved_name, sub.data_class, \ 
252                               queue_size=sub.queue_size, buff_size=sub.buff_size, 
253                               tcp_nodelay=sub.tcp_nodelay) 
254          conn = TCPROSTransport(protocol, resolved_name) 
255          conn.set_endpoint_id(pub_uri); 
256   
257          t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,resolved_name)) 
258           
259           
260   
261           
262          if sub.add_connection(conn):  
263               
264               
265               
266              t.start() 
267              return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port 
268          else: 
269               
270              conn.close() 
271              return 0, "ERROR: Race condition failure creating topic subscriber [%s]"%(resolved_name), 0 
 272   
274          """ 
275          @param protocol: name of protocol 
276          @type protocol: str 
277          @return: True if protocol is supported 
278          @rtype: bool 
279          """ 
280          return protocol == TCPROS 
 281       
283          """ 
284          Get supported protocols 
285          """ 
286          return [[TCPROS]] 
 287           
289          """ 
290          Initialize this node to receive an inbound TCP connection, 
291          i.e. startup a TCP server if one is not already running. 
292           
293          @param resolved_name: topic name 
294          @type  resolved__name: str 
295           
296          @param protocol: negotiated protocol 
297            parameters. protocol[0] must be the string 'TCPROS' 
298          @type  protocol: [str, value*] 
299          @return: (code, msg, [TCPROS, addr, port]) 
300          @rtype: (int, str, list) 
301          """ 
302          if protocol[0] != TCPROS: 
303              return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, [] 
304          start_tcpros_server() 
305          addr, port = get_tcpros_server_address() 
306          return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port] 
 307   
309          """ 
310          Process incoming topic connection. Reads in topic name from 
311          handshake and creates the appropriate L{TCPROSPub} handler for the 
312          connection. 
313          @param sock: socket connection 
314          @type sock: socket.socket 
315          @param client_addr: client address 
316          @type client_addr: (str, int) 
317          @param header: key/value pairs from handshake header 
318          @type header: dict 
319          @return: error string or None  
320          @rtype: str 
321          """ 
322          if rospy.core.is_shutdown_requested(): 
323              return "Node is shutting down" 
324          for required in ['topic', 'md5sum', 'callerid']: 
325              if not required in header: 
326                  return "Missing required '%s' field"%required 
327          else: 
328              resolved_topic_name = header['topic'] 
329              md5sum = header['md5sum'] 
330              tm = rospy.impl.registration.get_topic_manager() 
331              topic = tm.get_publisher_impl(resolved_topic_name) 
332              if not topic: 
333                  return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 
334              elif not topic.data_class or topic.closed: 
335                  return "Internal error processing topic [%s]"%(resolved_topic_name) 
336              elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 
337                  data_class = topic.data_class 
338                  actual_type = data_class._type 
339   
340                   
341                   
342                  if 'type' in header: 
343                      requested_type = header['type'] 
344                      if requested_type != actual_type: 
345                          return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 
346                  else: 
347                       
348                      requested_type = actual_type 
349   
350                  return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, data_class._md5sum) 
351   
352              else: 
353                   
354   
355                   
356                  if 'tcp_nodelay' in header: 
357                      tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 
358                  else: 
359                      tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 
360   
361                  _configure_pub_socket(sock, tcp_nodelay) 
362                  protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 
363                  transport = TCPROSTransport(protocol, resolved_topic_name) 
364                  transport.set_socket(sock, header['callerid']) 
365                  transport.remote_endpoint = client_addr 
366                  transport.write_header() 
367                  topic.add_connection(transport) 
  368               
369   
371      """ 
372      It wraps a Transport instance and behaves like one 
373      but it queues the data written to it and relays them 
374      asynchronously to the wrapped instance. 
375      """ 
376   
377 -    def __init__(self, connection, queue_size): 
 378          """ 
379          ctor. 
380          @param connection: the wrapped transport instance 
381          @type  connection: Transport 
382          @param queue_size: the maximum size of the queue, zero means infinite 
383          @type  queue_size: int 
384          """ 
385          super(QueuedConnection, self).__init__() 
386          self._connection = connection 
387          self._queue_size = queue_size 
388   
389          self._lock = threading.Lock() 
390          self._cond_data_available = threading.Condition(self._lock) 
391          self._connection.set_cleanup_callback(self._closed_connection_callback) 
392          self._queue = [] 
393          self._error = None 
394   
395          self._thread = threading.Thread(target=self._run) 
396          self._thread.start() 
 397   
399          with self._lock: 
400              self._cond_data_available.notify() 
 401   
403          if name.startswith('__'): 
404              raise AttributeError(name) 
405          return getattr(self._connection, name) 
 406   
408          with self._lock: 
409               
410              if self._error: 
411                  error = self._error 
412                  self._error = None 
413                  raise error 
414               
415              if self._queue_size > 0 and len(self._queue) == self._queue_size: 
416                  del self._queue[0] 
417              self._queue.append(data) 
418              self._cond_data_available.notify() 
419           
420          time.sleep(0) 
421          return True 
 422   
424          while not self._connection.done: 
425              queue = [] 
426              with self._lock: 
427                   
428                  while not self._queue and not self._connection.done: 
429                      self._cond_data_available.wait() 
430                   
431                  if self._queue: 
432                      queue = self._queue 
433                      self._queue = [] 
434               
435              for data in queue: 
436                  try: 
437                      self._connection.write_data(data) 
438                  except Exception as e: 
439                      with self._lock: 
440                          self._error = e 
  441