Go to the documentation of this file.00001
00002
00003 import time
00004 import rospy
00005 import shutil
00006
00007 from bwi_planning.srv import CostLearnerInterface, CostLearnerInterfaceRequest
00008 from std_srvs.srv import Empty
00009
00010 from .clingo import ClingoWrapper
00011
00012 def get_adjacent_door(atoms):
00013 for atom in atoms:
00014 if atom.name == "beside" and not atom.negated:
00015 return str(atom.value)
00016 return None
00017
00018 def get_location(atoms):
00019 for atom in atoms:
00020 if atom.name == "at":
00021 return str(atom.value)
00022 return None
00023
00024 class Planner(object):
00025
00026 def __init__(self, atom_class, executor_class):
00027
00028 self.atom_class = atom_class
00029 self.executor_class = executor_class
00030
00031 self.initialized = False
00032
00033 self.dry_run = rospy.get_param("~dry_run", False)
00034 self.clingo_interface = ClingoWrapper(self.atom_class)
00035 initial_file = rospy.get_param("~initial_file", None)
00036 self.query_file = rospy.get_param("~query_file")
00037 self.domain_costs_file = rospy.get_param("~domain_costs_file", None)
00038 self.costs_file = rospy.get_param("~costs_file", None)
00039 self.enable_learning = rospy.get_param("~enable_learning", False)
00040
00041 if self.dry_run and not initial_file:
00042 rospy.logfatal("Dry run requested, but no initial state provided." +
00043 " Please set the ~initial_file param.")
00044 return
00045 self.initial_file = "/tmp/initial"
00046 if self.dry_run:
00047 rospy.loginfo("Supplied initial state written to: " +
00048 self.initial_file)
00049 shutil.copyfile(initial_file, self.initial_file)
00050
00051 self.executor = self.executor_class(self.dry_run, self.initial_file)
00052
00053 if self.enable_learning:
00054 rospy.wait_for_service('cost_learner/increment_episode')
00055 rospy.wait_for_service('cost_learner/add_sample')
00056 self.finalize_episode = \
00057 rospy.ServiceProxy('cost_learner/increment_episode', Empty)
00058 self.add_sample = \
00059 rospy.ServiceProxy('cost_learner/add_sample',
00060 CostLearnerInterface)
00061 self.planning_times_file = rospy.get_param("~planning_times_file",
00062 None)
00063
00064 self.initialized = True
00065
00066 def _construct_initial_state(self, previous_state, observations):
00067 """
00068 This is a poor man's belief merge function between the previous_state
00069 and observations. This needs to be replaced with proper belief merge
00070 algorithms as it impossible to cover every possible situation here
00071 using if/else statements
00072 """
00073
00074 new_state = []
00075 removed_atoms = []
00076 for atom in previous_state:
00077 atom.time = 0
00078 add_atom = True
00079 for observation in observations:
00080 if observation.conflicts_with(atom):
00081 add_atom = False
00082 if add_atom:
00083 new_state.append(atom)
00084 else:
00085 removed_atoms.append(atom)
00086
00087 if len(removed_atoms) != 0:
00088 display_message = " Incompatible members in previous state: "
00089 for atom in removed_atoms:
00090 display_message += str(atom) + " "
00091 rospy.logwarn(display_message)
00092
00093 for observation in observations:
00094 observation.time = 0
00095 new_state.append(observation)
00096
00097 initial_file = open(self.initial_file,"w")
00098 display_message = " Constructed state: "
00099 for atom in new_state:
00100 initial_file.write(str(atom) + ".\n")
00101 display_message += str(atom) + " "
00102 initial_file.close()
00103 rospy.loginfo(display_message)
00104 rospy.loginfo("Constructed initial state written to: " +
00105 self.initial_file)
00106
00107 def _send_finish(self):
00108 if self.enable_learning:
00109 self.finalize_episode()
00110
00111 def start(self):
00112 if not self.initialized:
00113 return
00114 self.executor.sense_initial_state()
00115 while True:
00116 if self.domain_costs_file != None:
00117 rospy.loginfo("Planning with cost optimization!!")
00118 start_time = time.time()
00119 plan_available, optimization, plan, states = \
00120 self.clingo_interface.get_plan_costs([self.initial_file,
00121 self.query_file,
00122 self.costs_file,
00123 self.domain_costs_file])
00124 duration = time.time() - start_time
00125 if self.enable_learning and self.planning_times_file:
00126 ptf = open(self.planning_times_file, "a")
00127 ptf.write(str(duration) + "\n")
00128 ptf.close()
00129 else:
00130 plan_available, optimization, plan, states = \
00131 self.clingo_interface.get_plan([self.initial_file,
00132 self.query_file])
00133 if not plan_available:
00134 rospy.logfatal("No plan found to complete task!")
00135 break
00136 rospy.loginfo("Found plan (optimization = %i): %s"%(optimization,
00137 plan))
00138
00139 need_replan = False
00140 for action in plan:
00141 current_state = [state for state in states
00142 if state.time == action.time]
00143 next_state = [state for state in states
00144 if state.time == action.time+1]
00145 start_time = rospy.get_time()
00146 success, observations = \
00147 self.executor.execute_action(action, next_state,
00148 action.time+1)
00149 duration = rospy.get_time() - start_time
00150
00151 if self.enable_learning and action.name == "approach":
00152 if success:
00153
00154
00155 req = CostLearnerInterfaceRequest()
00156 req.location = get_location(current_state)
00157 req.door_from = get_adjacent_door(current_state)
00158 if req.door_from:
00159 req.door_to = str(action.value)
00160 req.cost = duration
00161 self.add_sample(req)
00162 else:
00163 rospy.loginfo("Executed unsuccessful approach action.")
00164
00165 for observation in observations:
00166 if observation not in next_state:
00167 rospy.logwarn(" Unexpected observation: " +
00168 str(observation))
00169 rospy.loginfo(" Expected Next State: " + str(next_state))
00170 need_replan = True
00171 break
00172 if need_replan:
00173 break
00174
00175 if need_replan:
00176 self._construct_initial_state(current_state, observations)
00177 continue
00178
00179 rospy.loginfo("Plan successful!")
00180 self._send_finish()
00181 break
00182
00183