Package rospy :: Module topics

Source Code for Module rospy.topics

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2008, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32  # 
  33  # Revision $Id$ 
  34   
  35  """ 
  36  rospy implementation of topics. 
  37   
  38  Client API 
  39  ========== 
  40   
  41  L{Publisher} and L{Subscriber} are the client API for topics. 
  42   
  43  Internal Implementation 
  44  ======================= 
  45   
  46  Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl) 
  47  are used to manage actual transport connections.  The L{_TopicManager} 
  48  is responsible for tracking the system-wide state of publications and 
  49  subscriptions as well as the L{_TopicImpl} instances. More info is below. 
  50    
  51  L{_TopicManager} 
  52  ================ 
  53   
  54  The L{_TopicManager} does the backend topic bookkeeping for the local 
  55  node.  Use L{get_topic_manager()} to access singleton. Actual topic 
  56  implementations are done through the 
  57  L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client 
  58  code generates instances of type L{Publisher}/L{Subscriber}, which 
  59  enable to client to create multiple publishers/subscribers of that 
  60  topic that get controlled access to the underlying share connections. 
  61   
  62  Common parent classes for all rospy topics. The rospy topic autogenerators 
  63  create classes that are children of these implementations. 
  64  """ 
  65   
  66   
  67  import struct 
  68  import select 
  69  try: 
  70      from cStringIO import StringIO #Python 2.x 
  71      python3 = 0 
