Package rospy :: Module topics

Source Code for Module rospy.topics

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2008, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32  # 
  33  # Revision $Id: topics.py 13839 2011-06-01 01:22:34Z kwc $ 
  34   
  35  """ 
  36  rospy implementation of topics. 
  37   
  38  Client API 
  39  ========== 
  40   
  41  L{Publisher} and L{Subscriber} are the client API for topics. 
  42   
  43  Internal Implementation 
  44  ======================= 
  45   
  46  Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl) 
  47  are used to manage actual transport connections.  The L{_TopicManager} 
  48  is responsible for tracking the system-wide state of publications and 
  49  subscriptions as well as the L{_TopicImpl} instances. More info is below. 
  50    
  51  L{_TopicManager} 
  52  ================ 
  53   
  54  The L{_TopicManager} does the backend topic bookkeeping for the local 
  55  node.  Use L{get_topic_manager()} to access singleton. Actual topic 
  56  implementations are done through the 
  57  L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client 
  58  code generates instances of type L{Publisher}/L{Subscriber}, which 
  59  enable to client to create multiple publishers/subscribers of that 
  60  topic that get controlled access to the underlying share connections. 
  61   
  62  Common parent classes for all rospy topics. The rospy topic autogenerators 
  63  create classes that are children of these implementations. 
  64  """ 
  65   
  66  from __future__ import with_statement 
  67  import struct, cStringIO, thread, threading, logging, time 
  68  from itertools import chain 
  69  import traceback 
  70   
  71  import roslib.message 
  72   
  73  from rospy.core import * 
  74  from rospy.exceptions import ROSSerializationException, TransportTerminated 
  75  from rospy.msg import serialize_message, args_kwds_to_message 
  76   
  77  from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners 
  78  from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE 
  79  from rospy.impl.transport import DeadTransport 
  80   
  81  _logger = logging.getLogger('rospy.topics') 
  82   
  83  # wrap roslib implementation and map it to rospy namespace 
  84  Message = roslib.message.Message 
  85   
  86  ####################################################################### 
  87  # Base classes for all client-API instantiated pub/sub 
  88  # 
  89  # There are two trees: Topic and _TopicImpl. Topic is the client API 
  90  # for interfacing with topics, while _TopicImpl implements the 
  91  # underlying connection details.  
  92   
