common.py
Go to the documentation of this file.
00001 # Copyright (c) 2013, Felix Kolbe
00002 # All rights reserved. BSD License
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions
00006 # are met:
00007 #
00008 # * Redistributions of source code must retain the above copyright
00009 #   notice, this list of conditions and the following disclaimer.
00010 #
00011 # * Redistributions in binary form must reproduce the above copyright
00012 #   notice, this list of conditions and the following disclaimer in the
00013 #   documentation and/or other materials provided with the distribution.
00014 #
00015 # * Neither the name of the {organization} nor the names of its
00016 #   contributors may be used to endorse or promote products derived
00017 #   from this software without specific prior written permission.
00018 #
00019 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00020 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00021 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00022 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
00023 # HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00024 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00025 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00026 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00027 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00028 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00029 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 
00031 
00032 import logging
00033 _logger = logging.getLogger('rgoap')
00034 
00035 
00036 
00037 def no_multilines(string):
00038     """anti-multiline workaround for ROS message types"""
00039     return string.replace('\n  ', ' ').replace(' \n', ' ').replace('\n', ' ')
00040 
00041 def stringify_dict(dict_, delim=', '):
00042     return delim.join(['%s: %s' % (c, v) for (c, v) in dict_.iteritems()])
00043 
00044 def stringify(iterable, delim=', '):
00045     return delim.join([str(e) for e in iterable])
00046 
00047 
00048 
00049 class WorldState(object):
00050     """Storage for values of conditions.
00051 
00052     self._condition_values: Map<Condition, Object>
00053     """
00054 
00055     def __init__(self, worldstate=None):
00056         self._condition_values = {}
00057         if worldstate is not None:
00058             self._condition_values.update(worldstate._condition_values)
00059 
00060     def __str__(self):
00061         return '%s {%s}' % (self.__class__.__name__,
00062                             no_multilines(stringify_dict(self._condition_values)))
00063 
00064     def __repr__(self):
00065         return '<WorldState %X values=%s>' % (id(self), self._condition_values)
00066 #        return '<WorldState>'
00067 
00068     def get_condition_value(self, condition):
00069         return self._condition_values[condition]
00070 
00071     def set_condition_value(self, condition, value):
00072         self._condition_values[condition] = value
00073 
00074     def matches(self, start_worldstate):
00075         """Return whether self is an 'equal subset' of start_worldstate."""
00076         start_ws_dict = start_worldstate._condition_values
00077         matches = True
00078         for (cond, value) in self._condition_values.viewitems():
00079             if cond in start_ws_dict:
00080                 if not start_ws_dict[cond] == value:
00081                     matches = False
00082                     break
00083         _logger.debug('comparing worldstates: %s', matches)
00084 #        _logger.debug('mine:  %s', self._condition_values)
00085 #        _logger.debug('start: %s', start_ws_dict)
00086         return matches
00087 
00088     def get_state_name_dict(self):
00089         """Returns a dictionary with not the conditions themselves but
00090         their state_names as keys."""
00091         return {cond._state_name: val
00092                 for cond, val in self._condition_values.viewitems()}
00093 
00094     def get_unsatisfied_conditions(self, worldstate):
00095         """Return a set of conditions that are in both the given and this
00096         worldstate but have unequal values. By now this is symmetric."""
00097         common_conditions_set = (self._condition_values.viewkeys() &
00098                                  worldstate._condition_values.viewkeys())
00099         unsatisfied_conditions = {condition
00100                                   for condition in common_conditions_set
00101                                   if (self.get_condition_value(condition) !=
00102                                       worldstate.get_condition_value(condition))}
00103 
00104         if _logger.isEnabledFor(logging.DEBUG):
00105             _logger.debug("unsatisfied conditions between world states: %d:\n%s",
00106                           len(unsatisfied_conditions),
00107                           '\n'.join([str(c) for c in unsatisfied_conditions]))
00108 
00109         return unsatisfied_conditions
00110 
00111 
00112 ## known as state
00113 class Condition(object):
00114     """The object that makes any kind of robot or system state available.
00115 
00116     This class, at least its static part, is a multiton:
00117     * For each state_name only one instance is allowed to be in the
00118       _conditions_dict mapping.
00119     * If there is no mapping for a get(state_name) call an assertion is
00120       triggered, as creating a new instance makes no sense here.
00121 
00122     self._state_name: id name of condition, must not be changed
00123     """
00124 
00125     def __init__(self, state_name):
00126         assert state_name not in Condition._conditions_dict, \
00127             "Condition '" + state_name + "' had already been created previously!"
00128         self._state_name = state_name
00129 
00130     def __str__(self):
00131         return '%s:%s' % (self.__class__.__name__, self._state_name)
00132 
00133     def __repr__(self):
00134         return '<%s state_name=%s>' % (self.__class__.__name__, self._state_name)
00135 
00136     def get_value(self):
00137         """Returns the current value, hopefully not blocking."""
00138         raise NotImplementedError
00139 
00140     def _update_value(self, worldstate):
00141         """Update the condition's current value to the given worldstate."""
00142         worldstate.set_condition_value(self, self.get_value())
00143 
00144 
00145 
00146     _conditions_dict = {}
00147 
00148     @classmethod
00149     def add(cls, condition):
00150         assert condition._state_name not in cls._conditions_dict, \
00151             "Condition '" + condition._state_name + "' had already been added previously!"
00152         cls._conditions_dict[condition._state_name] = condition
00153 
00154     @classmethod
00155     def get(cls, state_name):
00156         assert state_name in cls._conditions_dict, "Condition '" + state_name + "' has not yet been added!"
00157         return cls._conditions_dict[state_name]
00158 
00159     @classmethod
00160     def print_dict(cls):
00161         return '<Conditions %s>' % cls._conditions_dict
00162 
00163     @classmethod
00164     def initialize_worldstate(cls, worldstate):
00165         """Initialize the given worldstate with all known conditions and their current values."""
00166         for condition in cls._conditions_dict.values():
00167             condition._update_value(worldstate)
00168 
00169 
00170 
00171 class Precondition(object):
00172 
00173     def __init__(self, condition, value, deviation=None):
00174         self._condition = condition
00175         self._value = value
00176         self._deviation = deviation
00177         # TODO: make deviation relative/percental, not absolute
00178 
00179     def __str__(self):
00180         return '%s:%s=%s~%s' % (self.__class__.__name__, self._condition._state_name, self._value, self._deviation)
00181 
00182     def __repr__(self):
00183         return '<%s cond=%s value=%r dev=%s>' % (self.__class__.__name__, self._condition, self._value, self._deviation)
00184 
00185     def is_valid(self, worldstate):
00186         cond_value = worldstate.get_condition_value(self._condition)
00187         if self._deviation is None:
00188             return cond_value == self._value
00189         else:
00190             return abs(cond_value - self._value) <= self._deviation
00191 
00192     def apply(self, worldstate):
00193         # TODO: deviation gets lost in backwards planner
00194         worldstate.set_condition_value(self._condition, self._value)
00195 
00196 
00197 
00198 class Effect(object):
00199     # TODO: think about optional deviation (coordinate with multiple action results)
00200 
00201     def __init__(self, condition, new_value):
00202         self._condition = condition
00203         self._new_value = new_value
00204 
00205     def __str__(self):
00206         return '%s:%s=%s' % (self.__class__.__name__, self._condition._state_name, self._new_value)
00207 
00208     def __repr__(self):
00209         return '<%s cond=%s new_val=%s>' % (self.__class__.__name__, self._condition._state_name, self._new_value)
00210 
00211     def apply_to(self, worldstate):
00212         # TODO: remove me as I'm only for forward planning?
00213         worldstate.set_condition_value(self._condition, self._new_value)
00214 
00215     def matches_condition(self, worldstate, start_worldstate):
00216         """Return whether this effect can reach worldstate from start_worldstate"""
00217         return worldstate.get_condition_value(self._condition) == self._new_value
00218 
00219 
00220 
00221 class VariableEffect(object):
00222     """This variable effect can by default reach every value and therefore
00223     matches every worldstate.
00224 
00225     To make an effect reach not every possible value of its condition,
00226     subclass and override _is_reachable.
00227     """
00228     def __init__(self, condition):
00229 #        Effect.__init__(self, condition, None)
00230         self._condition = condition
00231 
00232     def __str__(self):
00233         return '%s:%s' % (self.__class__.__name__, self._condition._state_name)
00234 
00235     def __repr__(self):
00236         return '<%s cond=%s>' % (self.__class__.__name__, self._condition._state_name)
00237 
00238 #     def apply_to(self, worldstate):
00239         # TODO: remove me as I'm only for forward planning?
00240 #         worldstate.memory.set_value(self._condition, self._new_value)
00241 
00242     def matches_condition(self, worldstate, start_worldstate):
00243         """Return whether this effect can reach worldstate from start_worldstate"""
00244         return self._is_reachable(worldstate.get_condition_value(self._condition),
00245                                   start_worldstate.get_condition_value(self._condition))
00246 
00247     def _is_reachable(self, value, start_value):
00248         """Return a Boolean whether this variable effect can reach the given
00249         value from the given start_value. If this effect can reach certain
00250         values from any value, the start_value just might be ignored.
00251 
00252         Defaults to True, subclass to limit variability.
00253         """
00254         # TODO: change reachability from boolean to float
00255         return True
00256 
00257 
00258 class Goal(object):
00259     """
00260     usability range: from 0 to 1, defaults to 1
00261     """
00262     def __init__(self, preconditions, usability=1):
00263         self._preconditions = preconditions
00264         self.usability = usability
00265 
00266     def __str__(self):
00267         return '%s (usability=%s)' % (self.__class__.__name__, self.usability)
00268 
00269     def __repr__(self):
00270         return '<%s usability=%f preconditions=%s>' % (
00271                    self.__class__.__name__, self.usability, self._preconditions)
00272 
00273     def is_valid(self, worldstate):
00274         return all(precondition.is_valid(worldstate)
00275                    for precondition in self._preconditions)
00276 
00277     def apply_preconditions(self, worldstate):
00278         for precondition in self._preconditions:
00279             precondition.apply(worldstate)
00280 
00281 
00282 # TODO: implement denial of trivial actions (not changing conditions), if they're actually concerned?
00283 
00284 class Action(object):
00285 
00286     def __init__(self, preconditions, effects):
00287         self._preconditions = preconditions
00288         self._effects = effects
00289 
00290     def __str__(self):
00291         return self.__class__.__name__
00292 
00293     def __repr__(self):
00294         return '<%s preconditions=%s effects=%s>' % (self.__class__.__name__, self._preconditions, self._effects)
00295 
00296     def cost(self):
00297         """Return this action's cost value
00298 
00299         Override to apply own cost calculation. The returned cost must not be
00300         less than the number of declared effects!
00301 
00302         To check this when overriding use validate_cost:
00303             def cost(self):
00304                 return self.validate_cost(..custom cost calculation..)
00305         """
00306         return len(self._effects)
00307 
00308     def validate_cost(self, cost):
00309         """Can be used to validate custom cost calculations, see cost()"""
00310         minimum_cost = len(self._effects)
00311         if cost < minimum_cost:
00312             _logger.error("Warning: action %s proposed too small cost (%s), "
00313                           "overriding with minimum cost (%s)",
00314                           self.__class__.__name__, cost, minimum_cost)
00315             cost = minimum_cost
00316         return cost
00317 
00318 
00319     def run(self, next_worldstate):
00320         """
00321         next_worldstate: the worldstate that this action should lead to when run
00322         """
00323         raise NotImplementedError
00324 
00325 
00326     ## following for executor
00327 
00328     def is_valid(self, worldstate):
00329         """Return whether this action is applicable from the given worldstate
00330         on, i.e. all preconditions are valid."""
00331         return all(precondition.is_valid(worldstate)
00332                    for precondition in self._preconditions)
00333 
00334 
00335     ## following was for forward planner
00336 
00337     def apply_effects(self, worldstate):
00338         # TODO: remove me as I'm only for forward planning?
00339         for effect in self._effects:
00340             effect.apply_to(worldstate)
00341 
00342 
00343     ## following for backward planner
00344 
00345     def check_freeform_context(self):
00346         """Override to add context checks required to run this action that cannot be satisfied by the planner."""
00347         return True
00348 
00349     def has_satisfying_effects(self, worldstate, start_worldstate, unsatisfied_conditions):
00350         """Return True if at least one of own effects matches unsatisfied_conditions."""
00351         for effect in self._effects:
00352             if effect._condition in unsatisfied_conditions: # TODO: maybe put this check into called method // no, would make the return value trilateral
00353                 if effect.matches_condition(worldstate, start_worldstate):
00354                     return True
00355         return False
00356 
00357     def apply_preconditions(self, worldstate, start_worldstate):
00358         """
00359         worldstate: the worldstate to apply this action's preconditions to
00360         start_worldstate: needed to let actions optimize their variable precondition parameters
00361         """
00362         # TODO: make required derivation of variable actions more obvious and fail-safe
00363         for precondition in self._preconditions:
00364             precondition.apply(worldstate)
00365 
00366         # let the action generate preconditions for its variable effects
00367         var_effects = [effect for effect in self._effects if isinstance(effect, VariableEffect)]
00368         if len(var_effects) > 0:
00369             g = self._generate_variable_preconditions(var_effects, worldstate, start_worldstate)
00370             for precondition in g:
00371                 precondition.apply(worldstate)
00372 
00373     def _generate_variable_preconditions(self, var_effects, worldstate, start_worldstate):
00374         """
00375         Let the action itself generate variable preconditions for its variable effects.
00376 
00377         Must be implemented if the action contains variable effects.
00378         """
00379         # TODO: maybe implement a default behaviour, at least for variable effects that can reach any value
00380         for effect in self._effects:
00381             if isinstance(effect, VariableEffect):
00382                 raise NotImplementedError
00383 
00384 


rgoap
Author(s): Felix Kolbe
autogenerated on Sun Oct 5 2014 23:53:02