Package rospy :: Module topics

Source Code for Module rospy.topics

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2008, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32  # 
  33  # Revision $Id$ 
  34   
  35  """ 
  36  rospy implementation of topics. 
  37   
  38  Client API 
  39  ========== 
  40   
  41  L{Publisher} and L{Subscriber} are the client API for topics. 
  42   
  43  Internal Implementation 
  44  ======================= 
  45   
  46  Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl) 
  47  are used to manage actual transport connections.  The L{_TopicManager} 
  48  is responsible for tracking the system-wide state of publications and 
  49  subscriptions as well as the L{_TopicImpl} instances. More info is below. 
  50    
  51  L{_TopicManager} 
  52  ================ 
  53   
  54  The L{_TopicManager} does the backend topic bookkeeping for the local 
  55  node.  Use L{get_topic_manager()} to access singleton. Actual topic 
  56  implementations are done through the 
  57  L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client 
  58  code generates instances of type L{Publisher}/L{Subscriber}, which 
  59  enable to client to create multiple publishers/subscribers of that 
  60  topic that get controlled access to the underlying share connections. 
  61   
  62  Common parent classes for all rospy topics. The rospy topic autogenerators 
  63  create classes that are children of these implementations. 
  64  """ 
  65   
  66   
  67  import struct 
  68  import select 
  69  try: 
  70      from cStringIO import StringIO #Python 2.x 
  71      python3 = 0 
