Source code for concert_software_farmer.pool

# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_concert/license/LICENSE
#
##############################################################################
# Imports
##############################################################################

import yaml
import os

import rospkg
import genpy
import rocon_python_utils
import rocon_std_msgs.msg as rocon_std_msgs
import concert_msgs.msg as concert_msgs

from .exceptions import InvalidSoftwareprofileException

##############################################################################
# Classes
##############################################################################

[docs]class SoftwareProfile(object): ''' parses software profile from file path. ''' def __init__(self, resource_name, filepath): self.resource_name = resource_name self._filepath = filepath try: self._load_profile() except (rospkg.ResourceNotFound, IOError) as e: raise InvalidSoftwareProfileException("couldn't parse software profile[%s]"%(str(e))) def _load_profile(self): ''' load profile from filepath ''' msg = concert_msgs.SoftwareProfile() with open(self._filepath) as f: loaded_profile = yaml.load(f) if not 'launch' in loaded_profile: raise InvalidSoftwareprofileException("'launch' field does not exist!!!") msg.resource_name = loaded_profile['resource_name'] = self.resource_name msg.name = loaded_profile['name'] = loaded_profile['name'].lower().replace(" ", "_") msg.description = loaded_profile['description'] if 'description' in loaded_profile else "" msg.author = loaded_profile['author'] if 'author' in loaded_profile else "" msg.max_count = loaded_profile['max_count'] if 'max_count' in loaded_profile else -1 msg.launch = loaded_profile['launch'] msg.parameters = [] if 'parameters' in loaded_profile: for kv in loaded_profile['parameters']: msg.parameters.append(rocon_std_msgs.KeyValue(str(kv['name']), str(kv['value']))) # generate message self.msg = msg self.name = msg.name
[docs] def to_msg(self): return self.msg
def __str__(self): s = '' s += "\tresource_name : %s\n" % self.resource_name s += "\tname : %s\n" % self.msg.name s += "\tdescription : %s\n" % self.msg.description s += "\tauthor : %s\n" % self.msg.author s += "\tlaunch : %s\n" % self.msg.launch s += "\tparameters: \n" for p in self.msg.parameters: s += "\t\t%s : %s\n"%(p.key,p.value) return s
[docs] def validate_parameters(self, given): ''' validates the given parameter is usable for this software. :param given rocon_std_msgs/KeyValue[]: parameter to validate :returns: whether it is successful or not, the updated parameters, msg :rtypes: bool, [rocon_std_msgs.KeyValue], str ''' default = self.msg.parameters default_dict = { i.key:i.value for i in default} given_dict = {i.key:i.value for i in given} # Check if given parameters are invalid d_keys = default_dict.keys() is_invalid = False invalid_params = [] for k in given_dict.keys(): if not k in d_keys: invalid_params.append(k) is_invalid = True if is_invalid: msg = "Invalid parameter is given. %s"%str(invalid_params) return False, [], msg # Assign params = {} for key, value in default_dict.items(): params[key] = given_dict[key] if key in given_dict else value params_keyvalue = [rocon_std_msgs.KeyValue(key, value) for key, value in params.items()] return True, params_keyvalue, "Success"
[docs]class SoftwarePool(object): ''' maintains scanned software in ros package path. ''' __slots__ = ['_registered_software', '_software_profiles', '_invalid_software_profiles'] def __init__(self): self._registered_software = self._scan_registered_software() self._software_profiles, self._invalid_software_profiles = self._load_software_profiles(self._registered_software)
[docs] def status(self): ''' Returns the current software pool status :returns: Avaialble Software profiles, Invalid software profiles :rtype: dict, dict ''' return self._software_profiles, self._invalid_software_profiles
[docs] def get_profile(self, resource_name): ''' Returns the profile of given resource name :returns: Software profile object :rtype: :exc:`SoftwareProfile` :raises: :exc:`SoftwareNotExistException` if the software profile is not available in pool ''' if not resource_name in self._software_profiles: raise SoftwareNotExistException("%s does not exist"%str(resource_name)) return self._software_profiles[resource_name]
def _scan_registered_software(self): ''' parses package exports to scan all available software in the system. and returns name and location ''' cached_software_profile_information, unused_invalid_software = rocon_python_utils.ros.resource_index_from_package_exports(rocon_std_msgs.Strings.TAG_SOFTWARE) cached_software_profile_locations = {} for cached_resource_name, (cached_filename, unused_catkin_package) in cached_software_profile_information.iteritems(): cached_software_profile_locations[cached_resource_name] = cached_filename return cached_software_profile_locations def _load_software_profiles(self, software_locations): ''' Load software profile from file path :param software_locations: file path to the software profile :type software_locations: {resourcen_name: filepath} :returns: loaded software profile :rtypes: { resource_name: SoftwareProfile} ''' profiles = {} invalid_profiles = {} for name, location in software_locations.items(): try: profiles[name] = SoftwareProfile(name, location) except InvalidSoftwareprofileException as e: invalid_profiles[name] = str(e) return profiles, invalid_profiles