pddl.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import rospy
00004 import os
00005 import re
00006 import commands
00007 import tempfile
00008 
00009 try:
00010     from pddl_msgs.msg import *
00011 except:
00012     import roslib; roslib.load_manifest('pddl_planner')
00013     from pddl_msgs.msg import *
00014 import actionlib
00015 
00016 
00017 class PDDLPlannerActionServer(object):
00018     _result = PDDLPlannerResult()
00019     _feedback = PDDLPlannerFeedback()
00020     def __init__(self, name):
00021         self._action_name = name
00022         self._as = actionlib.SimpleActionServer(self._action_name,
00023                                                 PDDLPlannerAction,
00024                                                 self.execute_cb)
00025         # resolve rosparam
00026         self._planner_name = rospy.get_param('~pddl_planner', 'downward')
00027         self._search_option = rospy.get_param('~pddl_search_option', '')
00028 
00029     def execute_cb(self, goal):
00030         problem = goal.problem
00031         domain = goal.domain
00032         rospy.loginfo("take a message")
00033         (problem_path, domain_path) = self.gen_tmp_pddl_file(problem, domain)
00034         rospy.loginfo("problem_path => %s" % problem_path)
00035         rospy.loginfo("domain_path => %s" % domain_path)
00036         result = self.call_pddl_planner(problem_path, domain_path)
00037         if result:
00038             self._result.sequence = [PDDLStep(action = x.split(' ')[0],
00039                                               args = x.split(' ')[1:])
00040                                      for x in result]
00041             self._as.set_succeeded(self._result)
00042         else:
00043             self._as.set_aborted()
00044 
00045     def parse_pddl_result(self, output):
00046         rospy.loginfo(output)
00047         # dirty implementation
00048         step_before_after = output.split("step")
00049         if len(step_before_after) == 2:
00050             results = [re.sub("\s*$", "", re.sub(r'^\s*\d+:\s*', "" , x))
00051                        for x in step_before_after[1].split("time spent")[0].split("\n")]
00052             rospy.loginfo("result => %s" % results)
00053             return filter(lambda x: x != "", results)
00054         else:
00055             return False
00056     def parse_pddl_result_ffha(self, output):
00057         rospy.loginfo(output)
00058         # dirty implementation
00059         step_before_after = output.split("step")
00060         if len(step_before_after) == 2:
00061             results = [y.group(0)
00062                        for y in [re.search("\([^\)]+\)", x)
00063                                  for x in step_before_after[1].split("Total cost")[0].split("\n")[1:]]
00064                        if y != None]
00065             rospy.loginfo("result => %s" % results)
00066             return results
00067         else:
00068             return False
00069 
00070     def parse_pddl_result_downward(self, path_name):
00071         plan_path = path_name
00072         i = 1
00073         while os.path.exists(path_name + "." + str(i)):
00074             plan_path = path_name + "." + str(i)
00075             i += 1
00076         rospy.loginfo("plan_path => %s" % plan_path)
00077 
00078         with open(plan_path) as f:
00079             plan = f.read().split("\n")
00080 
00081         plan.remove("")
00082         results = [re.sub(" \)$", ")", x)
00083                    for x in plan]
00084         rospy.loginfo(results)
00085 
00086         return results
00087 
00088     def call_pddl_planner(self, problem, domain):
00089 
00090         if  self._planner_name == "ff":
00091             # -f problem -o domain
00092             output = commands.getoutput("rosrun ff ff -f %s -o %s" % (problem, domain))
00093             return self.parse_pddl_result(output)
00094 
00095         # ffha
00096         elif self._planner_name == "ffha":
00097             output = commands.getoutput("rosrun ffha ffha %s -f %s -o %s" % (self._search_option, problem, domain))
00098             if re.search("final domain representation is:", output):
00099                 tmp = output.split("metric:")
00100                 if len(tmp) > 1:
00101                     output = tmp[1];
00102                 self._result.data = tmp;
00103             return self.parse_pddl_result_ffha(output)
00104 
00105         elif self._planner_name == "downward":
00106             (fd, path_name) = tempfile.mkstemp(text=True, prefix='plan_')
00107             (status, output) = commands.getstatusoutput("rosrun downward plan %s %s %s --plan-file %s" % (domain,
00108                                                                                                           problem,
00109                                                                                                           self._search_option,
00110                                                                                                           path_name))
00111             rospy.loginfo(output)
00112             if status == 0:
00113                 self._result.data = output
00114                 return self.parse_pddl_result_downward(path_name)
00115             else:
00116                 return False
00117         else:
00118             rospy.logfatal("set invalid planner: %s !" % self._planner_name)
00119             return
00120 
00121     def gen_tmp_pddl_file(self, problem, domain):
00122         problem_file = self.gen_tmp_problem_pddl_file(problem)
00123         domain_file = self.gen_tmp_domain_pddl_file(domain)
00124         return (problem_file, domain_file)
00125     def gen_problem_objects_strings(self, objects):
00126         # objects = list of PDDLObject
00127         # PDDLObject has name and type
00128         # collect PDDLObjects which has the same type
00129         grouped_objects = {}
00130         # grouped_objects := (type_and_objects ...)
00131         # type_and_objects := (type_name object_a object_b ...)
00132         # destructively change grouped_objects
00133         for o in objects:
00134             object_name = o.name
00135             # find same object_type in grouped_objects
00136             if grouped_objects.has_key(o.type):
00137                 grouped_objects[o.type].append(o.name)
00138             else:
00139                 grouped_objects[o.type] = [o.name]
00140         return [" ".join(grouped_objects[t]) + " - " + t
00141                 for t in grouped_objects.keys()]
00142     
00143     def gen_tmp_problem_pddl_file(self, problem):
00144         (fd, path_name) = tempfile.mkstemp(text=True, prefix='problem_')
00145         path = os.fdopen(fd, 'w')
00146         path.write("""(define (problem %s)
00147 (:domain %s)
00148 (:objects %s)
00149 (:init %s)
00150 (:goal %s)
00151 """ % (problem.name, problem.domain,
00152        "\n".join(self.gen_problem_objects_strings(problem.objects)),
00153        ' '.join(problem.initial), problem.goal))
00154         if problem.metric:
00155             path.write("""(:metric %s)""" % problem.metric)
00156         path.write(""")""")
00157         return path_name
00158     def gen_tmp_domain_pddl_file(self, domain):
00159         (fd, path_name) = tempfile.mkstemp(text=True, prefix='domain_')
00160         path = os.fdopen(fd, 'w')
00161         path.write("(define (domain %s)\n" % domain.name)
00162         path.write("(:requirements %s)\n" % domain.requirements)
00163         path.write("(:types \n")
00164         for i in domain.types:
00165             path.write(i + " ")
00166         path.write(")\n")
00167         if len(domain.constants) > 0:
00168             path.write("(:constants \n")
00169             for i in domain.constants:
00170                 path.write(i + " ")
00171             path.write(")\n")
00172         path.write("(:predicates\n")
00173         for i in domain.predicates:
00174             path.write(i + " ")
00175         path.write(")\n")
00176         if domain.functions:
00177             path.write("(:functions\n")
00178             for i in domain.functions:
00179                 path.write(i + " ")
00180             path.write(")\n")
00181         for action in domain.actions:
00182             path.write("(:action %s\n" % action.name)
00183             path.write(":parameters %s\n" % action.parameters)
00184             path.write(":precondition %s\n" % action.precondition)
00185             path.write(":effect %s\n" % action.effect)
00186             path.write(")\n")               # (:action
00187         path.write(")\n")               # (define
00188         return path_name
00189     
00190 
00191 if __name__ == '__main__':
00192     rospy.init_node('pddl_planner')
00193     PDDLPlannerActionServer(rospy.get_name())
00194     rospy.spin()


pddl_planner
Author(s): Ryohei Ueda (ueda@jsk.t.u-tokyo.ac.jp)
autogenerated on Mon Sep 14 2015 14:00:42