Source code for rocon_interactions.remocon_monitor

#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_concert/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: remocon_monitor
   :platform: Unix
   :synopsis: Remocon monitoring tools


This module defines a class used monitor the status of connected remocons and
trigger when certain status updates happen.
----

"""
##############################################################################
# Imports
##############################################################################

import rospy
import rocon_interaction_msgs.msg as interaction_msgs

##############################################################################
# Remocon Monitor
##############################################################################


[docs]class RemoconMonitor(object): ''' Attaches a subscriber to a remocon publisher and monitors the status of the remocon. .. include:: weblinks.rst ''' __slots__ = [ 'name', 'unique_name', 'status', # concert_msgs.RemoconStatus '_subscriber', '_status_callback' # triggers state updates on the manager and publishes the list of interactive clients ] ########################################################################## # Initialisation ########################################################################## def __init__(self, topic_name, remocon_status_update_callback): self.name = 'unknown' self.unique_name = 'unknown' """Name of the connected remocon.""" if topic_name.startswith(interaction_msgs.Strings.REMOCONS_NAMESPACE + '/'): self.unique_name = topic_name[len(interaction_msgs.Strings.REMOCONS_NAMESPACE) + 1:] (self.name, unused_separator, unused_uuid_part) = self.unique_name.rpartition('_') else: # should raise an error here return self._subscriber = rospy.Subscriber(topic_name, interaction_msgs.RemoconStatus, self._callback) """Subscriber connected to a remocon's status topic.""" self.status = None """Holds the latest status (rocon_interaction_msgs.RemoconStatus_) update from the remocon.""" self._status_callback = remocon_status_update_callback def _callback(self, msg): """ :param msg interaction_msgs.RemoconStatus: incoming status update for the remocon """ old_interactions = self.status.running_interactions if self.status is not None else [] diff = lambda l1, l2: [x for x in l1 if x not in l2] new_interactions = diff(msg.running_interactions, old_interactions) finished_interactions = diff(old_interactions, msg.running_interactions) self.status = msg # make sure we publish whenever there is a state change (as assumed when we get a status update) self._status_callback(self.unique_name, new_interactions, finished_interactions)
[docs] def unregister(self): """ Unregister the subscriber attached to this remocon's status topic. """ self._subscriber.unregister()