Source code for rocon_interactions.interactions

#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: interactions
   :platform: Unix
   :synopsis: Representative class and methods for an *interaction*.


This module defines a class and methods that represent the core of what
an interaction is.

----

"""
##############################################################################
# Imports
##############################################################################

import yaml
import zlib  # crc32
import os

import genpy
import rospkg
import rocon_console.console as console
import rocon_python_utils
import rocon_interaction_msgs.msg as interaction_msgs

from .exceptions import InvalidInteraction, MalformedInteractionsYaml, YamlResourceNotFoundException
from . import web_interactions

##############################################################################
# Utility Methods
##############################################################################


[docs]def generate_hash(display_name, role, namespace): ''' Compute a unique hash for this interaction corresponding to the display_name-role-namespace triple. We use zlib's crc32 here instead of unique_id because of it's brevity which is important when trying to id an interaction by its hash from an nfc tag. Might be worth checking http://docs.python.org/2.7/library/zlib.html#zlib.crc32 if this doesn't produce the same hash on all platforms. :param str display_name: the display name of the interaction :param str role: the role the interaction is embedded in :param str namespace: the namespace in which to embed this interaction :returns: the hash :rtype: int32 ''' return zlib.crc32(display_name + "-" + role + "-" + namespace)
[docs]def load_msgs_from_yaml_file(file_path): """ Load interactions from a yaml resource. :param str file_path: file path of a yaml formatted interactions file (ext=.interactions). :returns: a list of ros msg interaction specifications :rtype: rocon_interaction_msgs.Interaction_ [] :raises: :exc:`.YamlResourceNotFoundException` if yaml is not found. :raises: :exc:`.MalformedInteractionsYaml` if yaml is malformed. .. include:: weblinks.rst """ interactions = [] try: yaml_filename = file_path if not os.path.isfile(yaml_filename): raise YamlResourceNotFoundException(str(e)) except rospkg.ResourceNotFound as e: # resource not found. raise YamlResourceNotFoundException(str(e)) with open(yaml_filename) as f: # load the interactions from yaml into a python object interaction_yaml_objects = yaml.load(f) # now drop it into message format for interaction_yaml_object in interaction_yaml_objects: # convert the parameters from a freeform yaml variable to a yaml string suitable for # shipping off in ros msgs (where parameters is a string variable) if 'parameters' in interaction_yaml_object: # it's an optional key # chomp trailing newlines interaction_yaml_object['parameters'] = yaml.dump(interaction_yaml_object['parameters']).rstrip() interaction = interaction_msgs.Interaction() try: genpy.message.fill_message_args(interaction, interaction_yaml_object) except genpy.MessageException as e: raise MalformedInteractionsYaml( "malformed yaml preventing converting of yaml to interaction msg type [%s]" % str(e)) interactions.append(interaction) return interactions
[docs]def load_msgs_from_yaml_resource(resource_name): """ Load interactions from a yaml resource. :param str resource_name: pkg/filename of a yaml formatted interactions file (ext=.interactions). :returns: a list of ros msg interaction specifications :rtype: rocon_interaction_msgs.Interaction_ [] :raises: :exc:`.YamlResourceNotFoundException` if yaml is not found. :raises: :exc:`.MalformedInteractionsYaml` if yaml is malformed. .. include:: weblinks.rst """ interactions = [] try: yaml_filename = rocon_python_utils.ros.find_resource_from_string(resource_name, extension='interactions') interactions = load_msgs_from_yaml_file(yaml_filename) return interactions except rospkg.ResourceNotFound as e: # resource not found. raise YamlResourceNotFoundException(str(e)) ############################################################################## # Classes ##############################################################################
[docs]class Interaction(object): ''' This class defines an interaction. It does so by wrapping the base rocon_interaction_msgs.Interaction_ msg structure with a few convenient variables and methods. .. include:: weblinks.rst ''' __slots__ = [ 'msg', # rocon_interaction_msgs.Interaction ]
[docs] def __init__(self, msg): """ Validate the incoming fields supplied by the interaction msg and populate remaining fields with proper defaults (e.g. calculate the unique hash for this interaction). The hash is calculated based on the incoming display_name-role-namespace triple. :param msg: underlying data structure with fields minimally filled via :func:`.load_msgs_from_yaml_resource`. :type msg: rocon_interaction_msgs.Interaction_ :raises: :exc:`.InvalidInteraction` if the interaction variables were improperly defined (e.g. max = -1) .. include:: weblinks.rst """ self.msg = msg """Underlying data structure (rocon_interaction_msgs.Interaction_)""" if self.msg.max < -1: raise InvalidInteraction("maximum instance configuration cannot be negative [%s]" % self.msg.display_name) if self.msg.max == 0: self.msg.max = 1 if self.msg.role == '': raise InvalidInteraction("role not configured [%s]" % self.msg.display_name) if self.msg.icon.resource_name == "": self.msg.icon.resource_name = 'rocon_bubble_icons/rocon.png' if not self.msg.icon.data: try: self.msg.icon = rocon_python_utils.ros.icon_resource_to_msg(self.msg.icon.resource_name) except rospkg.common.ResourceNotFound as e: # replace with default icon if icon resource is not found. self.msg.icon.resource_name = 'rocon_bubble_icons/rocon.png' self.msg.icon = rocon_python_utils.ros.icon_resource_to_msg(self.msg.icon.resource_name) if self.msg.namespace == '': self.msg.namespace = '/' self.msg.hash = generate_hash(self.msg.display_name, self.msg.role, self.msg.namespace) # some convenient aliases - these should be properties!
[docs] def is_paired_type(self): """ Classify whether this interaction is to be paired with a rapp or not. :returns: whether it is a pairing interaction or not :rtype: bool """ return True if self.msg.pairing.rapp else False ############################################################################## # Conveniences ##############################################################################
@property
[docs] def name(self): """Executable name for this interaction, can be a roslaunch, rosrunnable, global executable, web url or web app [int].""" return self.msg.name
@property
[docs] def role(self): """The group under which this interaction should be embedded [int].""" return self.msg.role
@property
[docs] def compatibility(self): """A rocon_uri_ string that indicates what platforms it may run on [int].""" return self.msg.compatibility
@property
[docs] def display_name(self): """A human friendly name that also uniquely helps uniquely identify this interaction (you can have more than one configured ``name`` instance) [int].""" return self.msg.display_name
@property def description(self): return self.msg.description @property
[docs] def namespace(self): """Default namespace under which ros services and topics should be embedded for this interaction [int].""" return self.msg.namespace
@property
[docs] def max(self): """ Maximum number of instantiations that is permitted (e.g. teleop should only allow 1) [int]. """ return self.msg.max
@property def remappings(self): return self.msg.remappings @property def parameters(self): return self.msg.parameters @property
[docs] def hash(self): """A crc32 unique identifier key for this interaction, see also :func:`.generate_hash` [int32].""" return self.msg.hash
@property def pairing(self): return self.msg.pairing def _eq__(self, other): if type(other) is type(self): return self.msg.hash == other.msg.hash else: return False def __ne__(self, other): return not self.__eq__(other)
[docs] def __str__(self): ''' Format the interaction into a human-readable string. ''' web_interaction = web_interactions.parse(self.msg.name) name = self.msg.name if web_interaction is None else web_interaction.url s = '' s += console.green + "%s" % self.msg.display_name + console.reset + '\n' s += console.cyan + " Name" + console.reset + " : " + console.yellow + "%s" % name + console.reset + '\n' # noqa s += console.cyan + " Description" + console.reset + " : " + console.yellow + "%s" % self.msg.description + console.reset + '\n' # noqa s += console.cyan + " Icon" + console.reset + " : " + console.yellow + "%s" % str(self.msg.icon.resource_name) + console.reset + '\n' # noqa s += console.cyan + " Rocon URI" + console.reset + " : " + console.yellow + "%s" % self.msg.compatibility + console.reset + '\n' # noqa s += console.cyan + " Namespace" + console.reset + " : " + console.yellow + "%s" % self.msg.namespace + console.reset + '\n' # noqa if self.msg.max == -1: s += console.cyan + " Max" + console.reset + " : " + console.yellow + "infinity" + console.reset + '\n' # noqa else: s += console.cyan + " Max" + console.reset + " : " + console.yellow + "%s" % self.msg.max + console.reset + '\n' # noqa already_prefixed = False for remapping in self.msg.remappings: if not already_prefixed: s += console.cyan + " Remappings" + console.reset + " : " + console.yellow + "%s->%s" % (remapping.remap_from, remapping.remap_to) + console.reset + '\n' # noqa already_prefixed = True else: s += " : " + console.yellow + "%s->%s" % (remapping.remap_from, remapping.remap_to) + console.reset + '\n' # noqa if self.msg.parameters != '': s += console.cyan + " Parameters" + console.reset + " : " + console.yellow + "%s" % self.msg.parameters + console.reset + '\n' # noqa s += console.cyan + " Hash" + console.reset + " : " + console.yellow + "%s" % str(self.msg.hash) + console.reset + '\n' # noqa if self.msg.pairing.rapp: s += console.cyan + " Pairing" + console.reset + " : " + console.yellow + "%s" % str(self.msg.pairing.rapp) + console.reset + '\n' # noqa already_prefixed = False for remapping in self.msg.pairing.remappings: if not already_prefixed: s += console.cyan + " Remappings" + console.reset + " : " + console.yellow + "%s->%s" % (remapping.remap_from, remapping.remap_to) + console.reset + '\n' # noqa already_prefixed = True else: s += " : " + console.yellow + "%s->%s" % (remapping.remap_from, remapping.remap_to) + console.reset + '\n' # noqa already_prefixed = False for pair in self.msg.pairing.parameters: if not already_prefixed: s += console.cyan + " Parameters" + console.reset + " : " + console.yellow + "%s-%s" % (pair.key, pair.value) + console.reset + '\n' # noqa already_prefixed = True else: s += " : " + console.yellow + "%s-%s" % (pair.key, pair.value) + console.reset + '\n' # noqa return s