Source code for rocon_python_comms.topics
#
# License: BSD
# https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################
"""
.. module:: topics
:platform: Unix
:synopsis: Useful methods relating to ros topics.
This module contains anything relating to introspection or manipulation
of ros topics.
----
"""
##############################################################################
# Imports
##############################################################################
import rospy
import time
import rostopic
from .exceptions import NotFoundException
##############################################################################
# Find topics/services
##############################################################################
[docs]def find_topic(topic_type, timeout=rospy.rostime.Duration(5.0), unique=False):
'''
Do a lookup to find topics of the type specified. It can apply the
additional logic of whether this should return a single unique result,
or a list. Under the hood this calls out to the ros master for a list
of registered topics and it parses that to determine the result. If nothing
is found, it loops around internally on a 100ms loop until the result is
found or the specified timeout is reached.
This will raise exceptions if it times out or returns multiple values.
Usage:
.. code-block:: python
from rocon_python_comms import find_topic
try:
pairing_topic_name = find_topic('rocon_interaction_msgs/Pair', timeout=rospy.rostime.Duration(0.5), unique=True)
except rocon_python_comms.NotFoundException as e:
rospy.logwarn("support for interactions disabled")
:param str topic_type: topic type specification, e.g. rocon_std_msgs/MasterInfo
:param rospy.rostime.Duration timeout: raise an exception if nothing is found before this timeout occurs.
:param bool unique: flag to select the lookup behaviour (single/multiple results)
:returns: the fully resolved name of the topic (unique) or list of names (non-unique)
:rtype: str
:raises: :exc:`.NotFoundException` if no topic is found within the timeout
'''
topic_name = None
topic_names = []
timeout_time = time.time() + timeout.to_sec()
while not rospy.is_shutdown() and time.time() < timeout_time and not topic_names:
try:
topic_names = rostopic.find_by_type(topic_type)
except rostopic.ROSTopicException:
raise NotFoundException("ros shutdown")
if unique:
if len(topic_names) > 1:
raise NotFoundException("multiple topics found %s." % topic_names)
elif len(topic_names) == 1:
topic_name = topic_names[0]
if not topic_names:
rospy.rostime.wallsleep(0.1)
if not topic_names:
raise NotFoundException("timed out")
return topic_name if topic_name else topic_names