Package rospy :: Package impl :: Module tcpros_pubsub

Source Code for Module rospy.impl.tcpros_pubsub

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: Topic-specific extensions for TCPROS support""" 
 36   
 37  import socket 
 38  import threading 
 39  import time 
 40   
 41  try: 
 42      from xmlrpc.client import ServerProxy  # Python 3.x 
 43  except ImportError: 
 44      from xmlrpclib import ServerProxy  # Python 2.x 
 45   
 46  from rospy.core import logwarn, logerr, logdebug, rospyerr 
 47  import rospy.exceptions 
 48  import rospy.names 
 49   
 50  import rospy.impl.registration 
 51  import rospy.impl.transport 
 52   
 53  from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \ 
 54      get_tcpros_server_address, start_tcpros_server,\ 
 55      DEFAULT_BUFF_SIZE, TCPROS 
 56   
57 -class TCPROSSub(TCPROSTransportProtocol):
58 """ 59 Subscription transport implementation for receiving topic data via 60 peer-to-peer TCP/IP sockets 61 """ 62
63 - def __init__(self, resolved_name, recv_data_class, queue_size=None, \ 64 buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
65 """ 66 ctor. 67 68 @param resolved_name: resolved subscription name 69 @type resolved_name: str 70 71 @param recv_data_class: class to instantiate to receive 72 messages 73 @type recv_data_class: L{rospy.Message} 74 75 @param queue_size: maximum number of messages to 76 deserialize from newly read data off socket 77 @type queue_size: int 78 79 @param buff_size: recv buffer size 80 @type buff_size: int 81 82 @param tcp_nodelay: If True, request TCP_NODELAY from publisher 83 @type tcp_nodelay: bool 84 """ 85 super(TCPROSSub, self).__init__(resolved_name, recv_data_class, queue_size, buff_size) 86 self.direction = rospy.impl.transport.INBOUND 87 self.tcp_nodelay = tcp_nodelay
88
89 - def get_header_fields(self):
90 """ 91 @return: dictionary of subscriber fields 92 @rtype: dict 93 """ 94 return {'topic': self.resolved_name, 95 'message_definition': self.recv_data_class._full_text, 96 'tcp_nodelay': '1' if self.tcp_nodelay else '0', 97 'md5sum': self.recv_data_class._md5sum, 98 'type': self.recv_data_class._type, 99 'callerid': rospy.names.get_caller_id()}
100 101 # Separate method for easier testing
102 -def _configure_pub_socket(sock, is_tcp_nodelay):
103 """ 104 Configure socket options on a new publisher socket. 105 @param sock: socket.socket 106 @type sock: socket.socket 107 @param is_tcp_nodelay: if True, TCP_NODELAY will be set on outgoing socket if available 108 @param is_tcp_nodelay: bool 109 """ 110 # #956: low latency, TCP_NODELAY support 111 if is_tcp_nodelay: 112 if hasattr(socket, 'TCP_NODELAY'): 113 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 114 else: 115 logwarn("WARNING: cannot enable TCP_NODELAY as its not supported on this platform")
116 117 #TODO:POLLING: TCPROSPub currently doesn't actually do anything -- not until polling is implemented 118
119 -class TCPROSPub(TCPROSTransportProtocol):
120 """ 121 Publisher transport implementation for publishing topic data via 122 peer-to-peer TCP/IP sockets. 123 """ 124
125 - def __init__(self, resolved_name, pub_data_class, is_latch=False, headers=None):
126 """ 127 ctor. 128 @param resolved_name: resolved topic name 129 @type resolved_name: str 130 @param pub_data_class: class to instance to receive messages 131 @type pub_data_class: L{rospy.Message} class 132 @param is_latch: If True, Publisher is latching 133 @type is_latch: bool 134 """ 135 # very small buffer size for publishers as the messages they receive are very small 136 super(TCPROSPub, self).__init__(resolved_name, None, queue_size=None, buff_size=128) 137 self.pub_data_class = pub_data_class 138 self.direction = rospy.impl.transport.OUTBOUND 139 self.is_latch = is_latch 140 self.headers = headers if headers else {}
141
142 - def get_header_fields(self):
143 base = {'topic': self.resolved_name, 144 'type': self.pub_data_class._type, 145 'latching': '1' if self.is_latch else '0', 146 'message_definition': self.pub_data_class._full_text, 147 'md5sum': self.pub_data_class._md5sum, 148 'callerid': rospy.names.get_caller_id() } 149 150 # this implementation allows the user to override builtin 151 # fields. this could potentially enable some interesting 152 # features... or it could be really bad. 153 if self.headers: 154 base.update(self.headers) 155 return base
156
157 -def robust_connect_subscriber(conn, dest_addr, dest_port, pub_uri, receive_cb, resolved_topic_name):
158 """ 159 Keeps trying to create connection for subscriber. Then passes off to receive_loop once connected. 160 """ 161 # kwc: this logic is not very elegant. I am waiting to rewrite 162 # the I/O loop with async i/o to clean this up. 163 164 # timeout is really generous. for now just choosing one that is large but not infinite 165 interval = 0.5 166 while conn.socket is None and not conn.done and not rospy.is_shutdown(): 167 try: 168 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.) 169 except rospy.exceptions.TransportInitError as e: 170 # if the connection was closed intentionally 171 # because of an unknown error, stop trying 172 if conn.protocol is None: 173 conn.done = True 174 break 175 rospyerr("unable to create subscriber transport: %s. Will try again in %ss", e, interval) 176 if interval < 30.0: 177 # exponential backoff (maximum 32 seconds) 178 interval = interval * 2 179 time.sleep(interval) 180 181 # check to see if publisher state has changed 182 conn.done = not check_if_still_publisher(resolved_topic_name, pub_uri) 183 184 if not conn.done: 185 conn.receive_loop(receive_cb)
186
187 -def check_if_still_publisher(resolved_topic_name, pub_uri):
188 try: 189 s = ServerProxy(pub_uri) 190 code, msg, val = s.getPublications(rospy.names.get_name()) 191 if code == 1: 192 return len([t for t in val if t[0] == resolved_topic_name]) > 0 193 else: 194 return False 195 except: 196 return False
197
198 -class TCPROSHandler(rospy.impl.transport.ProtocolHandler):
199 """ 200 ROS Protocol handler for TCPROS. Accepts both TCPROS topic 201 connections as well as ROS service connections over TCP. TCP server 202 socket is run once start_server() is called -- this is implicitly 203 called during init_publisher(). 204 """ 205
206 - def __init__(self):
207 """ctor""" 208 self.tcp_nodelay_map = {} # { topic : tcp_nodelay}
209
210 - def set_tcp_nodelay(self, resolved_name, tcp_nodelay):
211 """ 212 @param resolved_name: resolved topic name 213 @type resolved_name: str 214 215 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's 216 socket (disables Nagle algorithm). This results in lower 217 latency publishing at the cost of efficiency. 218 @type tcp_nodelay: bool 219 """ 220 self.tcp_nodelay_map[resolved_name] = tcp_nodelay
221
222 - def shutdown(self):
223 """ 224 stops the TCP/IP server responsible for receiving inbound connections 225 """ 226 pass
227
228 - def create_transport(self, resolved_name, pub_uri, protocol_params):
229 """ 230 Connect to topic resolved_name on Publisher pub_uri using TCPROS. 231 @param resolved_name str: resolved topic name 232 @type resolved_name: str 233 @param pub_uri: XML-RPC URI of publisher 234 @type pub_uri: str 235 @param protocol_params: protocol parameters to use for connecting 236 @type protocol_params: [XmlRpcLegal] 237 @return: code, message, debug 238 @rtype: (int, str, int) 239 """ 240 241 #Validate protocol params = [TCPROS, address, port] 242 if type(protocol_params) != list or len(protocol_params) != 3: 243 return 0, "ERROR: invalid TCPROS parameters", 0 244 if protocol_params[0] != TCPROS: 245 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%protocol_params[0], 0 246 id, dest_addr, dest_port = protocol_params 247 248 sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name) 249 250 #Create connection 251 protocol = TCPROSSub(resolved_name, sub.data_class, \ 252 queue_size=sub.queue_size, buff_size=sub.buff_size, 253 tcp_nodelay=sub.tcp_nodelay) 254 conn = TCPROSTransport(protocol, resolved_name) 255 conn.set_endpoint_id(pub_uri); 256 257 t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,resolved_name)) 258 # don't enable this just yet, need to work on this logic 259 #rospy.core._add_shutdown_thread(t) 260 261 # Attach connection to _SubscriberImpl 262 if sub.add_connection(conn): #pass tcp connection to handler 263 # since the thread might cause the connection to close 264 # it should only be started after the connection has been added to the subscriber 265 # https://github.com/ros/ros_comm/issues/544 266 t.start() 267 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port 268 else: 269 # _SubscriberImpl already closed or duplicate subscriber created 270 conn.close() 271 return 0, "ERROR: Race condition failure creating topic subscriber [%s]"%(resolved_name), 0
272
273 - def supports(self, protocol):
274 """ 275 @param protocol: name of protocol 276 @type protocol: str 277 @return: True if protocol is supported 278 @rtype: bool 279 """ 280 return protocol == TCPROS
281
282 - def get_supported(self):
283 """ 284 Get supported protocols 285 """ 286 return [[TCPROS]]
287
288 - def init_publisher(self, resolved_name, protocol):
289 """ 290 Initialize this node to receive an inbound TCP connection, 291 i.e. startup a TCP server if one is not already running. 292 293 @param resolved_name: topic name 294 @type resolved__name: str 295 296 @param protocol: negotiated protocol 297 parameters. protocol[0] must be the string 'TCPROS' 298 @type protocol: [str, value*] 299 @return: (code, msg, [TCPROS, addr, port]) 300 @rtype: (int, str, list) 301 """ 302 if protocol[0] != TCPROS: 303 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, [] 304 start_tcpros_server() 305 addr, port = get_tcpros_server_address() 306 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
307
308 - def topic_connection_handler(self, sock, client_addr, header):
309 """ 310 Process incoming topic connection. Reads in topic name from 311 handshake and creates the appropriate L{TCPROSPub} handler for the 312 connection. 313 @param sock: socket connection 314 @type sock: socket.socket 315 @param client_addr: client address 316 @type client_addr: (str, int) 317 @param header: key/value pairs from handshake header 318 @type header: dict 319 @return: error string or None 320 @rtype: str 321 """ 322 if rospy.core.is_shutdown_requested(): 323 return "Node is shutting down" 324 for required in ['topic', 'md5sum', 'callerid']: 325 if not required in header: 326 return "Missing required '%s' field"%required 327 else: 328 resolved_topic_name = header['topic'] 329 md5sum = header['md5sum'] 330 tm = rospy.impl.registration.get_topic_manager() 331 topic = tm.get_publisher_impl(resolved_topic_name) 332 if not topic: 333 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 334 elif not topic.data_class or topic.closed: 335 return "Internal error processing topic [%s]"%(resolved_topic_name) 336 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 337 data_class = topic.data_class 338 actual_type = data_class._type 339 340 # check to see if subscriber sent 'type' header. If they did, check that 341 # types are same first as this provides a better debugging message 342 if 'type' in header: 343 requested_type = header['type'] 344 if requested_type != actual_type: 345 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 346 else: 347 # defaults to actual type 348 requested_type = actual_type 349 350 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, data_class._md5sum) 351 352 else: 353 #TODO:POLLING if polling header is present, have to spin up receive loop as well 354 355 # #1334: tcp_nodelay support from subscriber option 356 if 'tcp_nodelay' in header: 357 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 358 else: 359 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 360 361 _configure_pub_socket(sock, tcp_nodelay) 362 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 363 transport = TCPROSTransport(protocol, resolved_topic_name) 364 transport.set_socket(sock, header['callerid']) 365 transport.remote_endpoint = client_addr 366 transport.write_header() 367 topic.add_connection(transport)
368 369
370 -class QueuedConnection(object):
371 """ 372 It wraps a Transport instance and behaves like one 373 but it queues the data written to it and relays them 374 asynchronously to the wrapped instance. 375 """ 376
377 - def __init__(self, connection, queue_size):
378 """ 379 ctor. 380 @param connection: the wrapped transport instance 381 @type connection: Transport 382 @param queue_size: the maximum size of the queue, zero means infinite 383 @type queue_size: int 384 """ 385 super(QueuedConnection, self).__init__() 386 self._connection = connection 387 self._queue_size = queue_size 388 389 self._lock = threading.Lock() 390 self._cond_data_available = threading.Condition(self._lock) 391 self._connection.set_cleanup_callback(self._closed_connection_callback) 392 self._queue = [] 393 self._error = None 394 395 self._thread = threading.Thread(target=self._run) 396 self._thread.start()
397
398 - def _closed_connection_callback(self, connection):
399 with self._lock: 400 self._cond_data_available.notify()
401
402 - def __getattr__(self, name):
403 if name.startswith('__'): 404 raise AttributeError(name) 405 return getattr(self._connection, name)
406
407 - def write_data(self, data):
408 with self._lock: 409 # if there was previously an error within the dispatch thread raise it 410 if self._error: 411 error = self._error 412 self._error = None 413 raise error 414 # pop oldest data if queue limit is reached 415 if self._queue_size > 0 and len(self._queue) == self._queue_size: 416 del self._queue[0] 417 self._queue.append(data) 418 self._cond_data_available.notify() 419 # effectively yields the rest of the thread quantum 420 time.sleep(0) 421 return True
422
423 - def _run(self):
424 while not self._connection.done: 425 queue = [] 426 with self._lock: 427 # wait for available data 428 while not self._queue and not self._connection.done: 429 self._cond_data_available.wait() 430 # take all data from queue for processing outside of the lock 431 if self._queue: 432 queue = self._queue 433 self._queue = [] 434 # relay all data 435 for data in queue: 436 try: 437 self._connection.write_data(data) 438 except Exception as e: 439 with self._lock: 440 self._error = e
441