Package rospy :: Module topics

Source Code for Module rospy.topics

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2008, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32  # 
  33  # Revision $Id$ 
  34   
  35  """ 
  36  rospy implementation of topics. 
  37   
  38  Client API 
  39  ========== 
  40   
  41  L{Publisher} and L{Subscriber} are the client API for topics. 
  42   
  43  Internal Implementation 
  44  ======================= 
  45   
  46  Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl) 
  47  are used to manage actual transport connections.  The L{_TopicManager} 
  48  is responsible for tracking the system-wide state of publications and 
  49  subscriptions as well as the L{_TopicImpl} instances. More info is below. 
  50    
  51  L{_TopicManager} 
  52  ================ 
  53   
  54  The L{_TopicManager} does the backend topic bookkeeping for the local 
  55  node.  Use L{get_topic_manager()} to access singleton. Actual topic 
  56  implementations are done through the 
  57  L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client 
  58  code generates instances of type L{Publisher}/L{Subscriber}, which 
  59  enable to client to create multiple publishers/subscribers of that 
  60  topic that get controlled access to the underlying share connections. 
  61   
  62  Common parent classes for all rospy topics. The rospy topic autogenerators 
  63  create classes that are children of these implementations. 
  64  """ 
  65   
  66   
  67  import struct 
  68  import select 
  69  try: 
  70      from cStringIO import StringIO #Python 2.x 
  71      import thread as _thread # Python 2 
  72      python3 = 0 
