effects.py
Go to the documentation of this file.
00001 import conditions
00002 import tasks
00003 import f_expression
00004 import pddl_types
00005 
00006 def cartesian_product(*sequences):
00007   # TODO: Also exists in tools.py outside the pddl package (defined slightly
00008   #       differently). Not good. Need proper import paths.
00009   if not sequences:
00010     yield ()
00011   else:
00012     for tup in cartesian_product(*sequences[1:]):
00013       for item in sequences[0]:
00014         yield (item,) + tup
00015 
00016 def parse_effects(alist, result):
00017     tmp_effect = parse_effect(alist)
00018     normalized = tmp_effect.normalize()
00019     if normalized.__class__.__name__ == "TmpEffect":
00020         for effect in normalized.effects:
00021             add_effect(effect,result)
00022     else:
00023         add_effect(normalized,result)
00024 
00025 def parse_durative_effects(alist, result):
00026     tmp_effect = parse_effect(alist,True)
00027     normalized = tmp_effect.normalize()
00028     if normalized.__class__.__name__ == "TmpEffect":
00029         for effect in normalized.effects:
00030             add_effect(effect,result,True)
00031     else:
00032         add_effect(normalized,result,True)
00033 
00034 # tmp_effect must be a normalized tmp_effect with only one effect
00035 # or a primitive effect
00036 def add_effect(tmp_effect, results, durative = False):
00037     time = None
00038     if durative:
00039         time = tmp_effect.time
00040     parameters = []
00041     if isinstance(tmp_effect, UniversalEffect):
00042         parameters = tmp_effect.parameters
00043         tmp_effect = tmp_effect.effects[0]
00044     if durative:
00045         condition = [conditions.Truth(),conditions.Truth(),conditions.Truth()]
00046     else: 
00047         condition = conditions.Truth()
00048     if isinstance(tmp_effect, ConditionalEffect):
00049         condition = tmp_effect.condition
00050         tmp_effect = tmp_effect.effects[0]
00051     if isinstance(tmp_effect, ConjunctiveEffect):
00052         assert len(tmp_effect.effects) == 1
00053         tmp_effect = tmp_effect.effects[0]
00054     assert not isinstance(tmp_effect,TmpEffect)
00055     if time=="start":
00056         results[0].append(Effect(parameters,condition,tmp_effect))
00057     elif time=="end":
00058         results[1].append(Effect(parameters,condition,tmp_effect))
00059     else:
00060         assert not durative
00061         results.append(Effect(parameters,condition,tmp_effect))
00062 
00063 def parse_effect(alist,durative=False):
00064     tag = alist[0]
00065     if tag == "and":
00066         return TmpEffect([parse_effect(eff,durative) for eff in alist[1:]])
00067     elif tag == "forall":
00068         assert len(alist)==3
00069         return UniversalEffect(pddl_types.parse_typed_list(alist[1]),
00070                                parse_effect(alist[2],durative))
00071     elif tag == "when":
00072         assert len(alist)==3
00073         if durative:
00074             condition = conditions.parse_durative_condition(alist[1])
00075             effect = parse_timed_effect(alist[2])
00076         else:
00077             condition = conditions.parse_condition(alist[1])
00078             effect = parse_cond_effect(alist[2])
00079         return ConditionalEffect(condition,effect)
00080     elif tag == "at" and durative:
00081         return parse_timed_effect(alist)
00082     elif tag == "change":
00083         assert durative
00084         new_alist = ["and", ["at", "start", ["assign", alist[1], "undefined"]],
00085                            ["at", "end", ["assign", alist[1], alist[2]]]]
00086         return parse_effect(new_alist,durative)
00087     else:
00088         return parse_cond_effect(alist)
00089 
00090 def parse_timed_effect(alist):
00091     assert len(alist)==3
00092     assert alist[0] == "at"
00093     time = alist[1]
00094     assert time in ("start","end")
00095     conjunctiveEffect = parse_cond_effect(alist[2],True)
00096     conjunctiveEffect.time = time
00097     return conjunctiveEffect
00098 
00099 def parse_cond_effect(alist, durative=False):
00100     tag = alist[0]
00101     if tag == "and":
00102         return ConjunctiveEffect([parse_cond_effect(eff,durative) for eff in alist[1:]])
00103     elif tag in ("scale-up", "scale-down", "increase", "decrease"):
00104         return ConjunctiveEffect([f_expression.parse_assignment(alist,durative)])
00105     elif tag == "assign":
00106         symbol = alist[1]
00107         if isinstance(symbol,list):
00108             symbol = symbol[0]
00109         if tasks.Task.FUNCTION_SYMBOLS.get(symbol,"object")=="number":
00110             return ConjunctiveEffect([f_expression.parse_assignment(alist,durative)])
00111         else:
00112             return ConjunctiveEffect([ObjectFunctionAssignment(conditions.parse_term(alist[1]),conditions.parse_term(alist[2]))])
00113     else:
00114         return ConjunctiveEffect([conditions.parse_condition(alist)])
00115     
00116 class Effect(object):
00117   def __init__(self, parameters, condition, peffect):
00118     self.parameters = parameters
00119     self.condition = condition
00120     self.peffect = peffect # literal or function assignment
00121   def __eq__(self, other):
00122     return (self.__class__ is other.__class__ and
00123             self.parameters == other.parameters and
00124             self.condition == other.condition and
00125             self.peffect == other.peffect)
00126   def dump(self):
00127     indent = "  "
00128     if self.parameters:
00129       print "%sforall %s" % (indent, ", ".join(map(str, self.parameters)))
00130       indent += "  "
00131     if ((isinstance(self.condition,list) and 
00132         self.condition != [conditions.Truth(),conditions.Truth(),conditions.Truth()])
00133        or (not isinstance(self.condition,list) and self.condition != conditions.Truth())):
00134       print "%sif" % indent
00135       if isinstance(self.condition,list):
00136         conditions.dump_temporal_condition(self.condition,indent + "  ")
00137       else:
00138         self.condition.dump(indent + "  ")
00139       print "%sthen" % indent
00140       indent += "  "
00141     self.peffect.dump(indent)
00142   def copy(self):
00143     return Effect(self.parameters, self.condition, self.peffect)
00144   def uniquify_variables(self, type_map):
00145     renamings = {}
00146     self.parameters = [par.uniquify_name(type_map, renamings)
00147                        for par in self.parameters]
00148     if isinstance(self.condition,list):
00149         self.condition = [cond.uniquify_variables(type_map, renamings) for 
00150                             cond in self.condition]
00151     else:
00152         self.condition = self.condition.uniquify_variables(type_map, renamings)
00153     self.peffect = self.peffect.rename_variables(renamings)
00154   def instantiate(self, var_mapping, init_facts, fluent_facts, init_function_vals,
00155                   fluent_functions, task, new_axiom, new_modules, objects_by_type, result):
00156     if self.parameters:
00157       var_mapping = var_mapping.copy() # Will modify this.
00158       object_lists = [objects_by_type.get(par.type, [])
00159                       for par in self.parameters]
00160       for object_tuple in cartesian_product(*object_lists):
00161         for (par, obj) in zip(self.parameters, object_tuple):
00162           var_mapping[conditions.Variable(par.name)] = conditions.ObjectTerm(obj)
00163         self._instantiate(var_mapping, init_facts, fluent_facts, init_function_vals,
00164                           fluent_functions, task, new_axiom, new_modules, result)
00165     else:
00166       self._instantiate(var_mapping, init_facts, fluent_facts, init_function_vals,
00167                           fluent_functions, task, new_axiom, new_modules, result)
00168   def _instantiate(self, var_mapping, init_facts, fluent_facts, init_function_vals,
00169                   fluent_functions, task, new_axiom, new_modules, result):
00170     condition = []
00171     if isinstance(self.condition,list):
00172         condition = [[],[],[]]
00173         for time,cond in enumerate(self.condition):
00174             try:
00175                 cond.instantiate(var_mapping, init_facts, fluent_facts, init_function_vals,
00176                                  fluent_functions, task, new_axiom, new_modules, condition[time])
00177             except conditions.Impossible:
00178                 return
00179     else:
00180         try:
00181           self.condition.instantiate(var_mapping, init_facts, fluent_facts, init_function_vals,
00182                                      fluent_functions, task, new_axiom, new_modules, condition)
00183         except conditions.Impossible:
00184           return
00185     effects = []
00186     self.peffect.instantiate(var_mapping, init_facts, fluent_facts, 
00187                              init_function_vals, fluent_functions, task,
00188                              new_axiom, new_modules, effects)
00189     assert len(effects) <= 1
00190     if effects:
00191       result.append((condition, effects[0]))
00192 #  def relaxed(self):
00193 #    if self.peffect.negated:
00194 #      return None
00195 #    else:
00196 #      return Effect(self.parameters, self.condition.relaxed(), self.peffect)
00197 #  def simplified(self):
00198 #    return Effect(self.parameters, self.condition.simplified(), self.peffect)
00199 
00200 class TmpEffect(object):
00201     def __init__(self,effects,time=None):
00202         flattened_effects = []
00203         for effect in effects:
00204             if effect.__class__.__name__ == "TmpEffect":
00205                 flattened_effects += effect.effects
00206             else:
00207                 flattened_effects.append(effect)
00208         self.effects = flattened_effects
00209         self.time = time
00210     def dump(self, indent="  "):
00211         if self.time:
00212             print "%sat %s:" %(indent,self.time)
00213             indent += "  "
00214         print "%sand" % (indent)
00215         for eff in self.effects:
00216             eff.dump(indent + "  ")
00217     def _dump(self):
00218         return self.__class__.__name__
00219     def normalize(self):
00220         return TmpEffect([effect.normalize() for effect in self.effects])
00221 
00222 class ConditionalEffect(TmpEffect):
00223     def __init__(self,condition,effect,time = None):
00224         if isinstance(effect,ConditionalEffect):
00225             assert len(condition) == len(effect.condition)
00226             if len(condition) == 1:
00227                 self.condition = conditions.Conjunction([condition,effect.condition])
00228             else:
00229                 new_condition = []
00230                 for index,cond in enumerate(condition):
00231                     new_condition.append(conditions.Conjunction([cond,effect.condition[index]]))
00232                 self.condition = new_condition
00233             self.effects = effect.effects
00234         else:
00235             self.condition = condition
00236             self.effects = [effect]
00237         self.time = time
00238         assert len(self.effects) == 1
00239     def dump(self, indent="  "):
00240         if self.time:
00241             print "%sat %s:" %(indent,self.time)
00242             indent += "  "
00243         print "%sif" % (indent)
00244         if isinstance(self.condition,list):
00245             conditions.dump_temporal_condition(self.condition,indent + "  ")
00246         else:
00247             self.condition.dump(indent + "  ")
00248         print "%sthen" % (indent)
00249         self.effects[0].dump(indent + "  ")
00250     def normalize(self):
00251         normalized = self.effects[0].normalize()
00252         if normalized.__class__.__name__ == "TmpEffect":
00253             effects = normalized.effects
00254         else:
00255             effects = [normalized]
00256         new_effects = []
00257         for effect in effects:
00258             assert effect.__class__.__name__ != "TmpEffect"
00259             if isinstance(effect,ConjunctiveEffect) or isinstance(effect,ConditionalEffect):
00260                 new_effects.append(ConditionalEffect(self.condition,effect,effect.time))
00261             elif isinstance(effect,UniversalEffect):
00262                 eff = ConditionalEffect(self.condition,effect.effects[0])
00263                 new_effects.append(UniversalEffect(effect.parameters,eff,effect.time))
00264         if len(new_effects) == 1:
00265             return new_effects[0]
00266         else:
00267             return TmpEffect(new_effects)
00268 
00269 class UniversalEffect(TmpEffect):
00270     def __init__(self,parameters,effect,time = None):
00271         if isinstance(effect,UniversalEffect):
00272             self.parameters = parameters + effect.parameters
00273             self.effects = effect.effects
00274         else:
00275             self.parameters = parameters
00276             self.effects = [effect]
00277         self.time = time
00278         assert len(self.effects) == 1
00279     def dump(self, indent="  "):
00280         if self.time:
00281             print "%sat %s:" %(indent,self.time)
00282             indent += "  "
00283         print "%sforall %s" % (indent, ", ".join(map(str, self.parameters)))
00284         self.effects[0].dump(indent + "  ")
00285     def normalize(self):
00286         effect = self.effects[0].normalize()
00287         if effect.__class__.__name__ == "TmpEffect":
00288             return TmpEffect([UniversalEffect(self.parameters,eff,eff.time) 
00289                                   for eff in effect.effects])
00290         return UniversalEffect(self.parameters,effect,effect.time) 
00291 
00292 class ConjunctiveEffect(TmpEffect):
00293 # effects are Literals and FunctionAssignments
00294     def __init__(self,effects,time=None):
00295         flattened_effects = []
00296         for effect in effects:
00297             if effect.__class__.__name__ == "ConjunctiveEffect":
00298                 flattened_effects += effect.effects
00299             else:
00300                 flattened_effects.append(effect)
00301         self.effects = flattened_effects
00302         self.time = time
00303     def normalize(self):
00304         effects = []
00305         for eff in self.effects:
00306             if isinstance(eff,ObjectFunctionAssignment):
00307                 results = []
00308                 eff.normalize(self.time,results)
00309                 effects += results
00310             else:
00311                 assert (isinstance(eff,f_expression.FunctionAssignment)
00312                         or isinstance(eff,conditions.Literal)
00313                         or isinstance(eff, conditions.ModuleCall)
00314                         # HACK CHECK
00315                         )
00316                 used_variables = [eff.free_variables()]
00317                 typed_vars = []
00318                 conjunction_parts = []
00319                 new_args = []
00320                 if isinstance(eff,f_expression.FunctionAssignment):
00321                     args = [eff.fluent, eff.expression]
00322                 else:
00323                     args = eff.args
00324                 for arg in args:
00325                     typed,parts,new_arg = arg.compile_objectfunctions_aux(used_variables)
00326                     typed_vars += typed
00327                     conjunction_parts += parts
00328                     new_args.append(new_arg)
00329                 if len(typed_vars) == 0:
00330                     effects.append(ConjunctiveEffect([eff],self.time))
00331                 else:
00332                     if isinstance(eff,f_expression.FunctionAssignment):
00333                         new_eff = eff.__class__(*new_args)
00334                     else:
00335                         new_eff = eff.__class__(eff.predicate,new_args)
00336                     conjunction = conditions.Conjunction(conjunction_parts)
00337                     if self.time == "start":
00338                         condition = [conjunction,conditions.Truth(),conditions.Truth()]
00339                     elif self.time == "end":
00340                         condition = [conditions.Truth(),conditions.Truth(),conjunction]
00341                     else:
00342                         condition = conjunction
00343                     cond_eff = ConditionalEffect(condition,new_eff)
00344                     effects.append(UniversalEffect(typed_vars,cond_eff,self.time))
00345         return TmpEffect(effects)
00346 
00347 class ObjectFunctionAssignment(object):
00348     def __init__(self,head,value):
00349         self.head = head    # term
00350         self.value = value  # term
00351     def dump(self, indent="  "):
00352         print "%sassign" % (indent)
00353         self.head.dump(indent + "  ")
00354         self.value.dump(indent + "  ")
00355     def rename_variables(self, renamings):
00356         return self.__class__(self.head.rename_variables(renamings),
00357                               self.value.rename_variables(renamings))
00358     def normalize(self,time,results): 
00359         used_variables = list(self.head.free_variables() | self.value.free_variables())
00360         typed1, conjunction_parts1, term1 = self.head.compile_objectfunctions_aux(used_variables)
00361         typed2, conjunction_parts2, term2 = self.value.compile_objectfunctions_aux(used_variables)
00362         assert isinstance(term1,conditions.Variable)
00363        
00364         add_params = set([typed for typed in typed1 if not typed.name==term1.name] + typed2)
00365         del_params = set(typed1 + typed2)
00366         
00367         add_conjunction_parts = conjunction_parts1[1:] + conjunction_parts2
00368         del_conjunction_parts = conjunction_parts1 + conjunction_parts2
00369         del_conjunction_parts = conjunction_parts1[1:] + conjunction_parts2
00370         # conjunctive_parts1[0] is left out because we do not need the condition 
00371         # that the atom in the del effect hast been true before
00372 
00373         # These conjunction parts are sufficient under the add-after-delete semantics.
00374         # Under the consistent effect semantics we need a further condition
00375         # that prevents deleting the added predicate.
00376         del_param = conjunction_parts1[0].args[-1]
00377         del_conjunction_parts.append(conditions.NegatedAtom("=",[del_param,term2]))
00378 
00379         del_effect = ConjunctiveEffect([conjunction_parts1[0].negate(),])
00380         atom_name = conjunction_parts1[0].predicate
00381         atom_parts = list(conjunction_parts1[0].args)
00382         atom_parts[-1] = term2
00383         add_effect = ConjunctiveEffect([conditions.Atom(atom_name,atom_parts),],time)
00384 
00385         add_conjunction = conditions.Conjunction(add_conjunction_parts)
00386         del_conjunction = conditions.Conjunction(del_conjunction_parts)
00387         if time == "start":
00388             del_condition = [del_conjunction,conditions.Truth(),conditions.Truth()]
00389             add_condition = [add_conjunction,conditions.Truth(),conditions.Truth()]
00390         elif time == "end":
00391             add_condition = [conditions.Truth(),conditions.Truth(),add_conjunction]
00392             del_condition = [conditions.Truth(),conditions.Truth(),del_conjunction]
00393         else:
00394             add_condition = add_conjunction
00395             del_condition = del_conjunction
00396         if len(add_conjunction_parts)>0:
00397             add_effect = ConditionalEffect(add_condition,add_effect,time)
00398         del_effect = ConditionalEffect(del_condition,del_effect)
00399         if len(add_params)>0:
00400             add_effect = UniversalEffect(add_params,add_effect,time)
00401         del_effect = UniversalEffect(del_params,del_effect,time)
00402         results.append(add_effect)
00403         results.append(del_effect)
00404 
00405         # value "undefined" must be treated specially because it has not the type
00406         # required in del_params
00407         if term2.name != "undefined":
00408             del_undef_params = set([typed for typed in del_params 
00409                                             if not typed.name==del_param.name])
00410             atom_parts = list(conjunction_parts1[0].args)
00411             atom_parts[-1] = conditions.ObjectTerm("undefined")
00412             del_undef_effect = ConjunctiveEffect([conditions.NegatedAtom(atom_name,atom_parts),],time)
00413             del_undef_conjunction_parts = del_conjunction_parts[:-1]
00414             del_undef_conjunction = conditions.Conjunction(del_undef_conjunction_parts)
00415             if time == "start":
00416                 del_undef_condition = [del_undef_conjunction,conditions.Truth(),conditions.Truth()]
00417             elif time == "end":
00418                 del_undef_condition = [conditions.Truth(),conditions.Truth(),del_undef_conjunction]
00419             else:
00420                 del_undef_condition = del_undef_conjunction
00421             if len(del_undef_conjunction_parts)>0:
00422                 del_undef_effect = ConditionalEffect(del_undef_condition,del_undef_effect,time)
00423             if len(del_undef_params)>0:
00424                 del_undef_effect = UniversalEffect(del_undef_params,del_undef_effect,time)
00425             results.append(del_undef_effect)
00426 
00427 


tfd_modules
Author(s): Maintained by Christian Dornhege (see AUTHORS file).
autogenerated on Mon Oct 6 2014 07:52:06