pddl.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import roslib; roslib.load_manifest('pddl_planner')
00003 import rospy
00004 import os
00005 import re
00006 import commands
00007 import tempfile
00008 import actionlib
00009 
00010 import pddl_msgs
00011 from pddl_msgs.msg import *
00012 
00013 class PDDLPlannerActionServer(object):
00014     _result = PDDLPlannerResult()
00015     _feedback = PDDLPlannerFeedback()
00016     def __init__(self, name):
00017         self._action_name = name
00018         self._as = actionlib.SimpleActionServer(self._action_name,
00019                                                 PDDLPlannerAction,
00020                                                 self.execute_cb)
00021         # resolve rosparam
00022         self._planner_path = rospy.get_param('~pddl_planner_path')
00023         if rospy.has_param('~pddl_search_option'):
00024             self._search_option = rospy.get_param('~pddl_search_option')
00025 
00026     def execute_cb(self, goal):
00027         problem = goal.problem
00028         domain = goal.domain
00029         rospy.loginfo("take a message")
00030         (problem_path, domain_path) = self.gen_tmp_pddl_file(problem, domain)
00031         rospy.loginfo("problem_path => %s" % problem_path)
00032         rospy.loginfo("domain_path => %s" % domain_path)
00033         result = self.call_pddl_planner(problem_path, domain_path)
00034         if result:
00035             self._result.sequence = [PDDLStep(action = x.split(' ')[0],
00036                                               args = x.split(' ')[1:])
00037                                      for x in result]
00038             self._as.set_succeeded(self._result)
00039         else:
00040             self._as.set_aborted()
00041 
00042     def parse_pddl_result(self, output):
00043         rospy.loginfo(output)
00044         # dirty implementation
00045         step_before_after = output.split("step")
00046         if len(step_before_after) == 2:
00047             results = [re.sub("\s*$", "", re.sub(r'^\s*\d+:\s*', "" , x))
00048                        for x in step_before_after[1].split("time spent")[0].split("\n")]
00049             rospy.loginfo("result => %s" % results)
00050             return filter(lambda x: x != "", results)
00051         else:
00052             return False
00053     def parse_pddl_result_ffha(self, output):
00054         rospy.loginfo(output)
00055         # dirty implementation
00056         step_before_after = output.split("step")
00057         if len(step_before_after) == 2:
00058             results = [y.group(0)
00059                        for y in [re.search("\([^\)]+\)", x)
00060                                  for x in step_before_after[1].split("Total cost")[0].split("\n")[1:]]
00061                        if y != None]
00062             rospy.loginfo("result => %s" % results)
00063             return results
00064         else:
00065             return False
00066 
00067     def parse_pddl_result_downward(self, path_name):
00068         plan_path = path_name
00069         i = 1
00070         while os.path.exists(path_name + "." + str(i)):
00071             plan_path = path_name + "." + str(i)
00072             i += 1
00073         rospy.loginfo("plan_path => %s" % plan_path)
00074 
00075         with open(plan_path) as f:
00076             plan = f.read().split("\n")
00077 
00078         plan.remove("")
00079         results = [re.sub(" \)$", ")", x)
00080                    for x in plan]
00081         rospy.loginfo(results)
00082 
00083         return results
00084 
00085     def call_pddl_planner(self, problem, domain):
00086 
00087         if re.search("/bin/ff", self._planner_path):
00088             # -f problem -o domain
00089             output = commands.getoutput("%s -f %s -o %s" % (self._planner_path,
00090                                                             problem,
00091                                                             domain))
00092             # ffha
00093             if re.search("/ffha", self._planner_path):
00094                 if re.search("final domain representation is:", output):
00095                     tmp = output.split("metric:")
00096                     if len(tmp) > 1:
00097                         output = tmp[1];
00098                     self._result.data = tmp;
00099                 return self.parse_pddl_result_ffha(output)
00100             # ff
00101             return self.parse_pddl_result(output)
00102 
00103         if re.search("downward", self._planner_path):
00104             (fd, path_name) = tempfile.mkstemp(text=True, prefix='plan_')
00105             (status, output) = commands.getstatusoutput("%s %s %s %s --plan-file %s" % (self._planner_path,
00106                                                                                         domain,
00107                                                                                         problem,
00108                                                                                         self._search_option,
00109                                                                                         path_name))
00110             rospy.loginfo(output)
00111             if status == 0:
00112                 self._result.data = output
00113                 return self.parse_pddl_result_downward(path_name)
00114             else:
00115                 return False
00116 
00117     def gen_tmp_pddl_file(self, problem, domain):
00118         problem_file = self.gen_tmp_problem_pddl_file(problem)
00119         domain_file = self.gen_tmp_domain_pddl_file(domain)
00120         return (problem_file, domain_file)
00121     def gen_problem_objects_strings(self, objects):
00122         # objects = list of PDDLObject
00123         # PDDLObject has name and type
00124         # collect PDDLObjects which has the same type
00125         grouped_objects = {}
00126         # grouped_objects := (type_and_objects ...)
00127         # type_and_objects := (type_name object_a object_b ...)
00128         # destructively change grouped_objects
00129         for o in objects:
00130             object_name = o.name
00131             # find same object_type in grouped_objects
00132             if grouped_objects.has_key(o.type):
00133                 grouped_objects[o.type].append(o.name)
00134             else:
00135                 grouped_objects[o.type] = [o.name]
00136         return [" ".join(grouped_objects[t]) + " - " + t
00137                 for t in grouped_objects.keys()]
00138     
00139     def gen_tmp_problem_pddl_file(self, problem):
00140         (fd, path_name) = tempfile.mkstemp(text=True, prefix='problem_')
00141         path = os.fdopen(fd, 'w')
00142         path.write("""(define (problem %s)
00143 (:domain %s)
00144 (:objects %s)
00145 (:init %s)
00146 (:goal %s)
00147 """ % (problem.name, problem.domain,
00148        "\n".join(self.gen_problem_objects_strings(problem.objects)),
00149        ' '.join(problem.initial), problem.goal))
00150         if problem.metric:
00151             path.write("""(:metric %s)""" % problem.metric)
00152         path.write(""")""")
00153         return path_name
00154     def gen_tmp_domain_pddl_file(self, domain):
00155         (fd, path_name) = tempfile.mkstemp(text=True, prefix='domain_')
00156         path = os.fdopen(fd, 'w')
00157         path.write("(define (domain %s)\n" % domain.name)
00158         path.write("(:requirements %s)\n" % domain.requirements)
00159         path.write("(:types \n")
00160         for i in domain.types:
00161             path.write(i + " ")
00162         path.write(")\n")
00163         if len(domain.constants) > 0:
00164             path.write("(:constants \n")
00165             for i in domain.constants:
00166                 path.write(i + " ")
00167             path.write(")\n")
00168         path.write("(:predicates\n")
00169         for i in domain.predicates:
00170             path.write(i + " ")
00171         path.write(")\n")
00172         if domain.functions:
00173             path.write("(:functions\n")
00174             for i in domain.functions:
00175                 path.write(i + " ")
00176             path.write(")\n")
00177         for action in domain.actions:
00178             path.write("(:action %s\n" % action.name)
00179             path.write(":parameters %s\n" % action.parameters)
00180             path.write(":precondition %s\n" % action.precondition)
00181             path.write(":effect %s\n" % action.effect)
00182             path.write(")\n")               # (:action
00183         path.write(")\n")               # (define
00184         return path_name
00185     
00186 
00187 if __name__ == '__main__':
00188     rospy.init_node('pddl_planner')
00189     PDDLPlannerActionServer(rospy.get_name())
00190     rospy.spin()


pddl_planner
Author(s): Ryohei Ueda (ueda@jsk.t.u-tokyo.ac.jp)
autogenerated on Mon Oct 6 2014 01:09:53