93 -class Topic(object):
94 """Base class of L{Publisher} and L{Subscriber}""" 95
96 - def __init__(self, name, data_class, reg_type):
97 """ 98 @param name: graph resource name of topic, e.g. 'laser'. 99 @type name: str 100 @param data_class: message class for serialization 101 @type data_class: L{Message} 102 @param reg_type Registration.PUB or Registration.SUB 103 @type reg_type: str 104 @raise ValueError: if parameters are invalid 105 """ 106 107 if not name or not isinstance(name, basestring): 108 raise ValueError("topic name is not a non-empty string") 109 if isinstance(name, unicode): 110 raise ValueError("topic name cannot be unicode") 111 if data_class is None: 112 raise ValueError("topic parameter 'data_class' is not initialized") 113 if not type(data_class) == type: 114 raise ValueError("data_class [%s] is not a class"%data_class) 115 if not issubclass(data_class, roslib.message.Message): 116 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__) 117 # #2202 118 if not roslib.names.is_legal_name(name): 119 import warnings 120 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 121 122 # this is a bit ugly, but necessary due to the fact that we allow 123 # topics and services to be initialized before the node 124 if not rospy.core.is_initialized(): 125 self.resolved_name = rospy.names.resolve_name_without_node_name(name) 126 else: 127 # init_node() has been called, so we can do normal resolution 128 self.resolved_name = resolve_name(name) 129 130 self.name = self.resolved_name # #1810 for backwards compatibility 131 132 self.data_class = data_class 133 self.type = data_class._type 134 self.md5sum = data_class._md5sum 135 self.reg_type = reg_type 136 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
137
138 - def get_num_connections(self):
139 """ 140 get the number of connections to other ROS nodes for this topic. For a Publisher, 141 this corresponds to the number of nodes subscribing. For a Subscriber, the number 142 of publishers. 143 @return: number of connections 144 @rtype: int 145 """ 146 return self.impl.get_num_connections()
147
148 - def unregister(self):
149 """ 150 unpublish/unsubscribe from topic. Topic instance is no longer 151 valid after this call. Additional calls to unregister() have no effect. 152 """ 153 # as we don't guard unregister, have to protect value of 154 # resolved_name for release_impl call 155 resolved_name = self.resolved_name 156 if resolved_name and self.impl: 157 get_topic_manager().release_impl(self.reg_type, resolved_name) 158 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
159
160 -class _TopicImpl(object):
161 """ 162 Base class of internal topic implementations. Each topic has a 163 singleton _TopicImpl implementation for managing the underlying 164 connections. 165 """ 166
167 - def __init__(self, name, data_class):
168 """ 169 Base constructor 170 @param name: graph resource name of topic, e.g. 'laser'. 171 @type name: str 172 @param data_class: message data class 173 @type data_class: L{Message} 174 """ 175 176 # #1810 made resolved/unresolved more explicit so we don't accidentally double-resolve 177 self.resolved_name = resolve_name(name) #NOTE: remapping occurs here! 178 self.name = self.resolved_name # for backwards compatibility 179 180 self.data_class = data_class 181 self.type = data_class._type 182 self.handler = None 183 self.seq = 0 184 # lock is used for to serialize call order that methods that 185 # modify self.connections. Because add/removing connections is 186 # a rare event, we go through the extra hassle of making a 187 # copy of the connections/dead_connections/callbacks lists 188 # when modifying, then setting the reference to the new copy. 189 # With this pattern, other code can use these lists without 190 # having to acquire c_lock 191 self.c_lock = threading.RLock() 192 self.connections = [] 193 self.closed = False 194 # number of Topic instances using this 195 self.ref_count = 0 196 #STATS 197 self.dead_connections = [] #for retaining stats on old conns
198
199 - def __del__(self):
200 # very similar to close(), but have to be more careful in a __del__ what we call 201 if self.closed: 202 return 203 if self.connections is not None: 204 for c in self.connections: 205 try: 206 c.close() 207 except: 208 pass 209 del self.connections[:] 210 del self.dead_connections[:] 211 self.c_lock = self.dead_connections = self.connections = self.handler = self.data_class = self.type = None
212
213 - def close(self):
214 """close I/O""" 215 if self.closed: 216 return 217 self.closed = True 218 if self.c_lock is not None: 219 with self.c_lock: 220 for c in self.connections: 221 try: 222 if c is not None: 223 c.close() 224 except: 225 # seems more logger.error internal than external logerr 226 _logger.error(traceback.format_exc()) 227 del self.connections[:] 228 self.handler = None
229 230 # note: currently not deleting self.dead_connections. not 231 # sure what the right policy here as dead_connections is 232 # for statistics only. they should probably be moved to 233 # the topic manager instead. 234
235 - def get_num_connections(self):
236 with self.c_lock: 237 return len(self.connections)
238
239 - def has_connection(self, endpoint_id):
240 """ 241 Query whether or not a connection with the associated \a 242 endpoint has been added to this object. 243 @param endpoint_id: endpoint ID associated with connection. 244 @type endpoint_id: str 245 """ 246 # save reference to avoid lock 247 conn = self.connections 248 for c in conn: 249 if c.endpoint_id == endpoint_id: 250 return True 251 return False
252
253 - def has_connections(self):
254 """ 255 Check to see if this topic is connected to other publishers/subscribers 256 @return: True if topic is connected 257 @rtype: bool 258 """ 259 if self.connections: 260 return True 261 return False
262
263 - def add_connection(self, c):
264 """ 265 Add a connection to this topic. 266 @param c: connection instance 267 @type c: Transport 268 @return: True if connection was added 269 @rtype: bool 270 """ 271 with self.c_lock: 272 # c_lock is to make add_connection thread-safe, but we 273 # still make a copy of self.connections so that the rest of the 274 # code can use self.connections in an unlocked manner 275 new_connections = self.connections[:] 276 new_connections.append(c) 277 self.connections = new_connections 278 279 # connections make a callback when closed 280 c.set_cleanup_callback(self.remove_connection) 281 282 return True
283
284 - def remove_connection(self, c):
285 """ 286 Remove connection from topic. 287 @param c: connection instance to remove 288 @type c: Transport 289 """ 290 with self.c_lock: 291 # c_lock is to make remove_connection thread-safe, but we 292 # still make a copy of self.connections so that the rest of the 293 # code can use self.connections in an unlocked manner 294 new_connections = self.connections[:] 295 new_dead_connections = self.dead_connections[:] 296 if c in new_connections: 297 new_connections.remove(c) 298 new_dead_connections.append(DeadTransport(c)) 299 self.connections = new_connections 300 self.dead_connections = new_dead_connections
301
302 - def get_stats_info(self): # STATS
303 """ 304 Get the stats for this topic 305 @return: stats for topic in getBusInfo() format:: 306 ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*) 307 @rtype: list 308 """ 309 # save referenceto avoid locking 310 connections = self.connections 311 dead_connections = self.dead_connections 312 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True) for c in connections] + \ 313 [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, False) for c in dead_connections]
314
315 - def get_stats(self): # STATS
316 """Get the stats for this topic (API stub)""" 317 raise Exception("subclasses must override") 318 319 # Implementation note: Subscriber attaches to a _SubscriberImpl 320 # singleton for that topic. The underlying impl manages the 321 # connections for that publication and enables thread-safe access 322
323 -class Subscriber(Topic):
324 """ 325 Class for registering as a subscriber to a specified topic, where 326 the messages are of a given type. 327 """
328 - def __init__(self, name, data_class, callback=None, callback_args=None, 329 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
330 """ 331 Constructor. 332 333 NOTE: for the queue_size and buff_size 334 parameters, rospy does not attempt to do intelligent merging 335 between multiple Subscriber instances for the same topic. As 336 they share the same underlying transport, multiple Subscribers 337 to the same topic can conflict with one another if they set 338 these parameters differently. 339 340 @param name: graph resource name of topic, e.g. 'laser'. 341 @type name: str 342 @param data_class: data type class to use for messages, 343 e.g. std_msgs.msg.String 344 @type data_class: L{Message} class 345 @param callback: function to call ( fn(data)) when data is 346 received. If callback_args is set, the function must accept 347 the callback_args as a second argument, i.e. fn(data, 348 callback_args). NOTE: Additional callbacks can be added using 349 add_callback(). 350 @type callback: str 351 @param callback_args: additional arguments to pass to the 352 callback. This is useful when you wish to reuse the same 353 callback for multiple subscriptions. 354 @type callback_args: any 355 @param queue_size: maximum number of messages to receive at 356 a time. This will generally be 1 or None (infinite, 357 default). buff_size should be increased if this parameter 358 is set as incoming data still needs to sit in the incoming 359 buffer before being discarded. Setting queue_size 360 buff_size to a non-default value affects all subscribers to 361 this topic in this process. 362 @type queue_size: int 363 @param buff_size: incoming message buffer size in bytes. If 364 queue_size is set, this should be set to a number greater 365 than the queue_size times the average message size. Setting 366 buff_size to a non-default value affects all subscribers to 367 this topic in this process. 368 @type buff_size: int 369 @param tcp_nodelay: if True, request TCP_NODELAY from 370 publisher. Use of this option is not generally recommended 371 in most cases as it is better to rely on timestamps in 372 message data. Setting tcp_nodelay to True enables TCP_NODELAY 373 for all subscribers in the same python process. 374 @type tcp_nodelay: bool 375 @raise ROSException: if parameters are invalid 376 """ 377 super(Subscriber, self).__init__(name, data_class, Registration.SUB) 378 #add in args that factory cannot pass in 379 380 # last person to set these to non-defaults wins, not much way 381 # around this 382 if queue_size is not None: 383 self.impl.set_queue_size(queue_size) 384 if buff_size != DEFAULT_BUFF_SIZE: 385 self.impl.set_buff_size(buff_size) 386 387 if callback is not None: 388 # #1852 389 # it's important that we call add_callback so that the 390 # callback can be invoked with any latched messages 391 self.impl.add_callback(callback, callback_args) 392 # save arguments for unregister 393 self.callback = callback 394 self.callback_args = callback_args 395 else: 396 # initialize fields 397 self.callback = self.callback_args = None 398 if tcp_nodelay: 399 self.impl.set_tcp_nodelay(tcp_nodelay)
400
401 - def unregister(self):
402 """ 403 unpublish/unsubscribe from topic. Topic instance is no longer 404 valid after this call. Additional calls to unregister() have no effect. 405 """ 406 if self.impl: 407 # It's possible to have a Subscriber instance with no 408 # associated callback 409 if self.callback is not None: 410 self.impl.remove_callback(self.callback, self.callback_args) 411 self.callback = self.callback_args = None 412 super(Subscriber, self).unregister()
413
414 -class _SubscriberImpl(_TopicImpl):
415 """ 416 Underyling L{_TopicImpl} implementation for subscriptions. 417 """ 418
419 - def __init__(self, name, data_class):
420 """ 421 ctor. 422 @param name: graph resource name of topic, e.g. 'laser'. 423 @type name: str 424 @param data_class: Message data class 425 @type data_class: L{Message} class 426 """ 427 super(_SubscriberImpl, self).__init__(name, data_class) 428 # client-methods to invoke on new messages. should only modify 429 # under lock. This is a list of 2-tuples (fn, args), where 430 # args are additional arguments for the callback, or None 431 self.callbacks = [] 432 self.queue_size = None 433 self.buff_size = DEFAULT_BUFF_SIZE 434 self.tcp_nodelay = False
435
436 - def close(self):
437 """close I/O and release resources""" 438 _TopicImpl.close(self) 439 if self.callbacks: 440 del self.callbacks[:] 441 self.callbacks = None
442
443 - def set_tcp_nodelay(self, tcp_nodelay):
444 """ 445 Set the value of TCP_NODELAY, which causes the Nagle algorithm 446 to be disabled for future topic connections, if the publisher 447 supports it. 448 """ 449 self.tcp_nodelay = tcp_nodelay
450
451 - def set_queue_size(self, queue_size):
452 """ 453 Set the receive queue size. If more than queue_size messages 454 are waiting to be deserialized, they are discarded. 455 456 @param queue_size int: incoming queue size. Must be positive integer or None. 457 @type queue_size: int 458 """ 459 if queue_size == -1: 460 self.queue_size = None 461 elif queue_size == 0: 462 raise ROSException("queue size may not be set to zero") 463 elif queue_size is not None and type(queue_size) != int: 464 raise ROSException("queue size must be an integer") 465 else: 466 self.queue_size = queue_size
467
468 - def set_buff_size(self, buff_size):
469 """ 470 Set the receive buffer size. The exact meaning of this is 471 transport dependent. 472 @param buff_size: receive buffer size 473 @type buff_size: int 474 """ 475 if type(buff_size) != int: 476 raise ROSException("buffer size must be an integer") 477 elif buff_size <= 0: 478 raise ROSException("buffer size must be a positive integer") 479 self.buff_size = buff_size
480
481 - def get_stats(self): # STATS
482 """ 483 Get the stats for this topic subscriber 484 @return: stats for topic in getBusStats() publisher format:: 485 (topicName, connStats) 486 where connStats is:: 487 [connectionId, bytesReceived, numSent, dropEstimate, connected]* 488 @rtype: list 489 """ 490 # save reference to avoid locking 491 conn = self.connections 492 dead_conn = self.dead_connections 493 #for now drop estimate is -1 494 stats = (self.resolved_name, 495 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done) 496 for c in chain(conn, dead_conn)] ) 497 return stats
498
499 - def add_callback(self, cb, cb_args):
500 """ 501 Register a callback to be invoked whenever a new message is received 502 @param cb: callback function to invoke with message data 503 instance, i.e. fn(data). If callback args is set, they will 504 be passed in as the second argument. 505 @type cb: fn(msg) 506 @param cb_cargs: additional arguments to pass to callback 507 @type cb_cargs: Any 508 """ 509 if self.closed: 510 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name)) 511 with self.c_lock: 512 # we lock in order to serialize calls to add_callback, but 513 # we copy self.callbacks so we can it 514 new_callbacks = self.callbacks[:] 515 new_callbacks.append((cb, cb_args)) 516 self.callbacks = new_callbacks 517 518 # #1852: invoke callback with any latched messages 519 for c in self.connections: 520 if c.latch is not None: 521 self._invoke_callback(c.latch, cb, cb_args)
522
523 - def remove_callback(self, cb, cb_args):
524 """ 525 Unregister a message callback. 526 @param cb: callback function 527 @type cb: fn(msg) 528 @param cb_cargs: additional arguments associated with callback 529 @type cb_cargs: Any 530 @raise KeyError: if no matching callback 531 """ 532 if self.closed: 533 return 534 with self.c_lock: 535 # we lock in order to serialize calls to add_callback, but 536 # we copy self.callbacks so we can it 537 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args] 538 if matches: 539 new_callbacks = self.callbacks[:] 540 # remove the first match 541 new_callbacks.remove(matches[0]) 542 self.callbacks = new_callbacks 543 if not matches: 544 raise KeyError("no matching cb")
545
546 - def _invoke_callback(self, msg, cb, cb_args):
547 """ 548 Invoke callback on msg. Traps and logs any exceptions raise by callback 549 @param msg: message data 550 @type msg: L{Message} 551 @param cb: callback 552 @type cb: fn(msg, cb_args) 553 @param cb_args: callback args or None 554 @type cb_args: Any 555 """ 556 try: 557 if cb_args is not None: 558 cb(msg, cb_args) 559 else: 560 cb(msg) 561 except Exception, e: 562 if not is_shutdown(): 563 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc())) 564 else: 565 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
566
567 - def receive_callback(self, msgs):
568 """ 569 Called by underlying connection transport for each new message received 570 @param msgs: message data 571 @type msgs: [L{Message}] 572 """ 573 # save reference to avoid lock 574 callbacks = self.callbacks 575 for msg in msgs: 576 for cb, cb_args in callbacks: 577 self._invoke_callback(msg, cb, cb_args)
578
579 -class SubscribeListener(object):
580 """ 581 Callback API to receive notifications when new subscribers 582 connect and disconnect. 583 """ 584
585 - def peer_subscribe(self, topic_name, topic_publish, peer_publish):
586 """ 587 callback when a peer has subscribed from a topic 588 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 589 @type topic_name: str 590 @param topic_publish: method to publish message data to all subscribers 591 @type topic_publish: fn(data) 592 @param peer_publish: method to publish message data to 593 new subscriber. NOTE: behavior for the latter is 594 transport-dependent as some transports may be broadcast only. 595 @type peer_publish: fn(data) 596 """ 597 pass
598
599 - def peer_unsubscribe(self, topic_name, num_peers):
600 """ 601 callback when a peer has unsubscribed from a topic 602 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 603 @type topic_name: str 604 @param num_peers: number of remaining peers subscribed to topic 605 @type num_peers: int 606 """ 607 pass
608 609 610 # Implementation note: Publisher attaches to a 611 # _PublisherImpl singleton for that topic. The underlying impl 612 # manages the connections for that publication and enables 613 # thread-safe access 614
615 -class Publisher(Topic):
616 """ 617 Class for registering as a publisher of a ROS topic. 618 """ 619
620 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None):
621 """ 622 Constructor 623 @param name: resource name of topic, e.g. 'laser'. 624 @type name: str 625 @param data_class: message class for serialization 626 @type data_class: L{Message} class 627 @param subscriber_listener: listener for 628 subscription events. May be None. 629 @type subscriber_listener: L{SubscribeListener} 630 @param tcp_nodelay: If True, sets TCP_NODELAY on 631 publisher's socket (disables Nagle algorithm). This results 632 in lower latency publishing at the cost of efficiency. 633 @type tcp_nodelay: bool 634 @param latch: If True, the last message published is 635 'latched', meaning that any future subscribers will be sent 636 that message immediately upon connection. 637 @type latch: bool 638 @raise ROSException: if parameters are invalid 639 """ 640 super(Publisher, self).__init__(name, data_class, Registration.PUB) 641 642 if subscriber_listener: 643 self.impl.add_subscriber_listener(subscriber_listener) 644 if tcp_nodelay: 645 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay) 646 if latch: 647 self.impl.enable_latch() 648 if headers: 649 self.impl.add_headers(headers)
650
651 - def publish(self, *args, **kwds):
652 """ 653 Publish message data object to this topic. 654 Publish can either be called with the message instance to 655 publish or with the constructor args for a new Message 656 instance, i.e.:: 657 pub.publish(message_instance) 658 pub.publish(message_field_1, message_field_2...) 659 pub.publish(message_field_1='foo', message_field_2='bar') 660 661 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used 662 @param kwds : Message keyword arguments. If kwds are used, args must be unset 663 @raise ROSException: If rospy node has not been initialized 664 @raise ROSSerializationException: If unable to serialize 665 message. This is usually a type error with one of the fields. 666 """ 667 if self.impl is None: 668 raise ROSException("publish() to an unregistered() handle") 669 if not is_initialized(): 670 raise ROSException("ROS node has not been initialized yet. Please call init_node() first") 671 data = args_kwds_to_message(self.data_class, args, kwds) 672 try: 673 self.impl.acquire() 674 self.impl.publish(data) 675 except roslib.message.SerializationError, e: 676 # can't go to rospy.logerr(), b/c this could potentially recurse 677 _logger.error(traceback.format_exc(e)) 678 raise ROSSerializationException(str(e)) 679 finally: 680 self.impl.release()
681
682 -class _PublisherImpl(_TopicImpl):
683 """ 684 Underyling L{_TopicImpl} implementation for publishers. 685 """ 686
687 - def __init__(self, name, data_class):
688 """ 689 @param name: name of topic, e.g. 'laser'. 690 @type name: str 691 @param data_class: Message data class 692 @type data_class: L{Message} class 693 """ 694 super(_PublisherImpl, self).__init__(name, data_class) 695 self.buff = cStringIO.StringIO() 696 self.publock = threading.RLock() #for acquire()/release 697 self.subscriber_listeners = [] 698 699 # additional client connection headers 700 self.headers = {} 701 702 # publish latch, starts disabled 703 self.is_latch = False 704 self.latch = None 705 706 #STATS 707 self.message_data_sent = 0
708
709 - def close(self):
710 """close I/O and release resources""" 711 _TopicImpl.close(self) 712 # release resources 713 if self.subscriber_listeners: 714 del self.subscriber_listeners[:] 715 if self.headers: 716 self.headers.clear() 717 if self.buff is not None: 718 self.buff.close() 719 self.publock = self.headers = self.buff = self.subscriber_listeners = None
720
721 - def add_headers(self, headers):
722 """ 723 Add connection headers to this Topic for future connections. 724 @param headers: key/values will be added to current connection 725 header set, overriding any existing keys if they conflict. 726 @type headers: dict 727 """ 728 self.headers.update(headers)
729
730 - def enable_latch(self):
731 """ 732 Enable publish() latch. The latch contains the last published 733 message and is sent to any new subscribers. 734 """ 735 self.is_latch = True
736
737 - def get_stats(self): # STATS
738 """ 739 Get the stats for this topic publisher 740 @return: stats for topic in getBusStats() publisher format:: 741 [topicName, messageDataBytes, connStats], 742 where connStats is:: 743 [id, bytes, numMessages, connected]* 744 @rtype: list 745 """ 746 # save reference to avoid lock 747 conn = self.connections 748 dead_conn = self.dead_connections 749 return (self.resolved_name, self.message_data_sent, 750 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in chain(conn, dead_conn)] )
751
752 - def add_subscriber_listener(self, l):
753 """ 754 Add a L{SubscribeListener} for subscribe events. 755 @param l: listener instance 756 @type l: L{SubscribeListener} 757 """ 758 self.subscriber_listeners.append(l)
759
760 - def acquire(self):
761 """lock for thread-safe publishing to this transport""" 762 if self.publock is not None: 763 self.publock.acquire()
764
765 - def release(self):
766 """lock for thread-safe publishing to this transport""" 767 if self.publock is not None: 768 self.publock.release()
769
770 - def add_connection(self, c):
771 """ 772 Add a connection to this topic. This must be a PubTransport. If 773 the latch is enabled, c will be sent a the value of the 774 latch. 775 @param c: connection instance 776 @type c: L{Transport} 777 @return: True if connection was added 778 @rtype: bool 779 """ 780 super(_PublisherImpl, self).add_connection(c) 781 def publish_single(data): 782 self.publish(data, connection_override=c)
783 for l in self.subscriber_listeners: 784 l.peer_subscribe(self.resolved_name, self.publish, publish_single) 785 if self.is_latch and self.latch is not None: 786 with self.publock: 787 self.publish(self.latch, connection_override=c) 788 return True 789
790 - def remove_connection(self, c):
791 """ 792 Remove existing connection from this topic. 793 @param c: connection instance to remove 794 @type c: L{Transport} 795 """ 796 super(_PublisherImpl, self).remove_connection(c) 797 num = len(self.connections) 798 for l in self.subscriber_listeners: 799 l.peer_unsubscribe(self.resolved_name, num)
800
801 - def publish(self, message, connection_override=None):
802 """ 803 Publish the data to the topic. If the topic has no subscribers, 804 the method will return without any affect. Access to publish() 805 should be locked using acquire() and release() in order to 806 ensure proper message publish ordering. 807 808 @param message: message data instance to publish 809 @type message: L{Message} 810 @param connection_override: publish to this connection instead of all 811 @type connection_override: L{Transport} 812 @return: True if the data was published, False otherwise. 813 @rtype: bool 814 @raise roslib.message.SerializationError: if L{Message} instance is unable to serialize itself 815 @raise rospy.ROSException: if topic has been closed or was closed during publish() 816 """ 817 #TODO: should really just use IOError instead of rospy.ROSException 818 819 if self.closed: 820 # during shutdown, the topic can get closed, which creates 821 # a race condition with user code testing is_shutdown 822 if not is_shutdown(): 823 raise ROSException("publish() to a closed topic") 824 else: 825 return 826 827 if self.is_latch: 828 self.latch = message 829 830 if not self.has_connections(): 831 #publish() falls through 832 return False 833 834 if connection_override is None: 835 #copy connections so we can iterate safely 836 conns = self.connections 837 else: 838 conns = [connection_override] 839 840 # #2128 test our buffer. I don't now how this got closed in 841 # that case, but we can at least diagnose the problem. 842 b = self.buff 843 try: 844 b.tell() 845 846 # serialize the message 847 self.seq += 1 #count messages published to the topic 848 serialize_message(b, self.seq, message) 849 850 # send the buffer to all connections 851 err_con = [] 852 data = b.getvalue() 853 854 for c in conns: 855 try: 856 if not is_shutdown(): 857 c.write_data(data) 858 except TransportTerminated, e: 859 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 860 err_con.append(c) 861 except Exception, e: 862 # greater severity level 863 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 864 err_con.append(c) 865 866 # reset the buffer and update stats 867 self.message_data_sent += b.tell() #STATS 868 b.seek(0) 869 b.truncate(0) 870 871 except ValueError: 872 # operations on self.buff can fail if topic is closed 873 # during publish, which often happens during Ctrl-C. 874 # diagnose the error and report accordingly. 875 if self.closed: 876 if is_shutdown(): 877 # we offer no guarantees on publishes that occur 878 # during shutdown, so this is not exceptional. 879 return 880 else: 881 # this indicates that user-level code most likely 882 # closed the topic, which is exceptional. 883 raise ROSException("topic was closed during publish()") 884 else: 885 # unexpected, so re-raise original error 886 raise 887 888 # remove any bad connections 889 for c in err_con: 890 try: 891 # connection will callback into remove_connection when 892 # we close it 893 c.close() 894 except: 895 pass
896 897 ################################################################################# 898 # TOPIC MANAGER/LISTENER 899
900 -class _TopicManager(object):
901 """ 902 Tracks Topic objects 903 See L{get_topic_manager()} for singleton access 904 """ 905
906 - def __init__(self):
907 """ctor.""" 908 super(_TopicManager, self).__init__() 909 self.pubs = {} #: { topic: _PublisherImpl } 910 self.subs = {} #: { topic: _SubscriberImpl } 911 self.topics = set() # [str] list of topic names 912 self.lock = threading.Condition() 913 self.closed = False 914 _logger.info("topicmanager initialized")
915
916 - def get_pub_sub_info(self):
917 """ 918 get topic publisher and subscriber connection info for getBusInfo() api 919 @return: [bus info stats] 920 See getBusInfo() API for more data structure details. 921 @rtype: list 922 """ 923 with self.lock: 924 info = [] 925 for s in chain(self.pubs.itervalues(), self.subs.itervalues()): 926 info.extend(s.get_stats_info()) 927 return info
928
929 - def get_pub_sub_stats(self):
930 """ 931 get topic publisher and subscriber stats for getBusStats() api 932 @return: [publisherStats, subscriberStats]. 933 See getBusStats() API for more data structure details. 934 @rtype: list 935 """ 936 with self.lock: 937 return [s.get_stats() for s in self.pubs.itervalues()],\ 938 [s.get_stats() for s in self.subs.itervalues()]
939
940 - def close_all(self):
941 """ 942 Close all registered publication and subscriptions. Manager is 943 no longer usable after close. 944 """ 945 self.closed = True 946 with self.lock: 947 for t in chain(self.pubs.itervalues(), self.subs.itervalues()): 948 t.close() 949 self.pubs.clear() 950 self.subs.clear()
951
952 - def _add(self, ps, rmap, reg_type):
953 """ 954 Add L{_TopicImpl} instance to rmap 955 @param ps: a pub/sub impl instance 956 @type ps: L{_TopicImpl} 957 @param rmap: { topic: _TopicImpl} rmap to record instance in 958 @type rmap: dict 959 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 960 @type reg_type: str 961 """ 962 resolved_name = ps.resolved_name 963 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type) 964 with self.lock: 965 rmap[resolved_name] = ps 966 self.topics.add(resolved_name) 967 968 # NOTE: this call can take a lengthy amount of time (at 969 # least until its reimplemented to use queues) 970 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
971
972 - def _recalculate_topics(self):
973 """recalculate self.topics. expensive""" 974 self.topics = set([x.resolved_name for x in self.pubs.itervalues()] + 975 [x.resolved_name for x in self.subs.itervalues()])
976
977 - def _remove(self, ps, rmap, reg_type):
978 """ 979 Remove L{_TopicImpl} instance from rmap 980 @param ps: a pub/sub impl instance 981 @type ps: L{_TopicImpl} 982 @param rmap: topic->_TopicImpl rmap to remove instance in 983 @type rmap: dict 984 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 985 @type reg_type: str 986 """ 987 resolved_name = ps.resolved_name 988 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type) 989 with self.lock: 990 del rmap[resolved_name] 991 self. _recalculate_topics() 992 993 # NOTE: this call can take a lengthy amount of time (at 994 # least until its reimplemented to use queues) 995 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
996
997 - def get_impl(self, reg_type, resolved_name):
998 """ 999 Get the L{_TopicImpl} for the specified topic. This is mainly for 1000 testing purposes. Unlike acquire_impl, it does not alter the 1001 ref count. 1002 @param resolved_name: resolved topic name 1003 @type resolved_name: str 1004 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1005 @type reg_type: str 1006 """ 1007 if reg_type == Registration.PUB: 1008 rmap = self.pubs 1009 elif reg_type == Registration.SUB: 1010 rmap = self.subs 1011 else: 1012 raise TypeError("invalid reg_type: %s"%s) 1013 return rmap.get(resolved_name, None)
1014
1015 - def acquire_impl(self, reg_type, resolved_name, data_class):
1016 """ 1017 Acquire a L{_TopicImpl} for the specified topic (create one if it 1018 doesn't exist). Every L{Topic} instance has a _TopicImpl that 1019 actually controls the topic resources so that multiple Topic 1020 instances use the same underlying connections. 'Acquiring' a 1021 topic implementation marks that another Topic instance is 1022 using the TopicImpl. 1023 1024 @param resolved_name: resolved topic name 1025 @type resolved_name: str 1026 1027 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1028 @type reg_type: str 1029 1030 @param data_class: message class for topic 1031 @type data_class: L{Message} class 1032 """ 1033 if reg_type == Registration.PUB: 1034 rmap = self.pubs 1035 impl_class = _PublisherImpl 1036 elif reg_type == Registration.SUB: 1037 rmap = self.subs 1038 impl_class = _SubscriberImpl 1039 else: 1040 raise TypeError("invalid reg_type: %s"%s) 1041 with self.lock: 1042 impl = rmap.get(resolved_name, None) 1043 if not impl: 1044 impl = impl_class(resolved_name, data_class) 1045 self._add(impl, rmap, reg_type) 1046 impl.ref_count += 1 1047 return impl
1048
1049 - def release_impl(self, reg_type, resolved_name):
1050 """ 1051 Release a L_{TopicImpl} for the specified topic. 1052 1053 Every L{Topic} instance has a _TopicImpl that actually 1054 controls the topic resources so that multiple Topic instances 1055 use the same underlying connections. 'Acquiring' a topic 1056 implementation marks that another Topic instance is using the 1057 TopicImpl. 1058 1059 @param resolved_name: resolved topic name 1060 @type resolved_name: str 1061 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1062 @type reg_type: str 1063 """ 1064 if reg_type == Registration.PUB: 1065 rmap = self.pubs 1066 else: 1067 rmap = self.subs 1068 with self.lock: 1069 # check for race condition where multiple things are cleaning up at once 1070 if self.closed: 1071 return 1072 impl = rmap.get(resolved_name, None) 1073 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name 1074 impl.ref_count -= 1 1075 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero" 1076 if impl.ref_count == 0: 1077 _logger.debug("topic impl's ref count is zero, deleting topic %s...", resolved_name) 1078 impl.close() 1079 self._remove(impl, rmap, reg_type) 1080 del impl 1081 _logger.debug("... done deleting topic %s", resolved_name)
1082
1083 - def get_publisher_impl(self, resolved_name):
1084 """ 1085 @param resolved_name: resolved topic name 1086 @type resolved_name: str 1087 @return: list of L{_PublisherImpl}s 1088 @rtype: [L{_PublisherImpl}] 1089 """ 1090 return self.pubs.get(resolved_name, None)
1091
1092 - def get_subscriber_impl(self, resolved_name):
1093 """ 1094 @param resolved_name: topic name 1095 @type resolved_name: str 1096 @return: subscriber for the specified topic. 1097 @rtype: L{_SubscriberImpl} 1098 """ 1099 return self.subs.get(resolved_name, None)
1100
1101 - def has_subscription(self, resolved_name):
1102 """ 1103 @param resolved_name: resolved topic name 1104 @type resolved_name: str 1105 @return: True if manager has subscription for specified topic 1106 @rtype: bool 1107 """ 1108 return resolved_name in self.subs
1109
1110 - def has_publication(self, resolved_name):
1111 """ 1112 @param resolved_name: resolved topic name 1113 @type resolved_name: str 1114 @return: True if manager has publication for specified topic 1115 @rtype: bool 1116 """ 1117 return resolved_name in self.pubs
1118
1119 - def get_topics(self):
1120 """ 1121 @return: list of topic names this node subscribes to/publishes 1122 @rtype: [str] 1123 """ 1124 return self.topics
1125
1126 - def _get_list(self, rmap):
1127 return [[k, v.type] for k, v in rmap.iteritems()]
1128 1129 ## @return [[str,str],]: list of topics subscribed to by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1130 - def get_subscriptions(self):
1131 return self._get_list(self.subs)
1132 1133 ## @return [[str,str],]: list of topics published by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1134 - def get_publications(self):
1135 return self._get_list(self.pubs)
1136 1137 set_topic_manager(_TopicManager()) 1138