Package rospy :: Module topics

Source Code for Module rospy.topics

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2008, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32  # 
  33  # Revision $Id: topics.py 16621 2012-04-06 00:36:58Z kwc $ 
  34   
  35  """ 
  36  rospy implementation of topics. 
  37   
  38  Client API 
  39  ========== 
  40   
  41  L{Publisher} and L{Subscriber} are the client API for topics. 
  42   
  43  Internal Implementation 
  44  ======================= 
  45   
  46  Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl) 
  47  are used to manage actual transport connections.  The L{_TopicManager} 
  48  is responsible for tracking the system-wide state of publications and 
  49  subscriptions as well as the L{_TopicImpl} instances. More info is below. 
  50    
  51  L{_TopicManager} 
  52  ================ 
  53   
  54  The L{_TopicManager} does the backend topic bookkeeping for the local 
  55  node.  Use L{get_topic_manager()} to access singleton. Actual topic 
  56  implementations are done through the 
  57  L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client 
  58  code generates instances of type L{Publisher}/L{Subscriber}, which 
  59  enable to client to create multiple publishers/subscribers of that 
  60  topic that get controlled access to the underlying share connections. 
  61   
  62  Common parent classes for all rospy topics. The rospy topic autogenerators 
  63  create classes that are children of these implementations. 
  64  """ 
  65   
  66   
  67  import struct 
  68  try: 
  69      from cStringIO import StringIO #Python 2.x 
  70      import thread as _thread # Python 2 
  71      python3 = 0 
