pddl.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import roslib; roslib.load_manifest('pddl_planner')
00003 import rospy
00004 import os
00005 import re
00006 import commands
00007 import tempfile
00008 import actionlib
00009 
00010 import pddl_msgs
00011 from pddl_msgs.msg import *
00012 
00013 class PDDLPlannerActionServer(object):
00014     _result = PDDLPlannerResult()
00015     _feedback = PDDLPlannerFeedback()
00016     def __init__(self, name):
00017         self._action_name = name
00018         self._as = actionlib.SimpleActionServer(self._action_name,
00019                                                 PDDLPlannerAction,
00020                                                 self.execute_cb)
00021         # resolve rosparam
00022         self._planner_path = rospy.get_param('~pddl_planner_path')
00023     def execute_cb(self, goal):
00024         problem = goal.problem
00025         domain = goal.domain
00026         rospy.loginfo("take a message")
00027         (problem_path, domain_path) = self.gen_tmp_pddl_file(problem, domain)
00028         rospy.loginfo("problem_path => %s" % problem_path)
00029         rospy.loginfo("domain_path => %s" % domain_path)
00030         result = self.call_pddl_planner(problem_path, domain_path)
00031         if result:
00032             self._result.sequence = [PDDLStep(action = x.split(' ')[0],
00033                                               args = x.split(' ')[1:])
00034                                      for x in result]
00035             self._as.set_succeeded(self._result)
00036         else:
00037             self._as.set_aborted()
00038             
00039     def parse_pddl_result(self, output):
00040         rospy.loginfo(output)
00041         # dirty implementation
00042         step_before_after = output.split("step")
00043         if len(step_before_after) == 2:
00044             results = [re.sub("\s*$", "", re.sub(r'^\s*\d+:\s*', "" , x))
00045                        for x in step_before_after[1].split("time spent")[0].split("\n")]
00046             rospy.loginfo("result => %s" % results)
00047             return filter(lambda x: x != "", results)
00048         else:
00049             return False
00050     def parse_pddl_result_ffha(self, output):
00051         rospy.loginfo(output)
00052         # dirty implementation
00053         step_before_after = output.split("step")
00054         if len(step_before_after) == 2:
00055             results = [re.sub("\s*$", "", re.sub(r'^\s*\d+:\s*', "" , x))
00056                        for x in step_before_after[1].split("Total cost")[0].split("\n")[1:]]
00057             rospy.loginfo("result => %s" % results)
00058             return filter(lambda x: x != "", results)
00059         else:
00060             return False
00061          
00062     def call_pddl_planner(self, problem, domain):
00063         """currently only supports ff"""
00064         # -f problem -o domain
00065         output = commands.getoutput("%s -f %s -o %s" % (self._planner_path,
00066                                                         problem,
00067                                                         domain))
00068         # ffha
00069         if re.search("/ffha", self._planner_path):
00070             if re.search("-i 120", self._planner_path):
00071                 tmp = output.split("metric:")
00072                 if len(tmp) > 1:
00073                     output = tmp[1];
00074                 self._result.data = tmp;
00075             return self.parse_pddl_result_ffha(output)
00076         # ff
00077         return self.parse_pddl_result(output)
00078 
00079     def gen_tmp_pddl_file(self, problem, domain):
00080         problem_file = self.gen_tmp_problem_pddl_file(problem)
00081         domain_file = self.gen_tmp_domain_pddl_file(domain)
00082         return (problem_file, domain_file)
00083     def gen_problem_objects_strings(self, objects):
00084         # objects = list of PDDLObject
00085         # PDDLObject has name and type
00086         # collect PDDLObjects which has the same type
00087         grouped_objects = {}
00088         # grouped_objects := (type_and_objects ...)
00089         # type_and_objects := (type_name object_a object_b ...)
00090         # destructively change grouped_objects
00091         for o in objects:
00092             object_name = o.name
00093             # find same object_type in grouped_objects
00094             if grouped_objects.has_key(o.type):
00095                 grouped_objects[o.type].append(o.name)
00096             else:
00097                 grouped_objects[o.type] = [o.name]
00098         return [" ".join(grouped_objects[t]) + " - " + t
00099                 for t in grouped_objects.keys()]
00100     
00101     def gen_tmp_problem_pddl_file(self, problem):
00102         (fd, path_name) = tempfile.mkstemp(text=True, prefix='problem_')
00103         path = os.fdopen(fd, 'w')
00104         path.write("""(define (problem %s)
00105 (:domain %s)
00106 (:objects %s)
00107 (:init %s)
00108 (:goal %s))
00109 """ % (problem.name, problem.domain,
00110        "\n".join(self.gen_problem_objects_strings(problem.objects)),
00111        ' '.join(problem.initial), problem.goal))
00112         return path_name
00113     def gen_tmp_domain_pddl_file(self, domain):
00114         (fd, path_name) = tempfile.mkstemp(text=True, prefix='domain_')
00115         path = os.fdopen(fd, 'w')
00116         path.write("(define (domain %s)\n" % domain.name)
00117         path.write("(:requirements %s)\n" % domain.requirements)
00118         path.write("(:types \n")
00119         for i in domain.types:
00120             path.write(i + " ")
00121         path.write(")\n")
00122         if len(domain.constants) > 0:
00123             path.write("(:constants \n")
00124             for i in domain.constants:
00125                 path.write(i + " ")
00126             path.write(")\n")
00127         path.write("(:predicates\n")
00128         for i in domain.predicates:
00129             path.write(i + " ")
00130         path.write(")\n")
00131         if domain.functions:
00132             path.write("(:functions\n")
00133             for i in domain.functions:
00134                 path.write(i + " ")
00135             path.write(")\n")
00136         for action in domain.actions:
00137             path.write("(:action %s\n" % action.name)
00138             path.write(":parameters %s\n" % action.parameters)
00139             path.write(":precondition %s\n" % action.precondition)
00140             path.write(":effect %s\n" % action.effect)
00141             path.write(")\n")               # (:action
00142         path.write(")\n")               # (define
00143         return path_name
00144     
00145 
00146 if __name__ == '__main__':
00147     rospy.init_node('pddl_planner')
00148     PDDLPlannerActionServer(rospy.get_name())
00149     rospy.spin()
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends


pddl_planner
Author(s): Ryohei Ueda (ueda@jsk.t.u-tokyo.ac.jp)
autogenerated on Sun Mar 24 2013 00:11:02