Source code for rocon_interactions.rapp_handler

#!/usr/bin/env python
#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_concert/license/LICENSE
#
##############################################################################
# Module
##############################################################################

"""
.. module:: rapp_handler
   :platform: Unix
   :synopsis: Works with the rocon app manager to support pairing interactions.

This module provides a class that can be used by other nodes to introspect
and start/stop rapps on a rapp manager running on the same ros master.

"""

##############################################################################
# Imports
##############################################################################

import rospy
import rocon_app_manager_msgs.msg as rocon_app_manager_msgs
import rocon_app_manager_msgs.srv as rocon_app_manager_srvs
import rocon_python_comms
from .exceptions import(FailedToStartRappError,
                        FailedToStopRappError,
                        RappNotRunningError
                        )

##############################################################################
# Utilities
##############################################################################


[docs]def rapp_msg_to_dict(rapp): """ Convert rapp message to a dictionary. This is not really necessary and we'd be better off flinging around the actual msg or a wrapper around the msg. """ dict_rapp = {} dict_rapp["status"] = rapp.status dict_rapp["name"] = rapp.name dict_rapp["display_name"] = rapp.display_name dict_rapp["description"] = rapp.description dict_rapp["compatibility"] = rapp.compatibility dict_rapp["preferred"] = rapp.preferred dict_rapp["icon"] = rapp.icon dict_rapp["implementations"] = rapp.implementations dict_rapp["public_interface"] = rapp.public_interface dict_rapp["public_parameters"] = rapp.public_parameters return dict_rapp
def rapp_list_msg_to_dict(list_rapp): dict_rapp = {} for rapp in list_rapp: name = rapp.name dict_rapp[name] = rapp_msg_to_dict(rapp) return dict_rapp ############################################################################## # Classes ##############################################################################
[docs]class RappHandler(object): """ Initialises from a conductor message detailing information about a concert client. Once established, this instance can be used as a convenience to start and stop rapps on the concert client. :ivar is_running: flag indicating if there is a monitored rapp running on the rapp manager. :vartype is_running: bool :ivar _running_rapp: None, or dict representation of the running rapp (result of :py:class:`rapp_msg_to_dict()<rocon_interactions.rapp_handler.rapp_msg_to_dict>`) :vartype _running_rapp: dict """
[docs] def __init__(self, rapp_running_state_changed_callback): """ Initialise the class with the relevant data required to start and stop rapps on this concert client. """ self._running_rapp = None self._available_rapps = {} self.rapp_running_state_changed_callback = rapp_running_state_changed_callback self.subscribers = rocon_python_comms.utils.Subscribers( [ ('~status', rocon_app_manager_msgs.Status, self._status_subscriber_callback), ] ) self.service_proxies = rocon_python_comms.utils.ServiceProxies( [ ('~start_rapp', rocon_app_manager_srvs.StartRapp), ('~stop_rapp', rocon_app_manager_srvs.StopRapp), ] ) # self._initialising_thread = threading.Thread(target=self.initialise) # self._initialising_thread.start() # this is blocking until it gets data self.initialise()
@property def available_rapps(self): return self._available_rapps @property def initialised(self): return len(self._available_rapps) != 0 @property def is_running(self): return self._running_rapp is not None @property def running_rapp(self): return self._running_rapp
[docs] def matches_running_rapp(self, rapp_name, remappings, parameters): ''' Compare the running rapp with an interaction's specification for a paired rapp. .. todo:: currently it checks against the rapp name only, expand this to consider remappings and parameters :param str rapp_name: ros package resource name for this interaction's rapp rapp :param rocon_std_msgs/Remapping[] remappings: rapp remapping rules for this interaction :param rocon_std_msgs/KeyValue[] parameters: rapp parameter rules for this interaction :returns: whether the incoming interaction specification matches the running rapp (used for filtering) :rtype; bool :raises: :exc:`rocon_interactions.exceptions.RappNotRunningError` if the rapp wasn't running ''' try: if self._running_rapp['name'] == rapp_name: return True except TypeError: # NoneType raise RappNotRunningError return False
def is_available_rapp(self, rapp_name): return True if rapp_name in self._available_rapps.keys() else False def get_rapp(self, rapp_name): try: return self._available_rapps[rapp_name] except KeyError: return None
[docs] def start(self, rapp, remappings, parameters=[]): """ Start the rapp with the specified remappings. :param str rapp: ros package resource name of the rapp to start (e.g. rocon_apps/teleop) :param remappings: remappings to apply to the rapp when starting. :type remappings: [rocon_std_msgs.Remapping] :param parameters: paramters to apply to the rapp when starting. :type remappings: [rocon_std_msgs.KeyValue] .. include:: weblinks.rst :raises: :exc:`.FailedToStartRappError` """ if not self.initialised: raise FailedToStartRappError("rapp manager's location unknown") try: unused_response = self.service_proxies.start_rapp(rocon_app_manager_srvs.StartRappRequest(name=rapp, remappings=remappings, parameters=parameters)) # todo check this response and process it except (rospy.service.ServiceException, rospy.exceptions.ROSInterruptException) as e: # Service not found or ros is shutting down raise FailedToStartRappError("%s" % str(e))
[docs] def stop(self): """ Stop a rapp on this concert client (if one should be running). This doesn't need a rapp specification since only one rapp can ever be running - it will just stop the currently running rapp. :raises: :exc:`.FailedToStopRappError` """ if not self.initialised: raise FailedToStopRappError("rapp manager's location not known") try: unused_response = self.service_proxies.stop_rapp(rocon_app_manager_srvs.StopRappRequest()) except (rospy.service.ServiceException, rospy.exceptions.ROSInterruptException) as e: # Service not found or ros is shutting down raise FailedToStopRappError("%s" % str(e))
[docs] def initialise(self): """ Loops around (indefinitely) until it makes a connection with the rapp manager and retrieves the rapp list. """ # get the rapp list - just loop around until catch it once - it is not dynamically changing rospy.loginfo("Interactions : calling the rapp manager to get the rapp list.") get_rapp_list = rocon_python_comms.SubscriberProxy('~rapp_list', rocon_app_manager_msgs.RappList) while not rospy.is_shutdown(): # msg is rocon_app_manager_msgs/RappList # this returns almost immediately with None if rospy gets shutdown, so a long duration is ok msg = get_rapp_list(rospy.Duration(30.0)) if msg is None: rospy.logwarn("Interactions : unable to connect with the rapp manager : {0} not found, will keep trying.".format(rospy.resolve_name('~rapp_list'))) else: self._available_rapps = rapp_list_msg_to_dict(msg.available_rapps) rospy.loginfo("Interactions : discovered rapp support for pairing modes:") for rapp in msg.available_rapps: rospy.loginfo("Interactions : '%s'" % rapp.name) break get_rapp_list.unregister()
def _status_subscriber_callback(self, msg): """ Update the current status of the rapp manager @param rocon_app_manager_msgs/Status msg: detailed report of the rapp manager's current state """ state_changed = False stopped = False if msg.rapp_status == rocon_app_manager_msgs.Status.RAPP_RUNNING: # FIXME : This should probably be done internally in the app_manager # => A we only need the published interface from a running app, without caring about the original specification # => Is this statement always true ? running_rapp = rapp_msg_to_dict(msg.rapp) if self._running_rapp is None or self._running_rapp['name'] != running_rapp['name']: state_changed = True # for pubif_idx, pubif in enumerate(running_rapp['public_interface']): # newvals = ast.literal_eval(pubif.value) # for msgif in msg.published_interfaces: # if ( # (pubif.key == 'subscribers' and msgif.interface.connection_type == 'subscriber') or # (pubif.key == 'publishers' and msgif.interface.connection_type == 'publisher') # ): # # rospy.loginfo('newvals %r', newvals) # for newval_idx, newval in enumerate(newvals): # # rospy.loginfo('newvals[%r] %r', newval_idx, newval) # if newval['name'] == msgif.interface.name and newval['type'] == msgif.interface.data_type: # # Careful we re changing the list in place here # newvals[newval_idx]['name'] = msgif.name # # rospy.loginfo('newvals[%r] -> %r', newval_idx, newval) # # Careful we re changing the list in place here # elif msgif.interface.connection_type not in rocon_python_comms.connections.connection_types: # rospy.logerr('Interactions : unsupported connection type : %r', msgif.interface.connection_type) # # Careful we re changing the list in place here # running_rapp['public_interface'][pubif_idx].value = str(newvals) # TODO : same for published parameters ? self._running_rapp = running_rapp # rospy.loginfo('Interactions : new public if : %r', self._running_rapp['public_interface']) elif msg.rapp_status == rocon_app_manager_msgs.Status.RAPP_STOPPED: was_running = self._running_rapp is not None self._running_rapp = None if was_running: state_changed = True stopped = True # signal the higher level disable pairing mode via this if state_changed: self.rapp_running_state_changed_callback(self._running_rapp, stopped)