Source code for rocon_interactions.interactions_table
#
# License: BSD
# https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################
"""
.. module:: interactions_table
:platform: Unix
:synopsis: A database of interactions.
This module provides a class that acts as a database (dictionary style) of
some set of interactions.
----
"""
##############################################################################
# Imports
##############################################################################
import rocon_console.console as console
import rocon_uri
from . import interactions
from .exceptions import InvalidInteraction
##############################################################################
# Classes
##############################################################################
[docs]class InteractionsTable(object):
'''
The runtime populated interactions table along with methods to
manipulate it.
.. include:: weblinks.rst
'''
__slots__ = [
'interactions', # rocon_interactions.interactions.Interaction[]
'filter_pairing_interactions' # do not load any paired interactions
]
[docs] def __init__(self, filter_pairing_interactions=False):
"""
Constructs an empty interactions table.
:param bool filter_pairing_interactions: do not load any paired interactions
"""
self.interactions = []
"""List of :class:`.Interaction` objects that will form the elements of the table."""
self.filter_pairing_interactions = filter_pairing_interactions
"""Flag for indicating whether pairing interactions should be filtered when loading."""
[docs] def roles(self):
'''
List all roles for the currently stored interactions.
:returns: a list of all roles
:rtype: str[]
'''
# uniquify the list
return list(set([i.role for i in self.interactions]))
def __len__(self):
return len(self.interactions)
[docs] def __str__(self):
"""
Convenient string representation of the table.
"""
s = ''
role_view = self.generate_role_view()
for role, interactions in role_view.iteritems():
s += console.bold + role + console.reset + '\n'
for interaction in interactions:
s += "\n".join(" " + i for i in str(interaction).splitlines()) + '\n'
return s
[docs] def generate_role_view(self):
'''
Creates a temporary copy of the interactions and sorts them into a dictionary
view classified by role.
:returns: A role based view of the interactions
:rtype: dict { role(str) : :class:`.interactions.Interaction`[] }
'''
# there's got to be a faster way of doing this.
interactions = list(self.interactions)
role_view = {}
for interaction in interactions:
if interaction.role not in role_view.keys():
role_view[interaction.role] = []
role_view[interaction.role].append(interaction)
return role_view
[docs] def filter(self, roles=None, compatibility_uri='rocon:/'):
'''
Filter the interactions in the table according to role and/or compatibility uri.
:param roles: a list of roles to filter against, use all roles if None
:type roles: str []
:param str compatibility_uri: compatibility rocon_uri_, eliminates interactions that don't match this uri.
:returns interactions: subset of all interactions that survived the filter
:rtype: :class:`.Interaction` []
:raises: rocon_uri.RoconURIValueError if provided compatibility_uri is invalid.
'''
if roles: # works for classifying non-empty list vs either of None or empty list
role_filtered_interactions = [i for i in self.interactions if i.role in roles]
else:
role_filtered_interactions = list(self.interactions)
filtered_interactions = [i for i in role_filtered_interactions
if rocon_uri.is_compatible(i.compatibility, compatibility_uri)]
return filtered_interactions
[docs] def load(self, msgs):
'''
Load some interactions into the interaction table. This involves some initialisation
and validation steps.
:param msgs: a list of interaction specifications to populate the table with.
:type msgs: rocon_interaction_msgs.Interaction_ []
:returns: list of all additions and any that were flagged as invalid
:rtype: (:class:`.Interaction` [], rocon_interaction_msgs.Interaction_ []) : (new, invalid)
'''
new = []
invalid = []
for msg in msgs:
# paired check
if self.filter_pairing_interactions and msg.pairing.rapp:
invalid.append(msg)
else:
try:
interaction = interactions.Interaction(msg)
self.interactions.append(interaction)
self.interactions = list(set(self.interactions)) # uniquify the list, just in case
new.append(interaction)
except InvalidInteraction:
invalid.append(msg)
return new, invalid
[docs] def unload(self, msgs):
'''
Removed the specified interactions interactions table. This list is typically
the same list as the user might initially send - no hashes yet generated.
:param msgs: a list of interactions
:type msgs: rocon_interaction_msgs.Interaction_ []
:returns: a list of removed interactions
:rtype: rocon_interaction_msgs.Interaction_ []
'''
removed = []
for msg in msgs:
msg_hash = interactions.generate_hash(msg.display_name, msg.role, msg.namespace)
found = self.find(msg_hash)
if found is not None:
removed.append(msg)
self.interactions.remove(found)
return removed
[docs] def find(self, interaction_hash):
'''
Find the specified interaction.
:param str interaction_hash: in crc32 format
:returns: interaction if found, None otherwise.
:rtype: :class:`.Interaction`
'''
interaction = next((interaction for interaction in self.interactions
if interaction.hash == interaction_hash), None)
return interaction