Go to the documentation of this file.00001
00002 import roslib; roslib.load_manifest('pddl_planner')
00003 import rospy
00004 import os
00005 import re
00006 import commands
00007 import tempfile
00008 import actionlib
00009
00010 import pddl_msgs
00011 from pddl_msgs.msg import *
00012
00013 class PDDLPlannerActionServer(object):
00014 _result = PDDLPlannerResult()
00015 _feedback = PDDLPlannerFeedback()
00016 def __init__(self, name):
00017 self._action_name = name
00018 self._as = actionlib.SimpleActionServer(self._action_name,
00019 PDDLPlannerAction,
00020 self.execute_cb)
00021
00022 self._planner_path = rospy.get_param('~pddl_planner_path')
00023 def execute_cb(self, goal):
00024 problem = goal.problem
00025 domain = goal.domain
00026 rospy.loginfo("take a message")
00027 (problem_path, domain_path) = self.gen_tmp_pddl_file(problem, domain)
00028 rospy.loginfo("problem_path => %s" % problem_path)
00029 rospy.loginfo("domain_path => %s" % domain_path)
00030 result = self.call_pddl_planner(problem_path, domain_path)
00031 if result:
00032 self._result.sequence = [PDDLStep(action = x.split(' ')[0],
00033 args = x.split(' ')[1:])
00034 for x in result]
00035 self._as.set_succeeded(self._result)
00036 else:
00037 self._as.set_aborted()
00038
00039 def parse_pddl_result(self, output):
00040 rospy.loginfo(output)
00041
00042 step_before_after = output.split("step")
00043 if len(step_before_after) == 2:
00044 results = [re.sub("\s*$", "", re.sub(r'^\s*\d+:\s*', "" , x))
00045 for x in step_before_after[1].split("time spent")[0].split("\n")]
00046 rospy.loginfo("result => %s" % results)
00047 return filter(lambda x: x != "", results)
00048 else:
00049 return False
00050 def parse_pddl_result_ffha(self, output):
00051 rospy.loginfo(output)
00052
00053 step_before_after = output.split("step")
00054 if len(step_before_after) == 2:
00055 results = [re.sub("\s*$", "", re.sub(r'^\s*\d+:\s*', "" , x))
00056 for x in step_before_after[1].split("Total cost")[0].split("\n")[1:]]
00057 rospy.loginfo("result => %s" % results)
00058 return filter(lambda x: x != "", results)
00059 else:
00060 return False
00061
00062 def call_pddl_planner(self, problem, domain):
00063 """currently only supports ff"""
00064
00065 output = commands.getoutput("%s -f %s -o %s" % (self._planner_path,
00066 problem,
00067 domain))
00068
00069 if re.search("/ffha", self._planner_path):
00070 if re.search("-i 120", self._planner_path):
00071 tmp = output.split("metric:")
00072 if len(tmp) > 1:
00073 output = tmp[1];
00074 self._result.data = tmp;
00075 return self.parse_pddl_result_ffha(output)
00076
00077 return self.parse_pddl_result(output)
00078
00079 def gen_tmp_pddl_file(self, problem, domain):
00080 problem_file = self.gen_tmp_problem_pddl_file(problem)
00081 domain_file = self.gen_tmp_domain_pddl_file(domain)
00082 return (problem_file, domain_file)
00083 def gen_problem_objects_strings(self, objects):
00084
00085
00086
00087 grouped_objects = {}
00088
00089
00090
00091 for o in objects:
00092 object_name = o.name
00093
00094 if grouped_objects.has_key(o.type):
00095 grouped_objects[o.type].append(o.name)
00096 else:
00097 grouped_objects[o.type] = [o.name]
00098 return [" ".join(grouped_objects[t]) + " - " + t
00099 for t in grouped_objects.keys()]
00100
00101 def gen_tmp_problem_pddl_file(self, problem):
00102 (fd, path_name) = tempfile.mkstemp(text=True, prefix='problem_')
00103 path = os.fdopen(fd, 'w')
00104 path.write("""(define (problem %s)
00105 (:domain %s)
00106 (:objects %s)
00107 (:init %s)
00108 (:goal %s))
00109 """ % (problem.name, problem.domain,
00110 "\n".join(self.gen_problem_objects_strings(problem.objects)),
00111 ' '.join(problem.initial), problem.goal))
00112 return path_name
00113 def gen_tmp_domain_pddl_file(self, domain):
00114 (fd, path_name) = tempfile.mkstemp(text=True, prefix='domain_')
00115 path = os.fdopen(fd, 'w')
00116 path.write("(define (domain %s)\n" % domain.name)
00117 path.write("(:requirements %s)\n" % domain.requirements)
00118 path.write("(:types \n")
00119 for i in domain.types:
00120 path.write(i + " ")
00121 path.write(")\n")
00122 if len(domain.constants) > 0:
00123 path.write("(:constants \n")
00124 for i in domain.constants:
00125 path.write(i + " ")
00126 path.write(")\n")
00127 path.write("(:predicates\n")
00128 for i in domain.predicates:
00129 path.write(i + " ")
00130 path.write(")\n")
00131 if domain.functions:
00132 path.write("(:functions\n")
00133 for i in domain.functions:
00134 path.write(i + " ")
00135 path.write(")\n")
00136 for action in domain.actions:
00137 path.write("(:action %s\n" % action.name)
00138 path.write(":parameters %s\n" % action.parameters)
00139 path.write(":precondition %s\n" % action.precondition)
00140 path.write(":effect %s\n" % action.effect)
00141 path.write(")\n")
00142 path.write(")\n")
00143 return path_name
00144
00145
00146 if __name__ == '__main__':
00147 rospy.init_node('pddl_planner')
00148 PDDLPlannerActionServer(rospy.get_name())
00149 rospy.spin()