72 - def isstring(s):
73 return isinstance(s, basestring) #Python 2.x
74 except ImportError: 75 python3 = 1 76 from io import StringIO, BytesIO #Python 3.x
77 - def isstring(s):
78 return isinstance(s, str) #Python 3.x
79 80 import threading 81 import logging 82 import time 83 84 from itertools import chain 85 import traceback 86 87 import rosgraph.names 88 89 from rospy.core import * 90 from rospy.exceptions import ROSSerializationException, TransportTerminated 91 from rospy.msg import serialize_message, args_kwds_to_message 92 93 from rospy.impl.statistics import SubscriberStatisticsLogger 94 95 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners 96 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE 97 from rospy.impl.tcpros_pubsub import QueuedConnection 98 99 _logger = logging.getLogger('rospy.topics') 100 101 # wrap genpy implementation and map it to rospy namespace 102 import genpy 103 Message = genpy.Message 104 105 ####################################################################### 106 # Base classes for all client-API instantiated pub/sub 107 # 108 # There are two trees: Topic and _TopicImpl. Topic is the client API 109 # for interfacing with topics, while _TopicImpl implements the 110 # underlying connection details. 111
112 -class Topic(object):
113 """Base class of L{Publisher} and L{Subscriber}""" 114
115 - def __init__(self, name, data_class, reg_type):
116 """ 117 @param name: graph resource name of topic, e.g. 'laser'. 118 @type name: str 119 @param data_class: message class for serialization 120 @type data_class: L{Message} 121 @param reg_type Registration.PUB or Registration.SUB 122 @type reg_type: str 123 @raise ValueError: if parameters are invalid 124 """ 125 126 if not name or not isstring(name): 127 raise ValueError("topic name is not a non-empty string") 128 try: 129 if python3 == 1: 130 name.encode("utf-8") 131 else: 132 name = name.encode("utf-8") 133 except UnicodeError: 134 raise ValueError("topic name must be ascii/utf-8 compatible") 135 if data_class is None: 136 raise ValueError("topic parameter 'data_class' is not initialized") 137 if not type(data_class) == type: 138 raise ValueError("data_class [%s] is not a class"%data_class) 139 if not issubclass(data_class, genpy.Message): 140 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__) 141 # #2202 142 if not rosgraph.names.is_legal_name(name): 143 import warnings 144 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 145 146 # this is a bit ugly, but necessary due to the fact that we allow 147 # topics and services to be initialized before the node 148 if not rospy.core.is_initialized(): 149 self.resolved_name = rospy.names.resolve_name_without_node_name(name) 150 else: 151 # init_node() has been called, so we can do normal resolution 152 self.resolved_name = resolve_name(name) 153 154 self.name = self.resolved_name # #1810 for backwards compatibility 155 156 self.data_class = data_class 157 self.type = data_class._type 158 self.md5sum = data_class._md5sum 159 self.reg_type = reg_type 160 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
161
162 - def get_num_connections(self):
163 """ 164 get the number of connections to other ROS nodes for this topic. For a Publisher, 165 this corresponds to the number of nodes subscribing. For a Subscriber, the number 166 of publishers. 167 @return: number of connections 168 @rtype: int 169 """ 170 return self.impl.get_num_connections()
171
172 - def unregister(self):
173 """ 174 unpublish/unsubscribe from topic. Topic instance is no longer 175 valid after this call. Additional calls to unregister() have no effect. 176 """ 177 # as we don't guard unregister, have to protect value of 178 # resolved_name for release_impl call 179 resolved_name = self.resolved_name 180 if resolved_name and self.impl: 181 get_topic_manager().release_impl(self.reg_type, resolved_name) 182 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
183 184 # #3808
185 -class Poller(object):
186 """ 187 select.poll/kqueue abstraction to handle socket failure detection 188 on multiple platforms. NOT thread-safe. 189 """
190 - def __init__(self):
191 try: 192 self.poller = select.poll() 193 self.add_fd = self.add_poll 194 self.remove_fd = self.remove_poll 195 self.error_iter = self.error_poll_iter 196 except: 197 try: 198 # poll() not available, try kqueue 199 self.poller = select.kqueue() 200 self.add_fd = self.add_kqueue 201 self.remove_fd = self.remove_kqueue 202 self.error_iter = self.error_kqueue_iter 203 self.kevents = [] 204 except: 205 #TODO: non-Noop impl for Windows 206 self.poller = self.noop 207 self.add_fd = self.noop 208 self.remove_fd = self.noop 209 self.error_iter = self.noop_iter
210
211 - def noop(self, *args):
212 pass
213
214 - def noop_iter(self):
215 empty_generator = (x for x in list()) 216 for x in empty_generator: 217 yield x
218
219 - def add_poll(self, fd):
220 self.poller.register(fd)
221
222 - def remove_poll(self, fd):
223 self.poller.unregister(fd)
224
225 - def error_poll_iter(self):
226 events = self.poller.poll(0) 227 for fd, event in events: 228 if event & (select.POLLHUP | select.POLLERR): 229 yield fd
230
231 - def add_kqueue(self, fd):
232 self.kevents.append(select.kevent(fd))
233
234 - def error_kqueue_iter(self):
235 events = self.poller.control(self.kevents, 0) 236 for event in events: 237 if event & (select.KQ_EV_ERROR | select.KQ_EV_EOF): 238 yield event.ident
239
240 - def remove_kqueue(self, fd):
241 e = [x for x in self.kevents if x.ident == fd] 242 for x in e: 243 self.kevents.remove(x)
244
245 -class _TopicImpl(object):
246 """ 247 Base class of internal topic implementations. Each topic has a 248 singleton _TopicImpl implementation for managing the underlying 249 connections. 250 """ 251
252 - def __init__(self, name, data_class):
253 """ 254 Base constructor 255 @param name: graph resource name of topic, e.g. 'laser'. 256 @type name: str 257 @param data_class: message data class 258 @type data_class: L{Message} 259 """ 260 261 # #1810 made resolved/unresolved more explicit so we don't accidentally double-resolve 262 self.resolved_name = resolve_name(name) #NOTE: remapping occurs here! 263 self.name = self.resolved_name # for backwards compatibility 264 265 self.data_class = data_class 266 self.type = data_class._type 267 self.handler = None 268 self.seq = 0 269 # lock is used for to serialize call order that methods that 270 # modify self.connections. Because add/removing connections is 271 # a rare event, we go through the extra hassle of making a 272 # copy of the connections/callbacks lists 273 # when modifying, then setting the reference to the new copy. 274 # With this pattern, other code can use these lists without 275 # having to acquire c_lock 276 self.c_lock = threading.RLock() 277 self.connections = [] 278 self.closed = False 279 # number of Topic instances using this 280 self.ref_count = 0 281 282 self.connection_poll = Poller()
283
284 - def __del__(self):
285 # very similar to close(), but have to be more careful in a __del__ what we call 286 if self.closed: 287 return 288 if self.connections is not None: 289 for c in self.connections: 290 try: 291 c.close() 292 except: 293 pass 294 del self.connections[:] 295 self.c_lock = self.connections = self.handler = self.data_class = self.type = None
296
297 - def close(self):
298 """close I/O""" 299 if self.closed: 300 return 301 self.closed = True 302 if self.c_lock is not None: 303 with self.c_lock: 304 for c in self.connections: 305 try: 306 if c is not None: 307 c.close() 308 except: 309 # seems more logger.error internal than external logerr 310 _logger.error(traceback.format_exc()) 311 del self.connections[:] 312 self.handler = None
313
314 - def get_num_connections(self):
315 with self.c_lock: 316 return len(self.connections)
317
318 - def has_connection(self, endpoint_id):
319 """ 320 Query whether or not a connection with the associated \a 321 endpoint has been added to this object. 322 @param endpoint_id: endpoint ID associated with connection. 323 @type endpoint_id: str 324 """ 325 # save reference to avoid lock 326 conn = self.connections 327 for c in conn: 328 if c.endpoint_id == endpoint_id: 329 return True 330 return False
331
332 - def has_connections(self):
333 """ 334 Check to see if this topic is connected to other publishers/subscribers 335 @return: True if topic is connected 336 @rtype: bool 337 """ 338 if self.connections: 339 return True 340 return False
341
342 - def _remove_connection(self, connections, c):
343 # Remove from poll instance as well as connections 344 try: 345 self.connection_poll.remove_fd(c.fileno()) 346 except: 347 pass 348 try: 349 # not necessarily correct from an abstraction point of 350 # view, but will prevent accident connection leaks 351 c.close() 352 except: 353 pass 354 # while c might be a rospy.impl.tcpros_base.TCPROSTransport instance 355 # connections might only contain the rospy.impl.tcpros_pubsub.QueuedConnection proxy 356 # finding the "right" connection is more difficult then 357 if c in connections: 358 connections.remove(c) 359 # therefore additionally check for fileno equality if available 360 elif c.fileno(): 361 matching_connections = [ 362 conn for conn in connections if conn.fileno() == c.fileno()] 363 if len(matching_connections) == 1: 364 connections.remove(matching_connections[0])
365
366 - def add_connection(self, c):
367 """ 368 Add a connection to this topic. If any previous connections 369 to same endpoint exist, drop them. 370 371 @param c: connection instance 372 @type c: Transport 373 @return: True if connection was added, ``bool`` 374 """ 375 rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections))) 376 with self.c_lock: 377 # protect against race condition adding connection to closed sub 378 if self.closed: 379 rospyerr( 380 "ERROR: Race condition failure adding connection to closed subscriber\n" 381 "If you run into this please comment on " 382 "https://github.com/ros/ros_comm/issues/544" 383 ) 384 return False 385 386 # c_lock is to make add_connection thread-safe, but we 387 # still make a copy of self.connections so that the rest of the 388 # code can use self.connections in an unlocked manner 389 new_connections = self.connections[:] 390 391 # if we have a connection to the same endpoint_id, drop 392 # the old one. 393 for oldc in self.connections: 394 if oldc.endpoint_id == c.endpoint_id: 395 self._remove_connection(new_connections, oldc) 396 397 # #3808: "garbage collect" bad sockets whenever we add new 398 # connections. This allows at most one stale connection 399 # per topic. Otherwise, we only detect bad connections on 400 # write. An alternative (given the current 401 # implementation) would be to start a thread that 402 # regularly polls all fds, but that would create a lot of 403 # synchronization events and also have a separate thread 404 # to manage. It would be desireable to move to this, but 405 # this change is less impactful and keeps the codebase 406 # more stable as we move towards an entirely different 407 # event loop for rospy -- the heart of the problem is that 408 # rospy's i/o is blocking-based, which has the obvious 409 # issues. 410 411 for fd in self.connection_poll.error_iter(): 412 to_remove = [x for x in new_connections if x.fileno() == fd] 413 for x in to_remove: 414 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id)) 415 self._remove_connection(new_connections, x) 416 417 # Add new connection to poller, register for all events, 418 # though we only care about POLLHUP/ERR 419 new_fd = c.fileno() 420 if new_fd is not None: 421 self.connection_poll.add_fd(new_fd) 422 423 # add in new connection 424 new_connections.append(c) 425 426 self.connections = new_connections 427 428 # connections make a callback when closed 429 # don't clobber an existing callback 430 if not c.cleanup_cb: 431 c.set_cleanup_callback(self.remove_connection) 432 else: 433 previous_callback = c.cleanup_cb 434 new_callback = self.remove_connection 435 def cleanup_cb_wrapper(s): 436 new_callback(s) 437 previous_callback(s)
438 c.set_cleanup_callback(cleanup_cb_wrapper) 439 440 return True
441
442 - def remove_connection(self, c):
443 """ 444 Remove connection from topic. 445 @param c: connection instance to remove 446 @type c: Transport 447 """ 448 rospyinfo("topic[%s] removing connection to %s"%(self.resolved_name, c.endpoint_id)) 449 with self.c_lock: 450 # c_lock is to make remove_connection thread-safe, but we 451 # still make a copy of self.connections so that the rest of the 452 # code can use self.connections in an unlocked manner 453 new_connections = self.connections[:] 454 self._remove_connection(new_connections, c) 455 self.connections = new_connections
456
457 - def get_stats_info(self): # STATS
458 """ 459 Get the stats for this topic 460 @return: stats for topic in getBusInfo() format:: 461 Publisher: 462 ((connection_id, destination_caller_id, direction, transport, topic_name, connected, connection_info_string)*) 463 Subscriber: 464 ((connection_id, publisher_xmlrpc_uri, direction, transport, topic_name, connected, connection_info_string)*) 465 @rtype: list 466 """ 467 # save referenceto avoid locking 468 connections = self.connections 469 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True, c.get_transport_info()) for c in connections] 470
471 - def get_stats(self): # STATS
472 """Get the stats for this topic (API stub)""" 473 raise Exception("subclasses must override") 474 475 # Implementation note: Subscriber attaches to a _SubscriberImpl 476 # singleton for that topic. The underlying impl manages the 477 # connections for that publication and enables thread-safe access 478
479 -class Subscriber(Topic):
480 """ 481 Class for registering as a subscriber to a specified topic, where 482 the messages are of a given type. 483 """
484 - def __init__(self, name, data_class, callback=None, callback_args=None, 485 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
486 """ 487 Constructor. 488 489 NOTE: for the queue_size and buff_size 490 parameters, rospy does not attempt to do intelligent merging 491 between multiple Subscriber instances for the same topic. As 492 they share the same underlying transport, multiple Subscribers 493 to the same topic can conflict with one another if they set 494 these parameters differently. 495 496 @param name: graph resource name of topic, e.g. 'laser'. 497 @type name: str 498 @param data_class: data type class to use for messages, 499 e.g. std_msgs.msg.String 500 @type data_class: L{Message} class 501 @param callback: function to call ( fn(data)) when data is 502 received. If callback_args is set, the function must accept 503 the callback_args as a second argument, i.e. fn(data, 504 callback_args). NOTE: Additional callbacks can be added using 505 add_callback(). 506 @type callback: fn(msg, cb_args) 507 @param callback_args: additional arguments to pass to the 508 callback. This is useful when you wish to reuse the same 509 callback for multiple subscriptions. 510 @type callback_args: any 511 @param queue_size: maximum number of messages to receive at 512 a time. This will generally be 1 or None (infinite, 513 default). buff_size should be increased if this parameter 514 is set as incoming data still needs to sit in the incoming 515 buffer before being discarded. Setting queue_size 516 buff_size to a non-default value affects all subscribers to 517 this topic in this process. 518 @type queue_size: int 519 @param buff_size: incoming message buffer size in bytes. If 520 queue_size is set, this should be set to a number greater 521 than the queue_size times the average message size. Setting 522 buff_size to a non-default value affects all subscribers to 523 this topic in this process. 524 @type buff_size: int 525 @param tcp_nodelay: if True, request TCP_NODELAY from 526 publisher. Use of this option is not generally recommended 527 in most cases as it is better to rely on timestamps in 528 message data. Setting tcp_nodelay to True enables TCP_NODELAY 529 for all subscribers in the same python process. 530 @type tcp_nodelay: bool 531 @raise ROSException: if parameters are invalid 532 """ 533 super(Subscriber, self).__init__(name, data_class, Registration.SUB) 534 #add in args that factory cannot pass in 535 536 # last person to set these to non-defaults wins, not much way 537 # around this 538 if queue_size is not None: 539 self.impl.set_queue_size(queue_size) 540 if buff_size != DEFAULT_BUFF_SIZE: 541 self.impl.set_buff_size(buff_size) 542 543 if callback is not None: 544 # #1852 545 # it's important that we call add_callback so that the 546 # callback can be invoked with any latched messages 547 self.impl.add_callback(callback, callback_args) 548 # save arguments for unregister 549 self.callback = callback 550 self.callback_args = callback_args 551 else: 552 # initialize fields 553 self.callback = self.callback_args = None 554 if tcp_nodelay: 555 self.impl.set_tcp_nodelay(tcp_nodelay)
556
557 - def unregister(self):
558 """ 559 unpublish/unsubscribe from topic. Topic instance is no longer 560 valid after this call. Additional calls to unregister() have no effect. 561 """ 562 if self.impl: 563 # It's possible to have a Subscriber instance with no 564 # associated callback 565 if self.callback is not None: 566 self.impl.remove_callback(self.callback, self.callback_args) 567 self.callback = self.callback_args = None 568 super(Subscriber, self).unregister()
569
570 -class _SubscriberImpl(_TopicImpl):
571 """ 572 Underyling L{_TopicImpl} implementation for subscriptions. 573 """
574 - def __init__(self, name, data_class):
575 """ 576 ctor. 577 @param name: graph resource name of topic, e.g. 'laser'. 578 @type name: str 579 @param data_class: Message data class 580 @type data_class: L{Message} class 581 """ 582 super(_SubscriberImpl, self).__init__(name, data_class) 583 # client-methods to invoke on new messages. should only modify 584 # under lock. This is a list of 2-tuples (fn, args), where 585 # args are additional arguments for the callback, or None 586 self.callbacks = [] 587 self.queue_size = None 588 self.buff_size = DEFAULT_BUFF_SIZE 589 self.tcp_nodelay = False 590 self.statistics_logger = SubscriberStatisticsLogger(self) \ 591 if SubscriberStatisticsLogger.is_enabled() \ 592 else None
593
594 - def close(self):
595 """close I/O and release resources""" 596 _TopicImpl.close(self) 597 if self.callbacks: 598 del self.callbacks[:] 599 self.callbacks = None 600 if self.statistics_logger: 601 self.statistics_logger.shutdown() 602 self.statistics_logger = None
603
604 - def set_tcp_nodelay(self, tcp_nodelay):
605 """ 606 Set the value of TCP_NODELAY, which causes the Nagle algorithm 607 to be disabled for future topic connections, if the publisher 608 supports it. 609 """ 610 self.tcp_nodelay = tcp_nodelay
611
612 - def set_queue_size(self, queue_size):
613 """ 614 Set the receive queue size. If more than queue_size messages 615 are waiting to be deserialized, they are discarded. 616 617 @param queue_size int: incoming queue size. Must be positive integer or None. 618 @type queue_size: int 619 """ 620 if queue_size == -1: 621 self.queue_size = None 622 elif queue_size == 0: 623 raise ROSException("queue size may not be set to zero") 624 elif queue_size is not None and type(queue_size) != int: 625 raise ROSException("queue size must be an integer") 626 else: 627 self.queue_size = queue_size
628
629 - def set_buff_size(self, buff_size):
630 """ 631 Set the receive buffer size. The exact meaning of this is 632 transport dependent. 633 @param buff_size: receive buffer size 634 @type buff_size: int 635 """ 636 if type(buff_size) != int: 637 raise ROSException("buffer size must be an integer") 638 elif buff_size <= 0: 639 raise ROSException("buffer size must be a positive integer") 640 self.buff_size = buff_size
641
642 - def get_stats(self): # STATS
643 """ 644 Get the stats for this topic subscriber 645 @return: stats for topic in getBusStats() publisher format:: 646 (topicName, connStats) 647 where connStats is:: 648 [connectionId, bytesReceived, numSent, dropEstimate, connected]* 649 @rtype: list 650 """ 651 # save reference to avoid locking 652 conn = self.connections 653 #for now drop estimate is -1 654 stats = (self.resolved_name, 655 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done) 656 for c in conn] ) 657 return stats
658
659 - def add_callback(self, cb, cb_args):
660 """ 661 Register a callback to be invoked whenever a new message is received 662 @param cb: callback function to invoke with message data 663 instance, i.e. fn(data). If callback args is set, they will 664 be passed in as the second argument. 665 @type cb: fn(msg, cb_args) 666 @param cb_cargs: additional arguments to pass to callback 667 @type cb_cargs: Any 668 """ 669 if self.closed: 670 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name)) 671 with self.c_lock: 672 # we lock in order to serialize calls to add_callback, but 673 # we copy self.callbacks so we can it 674 new_callbacks = self.callbacks[:] 675 new_callbacks.append((cb, cb_args)) 676 self.callbacks = new_callbacks 677 678 # #1852: invoke callback with any latched messages 679 for c in self.connections: 680 if c.latch is not None: 681 self._invoke_callback(c.latch, cb, cb_args)
682
683 - def remove_callback(self, cb, cb_args):
684 """ 685 Unregister a message callback. 686 @param cb: callback function 687 @type cb: fn(msg, cb_args) 688 @param cb_cargs: additional arguments associated with callback 689 @type cb_cargs: Any 690 @raise KeyError: if no matching callback 691 """ 692 if self.closed: 693 return 694 with self.c_lock: 695 # we lock in order to serialize calls to add_callback, but 696 # we copy self.callbacks so we can it 697 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args] 698 if matches: 699 new_callbacks = self.callbacks[:] 700 # remove the first match 701 new_callbacks.remove(matches[0]) 702 self.callbacks = new_callbacks 703 if not matches: 704 raise KeyError("no matching cb")
705
706 - def _invoke_callback(self, msg, cb, cb_args):
707 """ 708 Invoke callback on msg. Traps and logs any exceptions raise by callback 709 @param msg: message data 710 @type msg: L{Message} 711 @param cb: callback 712 @type cb: fn(msg, cb_args) 713 @param cb_args: callback args or None 714 @type cb_args: Any 715 """ 716 try: 717 if cb_args is not None: 718 cb(msg, cb_args) 719 else: 720 cb(msg) 721 except Exception as e: 722 if not is_shutdown(): 723 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc())) 724 else: 725 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
726
727 - def receive_callback(self, msgs, connection):
728 """ 729 Called by underlying connection transport for each new message received 730 @param msgs: message data 731 @type msgs: [L{Message}] 732 """ 733 # save reference to avoid lock 734 callbacks = self.callbacks 735 for msg in msgs: 736 if self.statistics_logger: 737 self.statistics_logger.callback(msg, connection.callerid_pub, connection.stat_bytes) 738 for cb, cb_args in callbacks: 739 self._invoke_callback(msg, cb, cb_args)
740
741 -class SubscribeListener(object):
742 """ 743 Callback API to receive notifications when new subscribers 744 connect and disconnect. 745 """ 746
747 - def peer_subscribe(self, topic_name, topic_publish, peer_publish):
748 """ 749 callback when a peer has subscribed from a topic 750 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 751 @type topic_name: str 752 @param topic_publish: method to publish message data to all subscribers 753 @type topic_publish: fn(data) 754 @param peer_publish: method to publish message data to 755 new subscriber. NOTE: behavior for the latter is 756 transport-dependent as some transports may be broadcast only. 757 @type peer_publish: fn(data) 758 """ 759 pass
760
761 - def peer_unsubscribe(self, topic_name, num_peers):
762 """ 763 callback when a peer has unsubscribed from a topic 764 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 765 @type topic_name: str 766 @param num_peers: number of remaining peers subscribed to topic 767 @type num_peers: int 768 """ 769 pass
770 771 772 # Implementation note: Publisher attaches to a 773 # _PublisherImpl singleton for that topic. The underlying impl 774 # manages the connections for that publication and enables 775 # thread-safe access 776
777 -class Publisher(Topic):
778 """ 779 Class for registering as a publisher of a ROS topic. 780 """ 781
782 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None):
783 """ 784 Constructor 785 @param name: resource name of topic, e.g. 'laser'. 786 @type name: str 787 @param data_class: message class for serialization 788 @type data_class: L{Message} class 789 @param subscriber_listener: listener for 790 subscription events. May be None. 791 @type subscriber_listener: L{SubscribeListener} 792 @param tcp_nodelay: If True, sets TCP_NODELAY on 793 publisher's socket (disables Nagle algorithm). This results 794 in lower latency publishing at the cost of efficiency. 795 @type tcp_nodelay: bool 796 @param latch: If True, the last message published is 797 'latched', meaning that any future subscribers will be sent 798 that message immediately upon connection. 799 @type latch: bool 800 @param headers: If not None, a dictionary with additional header 801 key-values being used for future connections. 802 @type headers: dict 803 @param queue_size: The queue size used for asynchronously 804 publishing messages from different threads. A size of zero 805 means an infinite queue, which can be dangerous. When the 806 keyword is not being used or when None is passed all 807 publishing will happen synchronously and a warning message 808 will be printed. 809 @type queue_size: int 810 @raise ROSException: if parameters are invalid 811 """ 812 super(Publisher, self).__init__(name, data_class, Registration.PUB) 813 814 if subscriber_listener: 815 self.impl.add_subscriber_listener(subscriber_listener) 816 if tcp_nodelay: 817 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay) 818 if latch: 819 self.impl.enable_latch() 820 if headers: 821 self.impl.add_headers(headers) 822 if queue_size is not None: 823 self.impl.set_queue_size(queue_size) 824 else: 825 import warnings 826 warnings.warn("The publisher should be created with an explicit keyword argument 'queue_size'. " 827 "Please see http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers for more information.", SyntaxWarning, stacklevel=2)
828
829 - def publish(self, *args, **kwds):
830 """ 831 Publish message data object to this topic. 832 Publish can either be called with the message instance to 833 publish or with the constructor args for a new Message 834 instance, i.e.:: 835 pub.publish(message_instance) 836 pub.publish(message_field_1, message_field_2...) 837 pub.publish(message_field_1='foo', message_field_2='bar') 838 839 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used 840 @param kwds : Message keyword arguments. If kwds are used, args must be unset 841 @raise ROSException: If rospy node has not been initialized 842 @raise ROSSerializationException: If unable to serialize 843 message. This is usually a type error with one of the fields. 844 """ 845 if self.impl is None: 846 raise ROSException("publish() to an unregistered() handle") 847 if not is_initialized(): 848 raise ROSException("ROS node has not been initialized yet. Please call init_node() first") 849 data = args_kwds_to_message(self.data_class, args, kwds) 850 try: 851 self.impl.acquire() 852 self.impl.publish(data) 853 except genpy.SerializationError as e: 854 # can't go to rospy.logerr(), b/c this could potentially recurse 855 _logger.error(traceback.format_exc()) 856 raise ROSSerializationException(str(e)) 857 finally: 858 self.impl.release()
859
860 -class _PublisherImpl(_TopicImpl):
861 """ 862 Underyling L{_TopicImpl} implementation for publishers. 863 """ 864
865 - def __init__(self, name, data_class):
866 """ 867 @param name: name of topic, e.g. 'laser'. 868 @type name: str 869 @param data_class: Message data class 870 @type data_class: L{Message} class 871 """ 872 super(_PublisherImpl, self).__init__(name, data_class) 873 if python3 == 0: 874 self.buff = StringIO() 875 else: 876 self.buff = BytesIO() 877 self.publock = threading.RLock() #for acquire()/release 878 self.subscriber_listeners = [] 879 880 # additional client connection headers 881 self.headers = {} 882 883 # publish latch, starts disabled 884 self.is_latch = False 885 self.latch = None 886 887 # maximum queue size for publishing messages 888 self.queue_size = None 889 890 #STATS 891 self.message_data_sent = 0
892
893 - def close(self):
894 """close I/O and release resources""" 895 _TopicImpl.close(self) 896 # release resources 897 if self.subscriber_listeners: 898 del self.subscriber_listeners[:] 899 if self.headers: 900 self.headers.clear() 901 if self.buff is not None: 902 self.buff.close() 903 self.publock = self.headers = self.buff = self.subscriber_listeners = None
904
905 - def add_headers(self, headers):
906 """ 907 Add connection headers to this Topic for future connections. 908 @param headers: key/values will be added to current connection 909 header set, overriding any existing keys if they conflict. 910 @type headers: dict 911 """ 912 self.headers.update(headers)
913
914 - def enable_latch(self):
915 """ 916 Enable publish() latch. The latch contains the last published 917 message and is sent to any new subscribers. 918 """ 919 self.is_latch = True
920
921 - def set_queue_size(self, queue_size):
922 self.queue_size = queue_size
923
924 - def get_stats(self): # STATS
925 """ 926 Get the stats for this topic publisher 927 @return: stats for topic in getBusStats() publisher format:: 928 [topicName, messageDataBytes, connStats], 929 where connStats is:: 930 [id, bytes, numMessages, connected]* 931 @rtype: list 932 """ 933 # save reference to avoid lock 934 conn = self.connections 935 return (self.resolved_name, self.message_data_sent, 936 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in conn] )
937
938 - def add_subscriber_listener(self, l):
939 """ 940 Add a L{SubscribeListener} for subscribe events. 941 @param l: listener instance 942 @type l: L{SubscribeListener} 943 """ 944 self.subscriber_listeners.append(l)
945
946 - def acquire(self):
947 """lock for thread-safe publishing to this transport""" 948 if self.publock is not None: 949 self.publock.acquire()
950
951 - def release(self):
952 """lock for thread-safe publishing to this transport""" 953 if self.publock is not None: 954 self.publock.release()
955
956 - def add_connection(self, c):
957 """ 958 Add a connection to this topic. This must be a PubTransport. If 959 the latch is enabled, c will be sent a the value of the 960 latch. 961 @param c: connection instance 962 @type c: L{Transport} 963 @return: True if connection was added 964 @rtype: bool 965 """ 966 if self.queue_size is not None: 967 c = QueuedConnection(c, self.queue_size) 968 super(_PublisherImpl, self).add_connection(c) 969 def publish_single(data): 970 self.publish(data, connection_override=c)
971 for l in self.subscriber_listeners: 972 l.peer_subscribe(self.resolved_name, self.publish, publish_single) 973 if self.is_latch and self.latch is not None: 974 with self.publock: 975 self.publish(self.latch, connection_override=c) 976 return True 977
978 - def remove_connection(self, c):
979 """ 980 Remove existing connection from this topic. 981 @param c: connection instance to remove 982 @type c: L{Transport} 983 """ 984 super(_PublisherImpl, self).remove_connection(c) 985 num = len(self.connections) 986 for l in self.subscriber_listeners: 987 l.peer_unsubscribe(self.resolved_name, num)
988
989 - def publish(self, message, connection_override=None):
990 """ 991 Publish the data to the topic. If the topic has no subscribers, 992 the method will return without any affect. Access to publish() 993 should be locked using acquire() and release() in order to 994 ensure proper message publish ordering. 995 996 @param message: message data instance to publish 997 @type message: L{Message} 998 @param connection_override: publish to this connection instead of all 999 @type connection_override: L{Transport} 1000 @return: True if the data was published, False otherwise. 1001 @rtype: bool 1002 @raise genpy.SerializationError: if L{Message} instance is unable to serialize itself 1003 @raise rospy.ROSException: if topic has been closed or was closed during publish() 1004 """ 1005 #TODO: should really just use IOError instead of rospy.ROSException 1006 1007 if self.closed: 1008 # during shutdown, the topic can get closed, which creates 1009 # a race condition with user code testing is_shutdown 1010 if not is_shutdown(): 1011 raise ROSException("publish() to a closed topic") 1012 else: 1013 return 1014 1015 if self.is_latch: 1016 self.latch = message 1017 1018 if not self.has_connections(): 1019 #publish() falls through 1020 return False 1021 1022 if connection_override is None: 1023 #copy connections so we can iterate safely 1024 conns = self.connections 1025 else: 1026 conns = [connection_override] 1027 1028 # #2128 test our buffer. I don't now how this got closed in 1029 # that case, but we can at least diagnose the problem. 1030 b = self.buff 1031 try: 1032 b.tell() 1033 1034 # serialize the message 1035 self.seq += 1 #count messages published to the topic 1036 serialize_message(b, self.seq, message) 1037 1038 # send the buffer to all connections 1039 err_con = [] 1040 data = b.getvalue() 1041 1042 for c in conns: 1043 try: 1044 if not is_shutdown(): 1045 c.write_data(data) 1046 except TransportTerminated as e: 1047 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 1048 err_con.append(c) 1049 except Exception as e: 1050 # greater severity level 1051 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 1052 err_con.append(c) 1053 1054 # reset the buffer and update stats 1055 self.message_data_sent += b.tell() #STATS 1056 b.seek(0) 1057 b.truncate(0) 1058 1059 except ValueError: 1060 # operations on self.buff can fail if topic is closed 1061 # during publish, which often happens during Ctrl-C. 1062 # diagnose the error and report accordingly. 1063 if self.closed: 1064 if is_shutdown(): 1065 # we offer no guarantees on publishes that occur 1066 # during shutdown, so this is not exceptional. 1067 return 1068 else: 1069 # this indicates that user-level code most likely 1070 # closed the topic, which is exceptional. 1071 raise ROSException("topic was closed during publish()") 1072 else: 1073 # unexpected, so re-raise original error 1074 raise 1075 1076 # remove any bad connections 1077 for c in err_con: 1078 try: 1079 # connection will callback into remove_connection when 1080 # we close it 1081 c.close() 1082 except: 1083 pass
1084 1085 ################################################################################# 1086 # TOPIC MANAGER/LISTENER 1087
1088 -class _TopicManager(object):
1089 """ 1090 Tracks Topic objects 1091 See L{get_topic_manager()} for singleton access 1092 """ 1093
1094 - def __init__(self):
1095 """ctor.""" 1096 super(_TopicManager, self).__init__() 1097 self.pubs = {} #: { topic: _PublisherImpl } 1098 self.subs = {} #: { topic: _SubscriberImpl } 1099 self.topics = set() # [str] list of topic names 1100 self.lock = threading.Condition() 1101 self.closed = False 1102 _logger.info("topicmanager initialized")
1103
1104 - def get_pub_sub_info(self):
1105 """ 1106 get topic publisher and subscriber connection info for getBusInfo() api 1107 @return: [bus info stats] 1108 See getBusInfo() API for more data structure details. 1109 @rtype: list 1110 """ 1111 with self.lock: 1112 info = [] 1113 for s in chain(iter(self.pubs.values()), iter(self.subs.values())): 1114 info.extend(s.get_stats_info()) 1115 return info
1116
1117 - def get_pub_sub_stats(self):
1118 """ 1119 get topic publisher and subscriber stats for getBusStats() api 1120 @return: [publisherStats, subscriberStats]. 1121 See getBusStats() API for more data structure details. 1122 @rtype: list 1123 """ 1124 with self.lock: 1125 return [s.get_stats() for s in self.pubs.values()],\ 1126 [s.get_stats() for s in self.subs.values()]
1127
1128 - def close_all(self):
1129 """ 1130 Close all registered publication and subscriptions. Manager is 1131 no longer usable after close. 1132 """ 1133 self.closed = True 1134 with self.lock: 1135 for t in chain(iter(self.pubs.values()), iter(self.subs.values())): 1136 t.close() 1137 self.pubs.clear() 1138 self.subs.clear()
1139
1140 - def _add(self, ps, rmap, reg_type):
1141 """ 1142 Add L{_TopicImpl} instance to rmap 1143 @param ps: a pub/sub impl instance 1144 @type ps: L{_TopicImpl} 1145 @param rmap: { topic: _TopicImpl} rmap to record instance in 1146 @type rmap: dict 1147 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1148 @type reg_type: str 1149 """ 1150 resolved_name = ps.resolved_name 1151 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type) 1152 with self.lock: 1153 rmap[resolved_name] = ps 1154 self.topics.add(resolved_name) 1155 1156 # NOTE: this call can take a lengthy amount of time (at 1157 # least until its reimplemented to use queues) 1158 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
1159
1160 - def _recalculate_topics(self):
1161 """recalculate self.topics. expensive""" 1162 self.topics = set([x.resolved_name for x in self.pubs.values()] + 1163 [x.resolved_name for x in self.subs.values()])
1164
1165 - def _remove(self, ps, rmap, reg_type):
1166 """ 1167 Remove L{_TopicImpl} instance from rmap 1168 @param ps: a pub/sub impl instance 1169 @type ps: L{_TopicImpl} 1170 @param rmap: topic->_TopicImpl rmap to remove instance in 1171 @type rmap: dict 1172 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1173 @type reg_type: str 1174 """ 1175 resolved_name = ps.resolved_name 1176 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type) 1177 with self.lock: 1178 del rmap[resolved_name] 1179 self. _recalculate_topics() 1180 1181 # NOTE: this call can take a lengthy amount of time (at 1182 # least until its reimplemented to use queues) 1183 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1184
1185 - def get_impl(self, reg_type, resolved_name):
1186 """ 1187 Get the L{_TopicImpl} for the specified topic. This is mainly for 1188 testing purposes. Unlike acquire_impl, it does not alter the 1189 ref count. 1190 @param resolved_name: resolved topic name 1191 @type resolved_name: str 1192 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1193 @type reg_type: str 1194 """ 1195 if reg_type == Registration.PUB: 1196 rmap = self.pubs 1197 elif reg_type == Registration.SUB: 1198 rmap = self.subs 1199 else: 1200 raise TypeError("invalid reg_type: %s"%s) 1201 return rmap.get(resolved_name, None)
1202
1203 - def acquire_impl(self, reg_type, resolved_name, data_class):
1204 """ 1205 Acquire a L{_TopicImpl} for the specified topic (create one if it 1206 doesn't exist). Every L{Topic} instance has a _TopicImpl that 1207 actually controls the topic resources so that multiple Topic 1208 instances use the same underlying connections. 'Acquiring' a 1209 topic implementation marks that another Topic instance is 1210 using the TopicImpl. 1211 1212 @param resolved_name: resolved topic name 1213 @type resolved_name: str 1214 1215 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1216 @type reg_type: str 1217 1218 @param data_class: message class for topic 1219 @type data_class: L{Message} class 1220 """ 1221 if reg_type == Registration.PUB: 1222 rmap = self.pubs 1223 impl_class = _PublisherImpl 1224 elif reg_type == Registration.SUB: 1225 rmap = self.subs 1226 impl_class = _SubscriberImpl 1227 else: 1228 raise TypeError("invalid reg_type: %s"%s) 1229 with self.lock: 1230 impl = rmap.get(resolved_name, None) 1231 if not impl: 1232 impl = impl_class(resolved_name, data_class) 1233 self._add(impl, rmap, reg_type) 1234 impl.ref_count += 1 1235 return impl
1236
1237 - def release_impl(self, reg_type, resolved_name):
1238 """ 1239 Release a L_{TopicImpl} for the specified topic. 1240 1241 Every L{Topic} instance has a _TopicImpl that actually 1242 controls the topic resources so that multiple Topic instances 1243 use the same underlying connections. 'Acquiring' a topic 1244 implementation marks that another Topic instance is using the 1245 TopicImpl. 1246 1247 @param resolved_name: resolved topic name 1248 @type resolved_name: str 1249 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1250 @type reg_type: str 1251 """ 1252 if reg_type == Registration.PUB: 1253 rmap = self.pubs 1254 else: 1255 rmap = self.subs 1256 with self.lock: 1257 # check for race condition where multiple things are cleaning up at once 1258 if self.closed: 1259 return 1260 impl = rmap.get(resolved_name, None) 1261 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name 1262 impl.ref_count -= 1 1263 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero" 1264 if impl.ref_count == 0: 1265 rospyinfo("topic impl's ref count is zero, deleting topic %s...", resolved_name) 1266 impl.close() 1267 self._remove(impl, rmap, reg_type) 1268 del impl 1269 _logger.debug("... done deleting topic %s", resolved_name)
1270
1271 - def get_publisher_impl(self, resolved_name):
1272 """ 1273 @param resolved_name: resolved topic name 1274 @type resolved_name: str 1275 @return: list of L{_PublisherImpl}s 1276 @rtype: [L{_PublisherImpl}] 1277 """ 1278 return self.pubs.get(resolved_name, None)
1279
1280 - def get_subscriber_impl(self, resolved_name):
1281 """ 1282 @param resolved_name: topic name 1283 @type resolved_name: str 1284 @return: subscriber for the specified topic. 1285 @rtype: L{_SubscriberImpl} 1286 """ 1287 return self.subs.get(resolved_name, None)
1288
1289 - def has_subscription(self, resolved_name):
1290 """ 1291 @param resolved_name: resolved topic name 1292 @type resolved_name: str 1293 @return: True if manager has subscription for specified topic 1294 @rtype: bool 1295 """ 1296 return resolved_name in self.subs
1297
1298 - def has_publication(self, resolved_name):
1299 """ 1300 @param resolved_name: resolved topic name 1301 @type resolved_name: str 1302 @return: True if manager has publication for specified topic 1303 @rtype: bool 1304 """ 1305 return resolved_name in self.pubs
1306
1307 - def get_topics(self):
1308 """ 1309 @return: list of topic names this node subscribes to/publishes 1310 @rtype: [str] 1311 """ 1312 return self.topics
1313
1314 - def _get_list(self, rmap):
1315 return [[k, v.type] for k, v in rmap.items()]
1316 1317 ## @return [[str,str],]: list of topics subscribed to by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1318 - def get_subscriptions(self):
1319 return self._get_list(self.subs)
1320 1321 ## @return [[str,str],]: list of topics published by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1322 - def get_publications(self):
1323 return self._get_list(self.pubs)
1324 1325 set_topic_manager(_TopicManager()) 1326