Package rospy :: Module topics

Source Code for Module rospy.topics

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2008, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32  # 
  33  # Revision $Id$ 
  34   
  35  """ 
  36  rospy implementation of topics. 
  37   
  38  Client API 
  39  ========== 
  40   
  41  L{Publisher} and L{Subscriber} are the client API for topics. 
  42   
  43  Internal Implementation 
  44  ======================= 
  45   
  46  Internally, L{_TopicImpl} instances (_PublisherImpl/_SubscriberImpl) 
  47  are used to manage actual transport connections.  The L{_TopicManager} 
  48  is responsible for tracking the system-wide state of publications and 
  49  subscriptions as well as the L{_TopicImpl} instances. More info is below. 
  50    
  51  L{_TopicManager} 
  52  ================ 
  53   
  54  The L{_TopicManager} does the backend topic bookkeeping for the local 
  55  node.  Use L{get_topic_manager()} to access singleton. Actual topic 
  56  implementations are done through the 
  57  L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client 
  58  code generates instances of type L{Publisher}/L{Subscriber}, which 
  59  enable to client to create multiple publishers/subscribers of that 
  60  topic that get controlled access to the underlying share connections. 
  61   
  62  Common parent classes for all rospy topics. The rospy topic autogenerators 
  63  create classes that are children of these implementations. 
  64  """ 
  65   
  66   
  67  import struct 
  68  import select 
  69  try: 
  70      from cStringIO import StringIO #Python 2.x 
  71      python3 = 0 