73 - def isstring(s):
74 return isinstance(s, basestring) #Python 2.x
75 except ImportError: 76 python3 = 1 77 from io import StringIO, BytesIO #Python 3.x 78 import _thread
79 - def isstring(s):
80 return isinstance(s, str) #Python 3.x
81 82 import threading 83 import logging 84 import time 85 86 from itertools import chain 87 import traceback 88 89 import rosgraph.names 90 91 from rospy.core import * 92 from rospy.exceptions import ROSSerializationException, TransportTerminated 93 from rospy.msg import serialize_message, args_kwds_to_message 94 95 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners 96 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE 97 98 _logger = logging.getLogger('rospy.topics') 99 100 # wrap genpy implementation and map it to rospy namespace 101 import genpy 102 Message = genpy.Message 103 104 ####################################################################### 105 # Base classes for all client-API instantiated pub/sub 106 # 107 # There are two trees: Topic and _TopicImpl. Topic is the client API 108 # for interfacing with topics, while _TopicImpl implements the 109 # underlying connection details. 110
111 -class Topic(object):
112 """Base class of L{Publisher} and L{Subscriber}""" 113
114 - def __init__(self, name, data_class, reg_type):
115 """ 116 @param name: graph resource name of topic, e.g. 'laser'. 117 @type name: str 118 @param data_class: message class for serialization 119 @type data_class: L{Message} 120 @param reg_type Registration.PUB or Registration.SUB 121 @type reg_type: str 122 @raise ValueError: if parameters are invalid 123 """ 124 125 if not name or not isstring(name): 126 raise ValueError("topic name is not a non-empty string") 127 try: 128 if python3 == 1: 129 name.encode("utf-8") 130 else: 131 name = name.encode("utf-8") 132 except UnicodeError: 133 raise ValueError("topic name must be ascii/utf-8 compatible") 134 if data_class is None: 135 raise ValueError("topic parameter 'data_class' is not initialized") 136 if not type(data_class) == type: 137 raise ValueError("data_class [%s] is not a class"%data_class) 138 if not issubclass(data_class, genpy.Message): 139 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__) 140 # #2202 141 if not rosgraph.names.is_legal_name(name): 142 import warnings 143 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 144 145 # this is a bit ugly, but necessary due to the fact that we allow 146 # topics and services to be initialized before the node 147 if not rospy.core.is_initialized(): 148 self.resolved_name = rospy.names.resolve_name_without_node_name(name) 149 else: 150 # init_node() has been called, so we can do normal resolution 151 self.resolved_name = resolve_name(name) 152 153 self.name = self.resolved_name # #1810 for backwards compatibility 154 155 self.data_class = data_class 156 self.type = data_class._type 157 self.md5sum = data_class._md5sum 158 self.reg_type = reg_type 159 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
160
161 - def get_num_connections(self):
162 """ 163 get the number of connections to other ROS nodes for this topic. For a Publisher, 164 this corresponds to the number of nodes subscribing. For a Subscriber, the number 165 of publishers. 166 @return: number of connections 167 @rtype: int 168 """ 169 return self.impl.get_num_connections()
170
171 - def unregister(self):
172 """ 173 unpublish/unsubscribe from topic. Topic instance is no longer 174 valid after this call. Additional calls to unregister() have no effect. 175 """ 176 # as we don't guard unregister, have to protect value of 177 # resolved_name for release_impl call 178 resolved_name = self.resolved_name 179 if resolved_name and self.impl: 180 get_topic_manager().release_impl(self.reg_type, resolved_name) 181 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
182 183 # #3808
184 -class Poller(object):
185 """ 186 select.poll/kqueue abstraction to handle socket failure detection 187 on multiple platforms. NOT thread-safe. 188 """
189 - def __init__(self):
190 try: 191 self.poller = select.poll() 192 self.add_fd = self.add_poll 193 self.remove_fd = self.remove_poll 194 self.error_iter = self.error_poll_iter 195 except: 196 try: 197 # poll() not available, try kqueue 198 self.poller = select.kqueue() 199 self.add_fd = self.add_kqueue 200 self.remove_fd = self.remove_kqueue 201 self.error_iter = self.error_kqueue_iter 202 self.kevents = [] 203 except: 204 #TODO: non-Noop impl for Windows 205 self.poller = self.noop 206 self.add_fd = self.noop 207 self.remove_fd = self.noop 208 self.error_iter = self.noop
209
210 - def noop(self, *args):
211 pass
212
213 - def add_poll(self, fd):
214 self.poller.register(fd)
215
216 - def remove_poll(self, fd):
217 self.poller.unregister(fd)
218
219 - def error_poll_iter(self):
220 events = self.poller.poll(0) 221 for fd, event in events: 222 if event & (select.POLLHUP | select.POLLERR): 223 yield fd
224
225 - def add_kqueue(self, fd):
226 self.kevents.append(select.kevent(fd))
227
228 - def error_kqueue_iter(self):
229 events = self.poller.control(self.kevents, 0) 230 for event in events: 231 if event & (select.KQ_EV_ERROR | select.KQ_EV_EOF): 232 yield event.ident
233
234 - def remove_kqueue(self, fd):
235 e = [x for x in self.kevents if x.ident == fd] 236 for x in e: 237 self.kevents.remove(x)
238
239 -class _TopicImpl(object):
240 """ 241 Base class of internal topic implementations. Each topic has a 242 singleton _TopicImpl implementation for managing the underlying 243 connections. 244 """ 245
246 - def __init__(self, name, data_class):
247 """ 248 Base constructor 249 @param name: graph resource name of topic, e.g. 'laser'. 250 @type name: str 251 @param data_class: message data class 252 @type data_class: L{Message} 253 """ 254 255 # #1810 made resolved/unresolved more explicit so we don't accidentally double-resolve 256 self.resolved_name = resolve_name(name) #NOTE: remapping occurs here! 257 self.name = self.resolved_name # for backwards compatibility 258 259 self.data_class = data_class 260 self.type = data_class._type 261 self.handler = None 262 self.seq = 0 263 # lock is used for to serialize call order that methods that 264 # modify self.connections. Because add/removing connections is 265 # a rare event, we go through the extra hassle of making a 266 # copy of the connections/callbacks lists 267 # when modifying, then setting the reference to the new copy. 268 # With this pattern, other code can use these lists without 269 # having to acquire c_lock 270 self.c_lock = threading.RLock() 271 self.connections = [] 272 self.closed = False 273 # number of Topic instances using this 274 self.ref_count = 0 275 276 self.connection_poll = Poller()
277
278 - def __del__(self):
279 # very similar to close(), but have to be more careful in a __del__ what we call 280 if self.closed: 281 return 282 if self.connections is not None: 283 for c in self.connections: 284 try: 285 c.close() 286 except: 287 pass 288 del self.connections[:] 289 self.c_lock = self.connections = self.handler = self.data_class = self.type = None
290
291 - def close(self):
292 """close I/O""" 293 if self.closed: 294 return 295 self.closed = True 296 if self.c_lock is not None: 297 with self.c_lock: 298 for c in self.connections: 299 try: 300 if c is not None: 301 c.close() 302 except: 303 # seems more logger.error internal than external logerr 304 _logger.error(traceback.format_exc()) 305 del self.connections[:] 306 self.handler = None
307
308 - def get_num_connections(self):
309 with self.c_lock: 310 return len(self.connections)
311
312 - def has_connection(self, endpoint_id):
313 """ 314 Query whether or not a connection with the associated \a 315 endpoint has been added to this object. 316 @param endpoint_id: endpoint ID associated with connection. 317 @type endpoint_id: str 318 """ 319 # save reference to avoid lock 320 conn = self.connections 321 for c in conn: 322 if c.endpoint_id == endpoint_id: 323 return True 324 return False
325
326 - def has_connections(self):
327 """ 328 Check to see if this topic is connected to other publishers/subscribers 329 @return: True if topic is connected 330 @rtype: bool 331 """ 332 if self.connections: 333 return True 334 return False
335
336 - def _remove_connection(self, connections, c):
337 # Remove from poll instance as well as connections 338 try: 339 self.connection_poll.remove_fd(c.fileno()) 340 except: 341 pass 342 try: 343 # not necessarily correct from an abstraction point of 344 # view, but will prevent accident connection leaks 345 c.close() 346 except: 347 pass 348 if c in connections: 349 connections.remove(c)
350
351 - def add_connection(self, c):
352 """ 353 Add a connection to this topic. If any previous connections 354 to same endpoint exist, drop them. 355 356 @param c: connection instance 357 @type c: Transport 358 @return: True if connection was added, ``bool`` 359 """ 360 rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections))) 361 with self.c_lock: 362 # c_lock is to make add_connection thread-safe, but we 363 # still make a copy of self.connections so that the rest of the 364 # code can use self.connections in an unlocked manner 365 new_connections = self.connections[:] 366 367 # if we have a connection to the same endpoint_id, drop 368 # the old one. 369 for oldc in self.connections: 370 if oldc.endpoint_id == c.endpoint_id: 371 self._remove_connection(new_connections, oldc) 372 373 # #3808: "garbage collect" bad sockets whenever we add new 374 # connections. This allows at most one stale connection 375 # per topic. Otherwise, we only detect bad connections on 376 # write. An alternative (given the current 377 # implementation) would be to start a thread that 378 # regularly polls all fds, but that would create a lot of 379 # synchronization events and also have a separate thread 380 # to manage. It would be desireable to move to this, but 381 # this change is less impactful and keeps the codebase 382 # more stable as we move towards an entirely different 383 # event loop for rospy -- the heart of the problem is that 384 # rospy's i/o is blocking-based, which has the obvious 385 # issues. 386 387 for fd in self.connection_poll.error_iter(): 388 to_remove = [x for x in new_connections if x.fileno() == fd] 389 for x in to_remove: 390 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id)) 391 self._remove_connection(new_connections, x) 392 393 # Add new connection to poller, register for all events, 394 # though we only care about POLLHUP/ERR 395 new_fd = c.fileno() 396 if new_fd is not None: 397 self.connection_poll.add_fd(new_fd) 398 399 # add in new connection 400 new_connections.append(c) 401 402 self.connections = new_connections 403 404 # connections make a callback when closed 405 c.set_cleanup_callback(self.remove_connection) 406 407 return True
408
409 - def remove_connection(self, c):
410 """ 411 Remove connection from topic. 412 @param c: connection instance to remove 413 @type c: Transport 414 """ 415 rospyinfo("topic[%s] removing connection to %s"%(self.resolved_name, c.endpoint_id)) 416 with self.c_lock: 417 # c_lock is to make remove_connection thread-safe, but we 418 # still make a copy of self.connections so that the rest of the 419 # code can use self.connections in an unlocked manner 420 new_connections = self.connections[:] 421 self._remove_connection(new_connections, c) 422 self.connections = new_connections
423
424 - def get_stats_info(self): # STATS
425 """ 426 Get the stats for this topic 427 @return: stats for topic in getBusInfo() format:: 428 ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*) 429 @rtype: list 430 """ 431 # save referenceto avoid locking 432 connections = self.connections 433 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True) for c in connections]
434
435 - def get_stats(self): # STATS
436 """Get the stats for this topic (API stub)""" 437 raise Exception("subclasses must override") 438 439 # Implementation note: Subscriber attaches to a _SubscriberImpl 440 # singleton for that topic. The underlying impl manages the 441 # connections for that publication and enables thread-safe access 442
443 -class Subscriber(Topic):
444 """ 445 Class for registering as a subscriber to a specified topic, where 446 the messages are of a given type. 447 """
448 - def __init__(self, name, data_class, callback=None, callback_args=None, 449 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
450 """ 451 Constructor. 452 453 NOTE: for the queue_size and buff_size 454 parameters, rospy does not attempt to do intelligent merging 455 between multiple Subscriber instances for the same topic. As 456 they share the same underlying transport, multiple Subscribers 457 to the same topic can conflict with one another if they set 458 these parameters differently. 459 460 @param name: graph resource name of topic, e.g. 'laser'. 461 @type name: str 462 @param data_class: data type class to use for messages, 463 e.g. std_msgs.msg.String 464 @type data_class: L{Message} class 465 @param callback: function to call ( fn(data)) when data is 466 received. If callback_args is set, the function must accept 467 the callback_args as a second argument, i.e. fn(data, 468 callback_args). NOTE: Additional callbacks can be added using 469 add_callback(). 470 @type callback: str 471 @param callback_args: additional arguments to pass to the 472 callback. This is useful when you wish to reuse the same 473 callback for multiple subscriptions. 474 @type callback_args: any 475 @param queue_size: maximum number of messages to receive at 476 a time. This will generally be 1 or None (infinite, 477 default). buff_size should be increased if this parameter 478 is set as incoming data still needs to sit in the incoming 479 buffer before being discarded. Setting queue_size 480 buff_size to a non-default value affects all subscribers to 481 this topic in this process. 482 @type queue_size: int 483 @param buff_size: incoming message buffer size in bytes. If 484 queue_size is set, this should be set to a number greater 485 than the queue_size times the average message size. Setting 486 buff_size to a non-default value affects all subscribers to 487 this topic in this process. 488 @type buff_size: int 489 @param tcp_nodelay: if True, request TCP_NODELAY from 490 publisher. Use of this option is not generally recommended 491 in most cases as it is better to rely on timestamps in 492 message data. Setting tcp_nodelay to True enables TCP_NODELAY 493 for all subscribers in the same python process. 494 @type tcp_nodelay: bool 495 @raise ROSException: if parameters are invalid 496 """ 497 super(Subscriber, self).__init__(name, data_class, Registration.SUB) 498 #add in args that factory cannot pass in 499 500 # last person to set these to non-defaults wins, not much way 501 # around this 502 if queue_size is not None: 503 self.impl.set_queue_size(queue_size) 504 if buff_size != DEFAULT_BUFF_SIZE: 505 self.impl.set_buff_size(buff_size) 506 507 if callback is not None: 508 # #1852 509 # it's important that we call add_callback so that the 510 # callback can be invoked with any latched messages 511 self.impl.add_callback(callback, callback_args) 512 # save arguments for unregister 513 self.callback = callback 514 self.callback_args = callback_args 515 else: 516 # initialize fields 517 self.callback = self.callback_args = None 518 if tcp_nodelay: 519 self.impl.set_tcp_nodelay(tcp_nodelay)
520
521 - def unregister(self):
522 """ 523 unpublish/unsubscribe from topic. Topic instance is no longer 524 valid after this call. Additional calls to unregister() have no effect. 525 """ 526 if self.impl: 527 # It's possible to have a Subscriber instance with no 528 # associated callback 529 if self.callback is not None: 530 self.impl.remove_callback(self.callback, self.callback_args) 531 self.callback = self.callback_args = None 532 super(Subscriber, self).unregister()
533
534 -class _SubscriberImpl(_TopicImpl):
535 """ 536 Underyling L{_TopicImpl} implementation for subscriptions. 537 """
538 - def __init__(self, name, data_class):
539 """ 540 ctor. 541 @param name: graph resource name of topic, e.g. 'laser'. 542 @type name: str 543 @param data_class: Message data class 544 @type data_class: L{Message} class 545 """ 546 super(_SubscriberImpl, self).__init__(name, data_class) 547 # client-methods to invoke on new messages. should only modify 548 # under lock. This is a list of 2-tuples (fn, args), where 549 # args are additional arguments for the callback, or None 550 self.callbacks = [] 551 self.queue_size = None 552 self.buff_size = DEFAULT_BUFF_SIZE 553 self.tcp_nodelay = False
554
555 - def close(self):
556 """close I/O and release resources""" 557 _TopicImpl.close(self) 558 if self.callbacks: 559 del self.callbacks[:] 560 self.callbacks = None
561
562 - def set_tcp_nodelay(self, tcp_nodelay):
563 """ 564 Set the value of TCP_NODELAY, which causes the Nagle algorithm 565 to be disabled for future topic connections, if the publisher 566 supports it. 567 """ 568 self.tcp_nodelay = tcp_nodelay
569
570 - def set_queue_size(self, queue_size):
571 """ 572 Set the receive queue size. If more than queue_size messages 573 are waiting to be deserialized, they are discarded. 574 575 @param queue_size int: incoming queue size. Must be positive integer or None. 576 @type queue_size: int 577 """ 578 if queue_size == -1: 579 self.queue_size = None 580 elif queue_size == 0: 581 raise ROSException("queue size may not be set to zero") 582 elif queue_size is not None and type(queue_size) != int: 583 raise ROSException("queue size must be an integer") 584 else: 585 self.queue_size = queue_size
586
587 - def set_buff_size(self, buff_size):
588 """ 589 Set the receive buffer size. The exact meaning of this is 590 transport dependent. 591 @param buff_size: receive buffer size 592 @type buff_size: int 593 """ 594 if type(buff_size) != int: 595 raise ROSException("buffer size must be an integer") 596 elif buff_size <= 0: 597 raise ROSException("buffer size must be a positive integer") 598 self.buff_size = buff_size
599
600 - def get_stats(self): # STATS
601 """ 602 Get the stats for this topic subscriber 603 @return: stats for topic in getBusStats() publisher format:: 604 (topicName, connStats) 605 where connStats is:: 606 [connectionId, bytesReceived, numSent, dropEstimate, connected]* 607 @rtype: list 608 """ 609 # save reference to avoid locking 610 conn = self.connections 611 #for now drop estimate is -1 612 stats = (self.resolved_name, 613 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done) 614 for c in conn] ) 615 return stats
616
617 - def add_callback(self, cb, cb_args):
618 """ 619 Register a callback to be invoked whenever a new message is received 620 @param cb: callback function to invoke with message data 621 instance, i.e. fn(data). If callback args is set, they will 622 be passed in as the second argument. 623 @type cb: fn(msg) 624 @param cb_cargs: additional arguments to pass to callback 625 @type cb_cargs: Any 626 """ 627 if self.closed: 628 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name)) 629 with self.c_lock: 630 # we lock in order to serialize calls to add_callback, but 631 # we copy self.callbacks so we can it 632 new_callbacks = self.callbacks[:] 633 new_callbacks.append((cb, cb_args)) 634 self.callbacks = new_callbacks 635 636 # #1852: invoke callback with any latched messages 637 for c in self.connections: 638 if c.latch is not None: 639 self._invoke_callback(c.latch, cb, cb_args)
640
641 - def remove_callback(self, cb, cb_args):
642 """ 643 Unregister a message callback. 644 @param cb: callback function 645 @type cb: fn(msg) 646 @param cb_cargs: additional arguments associated with callback 647 @type cb_cargs: Any 648 @raise KeyError: if no matching callback 649 """ 650 if self.closed: 651 return 652 with self.c_lock: 653 # we lock in order to serialize calls to add_callback, but 654 # we copy self.callbacks so we can it 655 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args] 656 if matches: 657 new_callbacks = self.callbacks[:] 658 # remove the first match 659 new_callbacks.remove(matches[0]) 660 self.callbacks = new_callbacks 661 if not matches: 662 raise KeyError("no matching cb")
663
664 - def _invoke_callback(self, msg, cb, cb_args):
665 """ 666 Invoke callback on msg. Traps and logs any exceptions raise by callback 667 @param msg: message data 668 @type msg: L{Message} 669 @param cb: callback 670 @type cb: fn(msg, cb_args) 671 @param cb_args: callback args or None 672 @type cb_args: Any 673 """ 674 try: 675 if cb_args is not None: 676 cb(msg, cb_args) 677 else: 678 cb(msg) 679 except Exception as e: 680 if not is_shutdown(): 681 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc())) 682 else: 683 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
684
685 - def receive_callback(self, msgs):
686 """ 687 Called by underlying connection transport for each new message received 688 @param msgs: message data 689 @type msgs: [L{Message}] 690 """ 691 # save reference to avoid lock 692 callbacks = self.callbacks 693 for msg in msgs: 694 for cb, cb_args in callbacks: 695 self._invoke_callback(msg, cb, cb_args)
696
697 -class SubscribeListener(object):
698 """ 699 Callback API to receive notifications when new subscribers 700 connect and disconnect. 701 """ 702
703 - def peer_subscribe(self, topic_name, topic_publish, peer_publish):
704 """ 705 callback when a peer has subscribed from a topic 706 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 707 @type topic_name: str 708 @param topic_publish: method to publish message data to all subscribers 709 @type topic_publish: fn(data) 710 @param peer_publish: method to publish message data to 711 new subscriber. NOTE: behavior for the latter is 712 transport-dependent as some transports may be broadcast only. 713 @type peer_publish: fn(data) 714 """ 715 pass
716
717 - def peer_unsubscribe(self, topic_name, num_peers):
718 """ 719 callback when a peer has unsubscribed from a topic 720 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 721 @type topic_name: str 722 @param num_peers: number of remaining peers subscribed to topic 723 @type num_peers: int 724 """ 725 pass
726 727 728 # Implementation note: Publisher attaches to a 729 # _PublisherImpl singleton for that topic. The underlying impl 730 # manages the connections for that publication and enables 731 # thread-safe access 732
733 -class Publisher(Topic):
734 """ 735 Class for registering as a publisher of a ROS topic. 736 """ 737
738 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None):
739 """ 740 Constructor 741 @param name: resource name of topic, e.g. 'laser'. 742 @type name: str 743 @param data_class: message class for serialization 744 @type data_class: L{Message} class 745 @param subscriber_listener: listener for 746 subscription events. May be None. 747 @type subscriber_listener: L{SubscribeListener} 748 @param tcp_nodelay: If True, sets TCP_NODELAY on 749 publisher's socket (disables Nagle algorithm). This results 750 in lower latency publishing at the cost of efficiency. 751 @type tcp_nodelay: bool 752 @param latch: If True, the last message published is 753 'latched', meaning that any future subscribers will be sent 754 that message immediately upon connection. 755 @type latch: bool 756 @raise ROSException: if parameters are invalid 757 """ 758 super(Publisher, self).__init__(name, data_class, Registration.PUB) 759 760 if subscriber_listener: 761 self.impl.add_subscriber_listener(subscriber_listener) 762 if tcp_nodelay: 763 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay) 764 if latch: 765 self.impl.enable_latch() 766 if headers: 767 self.impl.add_headers(headers)
768
769 - def publish(self, *args, **kwds):
770 """ 771 Publish message data object to this topic. 772 Publish can either be called with the message instance to 773 publish or with the constructor args for a new Message 774 instance, i.e.:: 775 pub.publish(message_instance) 776 pub.publish(message_field_1, message_field_2...) 777 pub.publish(message_field_1='foo', message_field_2='bar') 778 779 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used 780 @param kwds : Message keyword arguments. If kwds are used, args must be unset 781 @raise ROSException: If rospy node has not been initialized 782 @raise ROSSerializationException: If unable to serialize 783 message. This is usually a type error with one of the fields. 784 """ 785 if self.impl is None: 786 raise ROSException("publish() to an unregistered() handle") 787 if not is_initialized(): 788 raise ROSException("ROS node has not been initialized yet. Please call init_node() first") 789 data = args_kwds_to_message(self.data_class, args, kwds) 790 try: 791 self.impl.acquire() 792 self.impl.publish(data) 793 except genpy.SerializationError as e: 794 # can't go to rospy.logerr(), b/c this could potentially recurse 795 _logger.error(traceback.format_exc(e)) 796 raise ROSSerializationException(str(e)) 797 finally: 798 self.impl.release()
799
800 -class _PublisherImpl(_TopicImpl):
801 """ 802 Underyling L{_TopicImpl} implementation for publishers. 803 """ 804
805 - def __init__(self, name, data_class):
806 """ 807 @param name: name of topic, e.g. 'laser'. 808 @type name: str 809 @param data_class: Message data class 810 @type data_class: L{Message} class 811 """ 812 super(_PublisherImpl, self).__init__(name, data_class) 813 if python3 == 0: 814 self.buff = StringIO() 815 else: 816 self.buff = BytesIO() 817 self.publock = threading.RLock() #for acquire()/release 818 self.subscriber_listeners = [] 819 820 # additional client connection headers 821 self.headers = {} 822 823 # publish latch, starts disabled 824 self.is_latch = False 825 self.latch = None 826 827 #STATS 828 self.message_data_sent = 0
829
830 - def close(self):
831 """close I/O and release resources""" 832 _TopicImpl.close(self) 833 # release resources 834 if self.subscriber_listeners: 835 del self.subscriber_listeners[:] 836 if self.headers: 837 self.headers.clear() 838 if self.buff is not None: 839 self.buff.close() 840 self.publock = self.headers = self.buff = self.subscriber_listeners = None
841
842 - def add_headers(self, headers):
843 """ 844 Add connection headers to this Topic for future connections. 845 @param headers: key/values will be added to current connection 846 header set, overriding any existing keys if they conflict. 847 @type headers: dict 848 """ 849 self.headers.update(headers)
850
851 - def enable_latch(self):
852 """ 853 Enable publish() latch. The latch contains the last published 854 message and is sent to any new subscribers. 855 """ 856 self.is_latch = True
857
858 - def get_stats(self): # STATS
859 """ 860 Get the stats for this topic publisher 861 @return: stats for topic in getBusStats() publisher format:: 862 [topicName, messageDataBytes, connStats], 863 where connStats is:: 864 [id, bytes, numMessages, connected]* 865 @rtype: list 866 """ 867 # save reference to avoid lock 868 conn = self.connections 869 return (self.resolved_name, self.message_data_sent, 870 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in conn] )
871
872 - def add_subscriber_listener(self, l):
873 """ 874 Add a L{SubscribeListener} for subscribe events. 875 @param l: listener instance 876 @type l: L{SubscribeListener} 877 """ 878 self.subscriber_listeners.append(l)
879
880 - def acquire(self):
881 """lock for thread-safe publishing to this transport""" 882 if self.publock is not None: 883 self.publock.acquire()
884
885 - def release(self):
886 """lock for thread-safe publishing to this transport""" 887 if self.publock is not None: 888 self.publock.release()
889
890 - def add_connection(self, c):
891 """ 892 Add a connection to this topic. This must be a PubTransport. If 893 the latch is enabled, c will be sent a the value of the 894 latch. 895 @param c: connection instance 896 @type c: L{Transport} 897 @return: True if connection was added 898 @rtype: bool 899 """ 900 super(_PublisherImpl, self).add_connection(c) 901 def publish_single(data): 902 self.publish(data, connection_override=c)
903 for l in self.subscriber_listeners: 904 l.peer_subscribe(self.resolved_name, self.publish, publish_single) 905 if self.is_latch and self.latch is not None: 906 with self.publock: 907 self.publish(self.latch, connection_override=c) 908 return True 909
910 - def remove_connection(self, c):
911 """ 912 Remove existing connection from this topic. 913 @param c: connection instance to remove 914 @type c: L{Transport} 915 """ 916 super(_PublisherImpl, self).remove_connection(c) 917 num = len(self.connections) 918 for l in self.subscriber_listeners: 919 l.peer_unsubscribe(self.resolved_name, num)
920
921 - def publish(self, message, connection_override=None):
922 """ 923 Publish the data to the topic. If the topic has no subscribers, 924 the method will return without any affect. Access to publish() 925 should be locked using acquire() and release() in order to 926 ensure proper message publish ordering. 927 928 @param message: message data instance to publish 929 @type message: L{Message} 930 @param connection_override: publish to this connection instead of all 931 @type connection_override: L{Transport} 932 @return: True if the data was published, False otherwise. 933 @rtype: bool 934 @raise genpy.SerializationError: if L{Message} instance is unable to serialize itself 935 @raise rospy.ROSException: if topic has been closed or was closed during publish() 936 """ 937 #TODO: should really just use IOError instead of rospy.ROSException 938 939 if self.closed: 940 # during shutdown, the topic can get closed, which creates 941 # a race condition with user code testing is_shutdown 942 if not is_shutdown(): 943 raise ROSException("publish() to a closed topic") 944 else: 945 return 946 947 if self.is_latch: 948 self.latch = message 949 950 if not self.has_connections(): 951 #publish() falls through 952 return False 953 954 if connection_override is None: 955 #copy connections so we can iterate safely 956 conns = self.connections 957 else: 958 conns = [connection_override] 959 960 # #2128 test our buffer. I don't now how this got closed in 961 # that case, but we can at least diagnose the problem. 962 b = self.buff 963 try: 964 b.tell() 965 966 # serialize the message 967 self.seq += 1 #count messages published to the topic 968 serialize_message(b, self.seq, message) 969 970 # send the buffer to all connections 971 err_con = [] 972 data = b.getvalue() 973 974 for c in conns: 975 try: 976 if not is_shutdown(): 977 c.write_data(data) 978 except TransportTerminated as e: 979 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 980 err_con.append(c) 981 except Exception as e: 982 # greater severity level 983 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 984 err_con.append(c) 985 986 # reset the buffer and update stats 987 self.message_data_sent += b.tell() #STATS 988 b.seek(0) 989 b.truncate(0) 990 991 except ValueError: 992 # operations on self.buff can fail if topic is closed 993 # during publish, which often happens during Ctrl-C. 994 # diagnose the error and report accordingly. 995 if self.closed: 996 if is_shutdown(): 997 # we offer no guarantees on publishes that occur 998 # during shutdown, so this is not exceptional. 999 return 1000 else: 1001 # this indicates that user-level code most likely 1002 # closed the topic, which is exceptional. 1003 raise ROSException("topic was closed during publish()") 1004 else: 1005 # unexpected, so re-raise original error 1006 raise 1007 1008 # remove any bad connections 1009 for c in err_con: 1010 try: 1011 # connection will callback into remove_connection when 1012 # we close it 1013 c.close() 1014 except: 1015 pass
1016 1017 ################################################################################# 1018 # TOPIC MANAGER/LISTENER 1019
1020 -class _TopicManager(object):
1021 """ 1022 Tracks Topic objects 1023 See L{get_topic_manager()} for singleton access 1024 """ 1025
1026 - def __init__(self):
1027 """ctor.""" 1028 super(_TopicManager, self).__init__() 1029 self.pubs = {} #: { topic: _PublisherImpl } 1030 self.subs = {} #: { topic: _SubscriberImpl } 1031 self.topics = set() # [str] list of topic names 1032 self.lock = threading.Condition() 1033 self.closed = False 1034 _logger.info("topicmanager initialized")
1035
1036 - def get_pub_sub_info(self):
1037 """ 1038 get topic publisher and subscriber connection info for getBusInfo() api 1039 @return: [bus info stats] 1040 See getBusInfo() API for more data structure details. 1041 @rtype: list 1042 """ 1043 with self.lock: 1044 info = [] 1045 for s in chain(iter(self.pubs.values()), iter(self.subs.values())): 1046 info.extend(s.get_stats_info()) 1047 return info
1048
1049 - def get_pub_sub_stats(self):
1050 """ 1051 get topic publisher and subscriber stats for getBusStats() api 1052 @return: [publisherStats, subscriberStats]. 1053 See getBusStats() API for more data structure details. 1054 @rtype: list 1055 """ 1056 with self.lock: 1057 return [s.get_stats() for s in self.pubs.values()],\ 1058 [s.get_stats() for s in self.subs.values()]
1059
1060 - def close_all(self):
1061 """ 1062 Close all registered publication and subscriptions. Manager is 1063 no longer usable after close. 1064 """ 1065 self.closed = True 1066 with self.lock: 1067 for t in chain(iter(self.pubs.values()), iter(self.subs.values())): 1068 t.close() 1069 self.pubs.clear() 1070 self.subs.clear()
1071
1072 - def _add(self, ps, rmap, reg_type):
1073 """ 1074 Add L{_TopicImpl} instance to rmap 1075 @param ps: a pub/sub impl instance 1076 @type ps: L{_TopicImpl} 1077 @param rmap: { topic: _TopicImpl} rmap to record instance in 1078 @type rmap: dict 1079 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1080 @type reg_type: str 1081 """ 1082 resolved_name = ps.resolved_name 1083 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type) 1084 with self.lock: 1085 rmap[resolved_name] = ps 1086 self.topics.add(resolved_name) 1087 1088 # NOTE: this call can take a lengthy amount of time (at 1089 # least until its reimplemented to use queues) 1090 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
1091
1092 - def _recalculate_topics(self):
1093 """recalculate self.topics. expensive""" 1094 self.topics = set([x.resolved_name for x in self.pubs.values()] + 1095 [x.resolved_name for x in self.subs.values()])
1096
1097 - def _remove(self, ps, rmap, reg_type):
1098 """ 1099 Remove L{_TopicImpl} instance from rmap 1100 @param ps: a pub/sub impl instance 1101 @type ps: L{_TopicImpl} 1102 @param rmap: topic->_TopicImpl rmap to remove instance in 1103 @type rmap: dict 1104 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1105 @type reg_type: str 1106 """ 1107 resolved_name = ps.resolved_name 1108 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type) 1109 with self.lock: 1110 del rmap[resolved_name] 1111 self. _recalculate_topics() 1112 1113 # NOTE: this call can take a lengthy amount of time (at 1114 # least until its reimplemented to use queues) 1115 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1116
1117 - def get_impl(self, reg_type, resolved_name):
1118 """ 1119 Get the L{_TopicImpl} for the specified topic. This is mainly for 1120 testing purposes. Unlike acquire_impl, it does not alter the 1121 ref count. 1122 @param resolved_name: resolved topic name 1123 @type resolved_name: str 1124 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1125 @type reg_type: str 1126 """ 1127 if reg_type == Registration.PUB: 1128 rmap = self.pubs 1129 elif reg_type == Registration.SUB: 1130 rmap = self.subs 1131 else: 1132 raise TypeError("invalid reg_type: %s"%s) 1133 return rmap.get(resolved_name, None)
1134
1135 - def acquire_impl(self, reg_type, resolved_name, data_class):
1136 """ 1137 Acquire a L{_TopicImpl} for the specified topic (create one if it 1138 doesn't exist). Every L{Topic} instance has a _TopicImpl that 1139 actually controls the topic resources so that multiple Topic 1140 instances use the same underlying connections. 'Acquiring' a 1141 topic implementation marks that another Topic instance is 1142 using the TopicImpl. 1143 1144 @param resolved_name: resolved topic name 1145 @type resolved_name: str 1146 1147 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1148 @type reg_type: str 1149 1150 @param data_class: message class for topic 1151 @type data_class: L{Message} class 1152 """ 1153 if reg_type == Registration.PUB: 1154 rmap = self.pubs 1155 impl_class = _PublisherImpl 1156 elif reg_type == Registration.SUB: 1157 rmap = self.subs 1158 impl_class = _SubscriberImpl 1159 else: 1160 raise TypeError("invalid reg_type: %s"%s) 1161 with self.lock: 1162 impl = rmap.get(resolved_name, None) 1163 if not impl: 1164 impl = impl_class(resolved_name, data_class) 1165 self._add(impl, rmap, reg_type) 1166 impl.ref_count += 1 1167 return impl
1168
1169 - def release_impl(self, reg_type, resolved_name):
1170 """ 1171 Release a L_{TopicImpl} for the specified topic. 1172 1173 Every L{Topic} instance has a _TopicImpl that actually 1174 controls the topic resources so that multiple Topic instances 1175 use the same underlying connections. 'Acquiring' a topic 1176 implementation marks that another Topic instance is using the 1177 TopicImpl. 1178 1179 @param resolved_name: resolved topic name 1180 @type resolved_name: str 1181 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1182 @type reg_type: str 1183 """ 1184 if reg_type == Registration.PUB: 1185 rmap = self.pubs 1186 else: 1187 rmap = self.subs 1188 with self.lock: 1189 # check for race condition where multiple things are cleaning up at once 1190 if self.closed: 1191 return 1192 impl = rmap.get(resolved_name, None) 1193 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name 1194 impl.ref_count -= 1 1195 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero" 1196 if impl.ref_count == 0: 1197 rospyinfo("topic impl's ref count is zero, deleting topic %s...", resolved_name) 1198 impl.close() 1199 self._remove(impl, rmap, reg_type) 1200 del impl 1201 _logger.debug("... done deleting topic %s", resolved_name)
1202
1203 - def get_publisher_impl(self, resolved_name):
1204 """ 1205 @param resolved_name: resolved topic name 1206 @type resolved_name: str 1207 @return: list of L{_PublisherImpl}s 1208 @rtype: [L{_PublisherImpl}] 1209 """ 1210 return self.pubs.get(resolved_name, None)
1211
1212 - def get_subscriber_impl(self, resolved_name):
1213 """ 1214 @param resolved_name: topic name 1215 @type resolved_name: str 1216 @return: subscriber for the specified topic. 1217 @rtype: L{_SubscriberImpl} 1218 """ 1219 return self.subs.get(resolved_name, None)
1220
1221 - def has_subscription(self, resolved_name):
1222 """ 1223 @param resolved_name: resolved topic name 1224 @type resolved_name: str 1225 @return: True if manager has subscription for specified topic 1226 @rtype: bool 1227 """ 1228 return resolved_name in self.subs
1229
1230 - def has_publication(self, resolved_name):
1231 """ 1232 @param resolved_name: resolved topic name 1233 @type resolved_name: str 1234 @return: True if manager has publication for specified topic 1235 @rtype: bool 1236 """ 1237 return resolved_name in self.pubs
1238
1239 - def get_topics(self):
1240 """ 1241 @return: list of topic names this node subscribes to/publishes 1242 @rtype: [str] 1243 """ 1244 return self.topics
1245
1246 - def _get_list(self, rmap):
1247 return [[k, v.type] for k, v in rmap.items()]
1248 1249 ## @return [[str,str],]: list of topics subscribed to by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1250 - def get_subscriptions(self):
1251 return self._get_list(self.subs)
1252 1253 ## @return [[str,str],]: list of topics published by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1254 - def get_publications(self):
1255 return self._get_list(self.pubs)
1256 1257 set_topic_manager(_TopicManager()) 1258