72 - def isstring(s):
73 return isinstance(s, basestring) #Python 2.x
74 except ImportError: 75 python3 = 1 76 from io import StringIO, BytesIO #Python 3.x 77 import _thread
78 - def isstring(s):
79 return isinstance(s, str) #Python 3.x
80 81 import threading 82 import logging 83 import time 84 85 from itertools import chain 86 import traceback 87 88 import roslib.message 89 90 from rospy.core import * 91 from rospy.exceptions import ROSSerializationException, TransportTerminated 92 from rospy.msg import serialize_message, args_kwds_to_message 93 94 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners 95 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE 96 from rospy.impl.transport import DeadTransport 97 98 _logger = logging.getLogger('rospy.topics') 99 100 # wrap roslib implementation and map it to rospy namespace 101 Message = roslib.message.Message 102 103 ####################################################################### 104 # Base classes for all client-API instantiated pub/sub 105 # 106 # There are two trees: Topic and _TopicImpl. Topic is the client API 107 # for interfacing with topics, while _TopicImpl implements the 108 # underlying connection details. 109
110 -class Topic(object):
111 """Base class of L{Publisher} and L{Subscriber}""" 112
113 - def __init__(self, name, data_class, reg_type):
114 """ 115 @param name: graph resource name of topic, e.g. 'laser'. 116 @type name: str 117 @param data_class: message class for serialization 118 @type data_class: L{Message} 119 @param reg_type Registration.PUB or Registration.SUB 120 @type reg_type: str 121 @raise ValueError: if parameters are invalid 122 """ 123 124 if not name or not isstring(name): 125 raise ValueError("topic name is not a non-empty string") 126 try: 127 if python3 == 1: 128 name.encode("utf-8") 129 else: 130 name = name.encode("utf-8") 131 except UnicodeError: 132 raise ValueError("topic name must be ascii/utf-8 compatible") 133 if data_class is None: 134 raise ValueError("topic parameter 'data_class' is not initialized") 135 if not type(data_class) == type: 136 raise ValueError("data_class [%s] is not a class"%data_class) 137 if not issubclass(data_class, roslib.message.Message): 138 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__) 139 # #2202 140 if not roslib.names.is_legal_name(name): 141 import warnings 142 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 143 144 # this is a bit ugly, but necessary due to the fact that we allow 145 # topics and services to be initialized before the node 146 if not rospy.core.is_initialized(): 147 self.resolved_name = rospy.names.resolve_name_without_node_name(name) 148 else: 149 # init_node() has been called, so we can do normal resolution 150 self.resolved_name = resolve_name(name) 151 152 self.name = self.resolved_name # #1810 for backwards compatibility 153 154 self.data_class = data_class 155 self.type = data_class._type 156 self.md5sum = data_class._md5sum 157 self.reg_type = reg_type 158 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
159
160 - def get_num_connections(self):
161 """ 162 get the number of connections to other ROS nodes for this topic. For a Publisher, 163 this corresponds to the number of nodes subscribing. For a Subscriber, the number 164 of publishers. 165 @return: number of connections 166 @rtype: int 167 """ 168 return self.impl.get_num_connections()
169
170 - def unregister(self):
171 """ 172 unpublish/unsubscribe from topic. Topic instance is no longer 173 valid after this call. Additional calls to unregister() have no effect. 174 """ 175 # as we don't guard unregister, have to protect value of 176 # resolved_name for release_impl call 177 resolved_name = self.resolved_name 178 if resolved_name and self.impl: 179 get_topic_manager().release_impl(self.reg_type, resolved_name) 180 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
181
182 -class _TopicImpl(object):
183 """ 184 Base class of internal topic implementations. Each topic has a 185 singleton _TopicImpl implementation for managing the underlying 186 connections. 187 """ 188
189 - def __init__(self, name, data_class):
190 """ 191 Base constructor 192 @param name: graph resource name of topic, e.g. 'laser'. 193 @type name: str 194 @param data_class: message data class 195 @type data_class: L{Message} 196 """ 197 198 # #1810 made resolved/unresolved more explicit so we don't accidentally double-resolve 199 self.resolved_name = resolve_name(name) #NOTE: remapping occurs here! 200 self.name = self.resolved_name # for backwards compatibility 201 202 self.data_class = data_class 203 self.type = data_class._type 204 self.handler = None 205 self.seq = 0 206 # lock is used for to serialize call order that methods that 207 # modify self.connections. Because add/removing connections is 208 # a rare event, we go through the extra hassle of making a 209 # copy of the connections/dead_connections/callbacks lists 210 # when modifying, then setting the reference to the new copy. 211 # With this pattern, other code can use these lists without 212 # having to acquire c_lock 213 self.c_lock = threading.RLock() 214 self.connections = [] 215 self.closed = False 216 # number of Topic instances using this 217 self.ref_count = 0 218 #STATS 219 self.dead_connections = [] #for retaining stats on old conns
220
221 - def __del__(self):
222 # very similar to close(), but have to be more careful in a __del__ what we call 223 if self.closed: 224 return 225 if self.connections is not None: 226 for c in self.connections: 227 try: 228 c.close() 229 except: 230 pass 231 del self.connections[:] 232 del self.dead_connections[:] 233 self.c_lock = self.dead_connections = self.connections = self.handler = self.data_class = self.type = None
234
235 - def close(self):
236 """close I/O""" 237 if self.closed: 238 return 239 self.closed = True 240 if self.c_lock is not None: 241 with self.c_lock: 242 for c in self.connections: 243 try: 244 if c is not None: 245 c.close() 246 except: 247 # seems more logger.error internal than external logerr 248 _logger.error(traceback.format_exc()) 249 del self.connections[:] 250 self.handler = None
251 252 # note: currently not deleting self.dead_connections. not 253 # sure what the right policy here as dead_connections is 254 # for statistics only. they should probably be moved to 255 # the topic manager instead. 256
257 - def get_num_connections(self):
258 with self.c_lock: 259 return len(self.connections)
260
261 - def has_connection(self, endpoint_id):
262 """ 263 Query whether or not a connection with the associated \a 264 endpoint has been added to this object. 265 @param endpoint_id: endpoint ID associated with connection. 266 @type endpoint_id: str 267 """ 268 # save reference to avoid lock 269 conn = self.connections 270 for c in conn: 271 if c.endpoint_id == endpoint_id: 272 return True 273 return False
274
275 - def has_connections(self):
276 """ 277 Check to see if this topic is connected to other publishers/subscribers 278 @return: True if topic is connected 279 @rtype: bool 280 """ 281 if self.connections: 282 return True 283 return False
284
285 - def add_connection(self, c):
286 """ 287 Add a connection to this topic. 288 @param c: connection instance 289 @type c: Transport 290 @return: True if connection was added 291 @rtype: bool 292 """ 293 with self.c_lock: 294 # c_lock is to make add_connection thread-safe, but we 295 # still make a copy of self.connections so that the rest of the 296 # code can use self.connections in an unlocked manner 297 new_connections = self.connections[:] 298 new_connections.append(c) 299 self.connections = new_connections 300 301 # connections make a callback when closed 302 c.set_cleanup_callback(self.remove_connection) 303 304 return True
305
306 - def remove_connection(self, c):
307 """ 308 Remove connection from topic. 309 @param c: connection instance to remove 310 @type c: Transport 311 """ 312 with self.c_lock: 313 # c_lock is to make remove_connection thread-safe, but we 314 # still make a copy of self.connections so that the rest of the 315 # code can use self.connections in an unlocked manner 316 new_connections = self.connections[:] 317 new_dead_connections = self.dead_connections[:] 318 if c in new_connections: 319 new_connections.remove(c) 320 new_dead_connections.append(DeadTransport(c)) 321 self.connections = new_connections 322 self.dead_connections = new_dead_connections
323
324 - def get_stats_info(self): # STATS
325 """ 326 Get the stats for this topic 327 @return: stats for topic in getBusInfo() format:: 328 ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*) 329 @rtype: list 330 """ 331 # save referenceto avoid locking 332 connections = self.connections 333 dead_connections = self.dead_connections 334 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True) for c in connections] + \ 335 [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, False) for c in dead_connections]
336
337 - def get_stats(self): # STATS
338 """Get the stats for this topic (API stub)""" 339 raise Exception("subclasses must override") 340 341 # Implementation note: Subscriber attaches to a _SubscriberImpl 342 # singleton for that topic. The underlying impl manages the 343 # connections for that publication and enables thread-safe access 344
345 -class Subscriber(Topic):
346 """ 347 Class for registering as a subscriber to a specified topic, where 348 the messages are of a given type. 349 """
350 - def __init__(self, name, data_class, callback=None, callback_args=None, 351 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
352 """ 353 Constructor. 354 355 NOTE: for the queue_size and buff_size 356 parameters, rospy does not attempt to do intelligent merging 357 between multiple Subscriber instances for the same topic. As 358 they share the same underlying transport, multiple Subscribers 359 to the same topic can conflict with one another if they set 360 these parameters differently. 361 362 @param name: graph resource name of topic, e.g. 'laser'. 363 @type name: str 364 @param data_class: data type class to use for messages, 365 e.g. std_msgs.msg.String 366 @type data_class: L{Message} class 367 @param callback: function to call ( fn(data)) when data is 368 received. If callback_args is set, the function must accept 369 the callback_args as a second argument, i.e. fn(data, 370 callback_args). NOTE: Additional callbacks can be added using 371 add_callback(). 372 @type callback: str 373 @param callback_args: additional arguments to pass to the 374 callback. This is useful when you wish to reuse the same 375 callback for multiple subscriptions. 376 @type callback_args: any 377 @param queue_size: maximum number of messages to receive at 378 a time. This will generally be 1 or None (infinite, 379 default). buff_size should be increased if this parameter 380 is set as incoming data still needs to sit in the incoming 381 buffer before being discarded. Setting queue_size 382 buff_size to a non-default value affects all subscribers to 383 this topic in this process. 384 @type queue_size: int 385 @param buff_size: incoming message buffer size in bytes. If 386 queue_size is set, this should be set to a number greater 387 than the queue_size times the average message size. Setting 388 buff_size to a non-default value affects all subscribers to 389 this topic in this process. 390 @type buff_size: int 391 @param tcp_nodelay: if True, request TCP_NODELAY from 392 publisher. Use of this option is not generally recommended 393 in most cases as it is better to rely on timestamps in 394 message data. Setting tcp_nodelay to True enables TCP_NODELAY 395 for all subscribers in the same python process. 396 @type tcp_nodelay: bool 397 @raise ROSException: if parameters are invalid 398 """ 399 super(Subscriber, self).__init__(name, data_class, Registration.SUB) 400 #add in args that factory cannot pass in 401 402 # last person to set these to non-defaults wins, not much way 403 # around this 404 if queue_size is not None: 405 self.impl.set_queue_size(queue_size) 406 if buff_size != DEFAULT_BUFF_SIZE: 407 self.impl.set_buff_size(buff_size) 408 409 if callback is not None: 410 # #1852 411 # it's important that we call add_callback so that the 412 # callback can be invoked with any latched messages 413 self.impl.add_callback(callback, callback_args) 414 # save arguments for unregister 415 self.callback = callback 416 self.callback_args = callback_args 417 else: 418 # initialize fields 419 self.callback = self.callback_args = None 420 if tcp_nodelay: 421 self.impl.set_tcp_nodelay(tcp_nodelay)
422
423 - def unregister(self):
424 """ 425 unpublish/unsubscribe from topic. Topic instance is no longer 426 valid after this call. Additional calls to unregister() have no effect. 427 """ 428 if self.impl: 429 # It's possible to have a Subscriber instance with no 430 # associated callback 431 if self.callback is not None: 432 self.impl.remove_callback(self.callback, self.callback_args) 433 self.callback = self.callback_args = None 434 super(Subscriber, self).unregister()
435
436 -class _SubscriberImpl(_TopicImpl):
437 """ 438 Underyling L{_TopicImpl} implementation for subscriptions. 439 """
440 - def __init__(self, name, data_class):
441 """ 442 ctor. 443 @param name: graph resource name of topic, e.g. 'laser'. 444 @type name: str 445 @param data_class: Message data class 446 @type data_class: L{Message} class 447 """ 448 super(_SubscriberImpl, self).__init__(name, data_class) 449 # client-methods to invoke on new messages. should only modify 450 # under lock. This is a list of 2-tuples (fn, args), where 451 # args are additional arguments for the callback, or None 452 self.callbacks = [] 453 self.queue_size = None 454 self.buff_size = DEFAULT_BUFF_SIZE 455 self.tcp_nodelay = False
456
457 - def close(self):
458 """close I/O and release resources""" 459 _TopicImpl.close(self) 460 if self.callbacks: 461 del self.callbacks[:] 462 self.callbacks = None
463
464 - def set_tcp_nodelay(self, tcp_nodelay):
465 """ 466 Set the value of TCP_NODELAY, which causes the Nagle algorithm 467 to be disabled for future topic connections, if the publisher 468 supports it. 469 """ 470 self.tcp_nodelay = tcp_nodelay
471
472 - def set_queue_size(self, queue_size):
473 """ 474 Set the receive queue size. If more than queue_size messages 475 are waiting to be deserialized, they are discarded. 476 477 @param queue_size int: incoming queue size. Must be positive integer or None. 478 @type queue_size: int 479 """ 480 if queue_size == -1: 481 self.queue_size = None 482 elif queue_size == 0: 483 raise ROSException("queue size may not be set to zero") 484 elif queue_size is not None and type(queue_size) != int: 485 raise ROSException("queue size must be an integer") 486 else: 487 self.queue_size = queue_size
488
489 - def set_buff_size(self, buff_size):
490 """ 491 Set the receive buffer size. The exact meaning of this is 492 transport dependent. 493 @param buff_size: receive buffer size 494 @type buff_size: int 495 """ 496 if type(buff_size) != int: 497 raise ROSException("buffer size must be an integer") 498 elif buff_size <= 0: 499 raise ROSException("buffer size must be a positive integer") 500 self.buff_size = buff_size
501
502 - def get_stats(self): # STATS
503 """ 504 Get the stats for this topic subscriber 505 @return: stats for topic in getBusStats() publisher format:: 506 (topicName, connStats) 507 where connStats is:: 508 [connectionId, bytesReceived, numSent, dropEstimate, connected]* 509 @rtype: list 510 """ 511 # save reference to avoid locking 512 conn = self.connections 513 dead_conn = self.dead_connections 514 #for now drop estimate is -1 515 stats = (self.resolved_name, 516 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done) 517 for c in chain(conn, dead_conn)] ) 518 return stats
519
520 - def add_callback(self, cb, cb_args):
521 """ 522 Register a callback to be invoked whenever a new message is received 523 @param cb: callback function to invoke with message data 524 instance, i.e. fn(data). If callback args is set, they will 525 be passed in as the second argument. 526 @type cb: fn(msg) 527 @param cb_cargs: additional arguments to pass to callback 528 @type cb_cargs: Any 529 """ 530 if self.closed: 531 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name)) 532 with self.c_lock: 533 # we lock in order to serialize calls to add_callback, but 534 # we copy self.callbacks so we can it 535 new_callbacks = self.callbacks[:] 536 new_callbacks.append((cb, cb_args)) 537 self.callbacks = new_callbacks 538 539 # #1852: invoke callback with any latched messages 540 for c in self.connections: 541 if c.latch is not None: 542 self._invoke_callback(c.latch, cb, cb_args)
543
544 - def remove_callback(self, cb, cb_args):
545 """ 546 Unregister a message callback. 547 @param cb: callback function 548 @type cb: fn(msg) 549 @param cb_cargs: additional arguments associated with callback 550 @type cb_cargs: Any 551 @raise KeyError: if no matching callback 552 """ 553 if self.closed: 554 return 555 with self.c_lock: 556 # we lock in order to serialize calls to add_callback, but 557 # we copy self.callbacks so we can it 558 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args] 559 if matches: 560 new_callbacks = self.callbacks[:] 561 # remove the first match 562 new_callbacks.remove(matches[0]) 563 self.callbacks = new_callbacks 564 if not matches: 565 raise KeyError("no matching cb")
566
567 - def _invoke_callback(self, msg, cb, cb_args):
568 """ 569 Invoke callback on msg. Traps and logs any exceptions raise by callback 570 @param msg: message data 571 @type msg: L{Message} 572 @param cb: callback 573 @type cb: fn(msg, cb_args) 574 @param cb_args: callback args or None 575 @type cb_args: Any 576 """ 577 try: 578 if cb_args is not None: 579 cb(msg, cb_args) 580 else: 581 cb(msg) 582 except Exception as e: 583 if not is_shutdown(): 584 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc())) 585 else: 586 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
587
588 - def receive_callback(self, msgs):
589 """ 590 Called by underlying connection transport for each new message received 591 @param msgs: message data 592 @type msgs: [L{Message}] 593 """ 594 # save reference to avoid lock 595 callbacks = self.callbacks 596 for msg in msgs: 597 for cb, cb_args in callbacks: 598 self._invoke_callback(msg, cb, cb_args)
599
600 -class SubscribeListener(object):
601 """ 602 Callback API to receive notifications when new subscribers 603 connect and disconnect. 604 """ 605
606 - def peer_subscribe(self, topic_name, topic_publish, peer_publish):
607 """ 608 callback when a peer has subscribed from a topic 609 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 610 @type topic_name: str 611 @param topic_publish: method to publish message data to all subscribers 612 @type topic_publish: fn(data) 613 @param peer_publish: method to publish message data to 614 new subscriber. NOTE: behavior for the latter is 615 transport-dependent as some transports may be broadcast only. 616 @type peer_publish: fn(data) 617 """ 618 pass
619
620 - def peer_unsubscribe(self, topic_name, num_peers):
621 """ 622 callback when a peer has unsubscribed from a topic 623 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 624 @type topic_name: str 625 @param num_peers: number of remaining peers subscribed to topic 626 @type num_peers: int 627 """ 628 pass
629 630 631 # Implementation note: Publisher attaches to a 632 # _PublisherImpl singleton for that topic. The underlying impl 633 # manages the connections for that publication and enables 634 # thread-safe access 635
636 -class Publisher(Topic):
637 """ 638 Class for registering as a publisher of a ROS topic. 639 """ 640
641 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None):
642 """ 643 Constructor 644 @param name: resource name of topic, e.g. 'laser'. 645 @type name: str 646 @param data_class: message class for serialization 647 @type data_class: L{Message} class 648 @param subscriber_listener: listener for 649 subscription events. May be None. 650 @type subscriber_listener: L{SubscribeListener} 651 @param tcp_nodelay: If True, sets TCP_NODELAY on 652 publisher's socket (disables Nagle algorithm). This results 653 in lower latency publishing at the cost of efficiency. 654 @type tcp_nodelay: bool 655 @param latch: If True, the last message published is 656 'latched', meaning that any future subscribers will be sent 657 that message immediately upon connection. 658 @type latch: bool 659 @raise ROSException: if parameters are invalid 660 """ 661 super(Publisher, self).__init__(name, data_class, Registration.PUB) 662 663 if subscriber_listener: 664 self.impl.add_subscriber_listener(subscriber_listener) 665 if tcp_nodelay: 666 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay) 667 if latch: 668 self.impl.enable_latch() 669 if headers: 670 self.impl.add_headers(headers)
671
672 - def publish(self, *args, **kwds):
673 """ 674 Publish message data object to this topic. 675 Publish can either be called with the message instance to 676 publish or with the constructor args for a new Message 677 instance, i.e.:: 678 pub.publish(message_instance) 679 pub.publish(message_field_1, message_field_2...) 680 pub.publish(message_field_1='foo', message_field_2='bar') 681 682 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used 683 @param kwds : Message keyword arguments. If kwds are used, args must be unset 684 @raise ROSException: If rospy node has not been initialized 685 @raise ROSSerializationException: If unable to serialize 686 message. This is usually a type error with one of the fields. 687 """ 688 if self.impl is None: 689 raise ROSException("publish() to an unregistered() handle") 690 if not is_initialized(): 691 raise ROSException("ROS node has not been initialized yet. Please call init_node() first") 692 data = args_kwds_to_message(self.data_class, args, kwds) 693 try: 694 self.impl.acquire() 695 self.impl.publish(data) 696 except roslib.message.SerializationError as e: 697 # can't go to rospy.logerr(), b/c this could potentially recurse 698 _logger.error(traceback.format_exc(e)) 699 raise ROSSerializationException(str(e)) 700 finally: 701 self.impl.release()
702
703 -class _PublisherImpl(_TopicImpl):
704 """ 705 Underyling L{_TopicImpl} implementation for publishers. 706 """ 707
708 - def __init__(self, name, data_class):
709 """ 710 @param name: name of topic, e.g. 'laser'. 711 @type name: str 712 @param data_class: Message data class 713 @type data_class: L{Message} class 714 """ 715 super(_PublisherImpl, self).__init__(name, data_class) 716 if python3 == 0: 717 self.buff = StringIO() 718 else: 719 self.buff = BytesIO() 720 self.publock = threading.RLock() #for acquire()/release 721 self.subscriber_listeners = [] 722 723 # additional client connection headers 724 self.headers = {} 725 726 # publish latch, starts disabled 727 self.is_latch = False 728 self.latch = None 729 730 #STATS 731 self.message_data_sent = 0
732
733 - def close(self):
734 """close I/O and release resources""" 735 _TopicImpl.close(self) 736 # release resources 737 if self.subscriber_listeners: 738 del self.subscriber_listeners[:] 739 if self.headers: 740 self.headers.clear() 741 if self.buff is not None: 742 self.buff.close() 743 self.publock = self.headers = self.buff = self.subscriber_listeners = None
744
745 - def add_headers(self, headers):
746 """ 747 Add connection headers to this Topic for future connections. 748 @param headers: key/values will be added to current connection 749 header set, overriding any existing keys if they conflict. 750 @type headers: dict 751 """ 752 self.headers.update(headers)
753
754 - def enable_latch(self):
755 """ 756 Enable publish() latch. The latch contains the last published 757 message and is sent to any new subscribers. 758 """ 759 self.is_latch = True
760
761 - def get_stats(self): # STATS
762 """ 763 Get the stats for this topic publisher 764 @return: stats for topic in getBusStats() publisher format:: 765 [topicName, messageDataBytes, connStats], 766 where connStats is:: 767 [id, bytes, numMessages, connected]* 768 @rtype: list 769 """ 770 # save reference to avoid lock 771 conn = self.connections 772 dead_conn = self.dead_connections 773 return (self.resolved_name, self.message_data_sent, 774 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in chain(conn, dead_conn)] )
775
776 - def add_subscriber_listener(self, l):
777 """ 778 Add a L{SubscribeListener} for subscribe events. 779 @param l: listener instance 780 @type l: L{SubscribeListener} 781 """ 782 self.subscriber_listeners.append(l)
783
784 - def acquire(self):
785 """lock for thread-safe publishing to this transport""" 786 if self.publock is not None: 787 self.publock.acquire()
788
789 - def release(self):
790 """lock for thread-safe publishing to this transport""" 791 if self.publock is not None: 792 self.publock.release()
793
794 - def add_connection(self, c):
795 """ 796 Add a connection to this topic. This must be a PubTransport. If 797 the latch is enabled, c will be sent a the value of the 798 latch. 799 @param c: connection instance 800 @type c: L{Transport} 801 @return: True if connection was added 802 @rtype: bool 803 """ 804 super(_PublisherImpl, self).add_connection(c) 805 def publish_single(data): 806 self.publish(data, connection_override=c)
807 for l in self.subscriber_listeners: 808 l.peer_subscribe(self.resolved_name, self.publish, publish_single) 809 if self.is_latch and self.latch is not None: 810 with self.publock: 811 self.publish(self.latch, connection_override=c) 812 return True 813
814 - def remove_connection(self, c):
815 """ 816 Remove existing connection from this topic. 817 @param c: connection instance to remove 818 @type c: L{Transport} 819 """ 820 super(_PublisherImpl, self).remove_connection(c) 821 num = len(self.connections) 822 for l in self.subscriber_listeners: 823 l.peer_unsubscribe(self.resolved_name, num)
824
825 - def publish(self, message, connection_override=None):
826 """ 827 Publish the data to the topic. If the topic has no subscribers, 828 the method will return without any affect. Access to publish() 829 should be locked using acquire() and release() in order to 830 ensure proper message publish ordering. 831 832 @param message: message data instance to publish 833 @type message: L{Message} 834 @param connection_override: publish to this connection instead of all 835 @type connection_override: L{Transport} 836 @return: True if the data was published, False otherwise. 837 @rtype: bool 838 @raise roslib.message.SerializationError: if L{Message} instance is unable to serialize itself 839 @raise rospy.ROSException: if topic has been closed or was closed during publish() 840 """ 841 #TODO: should really just use IOError instead of rospy.ROSException 842 843 if self.closed: 844 # during shutdown, the topic can get closed, which creates 845 # a race condition with user code testing is_shutdown 846 if not is_shutdown(): 847 raise ROSException("publish() to a closed topic") 848 else: 849 return 850 851 if self.is_latch: 852 self.latch = message 853 854 if not self.has_connections(): 855 #publish() falls through 856 return False 857 858 if connection_override is None: 859 #copy connections so we can iterate safely 860 conns = self.connections 861 else: 862 conns = [connection_override] 863 864 # #2128 test our buffer. I don't now how this got closed in 865 # that case, but we can at least diagnose the problem. 866 b = self.buff 867 try: 868 b.tell() 869 870 # serialize the message 871 self.seq += 1 #count messages published to the topic 872 serialize_message(b, self.seq, message) 873 874 # send the buffer to all connections 875 err_con = [] 876 data = b.getvalue() 877 878 for c in conns: 879 try: 880 if not is_shutdown(): 881 c.write_data(data) 882 except TransportTerminated as e: 883 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 884 err_con.append(c) 885 except Exception as e: 886 # greater severity level 887 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 888 err_con.append(c) 889 890 # reset the buffer and update stats 891 self.message_data_sent += b.tell() #STATS 892 b.seek(0) 893 b.truncate(0) 894 895 except ValueError: 896 # operations on self.buff can fail if topic is closed 897 # during publish, which often happens during Ctrl-C. 898 # diagnose the error and report accordingly. 899 if self.closed: 900 if is_shutdown(): 901 # we offer no guarantees on publishes that occur 902 # during shutdown, so this is not exceptional. 903 return 904 else: 905 # this indicates that user-level code most likely 906 # closed the topic, which is exceptional. 907 raise ROSException("topic was closed during publish()") 908 else: 909 # unexpected, so re-raise original error 910 raise 911 912 # remove any bad connections 913 for c in err_con: 914 try: 915 # connection will callback into remove_connection when 916 # we close it 917 c.close() 918 except: 919 pass
920 921 ################################################################################# 922 # TOPIC MANAGER/LISTENER 923
924 -class _TopicManager(object):
925 """ 926 Tracks Topic objects 927 See L{get_topic_manager()} for singleton access 928 """ 929
930 - def __init__(self):
931 """ctor.""" 932 super(_TopicManager, self).__init__() 933 self.pubs = {} #: { topic: _PublisherImpl } 934 self.subs = {} #: { topic: _SubscriberImpl } 935 self.topics = set() # [str] list of topic names 936 self.lock = threading.Condition() 937 self.closed = False 938 _logger.info("topicmanager initialized")
939
940 - def get_pub_sub_info(self):
941 """ 942 get topic publisher and subscriber connection info for getBusInfo() api 943 @return: [bus info stats] 944 See getBusInfo() API for more data structure details. 945 @rtype: list 946 """ 947 with self.lock: 948 info = [] 949 for s in chain(iter(self.pubs.values()), iter(self.subs.values())): 950 info.extend(s.get_stats_info()) 951 return info
952
953 - def get_pub_sub_stats(self):
954 """ 955 get topic publisher and subscriber stats for getBusStats() api 956 @return: [publisherStats, subscriberStats]. 957 See getBusStats() API for more data structure details. 958 @rtype: list 959 """ 960 with self.lock: 961 return [s.get_stats() for s in self.pubs.values()],\ 962 [s.get_stats() for s in self.subs.values()]
963
964 - def close_all(self):
965 """ 966 Close all registered publication and subscriptions. Manager is 967 no longer usable after close. 968 """ 969 self.closed = True 970 with self.lock: 971 for t in chain(iter(self.pubs.values()), iter(self.subs.values())): 972 t.close() 973 self.pubs.clear() 974 self.subs.clear()
975
976 - def _add(self, ps, rmap, reg_type):
977 """ 978 Add L{_TopicImpl} instance to rmap 979 @param ps: a pub/sub impl instance 980 @type ps: L{_TopicImpl} 981 @param rmap: { topic: _TopicImpl} rmap to record instance in 982 @type rmap: dict 983 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 984 @type reg_type: str 985 """ 986 resolved_name = ps.resolved_name 987 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type) 988 with self.lock: 989 rmap[resolved_name] = ps 990 self.topics.add(resolved_name) 991 992 # NOTE: this call can take a lengthy amount of time (at 993 # least until its reimplemented to use queues) 994 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
995
996 - def _recalculate_topics(self):
997 """recalculate self.topics. expensive""" 998 self.topics = set([x.resolved_name for x in self.pubs.values()] + 999 [x.resolved_name for x in self.subs.values()])
1000
1001 - def _remove(self, ps, rmap, reg_type):
1002 """ 1003 Remove L{_TopicImpl} instance from rmap 1004 @param ps: a pub/sub impl instance 1005 @type ps: L{_TopicImpl} 1006 @param rmap: topic->_TopicImpl rmap to remove instance in 1007 @type rmap: dict 1008 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1009 @type reg_type: str 1010 """ 1011 resolved_name = ps.resolved_name 1012 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type) 1013 with self.lock: 1014 del rmap[resolved_name] 1015 self. _recalculate_topics() 1016 1017 # NOTE: this call can take a lengthy amount of time (at 1018 # least until its reimplemented to use queues) 1019 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1020
1021 - def get_impl(self, reg_type, resolved_name):
1022 """ 1023 Get the L{_TopicImpl} for the specified topic. This is mainly for 1024 testing purposes. Unlike acquire_impl, it does not alter the 1025 ref count. 1026 @param resolved_name: resolved topic name 1027 @type resolved_name: str 1028 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1029 @type reg_type: str 1030 """ 1031 if reg_type == Registration.PUB: 1032 rmap = self.pubs 1033 elif reg_type == Registration.SUB: 1034 rmap = self.subs 1035 else: 1036 raise TypeError("invalid reg_type: %s"%s) 1037 return rmap.get(resolved_name, None)
1038
1039 - def acquire_impl(self, reg_type, resolved_name, data_class):
1040 """ 1041 Acquire a L{_TopicImpl} for the specified topic (create one if it 1042 doesn't exist). Every L{Topic} instance has a _TopicImpl that 1043 actually controls the topic resources so that multiple Topic 1044 instances use the same underlying connections. 'Acquiring' a 1045 topic implementation marks that another Topic instance is 1046 using the TopicImpl. 1047 1048 @param resolved_name: resolved topic name 1049 @type resolved_name: str 1050 1051 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1052 @type reg_type: str 1053 1054 @param data_class: message class for topic 1055 @type data_class: L{Message} class 1056 """ 1057 if reg_type == Registration.PUB: 1058 rmap = self.pubs 1059 impl_class = _PublisherImpl 1060 elif reg_type == Registration.SUB: 1061 rmap = self.subs 1062 impl_class = _SubscriberImpl 1063 else: 1064 raise TypeError("invalid reg_type: %s"%s) 1065 with self.lock: 1066 impl = rmap.get(resolved_name, None) 1067 if not impl: 1068 impl = impl_class(resolved_name, data_class) 1069 self._add(impl, rmap, reg_type) 1070 impl.ref_count += 1 1071 return impl
1072
1073 - def release_impl(self, reg_type, resolved_name):
1074 """ 1075 Release a L_{TopicImpl} for the specified topic. 1076 1077 Every L{Topic} instance has a _TopicImpl that actually 1078 controls the topic resources so that multiple Topic instances 1079 use the same underlying connections. 'Acquiring' a topic 1080 implementation marks that another Topic instance is using the 1081 TopicImpl. 1082 1083 @param resolved_name: resolved topic name 1084 @type resolved_name: str 1085 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1086 @type reg_type: str 1087 """ 1088 if reg_type == Registration.PUB: 1089 rmap = self.pubs 1090 else: 1091 rmap = self.subs 1092 with self.lock: 1093 # check for race condition where multiple things are cleaning up at once 1094 if self.closed: 1095 return 1096 impl = rmap.get(resolved_name, None) 1097 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name 1098 impl.ref_count -= 1 1099 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero" 1100 if impl.ref_count == 0: 1101 _logger.debug("topic impl's ref count is zero, deleting topic %s...", resolved_name) 1102 impl.close() 1103 self._remove(impl, rmap, reg_type) 1104 del impl 1105 _logger.debug("... done deleting topic %s", resolved_name)
1106
1107 - def get_publisher_impl(self, resolved_name):
1108 """ 1109 @param resolved_name: resolved topic name 1110 @type resolved_name: str 1111 @return: list of L{_PublisherImpl}s 1112 @rtype: [L{_PublisherImpl}] 1113 """ 1114 return self.pubs.get(resolved_name, None)
1115
1116 - def get_subscriber_impl(self, resolved_name):
1117 """ 1118 @param resolved_name: topic name 1119 @type resolved_name: str 1120 @return: subscriber for the specified topic. 1121 @rtype: L{_SubscriberImpl} 1122 """ 1123 return self.subs.get(resolved_name, None)
1124
1125 - def has_subscription(self, resolved_name):
1126 """ 1127 @param resolved_name: resolved topic name 1128 @type resolved_name: str 1129 @return: True if manager has subscription for specified topic 1130 @rtype: bool 1131 """ 1132 return resolved_name in self.subs
1133
1134 - def has_publication(self, resolved_name):
1135 """ 1136 @param resolved_name: resolved topic name 1137 @type resolved_name: str 1138 @return: True if manager has publication for specified topic 1139 @rtype: bool 1140 """ 1141 return resolved_name in self.pubs
1142
1143 - def get_topics(self):
1144 """ 1145 @return: list of topic names this node subscribes to/publishes 1146 @rtype: [str] 1147 """ 1148 return self.topics
1149
1150 - def _get_list(self, rmap):
1151 return [[k, v.type] for k, v in rmap.items()]
1152 1153 ## @return [[str,str],]: list of topics subscribed to by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1154 - def get_subscriptions(self):
1155 return self._get_list(self.subs)
1156 1157 ## @return [[str,str],]: list of topics published by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1158 - def get_publications(self):
1159 return self._get_list(self.pubs)
1160 1161 set_topic_manager(_TopicManager()) 1162