00001 import conditions
00002 import tasks
00003 import f_expression
00004 import pddl_types
00005
00006 def cartesian_product(*sequences):
00007
00008
00009 if not sequences:
00010 yield ()
00011 else:
00012 for tup in cartesian_product(*sequences[1:]):
00013 for item in sequences[0]:
00014 yield (item,) + tup
00015
00016 def parse_effects(alist, result):
00017 tmp_effect = parse_effect(alist)
00018 normalized = tmp_effect.normalize()
00019 if normalized.__class__.__name__ == "TmpEffect":
00020 for effect in normalized.effects:
00021 add_effect(effect,result)
00022 else:
00023 add_effect(normalized,result)
00024
00025 def parse_durative_effects(alist, result):
00026 tmp_effect = parse_effect(alist,True)
00027 normalized = tmp_effect.normalize()
00028 if normalized.__class__.__name__ == "TmpEffect":
00029 for effect in normalized.effects:
00030 add_effect(effect,result,True)
00031 else:
00032 add_effect(normalized,result,True)
00033
00034
00035
00036 def add_effect(tmp_effect, results, durative = False):
00037 time = None
00038 if durative:
00039 time = tmp_effect.time
00040 parameters = []
00041 if isinstance(tmp_effect, UniversalEffect):
00042 parameters = tmp_effect.parameters
00043 tmp_effect = tmp_effect.effects[0]
00044 if durative:
00045 condition = [conditions.Truth(),conditions.Truth(),conditions.Truth()]
00046 else:
00047 condition = conditions.Truth()
00048 if isinstance(tmp_effect, ConditionalEffect):
00049 condition = tmp_effect.condition
00050 tmp_effect = tmp_effect.effects[0]
00051 if isinstance(tmp_effect, ConjunctiveEffect):
00052 assert len(tmp_effect.effects) == 1
00053 tmp_effect = tmp_effect.effects[0]
00054 assert not isinstance(tmp_effect,TmpEffect)
00055 if time=="start":
00056 results[0].append(Effect(parameters,condition,tmp_effect))
00057 elif time=="end":
00058 results[1].append(Effect(parameters,condition,tmp_effect))
00059 else:
00060 assert not durative
00061 results.append(Effect(parameters,condition,tmp_effect))
00062
00063 def parse_effect(alist,durative=False):
00064 tag = alist[0]
00065 if tag == "and":
00066 return TmpEffect([parse_effect(eff,durative) for eff in alist[1:]])
00067 elif tag == "forall":
00068 assert len(alist)==3
00069 return UniversalEffect(pddl_types.parse_typed_list(alist[1]),
00070 parse_effect(alist[2],durative))
00071 elif tag == "when":
00072 assert len(alist)==3
00073 if durative:
00074 condition = conditions.parse_durative_condition(alist[1])
00075 effect = parse_timed_effect(alist[2])
00076 else:
00077 condition = conditions.parse_condition(alist[1])
00078 effect = parse_cond_effect(alist[2])
00079 return ConditionalEffect(condition,effect)
00080 elif tag == "at" and durative:
00081 return parse_timed_effect(alist)
00082 elif tag == "change":
00083 assert durative
00084 new_alist = ["and", ["at", "start", ["assign", alist[1], "undefined"]],
00085 ["at", "end", ["assign", alist[1], alist[2]]]]
00086 return parse_effect(new_alist,durative)
00087 else:
00088 return parse_cond_effect(alist)
00089
00090 def parse_timed_effect(alist):
00091 assert len(alist)==3
00092 assert alist[0] == "at"
00093 time = alist[1]
00094 assert time in ("start","end")
00095 conjunctiveEffect = parse_cond_effect(alist[2],True)
00096 conjunctiveEffect.time = time
00097 return conjunctiveEffect
00098
00099 def parse_cond_effect(alist, durative=False):
00100 tag = alist[0]
00101 if tag == "and":
00102 return ConjunctiveEffect([parse_cond_effect(eff,durative) for eff in alist[1:]])
00103 elif tag in ("scale-up", "scale-down", "increase", "decrease"):
00104 return ConjunctiveEffect([f_expression.parse_assignment(alist,durative)])
00105 elif tag == "assign":
00106 symbol = alist[1]
00107 if isinstance(symbol,list):
00108 symbol = symbol[0]
00109 if tasks.Task.FUNCTION_SYMBOLS.get(symbol,"object")=="number":
00110 return ConjunctiveEffect([f_expression.parse_assignment(alist,durative)])
00111 else:
00112 return ConjunctiveEffect([ObjectFunctionAssignment(conditions.parse_term(alist[1]),conditions.parse_term(alist[2]))])
00113 else:
00114 return ConjunctiveEffect([conditions.parse_condition(alist)])
00115
00116 class Effect(object):
00117 def __init__(self, parameters, condition, peffect):
00118 self.parameters = parameters
00119 self.condition = condition
00120 self.peffect = peffect
00121 def __eq__(self, other):
00122 return (self.__class__ is other.__class__ and
00123 self.parameters == other.parameters and
00124 self.condition == other.condition and
00125 self.peffect == other.peffect)
00126 def dump(self):
00127 indent = " "
00128 if self.parameters:
00129 print "%sforall %s" % (indent, ", ".join(map(str, self.parameters)))
00130 indent += " "
00131 if ((isinstance(self.condition,list) and
00132 self.condition != [conditions.Truth(),conditions.Truth(),conditions.Truth()])
00133 or (not isinstance(self.condition,list) and self.condition != conditions.Truth())):
00134 print "%sif" % indent
00135 if isinstance(self.condition,list):
00136 conditions.dump_temporal_condition(self.condition,indent + " ")
00137 else:
00138 self.condition.dump(indent + " ")
00139 print "%sthen" % indent
00140 indent += " "
00141 self.peffect.dump(indent)
00142 def copy(self):
00143 return Effect(self.parameters, self.condition, self.peffect)
00144 def uniquify_variables(self, type_map):
00145 renamings = {}
00146 self.parameters = [par.uniquify_name(type_map, renamings)
00147 for par in self.parameters]
00148 if isinstance(self.condition,list):
00149 self.condition = [cond.uniquify_variables(type_map, renamings) for
00150 cond in self.condition]
00151 else:
00152 self.condition = self.condition.uniquify_variables(type_map, renamings)
00153 self.peffect = self.peffect.rename_variables(renamings)
00154 def instantiate(self, var_mapping, init_facts, fluent_facts, init_function_vals,
00155 fluent_functions, task, new_axiom, new_modules, objects_by_type, result):
00156 if self.parameters:
00157 var_mapping = var_mapping.copy()
00158 object_lists = [objects_by_type.get(par.type, [])
00159 for par in self.parameters]
00160 for object_tuple in cartesian_product(*object_lists):
00161 for (par, obj) in zip(self.parameters, object_tuple):
00162 var_mapping[conditions.Variable(par.name)] = conditions.ObjectTerm(obj)
00163 self._instantiate(var_mapping, init_facts, fluent_facts, init_function_vals,
00164 fluent_functions, task, new_axiom, new_modules, result)
00165 else:
00166 self._instantiate(var_mapping, init_facts, fluent_facts, init_function_vals,
00167 fluent_functions, task, new_axiom, new_modules, result)
00168 def _instantiate(self, var_mapping, init_facts, fluent_facts, init_function_vals,
00169 fluent_functions, task, new_axiom, new_modules, result):
00170 condition = []
00171 if isinstance(self.condition,list):
00172 condition = [[],[],[]]
00173 for time,cond in enumerate(self.condition):
00174 try:
00175 cond.instantiate(var_mapping, init_facts, fluent_facts, init_function_vals,
00176 fluent_functions, task, new_axiom, new_modules, condition[time])
00177 except conditions.Impossible:
00178 return
00179 else:
00180 try:
00181 self.condition.instantiate(var_mapping, init_facts, fluent_facts, init_function_vals,
00182 fluent_functions, task, new_axiom, new_modules, condition)
00183 except conditions.Impossible:
00184 return
00185 effects = []
00186 self.peffect.instantiate(var_mapping, init_facts, fluent_facts,
00187 init_function_vals, fluent_functions, task,
00188 new_axiom, new_modules, effects)
00189 assert len(effects) <= 1
00190 if effects:
00191 result.append((condition, effects[0]))
00192
00193
00194
00195
00196
00197
00198
00199
00200 class TmpEffect(object):
00201 def __init__(self,effects,time=None):
00202 flattened_effects = []
00203 for effect in effects:
00204 if effect.__class__.__name__ == "TmpEffect":
00205 flattened_effects += effect.effects
00206 else:
00207 flattened_effects.append(effect)
00208 self.effects = flattened_effects
00209 self.time = time
00210 def dump(self, indent=" "):
00211 if self.time:
00212 print "%sat %s:" %(indent,self.time)
00213 indent += " "
00214 print "%sand" % (indent)
00215 for eff in self.effects:
00216 eff.dump(indent + " ")
00217 def _dump(self):
00218 return self.__class__.__name__
00219 def normalize(self):
00220 return TmpEffect([effect.normalize() for effect in self.effects])
00221
00222 class ConditionalEffect(TmpEffect):
00223 def __init__(self,condition,effect,time = None):
00224 if isinstance(effect,ConditionalEffect):
00225 assert len(condition) == len(effect.condition)
00226 if len(condition) == 1:
00227 self.condition = conditions.Conjunction([condition,effect.condition])
00228 else:
00229 new_condition = []
00230 for index,cond in enumerate(condition):
00231 new_condition.append(conditions.Conjunction([cond,effect.condition[index]]))
00232 self.condition = new_condition
00233 self.effects = effect.effects
00234 else:
00235 self.condition = condition
00236 self.effects = [effect]
00237 self.time = time
00238 assert len(self.effects) == 1
00239 def dump(self, indent=" "):
00240 if self.time:
00241 print "%sat %s:" %(indent,self.time)
00242 indent += " "
00243 print "%sif" % (indent)
00244 if isinstance(self.condition,list):
00245 conditions.dump_temporal_condition(self.condition,indent + " ")
00246 else:
00247 self.condition.dump(indent + " ")
00248 print "%sthen" % (indent)
00249 self.effects[0].dump(indent + " ")
00250 def normalize(self):
00251 normalized = self.effects[0].normalize()
00252 if normalized.__class__.__name__ == "TmpEffect":
00253 effects = normalized.effects
00254 else:
00255 effects = [normalized]
00256 new_effects = []
00257 for effect in effects:
00258 assert effect.__class__.__name__ != "TmpEffect"
00259 if isinstance(effect,ConjunctiveEffect) or isinstance(effect,ConditionalEffect):
00260 new_effects.append(ConditionalEffect(self.condition,effect,effect.time))
00261 elif isinstance(effect,UniversalEffect):
00262 eff = ConditionalEffect(self.condition,effect.effects[0])
00263 new_effects.append(UniversalEffect(effect.parameters,eff,effect.time))
00264 if len(new_effects) == 1:
00265 return new_effects[0]
00266 else:
00267 return TmpEffect(new_effects)
00268
00269 class UniversalEffect(TmpEffect):
00270 def __init__(self,parameters,effect,time = None):
00271 if isinstance(effect,UniversalEffect):
00272 self.parameters = parameters + effect.parameters
00273 self.effects = effect.effects
00274 else:
00275 self.parameters = parameters
00276 self.effects = [effect]
00277 self.time = time
00278 assert len(self.effects) == 1
00279 def dump(self, indent=" "):
00280 if self.time:
00281 print "%sat %s:" %(indent,self.time)
00282 indent += " "
00283 print "%sforall %s" % (indent, ", ".join(map(str, self.parameters)))
00284 self.effects[0].dump(indent + " ")
00285 def normalize(self):
00286 effect = self.effects[0].normalize()
00287 if effect.__class__.__name__ == "TmpEffect":
00288 return TmpEffect([UniversalEffect(self.parameters,eff,eff.time)
00289 for eff in effect.effects])
00290 return UniversalEffect(self.parameters,effect,effect.time)
00291
00292 class ConjunctiveEffect(TmpEffect):
00293
00294 def __init__(self,effects,time=None):
00295 flattened_effects = []
00296 for effect in effects:
00297 if effect.__class__.__name__ == "ConjunctiveEffect":
00298 flattened_effects += effect.effects
00299 else:
00300 flattened_effects.append(effect)
00301 self.effects = flattened_effects
00302 self.time = time
00303 def normalize(self):
00304 effects = []
00305 for eff in self.effects:
00306 if isinstance(eff,ObjectFunctionAssignment):
00307 results = []
00308 eff.normalize(self.time,results)
00309 effects += results
00310 else:
00311 assert (isinstance(eff,f_expression.FunctionAssignment)
00312 or isinstance(eff,conditions.Literal)
00313 or isinstance(eff, conditions.ModuleCall)
00314
00315 )
00316 used_variables = [eff.free_variables()]
00317 typed_vars = []
00318 conjunction_parts = []
00319 new_args = []
00320 if isinstance(eff,f_expression.FunctionAssignment):
00321 args = [eff.fluent, eff.expression]
00322 else:
00323 args = eff.args
00324 for arg in args:
00325 typed,parts,new_arg = arg.compile_objectfunctions_aux(used_variables)
00326 typed_vars += typed
00327 conjunction_parts += parts
00328 new_args.append(new_arg)
00329 if len(typed_vars) == 0:
00330 effects.append(ConjunctiveEffect([eff],self.time))
00331 else:
00332 if isinstance(eff,f_expression.FunctionAssignment):
00333 new_eff = eff.__class__(*new_args)
00334 else:
00335 new_eff = eff.__class__(eff.predicate,new_args)
00336 conjunction = conditions.Conjunction(conjunction_parts)
00337 if self.time == "start":
00338 condition = [conjunction,conditions.Truth(),conditions.Truth()]
00339 elif self.time == "end":
00340 condition = [conditions.Truth(),conditions.Truth(),conjunction]
00341 else:
00342 condition = conjunction
00343 cond_eff = ConditionalEffect(condition,new_eff)
00344 effects.append(UniversalEffect(typed_vars,cond_eff,self.time))
00345 return TmpEffect(effects)
00346
00347 class ObjectFunctionAssignment(object):
00348 def __init__(self,head,value):
00349 self.head = head
00350 self.value = value
00351 def dump(self, indent=" "):
00352 print "%sassign" % (indent)
00353 self.head.dump(indent + " ")
00354 self.value.dump(indent + " ")
00355 def rename_variables(self, renamings):
00356 return self.__class__(self.head.rename_variables(renamings),
00357 self.value.rename_variables(renamings))
00358 def normalize(self,time,results):
00359 used_variables = list(self.head.free_variables() | self.value.free_variables())
00360 typed1, conjunction_parts1, term1 = self.head.compile_objectfunctions_aux(used_variables)
00361 typed2, conjunction_parts2, term2 = self.value.compile_objectfunctions_aux(used_variables)
00362 assert isinstance(term1,conditions.Variable)
00363
00364 add_params = set([typed for typed in typed1 if not typed.name==term1.name] + typed2)
00365 del_params = set(typed1 + typed2)
00366
00367 add_conjunction_parts = conjunction_parts1[1:] + conjunction_parts2
00368 del_conjunction_parts = conjunction_parts1 + conjunction_parts2
00369 del_conjunction_parts = conjunction_parts1[1:] + conjunction_parts2
00370
00371
00372
00373
00374
00375
00376 del_param = conjunction_parts1[0].args[-1]
00377 del_conjunction_parts.append(conditions.NegatedAtom("=",[del_param,term2]))
00378
00379 del_effect = ConjunctiveEffect([conjunction_parts1[0].negate(),])
00380 atom_name = conjunction_parts1[0].predicate
00381 atom_parts = list(conjunction_parts1[0].args)
00382 atom_parts[-1] = term2
00383 add_effect = ConjunctiveEffect([conditions.Atom(atom_name,atom_parts),],time)
00384
00385 add_conjunction = conditions.Conjunction(add_conjunction_parts)
00386 del_conjunction = conditions.Conjunction(del_conjunction_parts)
00387 if time == "start":
00388 del_condition = [del_conjunction,conditions.Truth(),conditions.Truth()]
00389 add_condition = [add_conjunction,conditions.Truth(),conditions.Truth()]
00390 elif time == "end":
00391 add_condition = [conditions.Truth(),conditions.Truth(),add_conjunction]
00392 del_condition = [conditions.Truth(),conditions.Truth(),del_conjunction]
00393 else:
00394 add_condition = add_conjunction
00395 del_condition = del_conjunction
00396 if len(add_conjunction_parts)>0:
00397 add_effect = ConditionalEffect(add_condition,add_effect,time)
00398 del_effect = ConditionalEffect(del_condition,del_effect)
00399 if len(add_params)>0:
00400 add_effect = UniversalEffect(add_params,add_effect,time)
00401 del_effect = UniversalEffect(del_params,del_effect,time)
00402 results.append(add_effect)
00403 results.append(del_effect)
00404
00405
00406
00407 if term2.name != "undefined":
00408 del_undef_params = set([typed for typed in del_params
00409 if not typed.name==del_param.name])
00410 atom_parts = list(conjunction_parts1[0].args)
00411 atom_parts[-1] = conditions.ObjectTerm("undefined")
00412 del_undef_effect = ConjunctiveEffect([conditions.NegatedAtom(atom_name,atom_parts),],time)
00413 del_undef_conjunction_parts = del_conjunction_parts[:-1]
00414 del_undef_conjunction = conditions.Conjunction(del_undef_conjunction_parts)
00415 if time == "start":
00416 del_undef_condition = [del_undef_conjunction,conditions.Truth(),conditions.Truth()]
00417 elif time == "end":
00418 del_undef_condition = [conditions.Truth(),conditions.Truth(),del_undef_conjunction]
00419 else:
00420 del_undef_condition = del_undef_conjunction
00421 if len(del_undef_conjunction_parts)>0:
00422 del_undef_effect = ConditionalEffect(del_undef_condition,del_undef_effect,time)
00423 if len(del_undef_params)>0:
00424 del_undef_effect = UniversalEffect(del_undef_params,del_undef_effect,time)
00425 results.append(del_undef_effect)
00426
00427