72 - def isstring(s):
73 return isinstance(s, basestring) #Python 2.x
74 except ImportError: 75 python3 = 1 76 from io import StringIO, BytesIO #Python 3.x
77 - def isstring(s):
78 return isinstance(s, str) #Python 3.x
79 80 import threading 81 import logging 82 import time 83 84 from itertools import chain 85 import traceback 86 87 import rosgraph.names 88 89 from rospy.core import * 90 from rospy.exceptions import ROSSerializationException, TransportTerminated 91 from rospy.msg import serialize_message, args_kwds_to_message 92 93 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners 94 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE 95 from rospy.impl.tcpros_pubsub import QueuedConnection 96 97 _logger = logging.getLogger('rospy.topics') 98 99 # wrap genpy implementation and map it to rospy namespace 100 import genpy 101 Message = genpy.Message 102 103 ####################################################################### 104 # Base classes for all client-API instantiated pub/sub 105 # 106 # There are two trees: Topic and _TopicImpl. Topic is the client API 107 # for interfacing with topics, while _TopicImpl implements the 108 # underlying connection details. 109
110 -class Topic(object):
111 """Base class of L{Publisher} and L{Subscriber}""" 112
113 - def __init__(self, name, data_class, reg_type):
114 """ 115 @param name: graph resource name of topic, e.g. 'laser'. 116 @type name: str 117 @param data_class: message class for serialization 118 @type data_class: L{Message} 119 @param reg_type Registration.PUB or Registration.SUB 120 @type reg_type: str 121 @raise ValueError: if parameters are invalid 122 """ 123 124 if not name or not isstring(name): 125 raise ValueError("topic name is not a non-empty string") 126 try: 127 if python3 == 1: 128 name.encode("utf-8") 129 else: 130 name = name.encode("utf-8") 131 except UnicodeError: 132 raise ValueError("topic name must be ascii/utf-8 compatible") 133 if data_class is None: 134 raise ValueError("topic parameter 'data_class' is not initialized") 135 if not type(data_class) == type: 136 raise ValueError("data_class [%s] is not a class"%data_class) 137 if not issubclass(data_class, genpy.Message): 138 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__) 139 # #2202 140 if not rosgraph.names.is_legal_name(name): 141 import warnings 142 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 143 144 # this is a bit ugly, but necessary due to the fact that we allow 145 # topics and services to be initialized before the node 146 if not rospy.core.is_initialized(): 147 self.resolved_name = rospy.names.resolve_name_without_node_name(name) 148 else: 149 # init_node() has been called, so we can do normal resolution 150 self.resolved_name = resolve_name(name) 151 152 self.name = self.resolved_name # #1810 for backwards compatibility 153 154 self.data_class = data_class 155 self.type = data_class._type 156 self.md5sum = data_class._md5sum 157 self.reg_type = reg_type 158 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
159
160 - def get_num_connections(self):
161 """ 162 get the number of connections to other ROS nodes for this topic. For a Publisher, 163 this corresponds to the number of nodes subscribing. For a Subscriber, the number 164 of publishers. 165 @return: number of connections 166 @rtype: int 167 """ 168 return self.impl.get_num_connections()
169
170 - def unregister(self):
171 """ 172 unpublish/unsubscribe from topic. Topic instance is no longer 173 valid after this call. Additional calls to unregister() have no effect. 174 """ 175 # as we don't guard unregister, have to protect value of 176 # resolved_name for release_impl call 177 resolved_name = self.resolved_name 178 if resolved_name and self.impl: 179 get_topic_manager().release_impl(self.reg_type, resolved_name) 180 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
181 182 # #3808
183 -class Poller(object):
184 """ 185 select.poll/kqueue abstraction to handle socket failure detection 186 on multiple platforms. NOT thread-safe. 187 """
188 - def __init__(self):
189 try: 190 self.poller = select.poll() 191 self.add_fd = self.add_poll 192 self.remove_fd = self.remove_poll 193 self.error_iter = self.error_poll_iter 194 except: 195 try: 196 # poll() not available, try kqueue 197 self.poller = select.kqueue() 198 self.add_fd = self.add_kqueue 199 self.remove_fd = self.remove_kqueue 200 self.error_iter = self.error_kqueue_iter 201 self.kevents = [] 202 except: 203 #TODO: non-Noop impl for Windows 204 self.poller = self.noop 205 self.add_fd = self.noop 206 self.remove_fd = self.noop 207 self.error_iter = self.noop_iter
208
209 - def noop(self, *args):
210 pass
211
212 - def noop_iter(self):
213 empty_generator = (x for x in list()) 214 for x in empty_generator: 215 yield x
216
217 - def add_poll(self, fd):
218 self.poller.register(fd)
219
220 - def remove_poll(self, fd):
221 self.poller.unregister(fd)
222
223 - def error_poll_iter(self):
224 events = self.poller.poll(0) 225 for fd, event in events: 226 if event & (select.POLLHUP | select.POLLERR): 227 yield fd
228
229 - def add_kqueue(self, fd):
230 self.kevents.append(select.kevent(fd))
231
232 - def error_kqueue_iter(self):
233 events = self.poller.control(self.kevents, 0) 234 for event in events: 235 if event & (select.KQ_EV_ERROR | select.KQ_EV_EOF): 236 yield event.ident
237
238 - def remove_kqueue(self, fd):
239 e = [x for x in self.kevents if x.ident == fd] 240 for x in e: 241 self.kevents.remove(x)
242
243 -class _TopicImpl(object):
244 """ 245 Base class of internal topic implementations. Each topic has a 246 singleton _TopicImpl implementation for managing the underlying 247 connections. 248 """ 249
250 - def __init__(self, name, data_class):
251 """ 252 Base constructor 253 @param name: graph resource name of topic, e.g. 'laser'. 254 @type name: str 255 @param data_class: message data class 256 @type data_class: L{Message} 257 """ 258 259 # #1810 made resolved/unresolved more explicit so we don't accidentally double-resolve 260 self.resolved_name = resolve_name(name) #NOTE: remapping occurs here! 261 self.name = self.resolved_name # for backwards compatibility 262 263 self.data_class = data_class 264 self.type = data_class._type 265 self.handler = None 266 self.seq = 0 267 # lock is used for to serialize call order that methods that 268 # modify self.connections. Because add/removing connections is 269 # a rare event, we go through the extra hassle of making a 270 # copy of the connections/callbacks lists 271 # when modifying, then setting the reference to the new copy. 272 # With this pattern, other code can use these lists without 273 # having to acquire c_lock 274 self.c_lock = threading.RLock() 275 self.connections = [] 276 self.closed = False 277 # number of Topic instances using this 278 self.ref_count = 0 279 280 self.connection_poll = Poller()
281
282 - def __del__(self):
283 # very similar to close(), but have to be more careful in a __del__ what we call 284 if self.closed: 285 return 286 if self.connections is not None: 287 for c in self.connections: 288 try: 289 c.close() 290 except: 291 pass 292 del self.connections[:] 293 self.c_lock = self.connections = self.handler = self.data_class = self.type = None
294
295 - def close(self):
296 """close I/O""" 297 if self.closed: 298 return 299 self.closed = True 300 if self.c_lock is not None: 301 with self.c_lock: 302 for c in self.connections: 303 try: 304 if c is not None: 305 c.close() 306 except: 307 # seems more logger.error internal than external logerr 308 _logger.error(traceback.format_exc()) 309 del self.connections[:] 310 self.handler = None
311
312 - def get_num_connections(self):
313 with self.c_lock: 314 return len(self.connections)
315
316 - def has_connection(self, endpoint_id):
317 """ 318 Query whether or not a connection with the associated \a 319 endpoint has been added to this object. 320 @param endpoint_id: endpoint ID associated with connection. 321 @type endpoint_id: str 322 """ 323 # save reference to avoid lock 324 conn = self.connections 325 for c in conn: 326 if c.endpoint_id == endpoint_id: 327 return True 328 return False
329
330 - def has_connections(self):
331 """ 332 Check to see if this topic is connected to other publishers/subscribers 333 @return: True if topic is connected 334 @rtype: bool 335 """ 336 if self.connections: 337 return True 338 return False
339
340 - def _remove_connection(self, connections, c):
341 # Remove from poll instance as well as connections 342 try: 343 self.connection_poll.remove_fd(c.fileno()) 344 except: 345 pass 346 try: 347 # not necessarily correct from an abstraction point of 348 # view, but will prevent accident connection leaks 349 c.close() 350 except: 351 pass 352 if c in connections: 353 connections.remove(c)
354
355 - def add_connection(self, c):
356 """ 357 Add a connection to this topic. If any previous connections 358 to same endpoint exist, drop them. 359 360 @param c: connection instance 361 @type c: Transport 362 @return: True if connection was added, ``bool`` 363 """ 364 rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections))) 365 with self.c_lock: 366 # c_lock is to make add_connection thread-safe, but we 367 # still make a copy of self.connections so that the rest of the 368 # code can use self.connections in an unlocked manner 369 new_connections = self.connections[:] 370 371 # if we have a connection to the same endpoint_id, drop 372 # the old one. 373 for oldc in self.connections: 374 if oldc.endpoint_id == c.endpoint_id: 375 self._remove_connection(new_connections, oldc) 376 377 # #3808: "garbage collect" bad sockets whenever we add new 378 # connections. This allows at most one stale connection 379 # per topic. Otherwise, we only detect bad connections on 380 # write. An alternative (given the current 381 # implementation) would be to start a thread that 382 # regularly polls all fds, but that would create a lot of 383 # synchronization events and also have a separate thread 384 # to manage. It would be desireable to move to this, but 385 # this change is less impactful and keeps the codebase 386 # more stable as we move towards an entirely different 387 # event loop for rospy -- the heart of the problem is that 388 # rospy's i/o is blocking-based, which has the obvious 389 # issues. 390 391 for fd in self.connection_poll.error_iter(): 392 to_remove = [x for x in new_connections if x.fileno() == fd] 393 for x in to_remove: 394 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id)) 395 self._remove_connection(new_connections, x) 396 397 # Add new connection to poller, register for all events, 398 # though we only care about POLLHUP/ERR 399 new_fd = c.fileno() 400 if new_fd is not None: 401 self.connection_poll.add_fd(new_fd) 402 403 # add in new connection 404 new_connections.append(c) 405 406 self.connections = new_connections 407 408 # connections make a callback when closed 409 c.set_cleanup_callback(self.remove_connection) 410 411 return True
412
413 - def remove_connection(self, c):
414 """ 415 Remove connection from topic. 416 @param c: connection instance to remove 417 @type c: Transport 418 """ 419 rospyinfo("topic[%s] removing connection to %s"%(self.resolved_name, c.endpoint_id)) 420 with self.c_lock: 421 # c_lock is to make remove_connection thread-safe, but we 422 # still make a copy of self.connections so that the rest of the 423 # code can use self.connections in an unlocked manner 424 new_connections = self.connections[:] 425 self._remove_connection(new_connections, c) 426 self.connections = new_connections
427
428 - def get_stats_info(self): # STATS
429 """ 430 Get the stats for this topic 431 @return: stats for topic in getBusInfo() format:: 432 ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*) 433 @rtype: list 434 """ 435 # save referenceto avoid locking 436 connections = self.connections 437 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True) for c in connections]
438
439 - def get_stats(self): # STATS
440 """Get the stats for this topic (API stub)""" 441 raise Exception("subclasses must override") 442 443 # Implementation note: Subscriber attaches to a _SubscriberImpl 444 # singleton for that topic. The underlying impl manages the 445 # connections for that publication and enables thread-safe access 446
447 -class Subscriber(Topic):
448 """ 449 Class for registering as a subscriber to a specified topic, where 450 the messages are of a given type. 451 """
452 - def __init__(self, name, data_class, callback=None, callback_args=None, 453 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
454 """ 455 Constructor. 456 457 NOTE: for the queue_size and buff_size 458 parameters, rospy does not attempt to do intelligent merging 459 between multiple Subscriber instances for the same topic. As 460 they share the same underlying transport, multiple Subscribers 461 to the same topic can conflict with one another if they set 462 these parameters differently. 463 464 @param name: graph resource name of topic, e.g. 'laser'. 465 @type name: str 466 @param data_class: data type class to use for messages, 467 e.g. std_msgs.msg.String 468 @type data_class: L{Message} class 469 @param callback: function to call ( fn(data)) when data is 470 received. If callback_args is set, the function must accept 471 the callback_args as a second argument, i.e. fn(data, 472 callback_args). NOTE: Additional callbacks can be added using 473 add_callback(). 474 @type callback: str 475 @param callback_args: additional arguments to pass to the 476 callback. This is useful when you wish to reuse the same 477 callback for multiple subscriptions. 478 @type callback_args: any 479 @param queue_size: maximum number of messages to receive at 480 a time. This will generally be 1 or None (infinite, 481 default). buff_size should be increased if this parameter 482 is set as incoming data still needs to sit in the incoming 483 buffer before being discarded. Setting queue_size 484 buff_size to a non-default value affects all subscribers to 485 this topic in this process. 486 @type queue_size: int 487 @param buff_size: incoming message buffer size in bytes. If 488 queue_size is set, this should be set to a number greater 489 than the queue_size times the average message size. Setting 490 buff_size to a non-default value affects all subscribers to 491 this topic in this process. 492 @type buff_size: int 493 @param tcp_nodelay: if True, request TCP_NODELAY from 494 publisher. Use of this option is not generally recommended 495 in most cases as it is better to rely on timestamps in 496 message data. Setting tcp_nodelay to True enables TCP_NODELAY 497 for all subscribers in the same python process. 498 @type tcp_nodelay: bool 499 @raise ROSException: if parameters are invalid 500 """ 501 super(Subscriber, self).__init__(name, data_class, Registration.SUB) 502 #add in args that factory cannot pass in 503 504 # last person to set these to non-defaults wins, not much way 505 # around this 506 if queue_size is not None: 507 self.impl.set_queue_size(queue_size) 508 if buff_size != DEFAULT_BUFF_SIZE: 509 self.impl.set_buff_size(buff_size) 510 511 if callback is not None: 512 # #1852 513 # it's important that we call add_callback so that the 514 # callback can be invoked with any latched messages 515 self.impl.add_callback(callback, callback_args) 516 # save arguments for unregister 517 self.callback = callback 518 self.callback_args = callback_args 519 else: 520 # initialize fields 521 self.callback = self.callback_args = None 522 if tcp_nodelay: 523 self.impl.set_tcp_nodelay(tcp_nodelay)
524
525 - def unregister(self):
526 """ 527 unpublish/unsubscribe from topic. Topic instance is no longer 528 valid after this call. Additional calls to unregister() have no effect. 529 """ 530 if self.impl: 531 # It's possible to have a Subscriber instance with no 532 # associated callback 533 if self.callback is not None: 534 self.impl.remove_callback(self.callback, self.callback_args) 535 self.callback = self.callback_args = None 536 super(Subscriber, self).unregister()
537
538 -class _SubscriberImpl(_TopicImpl):
539 """ 540 Underyling L{_TopicImpl} implementation for subscriptions. 541 """
542 - def __init__(self, name, data_class):
543 """ 544 ctor. 545 @param name: graph resource name of topic, e.g. 'laser'. 546 @type name: str 547 @param data_class: Message data class 548 @type data_class: L{Message} class 549 """ 550 super(_SubscriberImpl, self).__init__(name, data_class) 551 # client-methods to invoke on new messages. should only modify 552 # under lock. This is a list of 2-tuples (fn, args), where 553 # args are additional arguments for the callback, or None 554 self.callbacks = [] 555 self.queue_size = None 556 self.buff_size = DEFAULT_BUFF_SIZE 557 self.tcp_nodelay = False
558
559 - def close(self):
560 """close I/O and release resources""" 561 _TopicImpl.close(self) 562 if self.callbacks: 563 del self.callbacks[:] 564 self.callbacks = None
565
566 - def set_tcp_nodelay(self, tcp_nodelay):
567 """ 568 Set the value of TCP_NODELAY, which causes the Nagle algorithm 569 to be disabled for future topic connections, if the publisher 570 supports it. 571 """ 572 self.tcp_nodelay = tcp_nodelay
573
574 - def set_queue_size(self, queue_size):
575 """ 576 Set the receive queue size. If more than queue_size messages 577 are waiting to be deserialized, they are discarded. 578 579 @param queue_size int: incoming queue size. Must be positive integer or None. 580 @type queue_size: int 581 """ 582 if queue_size == -1: 583 self.queue_size = None 584 elif queue_size == 0: 585 raise ROSException("queue size may not be set to zero") 586 elif queue_size is not None and type(queue_size) != int: 587 raise ROSException("queue size must be an integer") 588 else: 589 self.queue_size = queue_size
590
591 - def set_buff_size(self, buff_size):
592 """ 593 Set the receive buffer size. The exact meaning of this is 594 transport dependent. 595 @param buff_size: receive buffer size 596 @type buff_size: int 597 """ 598 if type(buff_size) != int: 599 raise ROSException("buffer size must be an integer") 600 elif buff_size <= 0: 601 raise ROSException("buffer size must be a positive integer") 602 self.buff_size = buff_size
603
604 - def get_stats(self): # STATS
605 """ 606 Get the stats for this topic subscriber 607 @return: stats for topic in getBusStats() publisher format:: 608 (topicName, connStats) 609 where connStats is:: 610 [connectionId, bytesReceived, numSent, dropEstimate, connected]* 611 @rtype: list 612 """ 613 # save reference to avoid locking 614 conn = self.connections 615 #for now drop estimate is -1 616 stats = (self.resolved_name, 617 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done) 618 for c in conn] ) 619 return stats
620
621 - def add_callback(self, cb, cb_args):
622 """ 623 Register a callback to be invoked whenever a new message is received 624 @param cb: callback function to invoke with message data 625 instance, i.e. fn(data). If callback args is set, they will 626 be passed in as the second argument. 627 @type cb: fn(msg) 628 @param cb_cargs: additional arguments to pass to callback 629 @type cb_cargs: Any 630 """ 631 if self.closed: 632 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name)) 633 with self.c_lock: 634 # we lock in order to serialize calls to add_callback, but 635 # we copy self.callbacks so we can it 636 new_callbacks = self.callbacks[:] 637 new_callbacks.append((cb, cb_args)) 638 self.callbacks = new_callbacks 639 640 # #1852: invoke callback with any latched messages 641 for c in self.connections: 642 if c.latch is not None: 643 self._invoke_callback(c.latch, cb, cb_args)
644
645 - def remove_callback(self, cb, cb_args):
646 """ 647 Unregister a message callback. 648 @param cb: callback function 649 @type cb: fn(msg) 650 @param cb_cargs: additional arguments associated with callback 651 @type cb_cargs: Any 652 @raise KeyError: if no matching callback 653 """ 654 if self.closed: 655 return 656 with self.c_lock: 657 # we lock in order to serialize calls to add_callback, but 658 # we copy self.callbacks so we can it 659 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args] 660 if matches: 661 new_callbacks = self.callbacks[:] 662 # remove the first match 663 new_callbacks.remove(matches[0]) 664 self.callbacks = new_callbacks 665 if not matches: 666 raise KeyError("no matching cb")
667
668 - def _invoke_callback(self, msg, cb, cb_args):
669 """ 670 Invoke callback on msg. Traps and logs any exceptions raise by callback 671 @param msg: message data 672 @type msg: L{Message} 673 @param cb: callback 674 @type cb: fn(msg, cb_args) 675 @param cb_args: callback args or None 676 @type cb_args: Any 677 """ 678 try: 679 if cb_args is not None: 680 cb(msg, cb_args) 681 else: 682 cb(msg) 683 except Exception as e: 684 if not is_shutdown(): 685 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc())) 686 else: 687 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
688
689 - def receive_callback(self, msgs):
690 """ 691 Called by underlying connection transport for each new message received 692 @param msgs: message data 693 @type msgs: [L{Message}] 694 """ 695 # save reference to avoid lock 696 callbacks = self.callbacks 697 for msg in msgs: 698 for cb, cb_args in callbacks: 699 self._invoke_callback(msg, cb, cb_args)
700
701 -class SubscribeListener(object):
702 """ 703 Callback API to receive notifications when new subscribers 704 connect and disconnect. 705 """ 706
707 - def peer_subscribe(self, topic_name, topic_publish, peer_publish):
708 """ 709 callback when a peer has subscribed from a topic 710 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 711 @type topic_name: str 712 @param topic_publish: method to publish message data to all subscribers 713 @type topic_publish: fn(data) 714 @param peer_publish: method to publish message data to 715 new subscriber. NOTE: behavior for the latter is 716 transport-dependent as some transports may be broadcast only. 717 @type peer_publish: fn(data) 718 """ 719 pass
720
721 - def peer_unsubscribe(self, topic_name, num_peers):
722 """ 723 callback when a peer has unsubscribed from a topic 724 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 725 @type topic_name: str 726 @param num_peers: number of remaining peers subscribed to topic 727 @type num_peers: int 728 """ 729 pass
730 731 732 # Implementation note: Publisher attaches to a 733 # _PublisherImpl singleton for that topic. The underlying impl 734 # manages the connections for that publication and enables 735 # thread-safe access 736
737 -class Publisher(Topic):
738 """ 739 Class for registering as a publisher of a ROS topic. 740 """ 741
742 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None):
743 """ 744 Constructor 745 @param name: resource name of topic, e.g. 'laser'. 746 @type name: str 747 @param data_class: message class for serialization 748 @type data_class: L{Message} class 749 @param subscriber_listener: listener for 750 subscription events. May be None. 751 @type subscriber_listener: L{SubscribeListener} 752 @param tcp_nodelay: If True, sets TCP_NODELAY on 753 publisher's socket (disables Nagle algorithm). This results 754 in lower latency publishing at the cost of efficiency. 755 @type tcp_nodelay: bool 756 @param latch: If True, the last message published is 757 'latched', meaning that any future subscribers will be sent 758 that message immediately upon connection. 759 @type latch: bool 760 @raise ROSException: if parameters are invalid 761 """ 762 super(Publisher, self).__init__(name, data_class, Registration.PUB) 763 764 if subscriber_listener: 765 self.impl.add_subscriber_listener(subscriber_listener) 766 if tcp_nodelay: 767 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay) 768 if latch: 769 self.impl.enable_latch() 770 if headers: 771 self.impl.add_headers(headers) 772 if queue_size is not None: 773 self.impl.set_queue_size(queue_size)
774
775 - def publish(self, *args, **kwds):
776 """ 777 Publish message data object to this topic. 778 Publish can either be called with the message instance to 779 publish or with the constructor args for a new Message 780 instance, i.e.:: 781 pub.publish(message_instance) 782 pub.publish(message_field_1, message_field_2...) 783 pub.publish(message_field_1='foo', message_field_2='bar') 784 785 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used 786 @param kwds : Message keyword arguments. If kwds are used, args must be unset 787 @raise ROSException: If rospy node has not been initialized 788 @raise ROSSerializationException: If unable to serialize 789 message. This is usually a type error with one of the fields. 790 """ 791 if self.impl is None: 792 raise ROSException("publish() to an unregistered() handle") 793 if not is_initialized(): 794 raise ROSException("ROS node has not been initialized yet. Please call init_node() first") 795 data = args_kwds_to_message(self.data_class, args, kwds) 796 try: 797 self.impl.acquire() 798 self.impl.publish(data) 799 except genpy.SerializationError as e: 800 # can't go to rospy.logerr(), b/c this could potentially recurse 801 _logger.error(traceback.format_exc(e)) 802 raise ROSSerializationException(str(e)) 803 finally: 804 self.impl.release()
805
806 -class _PublisherImpl(_TopicImpl):
807 """ 808 Underyling L{_TopicImpl} implementation for publishers. 809 """ 810
811 - def __init__(self, name, data_class):
812 """ 813 @param name: name of topic, e.g. 'laser'. 814 @type name: str 815 @param data_class: Message data class 816 @type data_class: L{Message} class 817 """ 818 super(_PublisherImpl, self).__init__(name, data_class) 819 if python3 == 0: 820 self.buff = StringIO() 821 else: 822 self.buff = BytesIO() 823 self.publock = threading.RLock() #for acquire()/release 824 self.subscriber_listeners = [] 825 826 # additional client connection headers 827 self.headers = {} 828 829 # publish latch, starts disabled 830 self.is_latch = False 831 self.latch = None 832 833 # maximum queue size for publishing messages 834 self.queue_size = None 835 836 #STATS 837 self.message_data_sent = 0
838
839 - def close(self):
840 """close I/O and release resources""" 841 _TopicImpl.close(self) 842 # release resources 843 if self.subscriber_listeners: 844 del self.subscriber_listeners[:] 845 if self.headers: 846 self.headers.clear() 847 if self.buff is not None: 848 self.buff.close() 849 self.publock = self.headers = self.buff = self.subscriber_listeners = None
850
851 - def add_headers(self, headers):
852 """ 853 Add connection headers to this Topic for future connections. 854 @param headers: key/values will be added to current connection 855 header set, overriding any existing keys if they conflict. 856 @type headers: dict 857 """ 858 self.headers.update(headers)
859
860 - def enable_latch(self):
861 """ 862 Enable publish() latch. The latch contains the last published 863 message and is sent to any new subscribers. 864 """ 865 self.is_latch = True
866
867 - def set_queue_size(self, queue_size):
868 self.queue_size = queue_size
869
870 - def get_stats(self): # STATS
871 """ 872 Get the stats for this topic publisher 873 @return: stats for topic in getBusStats() publisher format:: 874 [topicName, messageDataBytes, connStats], 875 where connStats is:: 876 [id, bytes, numMessages, connected]* 877 @rtype: list 878 """ 879 # save reference to avoid lock 880 conn = self.connections 881 return (self.resolved_name, self.message_data_sent, 882 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in conn] )
883
884 - def add_subscriber_listener(self, l):
885 """ 886 Add a L{SubscribeListener} for subscribe events. 887 @param l: listener instance 888 @type l: L{SubscribeListener} 889 """ 890 self.subscriber_listeners.append(l)
891
892 - def acquire(self):
893 """lock for thread-safe publishing to this transport""" 894 if self.publock is not None: 895 self.publock.acquire()
896
897 - def release(self):
898 """lock for thread-safe publishing to this transport""" 899 if self.publock is not None: 900 self.publock.release()
901
902 - def add_connection(self, c):
903 """ 904 Add a connection to this topic. This must be a PubTransport. If 905 the latch is enabled, c will be sent a the value of the 906 latch. 907 @param c: connection instance 908 @type c: L{Transport} 909 @return: True if connection was added 910 @rtype: bool 911 """ 912 if self.queue_size is not None: 913 c = QueuedConnection(c, self.queue_size) 914 super(_PublisherImpl, self).add_connection(c) 915 def publish_single(data): 916 self.publish(data, connection_override=c)
917 for l in self.subscriber_listeners: 918 l.peer_subscribe(self.resolved_name, self.publish, publish_single) 919 if self.is_latch and self.latch is not None: 920 with self.publock: 921 self.publish(self.latch, connection_override=c) 922 return True 923
924 - def remove_connection(self, c):
925 """ 926 Remove existing connection from this topic. 927 @param c: connection instance to remove 928 @type c: L{Transport} 929 """ 930 super(_PublisherImpl, self).remove_connection(c) 931 num = len(self.connections) 932 for l in self.subscriber_listeners: 933 l.peer_unsubscribe(self.resolved_name, num)
934
935 - def publish(self, message, connection_override=None):
936 """ 937 Publish the data to the topic. If the topic has no subscribers, 938 the method will return without any affect. Access to publish() 939 should be locked using acquire() and release() in order to 940 ensure proper message publish ordering. 941 942 @param message: message data instance to publish 943 @type message: L{Message} 944 @param connection_override: publish to this connection instead of all 945 @type connection_override: L{Transport} 946 @return: True if the data was published, False otherwise. 947 @rtype: bool 948 @raise genpy.SerializationError: if L{Message} instance is unable to serialize itself 949 @raise rospy.ROSException: if topic has been closed or was closed during publish() 950 """ 951 #TODO: should really just use IOError instead of rospy.ROSException 952 953 if self.closed: 954 # during shutdown, the topic can get closed, which creates 955 # a race condition with user code testing is_shutdown 956 if not is_shutdown(): 957 raise ROSException("publish() to a closed topic") 958 else: 959 return 960 961 if self.is_latch: 962 self.latch = message 963 964 if not self.has_connections(): 965 #publish() falls through 966 return False 967 968 if connection_override is None: 969 #copy connections so we can iterate safely 970 conns = self.connections 971 else: 972 conns = [connection_override] 973 974 # #2128 test our buffer. I don't now how this got closed in 975 # that case, but we can at least diagnose the problem. 976 b = self.buff 977 try: 978 b.tell() 979 980 # serialize the message 981 self.seq += 1 #count messages published to the topic 982 serialize_message(b, self.seq, message) 983 984 # send the buffer to all connections 985 err_con = [] 986 data = b.getvalue() 987 988 for c in conns: 989 try: 990 if not is_shutdown(): 991 c.write_data(data) 992 except TransportTerminated as e: 993 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 994 err_con.append(c) 995 except Exception as e: 996 # greater severity level 997 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 998 err_con.append(c) 999 1000 # reset the buffer and update stats 1001 self.message_data_sent += b.tell() #STATS 1002 b.seek(0) 1003 b.truncate(0) 1004 1005 except ValueError: 1006 # operations on self.buff can fail if topic is closed 1007 # during publish, which often happens during Ctrl-C. 1008 # diagnose the error and report accordingly. 1009 if self.closed: 1010 if is_shutdown(): 1011 # we offer no guarantees on publishes that occur 1012 # during shutdown, so this is not exceptional. 1013 return 1014 else: 1015 # this indicates that user-level code most likely 1016 # closed the topic, which is exceptional. 1017 raise ROSException("topic was closed during publish()") 1018 else: 1019 # unexpected, so re-raise original error 1020 raise 1021 1022 # remove any bad connections 1023 for c in err_con: 1024 try: 1025 # connection will callback into remove_connection when 1026 # we close it 1027 c.close() 1028 except: 1029 pass
1030 1031 ################################################################################# 1032 # TOPIC MANAGER/LISTENER 1033
1034 -class _TopicManager(object):
1035 """ 1036 Tracks Topic objects 1037 See L{get_topic_manager()} for singleton access 1038 """ 1039
1040 - def __init__(self):
1041 """ctor.""" 1042 super(_TopicManager, self).__init__() 1043 self.pubs = {} #: { topic: _PublisherImpl } 1044 self.subs = {} #: { topic: _SubscriberImpl } 1045 self.topics = set() # [str] list of topic names 1046 self.lock = threading.Condition() 1047 self.closed = False 1048 _logger.info("topicmanager initialized")
1049
1050 - def get_pub_sub_info(self):
1051 """ 1052 get topic publisher and subscriber connection info for getBusInfo() api 1053 @return: [bus info stats] 1054 See getBusInfo() API for more data structure details. 1055 @rtype: list 1056 """ 1057 with self.lock: 1058 info = [] 1059 for s in chain(iter(self.pubs.values()), iter(self.subs.values())): 1060 info.extend(s.get_stats_info()) 1061 return info
1062
1063 - def get_pub_sub_stats(self):
1064 """ 1065 get topic publisher and subscriber stats for getBusStats() api 1066 @return: [publisherStats, subscriberStats]. 1067 See getBusStats() API for more data structure details. 1068 @rtype: list 1069 """ 1070 with self.lock: 1071 return [s.get_stats() for s in self.pubs.values()],\ 1072 [s.get_stats() for s in self.subs.values()]
1073
1074 - def close_all(self):
1075 """ 1076 Close all registered publication and subscriptions. Manager is 1077 no longer usable after close. 1078 """ 1079 self.closed = True 1080 with self.lock: 1081 for t in chain(iter(self.pubs.values()), iter(self.subs.values())): 1082 t.close() 1083 self.pubs.clear() 1084 self.subs.clear()
1085
1086 - def _add(self, ps, rmap, reg_type):
1087 """ 1088 Add L{_TopicImpl} instance to rmap 1089 @param ps: a pub/sub impl instance 1090 @type ps: L{_TopicImpl} 1091 @param rmap: { topic: _TopicImpl} rmap to record instance in 1092 @type rmap: dict 1093 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1094 @type reg_type: str 1095 """ 1096 resolved_name = ps.resolved_name 1097 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type) 1098 with self.lock: 1099 rmap[resolved_name] = ps 1100 self.topics.add(resolved_name) 1101 1102 # NOTE: this call can take a lengthy amount of time (at 1103 # least until its reimplemented to use queues) 1104 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
1105
1106 - def _recalculate_topics(self):
1107 """recalculate self.topics. expensive""" 1108 self.topics = set([x.resolved_name for x in self.pubs.values()] + 1109 [x.resolved_name for x in self.subs.values()])
1110
1111 - def _remove(self, ps, rmap, reg_type):
1112 """ 1113 Remove L{_TopicImpl} instance from rmap 1114 @param ps: a pub/sub impl instance 1115 @type ps: L{_TopicImpl} 1116 @param rmap: topic->_TopicImpl rmap to remove instance in 1117 @type rmap: dict 1118 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1119 @type reg_type: str 1120 """ 1121 resolved_name = ps.resolved_name 1122 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type) 1123 with self.lock: 1124 del rmap[resolved_name] 1125 self. _recalculate_topics() 1126 1127 # NOTE: this call can take a lengthy amount of time (at 1128 # least until its reimplemented to use queues) 1129 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1130
1131 - def get_impl(self, reg_type, resolved_name):
1132 """ 1133 Get the L{_TopicImpl} for the specified topic. This is mainly for 1134 testing purposes. Unlike acquire_impl, it does not alter the 1135 ref count. 1136 @param resolved_name: resolved topic name 1137 @type resolved_name: str 1138 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1139 @type reg_type: str 1140 """ 1141 if reg_type == Registration.PUB: 1142 rmap = self.pubs 1143 elif reg_type == Registration.SUB: 1144 rmap = self.subs 1145 else: 1146 raise TypeError("invalid reg_type: %s"%s) 1147 return rmap.get(resolved_name, None)
1148
1149 - def acquire_impl(self, reg_type, resolved_name, data_class):
1150 """ 1151 Acquire a L{_TopicImpl} for the specified topic (create one if it 1152 doesn't exist). Every L{Topic} instance has a _TopicImpl that 1153 actually controls the topic resources so that multiple Topic 1154 instances use the same underlying connections. 'Acquiring' a 1155 topic implementation marks that another Topic instance is 1156 using the TopicImpl. 1157 1158 @param resolved_name: resolved topic name 1159 @type resolved_name: str 1160 1161 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1162 @type reg_type: str 1163 1164 @param data_class: message class for topic 1165 @type data_class: L{Message} class 1166 """ 1167 if reg_type == Registration.PUB: 1168 rmap = self.pubs 1169 impl_class = _PublisherImpl 1170 elif reg_type == Registration.SUB: 1171 rmap = self.subs 1172 impl_class = _SubscriberImpl 1173 else: 1174 raise TypeError("invalid reg_type: %s"%s) 1175 with self.lock: 1176 impl = rmap.get(resolved_name, None) 1177 if not impl: 1178 impl = impl_class(resolved_name, data_class) 1179 self._add(impl, rmap, reg_type) 1180 impl.ref_count += 1 1181 return impl
1182
1183 - def release_impl(self, reg_type, resolved_name):
1184 """ 1185 Release a L_{TopicImpl} for the specified topic. 1186 1187 Every L{Topic} instance has a _TopicImpl that actually 1188 controls the topic resources so that multiple Topic instances 1189 use the same underlying connections. 'Acquiring' a topic 1190 implementation marks that another Topic instance is using the 1191 TopicImpl. 1192 1193 @param resolved_name: resolved topic name 1194 @type resolved_name: str 1195 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1196 @type reg_type: str 1197 """ 1198 if reg_type == Registration.PUB: 1199 rmap = self.pubs 1200 else: 1201 rmap = self.subs 1202 with self.lock: 1203 # check for race condition where multiple things are cleaning up at once 1204 if self.closed: 1205 return 1206 impl = rmap.get(resolved_name, None) 1207 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name 1208 impl.ref_count -= 1 1209 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero" 1210 if impl.ref_count == 0: 1211 rospyinfo("topic impl's ref count is zero, deleting topic %s...", resolved_name) 1212 impl.close() 1213 self._remove(impl, rmap, reg_type) 1214 del impl 1215 _logger.debug("... done deleting topic %s", resolved_name)
1216
1217 - def get_publisher_impl(self, resolved_name):
1218 """ 1219 @param resolved_name: resolved topic name 1220 @type resolved_name: str 1221 @return: list of L{_PublisherImpl}s 1222 @rtype: [L{_PublisherImpl}] 1223 """ 1224 return self.pubs.get(resolved_name, None)
1225
1226 - def get_subscriber_impl(self, resolved_name):
1227 """ 1228 @param resolved_name: topic name 1229 @type resolved_name: str 1230 @return: subscriber for the specified topic. 1231 @rtype: L{_SubscriberImpl} 1232 """ 1233 return self.subs.get(resolved_name, None)
1234
1235 - def has_subscription(self, resolved_name):
1236 """ 1237 @param resolved_name: resolved topic name 1238 @type resolved_name: str 1239 @return: True if manager has subscription for specified topic 1240 @rtype: bool 1241 """ 1242 return resolved_name in self.subs
1243
1244 - def has_publication(self, resolved_name):
1245 """ 1246 @param resolved_name: resolved topic name 1247 @type resolved_name: str 1248 @return: True if manager has publication for specified topic 1249 @rtype: bool 1250 """ 1251 return resolved_name in self.pubs
1252
1253 - def get_topics(self):
1254 """ 1255 @return: list of topic names this node subscribes to/publishes 1256 @rtype: [str] 1257 """ 1258 return self.topics
1259
1260 - def _get_list(self, rmap):
1261 return [[k, v.type] for k, v in rmap.items()]
1262 1263 ## @return [[str,str],]: list of topics subscribed to by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1264 - def get_subscriptions(self):
1265 return self._get_list(self.subs)
1266 1267 ## @return [[str,str],]: list of topics published by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1268 - def get_publications(self):
1269 return self._get_list(self.pubs)
1270 1271 set_topic_manager(_TopicManager()) 1272