Source code for rocon_python_comms.subscriber_proxy

#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: subscriber_proxy
   :platform: Unix
   :synopsis: Request-response style communication with a latched publisher.


This module provides a means of interacting with a ros latched publisher
in the same style as you would a ros service (request-response).

----

"""
##############################################################################
# Imports
##############################################################################

import time
import rospy
import threading

##############################################################################
# Subscriber Proxy
##############################################################################


[docs]class SubscriberProxy(): ''' Works like a service proxy, but using a latched subscriber instead (regular subscribers will also work, but this is especially useful for latched subscribers since they typically always provide data). If no timeout is specified when calling, it blocks indefinitely on a 100ms loop until a message arrives. Alternatively it will return with None if a specified timeout is reached. **Usage:** .. code-block:: python from rocon_python_comms import SubscriberProxy try: gateway_info = SubscriberProxy('gateway_info', gateway_msgs.GatewayInfo)(rospy.Duration(0.5)) if gateway_info is not None: # do something except rospy.exceptions.ROSInterruptException: # make sure to handle a Ros shutdown # react something :todo: upgrade to make use of python events instead of manual loops '''
[docs] def __init__(self, topic, msg_type): ''' :param str topic: the topic name to subscriber to :param str msg_type: any ros message type (e.g. std_msgs/String) ''' self._data = None self._lock = threading.Lock() self._subscriber = rospy.Subscriber(topic, msg_type, self._callback)
[docs] def __call__(self, timeout=None): ''' Returns immediately with the latest data or blocks to a timeout/indefinitely until the next data arrives. :param rospy.Duration timeout: time to wait for data, polling at 10Hz (None = /infty) :returns: msg type data or None :rtype: same as the msg type specified in the arg or None :returns: latest data or None ''' if timeout is not None: # everything in floating point calculations timeout_time = time.time() + timeout.to_sec() with self._lock: data = self._data while not rospy.is_shutdown() and data is None: rospy.rostime.wallsleep(0.1) if timeout is not None: if time.time() > timeout_time: return None # check to see if there is new data with self._lock: data = self._data return data
[docs] def wait_for_next(self, timeout=None): ''' Makes sure any current data is cleared and waits for new data. :param rospy.Duration timeout: time to wait for data, polling at 10Hz. :returns: latest data or None ''' self._data = None return self.__call__(timeout)
[docs] def wait_for_publishers(self): ''' Blocks until publishers are seen. :raises: rospy.ROSInterruptException if we are in shutdown. ''' r = rospy.Rate(10) while not rospy.is_shutdown(): if self._subscriber.get_num_connections() != 0: return else: r.sleep() # we are shutting down raise rospy.exceptions.ROSInterruptException
def _callback(self, data): with self._lock: self._data = data
[docs] def unregister(self): ''' Unregister the subscriber so future instantiations of this class can pull a fresh subscriber (important if the data is latched). ''' self._subscriber.unregister()