72 - def isstring(s):
73 return isinstance(s, basestring) #Python 2.x
74 except ImportError: 75 python3 = 1 76 from io import StringIO, BytesIO #Python 3.x
77 - def isstring(s):
78 return isinstance(s, str) #Python 3.x
79 80 import threading 81 import logging 82 import time 83 84 from itertools import chain 85 import traceback 86 87 import rosgraph.names 88 89 from rospy.core import * 90 from rospy.exceptions import ROSSerializationException, TransportTerminated 91 from rospy.msg import serialize_message, args_kwds_to_message 92 93 from rospy.impl.statistics import SubscriberStatisticsLogger 94 95 from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners 96 from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE 97 from rospy.impl.tcpros_pubsub import QueuedConnection 98 99 _logger = logging.getLogger('rospy.topics') 100 101 # wrap genpy implementation and map it to rospy namespace 102 import genpy 103 Message = genpy.Message 104 105 ####################################################################### 106 # Base classes for all client-API instantiated pub/sub 107 # 108 # There are two trees: Topic and _TopicImpl. Topic is the client API 109 # for interfacing with topics, while _TopicImpl implements the 110 # underlying connection details. 111 112 if not hasattr(select, 'EPOLLRDHUP'): 113 select.EPOLLRDHUP = 0x2000 114 115
116 -class Topic(object):
117 """Base class of L{Publisher} and L{Subscriber}""" 118
119 - def __init__(self, name, data_class, reg_type):
120 """ 121 @param name: graph resource name of topic, e.g. 'laser'. 122 @type name: str 123 @param data_class: message class for serialization 124 @type data_class: L{Message} 125 @param reg_type Registration.PUB or Registration.SUB 126 @type reg_type: str 127 @raise ValueError: if parameters are invalid 128 """ 129 130 if not name or not isstring(name): 131 raise ValueError("topic name is not a non-empty string") 132 try: 133 if python3 == 1: 134 name.encode("utf-8") 135 else: 136 name = name.encode("utf-8") 137 except UnicodeError: 138 raise ValueError("topic name must be ascii/utf-8 compatible") 139 if data_class is None: 140 raise ValueError("topic parameter 'data_class' is not initialized") 141 if not type(data_class) == type: 142 raise ValueError("data_class [%s] is not a class"%data_class) 143 if not issubclass(data_class, genpy.Message): 144 raise ValueError("data_class [%s] is not a message data class"%data_class.__class__.__name__) 145 # #2202 146 if not rosgraph.names.is_legal_name(name): 147 import warnings 148 warnings.warn("'%s' is not a legal ROS graph resource name. This may cause problems with other ROS tools"%name, stacklevel=2) 149 150 # this is a bit ugly, but necessary due to the fact that we allow 151 # topics and services to be initialized before the node 152 if not rospy.core.is_initialized(): 153 self.resolved_name = rospy.names.resolve_name_without_node_name(name) 154 else: 155 # init_node() has been called, so we can do normal resolution 156 self.resolved_name = resolve_name(name) 157 158 self.name = self.resolved_name # #1810 for backwards compatibility 159 160 self.data_class = data_class 161 self.type = data_class._type 162 self.md5sum = data_class._md5sum 163 self.reg_type = reg_type 164 self.impl = get_topic_manager().acquire_impl(reg_type, self.resolved_name, data_class)
165
166 - def get_num_connections(self):
167 """ 168 get the number of connections to other ROS nodes for this topic. For a Publisher, 169 this corresponds to the number of nodes subscribing. For a Subscriber, the number 170 of publishers. 171 @return: number of connections 172 @rtype: int 173 """ 174 return self.impl.get_num_connections()
175
176 - def unregister(self):
177 """ 178 unpublish/unsubscribe from topic. Topic instance is no longer 179 valid after this call. Additional calls to unregister() have no effect. 180 """ 181 # as we don't guard unregister, have to protect value of 182 # resolved_name for release_impl call 183 resolved_name = self.resolved_name 184 if resolved_name and self.impl: 185 get_topic_manager().release_impl(self.reg_type, resolved_name) 186 self.impl = self.resolved_name = self.type = self.md5sum = self.data_class = None
187 188 # #3808
189 -class Poller(object):
190 """ 191 select.poll/kqueue abstraction to handle socket failure detection 192 on multiple platforms. NOT thread-safe. 193 """
194 - def __init__(self):
195 if hasattr(select, 'epoll'): 196 self.poller = select.epoll() 197 self.add_fd = self.add_epoll 198 self.remove_fd = self.remove_epoll 199 self.error_iter = self.error_epoll_iter 200 elif hasattr(select, 'poll'): 201 self.poller = select.poll() 202 self.add_fd = self.add_poll 203 self.remove_fd = self.remove_poll 204 self.error_iter = self.error_poll_iter 205 elif hasattr(select, 'kqueue'): 206 self.poller = select.kqueue() 207 self.add_fd = self.add_kqueue 208 self.remove_fd = self.remove_kqueue 209 self.error_iter = self.error_kqueue_iter 210 self.kevents = [] 211 else: 212 #TODO: non-Noop impl for Windows 213 self.poller = self.noop 214 self.add_fd = self.noop 215 self.remove_fd = self.noop 216 self.error_iter = self.noop_iter
217
218 - def noop(self, *args):
219 pass
220
221 - def noop_iter(self):
222 empty_generator = (x for x in list()) 223 for x in empty_generator: 224 yield x
225
226 - def add_poll(self, fd):
227 self.poller.register(fd)
228
229 - def remove_poll(self, fd):
230 self.poller.unregister(fd)
231
232 - def error_poll_iter(self):
233 events = self.poller.poll(0) 234 for fd, event in events: 235 if event & (select.POLLHUP | select.POLLERR): 236 yield fd
237
238 - def add_epoll(self, fd):
239 self.poller.register(fd, select.EPOLLHUP|select.EPOLLERR|select.EPOLLRDHUP)
240
241 - def remove_epoll(self, fd):
242 self.poller.unregister(fd)
243
244 - def error_epoll_iter(self):
245 events = self.poller.poll(0) 246 for fd, event in events: 247 if event & (select.EPOLLHUP | select.EPOLLERR | select.EPOLLRDHUP): 248 yield fd
249
250 - def add_kqueue(self, fd):
251 self.kevents.append(select.kevent(fd))
252
253 - def error_kqueue_iter(self):
254 events = self.poller.control(self.kevents, 0) 255 for event in events: 256 if event & (select.KQ_EV_ERROR | select.KQ_EV_EOF): 257 yield event.ident
258
259 - def remove_kqueue(self, fd):
260 e = [x for x in self.kevents if x.ident == fd] 261 for x in e: 262 self.kevents.remove(x)
263
264 -class _TopicImpl(object):
265 """ 266 Base class of internal topic implementations. Each topic has a 267 singleton _TopicImpl implementation for managing the underlying 268 connections. 269 """ 270
271 - def __init__(self, name, data_class):
272 """ 273 Base constructor 274 @param name: graph resource name of topic, e.g. 'laser'. 275 @type name: str 276 @param data_class: message data class 277 @type data_class: L{Message} 278 """ 279 280 # #1810 made resolved/unresolved more explicit so we don't accidentally double-resolve 281 self.resolved_name = resolve_name(name) #NOTE: remapping occurs here! 282 self.name = self.resolved_name # for backwards compatibility 283 284 self.data_class = data_class 285 self.type = data_class._type 286 self.handler = None 287 self.seq = 0 288 # lock is used for to serialize call order that methods that 289 # modify self.connections. Because add/removing connections is 290 # a rare event, we go through the extra hassle of making a 291 # copy of the connections/callbacks lists 292 # when modifying, then setting the reference to the new copy. 293 # With this pattern, other code can use these lists without 294 # having to acquire c_lock 295 self.c_lock = threading.RLock() 296 self.connections = [] 297 self.closed = False 298 # number of Topic instances using this 299 self.ref_count = 0 300 301 self.connection_poll = Poller()
302
303 - def __del__(self):
304 # very similar to close(), but have to be more careful in a __del__ what we call 305 if self.closed: 306 return 307 if self.connections is not None: 308 for c in self.connections: 309 try: 310 c.close() 311 except: 312 pass 313 del self.connections[:] 314 self.c_lock = self.connections = self.handler = self.data_class = self.type = None
315
316 - def close(self):
317 """close I/O""" 318 if self.closed: 319 return 320 self.closed = True 321 if self.c_lock is not None: 322 with self.c_lock: 323 for c in self.connections: 324 try: 325 if c is not None: 326 c.close() 327 except: 328 # seems more logger.error internal than external logerr 329 _logger.error(traceback.format_exc()) 330 del self.connections[:] 331 self.handler = None
332
333 - def get_num_connections(self):
334 with self.c_lock: 335 return len(self.connections)
336
337 - def has_connection(self, endpoint_id):
338 """ 339 Query whether or not a connection with the associated \a 340 endpoint has been added to this object. 341 @param endpoint_id: endpoint ID associated with connection. 342 @type endpoint_id: str 343 """ 344 # save reference to avoid lock 345 conn = self.connections 346 for c in conn: 347 if c.endpoint_id == endpoint_id: 348 return True 349 return False
350
351 - def has_connections(self):
352 """ 353 Check to see if this topic is connected to other publishers/subscribers 354 @return: True if topic is connected 355 @rtype: bool 356 """ 357 if self.connections: 358 return True 359 return False
360
361 - def _remove_connection(self, connections, c):
362 # Remove from poll instance as well as connections 363 try: 364 self.connection_poll.remove_fd(c.fileno()) 365 except: 366 pass 367 try: 368 # not necessarily correct from an abstraction point of 369 # view, but will prevent accident connection leaks 370 c.close() 371 except: 372 pass 373 # while c might be a rospy.impl.tcpros_base.TCPROSTransport instance 374 # connections might only contain the rospy.impl.tcpros_pubsub.QueuedConnection proxy 375 # finding the "right" connection is more difficult then 376 if c in connections: 377 connections.remove(c) 378 # therefore additionally check for fileno equality if available 379 elif c.fileno(): 380 matching_connections = [ 381 conn for conn in connections if conn.fileno() == c.fileno()] 382 if len(matching_connections) == 1: 383 connections.remove(matching_connections[0])
384
385 - def add_connection(self, c):
386 """ 387 Add a connection to this topic. If any previous connections 388 to same endpoint exist, drop them. 389 390 @param c: connection instance 391 @type c: Transport 392 @return: True if connection was added, ``bool`` 393 """ 394 rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections))) 395 with self.c_lock: 396 # protect against race condition adding connection to closed sub 397 if self.closed: 398 rospyerr( 399 "ERROR: Race condition failure adding connection to closed subscriber\n" 400 "If you run into this please comment on " 401 "https://github.com/ros/ros_comm/issues/544" 402 ) 403 return False 404 405 # c_lock is to make add_connection thread-safe, but we 406 # still make a copy of self.connections so that the rest of the 407 # code can use self.connections in an unlocked manner 408 new_connections = self.connections[:] 409 410 # if we have a connection to the same endpoint_id, drop 411 # the old one. 412 for oldc in self.connections: 413 if oldc.endpoint_id == c.endpoint_id: 414 self._remove_connection(new_connections, oldc) 415 416 # #3808: "garbage collect" bad sockets whenever we add new 417 # connections. This allows at most one stale connection 418 # per topic. Otherwise, we only detect bad connections on 419 # write. An alternative (given the current 420 # implementation) would be to start a thread that 421 # regularly polls all fds, but that would create a lot of 422 # synchronization events and also have a separate thread 423 # to manage. It would be desireable to move to this, but 424 # this change is less impactful and keeps the codebase 425 # more stable as we move towards an entirely different 426 # event loop for rospy -- the heart of the problem is that 427 # rospy's i/o is blocking-based, which has the obvious 428 # issues. 429 430 for fd in self.connection_poll.error_iter(): 431 to_remove = [x for x in new_connections if x.fileno() == fd] 432 for x in to_remove: 433 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id)) 434 self._remove_connection(new_connections, x) 435 436 # Add new connection to poller, register for all events, 437 # though we only care about POLLHUP/ERR 438 new_fd = c.fileno() 439 if new_fd is not None: 440 self.connection_poll.add_fd(new_fd) 441 442 # add in new connection 443 new_connections.append(c) 444 445 self.connections = new_connections 446 447 # connections make a callback when closed 448 # don't clobber an existing callback 449 if not c.cleanup_cb: 450 c.set_cleanup_callback(self.remove_connection) 451 else: 452 previous_callback = c.cleanup_cb 453 new_callback = self.remove_connection 454 def cleanup_cb_wrapper(s): 455 new_callback(s) 456 previous_callback(s)
457 c.set_cleanup_callback(cleanup_cb_wrapper) 458 459 return True
460
461 - def check(self):
462 fds_to_remove = list(self.connection_poll.error_iter()) 463 if fds_to_remove: 464 with self.c_lock: 465 new_connections = self.connections[:] 466 to_remove = [x for x in new_connections if x.fileno() in fds_to_remove] 467 for x in to_remove: 468 rospydebug("removing connection to %s, connection error detected"%(x.endpoint_id)) 469 self._remove_connection(new_connections, x) 470 self.connections = new_connections
471
472 - def remove_connection(self, c):
473 """ 474 Remove connection from topic. 475 @param c: connection instance to remove 476 @type c: Transport 477 """ 478 rospyinfo("topic[%s] removing connection to %s"%(self.resolved_name, c.endpoint_id)) 479 with self.c_lock: 480 # c_lock is to make remove_connection thread-safe, but we 481 # still make a copy of self.connections so that the rest of the 482 # code can use self.connections in an unlocked manner 483 new_connections = self.connections[:] 484 self._remove_connection(new_connections, c) 485 self.connections = new_connections
486
487 - def get_stats_info(self): # STATS
488 """ 489 Get the stats for this topic 490 @return: stats for topic in getBusInfo() format:: 491 Publisher: 492 ((connection_id, destination_caller_id, direction, transport, topic_name, connected, connection_info_string)*) 493 Subscriber: 494 ((connection_id, publisher_xmlrpc_uri, direction, transport, topic_name, connected, connection_info_string)*) 495 @rtype: list 496 """ 497 # save referenceto avoid locking 498 connections = self.connections 499 return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.resolved_name, True, c.get_transport_info()) for c in connections] 500
501 - def get_stats(self): # STATS
502 """Get the stats for this topic (API stub)""" 503 raise Exception("subclasses must override") 504 505 # Implementation note: Subscriber attaches to a _SubscriberImpl 506 # singleton for that topic. The underlying impl manages the 507 # connections for that publication and enables thread-safe access 508
509 -class Subscriber(Topic):
510 """ 511 Class for registering as a subscriber to a specified topic, where 512 the messages are of a given type. 513 """
514 - def __init__(self, name, data_class, callback=None, callback_args=None, 515 queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
516 """ 517 Constructor. 518 519 NOTE: for the queue_size and buff_size 520 parameters, rospy does not attempt to do intelligent merging 521 between multiple Subscriber instances for the same topic. As 522 they share the same underlying transport, multiple Subscribers 523 to the same topic can conflict with one another if they set 524 these parameters differently. 525 526 @param name: graph resource name of topic, e.g. 'laser'. 527 @type name: str 528 @param data_class: data type class to use for messages, 529 e.g. std_msgs.msg.String 530 @type data_class: L{Message} class 531 @param callback: function to call ( fn(data)) when data is 532 received. If callback_args is set, the function must accept 533 the callback_args as a second argument, i.e. fn(data, 534 callback_args). NOTE: Additional callbacks can be added using 535 add_callback(). 536 @type callback: fn(msg, cb_args) 537 @param callback_args: additional arguments to pass to the 538 callback. This is useful when you wish to reuse the same 539 callback for multiple subscriptions. 540 @type callback_args: any 541 @param queue_size: maximum number of messages to receive at 542 a time. This will generally be 1 or None (infinite, 543 default). buff_size should be increased if this parameter 544 is set as incoming data still needs to sit in the incoming 545 buffer before being discarded. Setting queue_size 546 buff_size to a non-default value affects all subscribers to 547 this topic in this process. 548 @type queue_size: int 549 @param buff_size: incoming message buffer size in bytes. If 550 queue_size is set, this should be set to a number greater 551 than the queue_size times the average message size. Setting 552 buff_size to a non-default value affects all subscribers to 553 this topic in this process. 554 @type buff_size: int 555 @param tcp_nodelay: if True, request TCP_NODELAY from 556 publisher. Use of this option is not generally recommended 557 in most cases as it is better to rely on timestamps in 558 message data. Setting tcp_nodelay to True enables TCP_NODELAY 559 for all subscribers in the same python process. 560 @type tcp_nodelay: bool 561 @raise ROSException: if parameters are invalid 562 """ 563 super(Subscriber, self).__init__(name, data_class, Registration.SUB) 564 #add in args that factory cannot pass in 565 566 # last person to set these to non-defaults wins, not much way 567 # around this 568 if queue_size is not None: 569 self.impl.set_queue_size(queue_size) 570 if buff_size != DEFAULT_BUFF_SIZE: 571 self.impl.set_buff_size(buff_size) 572 573 if callback is not None: 574 # #1852 575 # it's important that we call add_callback so that the 576 # callback can be invoked with any latched messages 577 self.impl.add_callback(callback, callback_args) 578 # save arguments for unregister 579 self.callback = callback 580 self.callback_args = callback_args 581 else: 582 # initialize fields 583 self.callback = self.callback_args = None 584 if tcp_nodelay: 585 self.impl.set_tcp_nodelay(tcp_nodelay)
586
587 - def unregister(self):
588 """ 589 unpublish/unsubscribe from topic. Topic instance is no longer 590 valid after this call. Additional calls to unregister() have no effect. 591 """ 592 if self.impl: 593 # It's possible to have a Subscriber instance with no 594 # associated callback 595 if self.callback is not None: 596 self.impl.remove_callback(self.callback, self.callback_args) 597 self.callback = self.callback_args = None 598 super(Subscriber, self).unregister()
599
600 -class _SubscriberImpl(_TopicImpl):
601 """ 602 Underyling L{_TopicImpl} implementation for subscriptions. 603 """
604 - def __init__(self, name, data_class):
605 """ 606 ctor. 607 @param name: graph resource name of topic, e.g. 'laser'. 608 @type name: str 609 @param data_class: Message data class 610 @type data_class: L{Message} class 611 """ 612 super(_SubscriberImpl, self).__init__(name, data_class) 613 # client-methods to invoke on new messages. should only modify 614 # under lock. This is a list of 2-tuples (fn, args), where 615 # args are additional arguments for the callback, or None 616 self.callbacks = [] 617 self.queue_size = None 618 self.buff_size = DEFAULT_BUFF_SIZE 619 self.tcp_nodelay = False 620 self.statistics_logger = SubscriberStatisticsLogger(self) \ 621 if SubscriberStatisticsLogger.is_enabled() \ 622 else None
623
624 - def close(self):
625 """close I/O and release resources""" 626 _TopicImpl.close(self) 627 if self.callbacks: 628 del self.callbacks[:] 629 self.callbacks = None 630 if self.statistics_logger: 631 self.statistics_logger.shutdown() 632 self.statistics_logger = None
633
634 - def set_tcp_nodelay(self, tcp_nodelay):
635 """ 636 Set the value of TCP_NODELAY, which causes the Nagle algorithm 637 to be disabled for future topic connections, if the publisher 638 supports it. 639 """ 640 self.tcp_nodelay = tcp_nodelay
641
642 - def set_queue_size(self, queue_size):
643 """ 644 Set the receive queue size. If more than queue_size messages 645 are waiting to be deserialized, they are discarded. 646 647 @param queue_size int: incoming queue size. Must be positive integer or None. 648 @type queue_size: int 649 """ 650 if queue_size == -1: 651 self.queue_size = None 652 elif queue_size == 0: 653 raise ROSException("queue size may not be set to zero") 654 elif queue_size is not None and type(queue_size) != int: 655 raise ROSException("queue size must be an integer") 656 else: 657 self.queue_size = queue_size
658
659 - def set_buff_size(self, buff_size):
660 """ 661 Set the receive buffer size. The exact meaning of this is 662 transport dependent. 663 @param buff_size: receive buffer size 664 @type buff_size: int 665 """ 666 if type(buff_size) != int: 667 raise ROSException("buffer size must be an integer") 668 elif buff_size <= 0: 669 raise ROSException("buffer size must be a positive integer") 670 self.buff_size = buff_size
671
672 - def get_stats(self): # STATS
673 """ 674 Get the stats for this topic subscriber 675 @return: stats for topic in getBusStats() publisher format:: 676 (topicName, connStats) 677 where connStats is:: 678 [connectionId, bytesReceived, numSent, dropEstimate, connected]* 679 @rtype: list 680 """ 681 # save reference to avoid locking 682 conn = self.connections 683 #for now drop estimate is -1 684 stats = (self.resolved_name, 685 [(c.id, c.stat_bytes, c.stat_num_msg, -1, not c.done) 686 for c in conn] ) 687 return stats
688
689 - def add_callback(self, cb, cb_args):
690 """ 691 Register a callback to be invoked whenever a new message is received 692 @param cb: callback function to invoke with message data 693 instance, i.e. fn(data). If callback args is set, they will 694 be passed in as the second argument. 695 @type cb: fn(msg, cb_args) 696 @param cb_cargs: additional arguments to pass to callback 697 @type cb_cargs: Any 698 """ 699 if self.closed: 700 raise ROSException("subscriber [%s] has been closed"%(self.resolved_name)) 701 with self.c_lock: 702 # we lock in order to serialize calls to add_callback, but 703 # we copy self.callbacks so we can it 704 new_callbacks = self.callbacks[:] 705 new_callbacks.append((cb, cb_args)) 706 self.callbacks = new_callbacks 707 708 # #1852: invoke callback with any latched messages 709 for c in self.connections: 710 if c.latch is not None: 711 self._invoke_callback(c.latch, cb, cb_args)
712
713 - def remove_callback(self, cb, cb_args):
714 """ 715 Unregister a message callback. 716 @param cb: callback function 717 @type cb: fn(msg, cb_args) 718 @param cb_cargs: additional arguments associated with callback 719 @type cb_cargs: Any 720 @raise KeyError: if no matching callback 721 """ 722 if self.closed: 723 return 724 with self.c_lock: 725 # we lock in order to serialize calls to add_callback, but 726 # we copy self.callbacks so we can it 727 matches = [x for x in self.callbacks if x[0] == cb and x[1] == cb_args] 728 if matches: 729 new_callbacks = self.callbacks[:] 730 # remove the first match 731 new_callbacks.remove(matches[0]) 732 self.callbacks = new_callbacks 733 if not matches: 734 raise KeyError("no matching cb")
735
736 - def _invoke_callback(self, msg, cb, cb_args):
737 """ 738 Invoke callback on msg. Traps and logs any exceptions raise by callback 739 @param msg: message data 740 @type msg: L{Message} 741 @param cb: callback 742 @type cb: fn(msg, cb_args) 743 @param cb_args: callback args or None 744 @type cb_args: Any 745 """ 746 try: 747 if cb_args is not None: 748 cb(msg, cb_args) 749 else: 750 cb(msg) 751 except Exception as e: 752 if not is_shutdown(): 753 logerr("bad callback: %s\n%s"%(cb, traceback.format_exc())) 754 else: 755 _logger.warn("during shutdown, bad callback: %s\n%s"%(cb, traceback.format_exc()))
756
757 - def receive_callback(self, msgs, connection):
758 """ 759 Called by underlying connection transport for each new message received 760 @param msgs: message data 761 @type msgs: [L{Message}] 762 """ 763 # save reference to avoid lock 764 callbacks = self.callbacks 765 for msg in msgs: 766 if self.statistics_logger: 767 self.statistics_logger.callback(msg, connection.callerid_pub, connection.stat_bytes) 768 for cb, cb_args in callbacks: 769 self._invoke_callback(msg, cb, cb_args)
770
771 -class SubscribeListener(object):
772 """ 773 Callback API to receive notifications when new subscribers 774 connect and disconnect. 775 """ 776
777 - def peer_subscribe(self, topic_name, topic_publish, peer_publish):
778 """ 779 callback when a peer has subscribed from a topic 780 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 781 @type topic_name: str 782 @param topic_publish: method to publish message data to all subscribers 783 @type topic_publish: fn(data) 784 @param peer_publish: method to publish message data to 785 new subscriber. NOTE: behavior for the latter is 786 transport-dependent as some transports may be broadcast only. 787 @type peer_publish: fn(data) 788 """ 789 pass
790
791 - def peer_unsubscribe(self, topic_name, num_peers):
792 """ 793 callback when a peer has unsubscribed from a topic 794 @param topic_name: topic name. NOTE: topic name will be resolved/remapped 795 @type topic_name: str 796 @param num_peers: number of remaining peers subscribed to topic 797 @type num_peers: int 798 """ 799 pass
800 801 802 # Implementation note: Publisher attaches to a 803 # _PublisherImpl singleton for that topic. The underlying impl 804 # manages the connections for that publication and enables 805 # thread-safe access 806
807 -class Publisher(Topic):
808 """ 809 Class for registering as a publisher of a ROS topic. 810 """ 811
812 - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None):
813 """ 814 Constructor 815 @param name: resource name of topic, e.g. 'laser'. 816 @type name: str 817 @param data_class: message class for serialization 818 @type data_class: L{Message} class 819 @param subscriber_listener: listener for 820 subscription events. May be None. 821 @type subscriber_listener: L{SubscribeListener} 822 @param tcp_nodelay: If True, sets TCP_NODELAY on 823 publisher's socket (disables Nagle algorithm). This results 824 in lower latency publishing at the cost of efficiency. 825 @type tcp_nodelay: bool 826 @param latch: If True, the last message published is 827 'latched', meaning that any future subscribers will be sent 828 that message immediately upon connection. 829 @type latch: bool 830 @param headers: If not None, a dictionary with additional header 831 key-values being used for future connections. 832 @type headers: dict 833 @param queue_size: The queue size used for asynchronously 834 publishing messages from different threads. A size of zero 835 means an infinite queue, which can be dangerous. When the 836 keyword is not being used or when None is passed all 837 publishing will happen synchronously and a warning message 838 will be printed. 839 @type queue_size: int 840 @raise ROSException: if parameters are invalid 841 """ 842 super(Publisher, self).__init__(name, data_class, Registration.PUB) 843 844 if subscriber_listener: 845 self.impl.add_subscriber_listener(subscriber_listener) 846 if tcp_nodelay: 847 get_tcpros_handler().set_tcp_nodelay(self.resolved_name, tcp_nodelay) 848 if latch: 849 self.impl.enable_latch() 850 if headers: 851 self.impl.add_headers(headers) 852 if queue_size is not None: 853 self.impl.set_queue_size(queue_size) 854 else: 855 import warnings 856 warnings.warn("The publisher should be created with an explicit keyword argument 'queue_size'. " 857 "Please see http://wiki.ros.org/rospy/Overview/Publishers%20and%20Subscribers for more information.", SyntaxWarning, stacklevel=2)
858
859 - def publish(self, *args, **kwds):
860 """ 861 Publish message data object to this topic. 862 Publish can either be called with the message instance to 863 publish or with the constructor args for a new Message 864 instance, i.e.:: 865 pub.publish(message_instance) 866 pub.publish(message_field_1, message_field_2...) 867 pub.publish(message_field_1='foo', message_field_2='bar') 868 869 @param args : L{Message} instance, message arguments, or no args if keyword arguments are used 870 @param kwds : Message keyword arguments. If kwds are used, args must be unset 871 @raise ROSException: If rospy node has not been initialized 872 @raise ROSSerializationException: If unable to serialize 873 message. This is usually a type error with one of the fields. 874 """ 875 if self.impl is None: 876 raise ROSException("publish() to an unregistered() handle") 877 if not is_initialized(): 878 raise ROSException("ROS node has not been initialized yet. Please call init_node() first") 879 data = args_kwds_to_message(self.data_class, args, kwds) 880 try: 881 self.impl.acquire() 882 self.impl.publish(data) 883 except genpy.SerializationError as e: 884 # can't go to rospy.logerr(), b/c this could potentially recurse 885 _logger.error(traceback.format_exc()) 886 raise ROSSerializationException(str(e)) 887 finally: 888 self.impl.release()
889
890 -class _PublisherImpl(_TopicImpl):
891 """ 892 Underyling L{_TopicImpl} implementation for publishers. 893 """ 894
895 - def __init__(self, name, data_class):
896 """ 897 @param name: name of topic, e.g. 'laser'. 898 @type name: str 899 @param data_class: Message data class 900 @type data_class: L{Message} class 901 """ 902 super(_PublisherImpl, self).__init__(name, data_class) 903 if python3 == 0: 904 self.buff = StringIO() 905 else: 906 self.buff = BytesIO() 907 self.publock = threading.RLock() #for acquire()/release 908 self.subscriber_listeners = [] 909 910 # additional client connection headers 911 self.headers = {} 912 913 # publish latch, starts disabled 914 self.is_latch = False 915 self.latch = None 916 917 # maximum queue size for publishing messages 918 self.queue_size = None 919 920 #STATS 921 self.message_data_sent = 0
922
923 - def close(self):
924 """close I/O and release resources""" 925 _TopicImpl.close(self) 926 # release resources 927 if self.subscriber_listeners: 928 del self.subscriber_listeners[:] 929 if self.headers: 930 self.headers.clear() 931 if self.buff is not None: 932 self.buff.close() 933 self.publock = self.headers = self.buff = self.subscriber_listeners = None
934
935 - def add_headers(self, headers):
936 """ 937 Add connection headers to this Topic for future connections. 938 @param headers: key/values will be added to current connection 939 header set, overriding any existing keys if they conflict. 940 @type headers: dict 941 """ 942 self.headers.update(headers)
943
944 - def enable_latch(self):
945 """ 946 Enable publish() latch. The latch contains the last published 947 message and is sent to any new subscribers. 948 """ 949 self.is_latch = True
950
951 - def set_queue_size(self, queue_size):
952 self.queue_size = queue_size
953
954 - def get_stats(self): # STATS
955 """ 956 Get the stats for this topic publisher 957 @return: stats for topic in getBusStats() publisher format:: 958 [topicName, messageDataBytes, connStats], 959 where connStats is:: 960 [id, bytes, numMessages, connected]* 961 @rtype: list 962 """ 963 # save reference to avoid lock 964 conn = self.connections 965 return (self.resolved_name, self.message_data_sent, 966 [(c.id, c.stat_bytes, c.stat_num_msg, not c.done) for c in conn] )
967
968 - def add_subscriber_listener(self, l):
969 """ 970 Add a L{SubscribeListener} for subscribe events. 971 @param l: listener instance 972 @type l: L{SubscribeListener} 973 """ 974 self.subscriber_listeners.append(l)
975
976 - def acquire(self):
977 """lock for thread-safe publishing to this transport""" 978 if self.publock is not None: 979 self.publock.acquire()
980
981 - def release(self):
982 """lock for thread-safe publishing to this transport""" 983 if self.publock is not None: 984 self.publock.release()
985
986 - def add_connection(self, c):
987 """ 988 Add a connection to this topic. This must be a PubTransport. If 989 the latch is enabled, c will be sent a the value of the 990 latch. 991 @param c: connection instance 992 @type c: L{Transport} 993 @return: True if connection was added 994 @rtype: bool 995 """ 996 if self.queue_size is not None: 997 c = QueuedConnection(c, self.queue_size) 998 super(_PublisherImpl, self).add_connection(c) 999 def publish_single(data): 1000 self.publish(data, connection_override=c)
1001 for l in self.subscriber_listeners: 1002 l.peer_subscribe(self.resolved_name, self.publish, publish_single) 1003 if self.is_latch and self.latch is not None: 1004 with self.publock: 1005 self.publish(self.latch, connection_override=c) 1006 return True 1007
1008 - def remove_connection(self, c):
1009 """ 1010 Remove existing connection from this topic. 1011 @param c: connection instance to remove 1012 @type c: L{Transport} 1013 """ 1014 super(_PublisherImpl, self).remove_connection(c) 1015 num = len(self.connections) 1016 for l in self.subscriber_listeners: 1017 l.peer_unsubscribe(self.resolved_name, num)
1018
1019 - def publish(self, message, connection_override=None):
1020 """ 1021 Publish the data to the topic. If the topic has no subscribers, 1022 the method will return without any affect. Access to publish() 1023 should be locked using acquire() and release() in order to 1024 ensure proper message publish ordering. 1025 1026 @param message: message data instance to publish 1027 @type message: L{Message} 1028 @param connection_override: publish to this connection instead of all 1029 @type connection_override: L{Transport} 1030 @return: True if the data was published, False otherwise. 1031 @rtype: bool 1032 @raise genpy.SerializationError: if L{Message} instance is unable to serialize itself 1033 @raise rospy.ROSException: if topic has been closed or was closed during publish() 1034 """ 1035 #TODO: should really just use IOError instead of rospy.ROSException 1036 1037 if self.closed: 1038 # during shutdown, the topic can get closed, which creates 1039 # a race condition with user code testing is_shutdown 1040 if not is_shutdown(): 1041 raise ROSException("publish() to a closed topic") 1042 else: 1043 return 1044 1045 if self.is_latch: 1046 self.latch = message 1047 1048 if not self.has_connections(): 1049 #publish() falls through 1050 return False 1051 1052 if connection_override is None: 1053 #copy connections so we can iterate safely 1054 conns = self.connections 1055 else: 1056 conns = [connection_override] 1057 1058 # #2128 test our buffer. I don't now how this got closed in 1059 # that case, but we can at least diagnose the problem. 1060 b = self.buff 1061 try: 1062 b.tell() 1063 1064 # serialize the message 1065 self.seq += 1 #count messages published to the topic 1066 serialize_message(b, self.seq, message) 1067 1068 # send the buffer to all connections 1069 err_con = [] 1070 data = b.getvalue() 1071 1072 for c in conns: 1073 try: 1074 if not is_shutdown(): 1075 c.write_data(data) 1076 except TransportTerminated as e: 1077 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 1078 err_con.append(c) 1079 except Exception as e: 1080 # greater severity level 1081 logdebug("publisher connection to [%s] terminated, see errorlog for details:\n%s"%(c.endpoint_id, traceback.format_exc())) 1082 err_con.append(c) 1083 1084 # reset the buffer and update stats 1085 self.message_data_sent += b.tell() #STATS 1086 b.seek(0) 1087 b.truncate(0) 1088 1089 except ValueError: 1090 # operations on self.buff can fail if topic is closed 1091 # during publish, which often happens during Ctrl-C. 1092 # diagnose the error and report accordingly. 1093 if self.closed: 1094 if is_shutdown(): 1095 # we offer no guarantees on publishes that occur 1096 # during shutdown, so this is not exceptional. 1097 return 1098 else: 1099 # this indicates that user-level code most likely 1100 # closed the topic, which is exceptional. 1101 raise ROSException("topic was closed during publish()") 1102 else: 1103 # unexpected, so re-raise original error 1104 raise 1105 1106 # remove any bad connections 1107 for c in err_con: 1108 try: 1109 # connection will callback into remove_connection when 1110 # we close it 1111 c.close() 1112 except: 1113 pass
1114 1115 ################################################################################# 1116 # TOPIC MANAGER/LISTENER 1117
1118 -class _TopicManager(object):
1119 """ 1120 Tracks Topic objects 1121 See L{get_topic_manager()} for singleton access 1122 """ 1123
1124 - def __init__(self):
1125 """ctor.""" 1126 super(_TopicManager, self).__init__() 1127 self.pubs = {} #: { topic: _PublisherImpl } 1128 self.subs = {} #: { topic: _SubscriberImpl } 1129 self.topics = set() # [str] list of topic names 1130 self.lock = threading.Condition() 1131 self.closed = False 1132 _logger.info("topicmanager initialized")
1133
1134 - def get_pub_sub_info(self):
1135 """ 1136 get topic publisher and subscriber connection info for getBusInfo() api 1137 @return: [bus info stats] 1138 See getBusInfo() API for more data structure details. 1139 @rtype: list 1140 """ 1141 with self.lock: 1142 info = [] 1143 for s in chain(iter(self.pubs.values()), iter(self.subs.values())): 1144 info.extend(s.get_stats_info()) 1145 return info
1146
1147 - def get_pub_sub_stats(self):
1148 """ 1149 get topic publisher and subscriber stats for getBusStats() api 1150 @return: [publisherStats, subscriberStats]. 1151 See getBusStats() API for more data structure details. 1152 @rtype: list 1153 """ 1154 with self.lock: 1155 return [s.get_stats() for s in self.pubs.values()],\ 1156 [s.get_stats() for s in self.subs.values()]
1157
1158 - def close_all(self):
1159 """ 1160 Close all registered publication and subscriptions. Manager is 1161 no longer usable after close. 1162 """ 1163 self.closed = True 1164 with self.lock: 1165 for t in chain(iter(self.pubs.values()), iter(self.subs.values())): 1166 t.close() 1167 self.pubs.clear() 1168 self.subs.clear()
1169 1170
1171 - def check_all(self):
1172 """ 1173 Check all registered publication and subscriptions. 1174 """ 1175 with self.lock: 1176 for t in chain(iter(self.pubs.values()), iter(self.subs.values())): 1177 t.check()
1178
1179 - def _add(self, ps, rmap, reg_type):
1180 """ 1181 Add L{_TopicImpl} instance to rmap 1182 @param ps: a pub/sub impl instance 1183 @type ps: L{_TopicImpl} 1184 @param rmap: { topic: _TopicImpl} rmap to record instance in 1185 @type rmap: dict 1186 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1187 @type reg_type: str 1188 """ 1189 resolved_name = ps.resolved_name 1190 _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type) 1191 with self.lock: 1192 rmap[resolved_name] = ps 1193 self.topics.add(resolved_name) 1194 1195 # NOTE: this call can take a lengthy amount of time (at 1196 # least until its reimplemented to use queues) 1197 get_registration_listeners().notify_added(resolved_name, ps.type, reg_type)
1198
1199 - def _recalculate_topics(self):
1200 """recalculate self.topics. expensive""" 1201 self.topics = set([x.resolved_name for x in self.pubs.values()] + 1202 [x.resolved_name for x in self.subs.values()])
1203
1204 - def _remove(self, ps, rmap, reg_type):
1205 """ 1206 Remove L{_TopicImpl} instance from rmap 1207 @param ps: a pub/sub impl instance 1208 @type ps: L{_TopicImpl} 1209 @param rmap: topic->_TopicImpl rmap to remove instance in 1210 @type rmap: dict 1211 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1212 @type reg_type: str 1213 """ 1214 resolved_name = ps.resolved_name 1215 _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type) 1216 with self.lock: 1217 del rmap[resolved_name] 1218 self. _recalculate_topics() 1219 1220 # NOTE: this call can take a lengthy amount of time (at 1221 # least until its reimplemented to use queues) 1222 get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
1223
1224 - def get_impl(self, reg_type, resolved_name):
1225 """ 1226 Get the L{_TopicImpl} for the specified topic. This is mainly for 1227 testing purposes. Unlike acquire_impl, it does not alter the 1228 ref count. 1229 @param resolved_name: resolved topic name 1230 @type resolved_name: str 1231 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1232 @type reg_type: str 1233 """ 1234 if reg_type == Registration.PUB: 1235 rmap = self.pubs 1236 elif reg_type == Registration.SUB: 1237 rmap = self.subs 1238 else: 1239 raise TypeError("invalid reg_type: %s"%s) 1240 return rmap.get(resolved_name, None)
1241
1242 - def acquire_impl(self, reg_type, resolved_name, data_class):
1243 """ 1244 Acquire a L{_TopicImpl} for the specified topic (create one if it 1245 doesn't exist). Every L{Topic} instance has a _TopicImpl that 1246 actually controls the topic resources so that multiple Topic 1247 instances use the same underlying connections. 'Acquiring' a 1248 topic implementation marks that another Topic instance is 1249 using the TopicImpl. 1250 1251 @param resolved_name: resolved topic name 1252 @type resolved_name: str 1253 1254 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1255 @type reg_type: str 1256 1257 @param data_class: message class for topic 1258 @type data_class: L{Message} class 1259 """ 1260 if reg_type == Registration.PUB: 1261 rmap = self.pubs 1262 impl_class = _PublisherImpl 1263 elif reg_type == Registration.SUB: 1264 rmap = self.subs 1265 impl_class = _SubscriberImpl 1266 else: 1267 raise TypeError("invalid reg_type: %s"%s) 1268 with self.lock: 1269 impl = rmap.get(resolved_name, None) 1270 if not impl: 1271 impl = impl_class(resolved_name, data_class) 1272 self._add(impl, rmap, reg_type) 1273 impl.ref_count += 1 1274 return impl
1275
1276 - def release_impl(self, reg_type, resolved_name):
1277 """ 1278 Release a L_{TopicImpl} for the specified topic. 1279 1280 Every L{Topic} instance has a _TopicImpl that actually 1281 controls the topic resources so that multiple Topic instances 1282 use the same underlying connections. 'Acquiring' a topic 1283 implementation marks that another Topic instance is using the 1284 TopicImpl. 1285 1286 @param resolved_name: resolved topic name 1287 @type resolved_name: str 1288 @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} 1289 @type reg_type: str 1290 """ 1291 if reg_type == Registration.PUB: 1292 rmap = self.pubs 1293 else: 1294 rmap = self.subs 1295 with self.lock: 1296 # check for race condition where multiple things are cleaning up at once 1297 if self.closed: 1298 return 1299 impl = rmap.get(resolved_name, None) 1300 assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name 1301 impl.ref_count -= 1 1302 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero" 1303 if impl.ref_count == 0: 1304 rospyinfo("topic impl's ref count is zero, deleting topic %s...", resolved_name) 1305 impl.close() 1306 self._remove(impl, rmap, reg_type) 1307 del impl 1308 _logger.debug("... done deleting topic %s", resolved_name)
1309
1310 - def get_publisher_impl(self, resolved_name):
1311 """ 1312 @param resolved_name: resolved topic name 1313 @type resolved_name: str 1314 @return: list of L{_PublisherImpl}s 1315 @rtype: [L{_PublisherImpl}] 1316 """ 1317 return self.pubs.get(resolved_name, None)
1318
1319 - def get_subscriber_impl(self, resolved_name):
1320 """ 1321 @param resolved_name: topic name 1322 @type resolved_name: str 1323 @return: subscriber for the specified topic. 1324 @rtype: L{_SubscriberImpl} 1325 """ 1326 return self.subs.get(resolved_name, None)
1327
1328 - def has_subscription(self, resolved_name):
1329 """ 1330 @param resolved_name: resolved topic name 1331 @type resolved_name: str 1332 @return: True if manager has subscription for specified topic 1333 @rtype: bool 1334 """ 1335 return resolved_name in self.subs
1336
1337 - def has_publication(self, resolved_name):
1338 """ 1339 @param resolved_name: resolved topic name 1340 @type resolved_name: str 1341 @return: True if manager has publication for specified topic 1342 @rtype: bool 1343 """ 1344 return resolved_name in self.pubs
1345
1346 - def get_topics(self):
1347 """ 1348 @return: list of topic names this node subscribes to/publishes 1349 @rtype: [str] 1350 """ 1351 return self.topics.copy()
1352
1353 - def _get_list(self, rmap):
1354 return [[k, v.type] for k, v in rmap.items()]
1355 1356 ## @return [[str,str],]: list of topics subscribed to by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1357 - def get_subscriptions(self):
1358 return self._get_list(self.subs)
1359 1360 ## @return [[str,str],]: list of topics published by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
1361 - def get_publications(self):
1362 return self._get_list(self.pubs)
1363 1364 set_topic_manager(_TopicManager()) 1365