Source code for rocon_interactions.interactions_table

#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: interactions_table
   :platform: Unix
   :synopsis: A database of interactions.


This module provides a class that acts as a database (dictionary style) of
some set of interactions.

----

"""
##############################################################################
# Imports
##############################################################################

import re
import rocon_console.console as console
import rocon_uri
import rospy

from . import interactions
from . import utils
from .exceptions import InvalidInteraction, MalformedInteractionsYaml

##############################################################################
# Classes
##############################################################################


[docs]class InteractionsTable(object): ''' The runtime populated interactions table along with methods to manipulate it. .. include:: weblinks.rst :ivar interactions: list of objects that form the elements of the table :vartype interactions: rocon_interactions.interactions.Interaction[] :ivar filter_pairing_interactions: flag for indicating whether pairing interactions should be filtered when loading. :vartype filter_pairing_interactions: bool '''
[docs] def __init__(self, filter_pairing_interactions=False ): """ Constructs an empty interactions table. :param bool filter_pairing_interactions: do not load any paired interactions """ self.interactions = [] self.filter_pairing_interactions = filter_pairing_interactions
[docs] def groups(self): ''' List all groups for the currently stored interactions. :returns: a list of all groups :rtype: str[] ''' # uniquify the list return sorted(list(set([i.group for i in self.interactions])))
def __len__(self): return len(self.interactions)
[docs] def __str__(self): """ Convenient string representation of the table. """ s = '' group_view = self.generate_group_view() for group, interactions in group_view.iteritems(): s += console.bold + "Interactions - " + group + console.reset + '\n' for interaction in interactions: s += "\n".join(" " + i for i in str(interaction).splitlines()) + '\n' return s
[docs] def sorted(self): """ Return the interactions list sorted by name. """ return sorted(self.interactions, key=lambda interaction: interaction.name)
[docs] def generate_group_view(self): ''' Creates a temporary copy of the interactions and sorts them into a dictionary view classified by group. :returns: A group based view of the interactions :rtype: dict { group(str) : :class:`.interactions.Interaction`[] } ''' # there's got to be a faster way of doing this. interactions = list(self.interactions) group_view = {} for interaction in interactions: if interaction.group not in group_view.keys(): group_view[interaction.group] = [] group_view[interaction.group].append(interaction) return group_view
[docs] def filter(self, groups=None, compatibility_uri='rocon:/'): ''' Filter the interactions in the table according to group and/or compatibility uri. :param groups: a list of groups to filter against, use all groups if None :type groups: str [] :param str compatibility_uri: compatibility rocon_uri_, eliminates interactions that don't match this uri. :returns interactions: subset of all interactions that survived the filter :rtype: :class:`.Interaction` [] :raises: rocon_uri.RoconURIValueError if provided compatibility_uri is invalid. ''' if groups: # works for classifying non-empty list vs either of None or empty list group_filtered_interactions = [i for i in self.interactions if i.group in groups] else: group_filtered_interactions = list(self.interactions) filtered_interactions = [i for i in group_filtered_interactions if rocon_uri.is_compatible(i.compatibility, compatibility_uri)] return filtered_interactions
[docs] def load(self, msgs): ''' Load some interactions into the table. This involves some initialisation and validation steps. :param msgs: a list of interaction specifications to populate the table with. :type msgs: rocon_interaction_msgs.Interaction_ [] :returns: list of all additions and any that were flagged as invalid :rtype: (:class:`.Interaction` [], rocon_interaction_msgs.Interaction_ []) : (new, invalid) ''' msgs = self._bind_dynamic_symbols(msgs) new = [] invalid = [] for msg in msgs: try: interaction = interactions.Interaction(msg) self.interactions.append(interaction) self.interactions = list(set(self.interactions)) # uniquify the list, just in case new.append(interaction) except InvalidInteraction: invalid.append(msg) return new, invalid
[docs] def unload(self, msgs): ''' Removed the specified interactions interactions table. This list is typically the same list as the user might initially send - no hashes yet generated. :param msgs: a list of interactions :type msgs: rocon_interaction_msgs.Interaction_ [] :returns: a list of removed interactions :rtype: rocon_interaction_msgs.Interaction_ [] ''' removed = [] for msg in msgs: msg_hash = utils.generate_hash(msg.name, msg.group, msg.namespace) found = self.find(msg_hash) if found is not None: removed.append(msg) self.interactions.remove(found) return removed
[docs] def find(self, interaction_hash): ''' Find the specified interaction. :param str interaction_hash: in crc32 format :returns: interaction if found, None otherwise. :rtype: :class:`.Interaction` ''' interaction = next((interaction for interaction in self.interactions if interaction.hash == interaction_hash), None) return interaction
def _bind_dynamic_symbols(self, interaction_msgs): ''' Provide some intelligence to the interactions specification by binding designated symbols at runtime. Commonly used bindings and their usage points include: - interaction.name - __WEBSERVER_ADDRESS__ - interaction.parameters - __ROSBRIDGE_ADDRESS__ - interaction.parameters - __ROSBRIDGE_PORT__ :param interaction_msgs: parse this interaction scanning and replacing symbols. :type interaction_msgs: rocon_interactions_msgs.Interaction[] :returns: the updated interaction list :rtype: rocon_interactions_msgs.Interaction[] ''' # search for patterns of the form '<space>__PARAMNAME__,' # and if found, look to see if there is a rosparam matching that pattern that we can substitute pattern = '\ __(.*?)__[,|\}]' for interaction in interaction_msgs: for p in re.findall(pattern, interaction.parameters): rparam = None if p.startswith('/'): try: rparam = rospy.get_param(p) except KeyError: pass # we show a warning below elif p.startswith('~'): msg = '%s is invalid format for rosparam binding. See https://github.com/robotics-in-concert/rocon_tools/issues/81' % p raise MalformedInteractionsYaml(str(msg)) else: # See https://github.com/robotics-in-concert/rocon_tools/issues/81 for the rule if interaction.namespace: try: rparam = rospy.get_param(interaction.namespace) except KeyError: pass # fallback and try again in the private namespace if rparam is None: try: rparam = rospy.get_param('~' + p) except KeyError: pass # we show a warning below if rparam is None: rospy.logwarn("Interactions : no dynamic binding found for '%s'" % p) match = '__' + p + '__' interaction.parameters = interaction.parameters.replace(match, str(rparam)) return interaction_msgs