00001
00002
00003 import os
00004 import time
00005 from threading import Lock
00006 from collections import deque
00007
00008 import roslib
00009 roslib.load_manifest("kelsey_sandbox")
00010
00011 import rospy
00012 from std_srvs.srv import Empty, EmptyResponse
00013 from std_msgs.msg import String
00014
00015 from srv import BehaviorRegistration, BehaviorRequest, BehaviorRequestRequest, BehaviorRelinquish
00016 from srv import BehaviorRegistrationResponse, BehaviorRequestResponse, BehaviorRelinquishResponse
00017 from srv import BehaviorResume, BehaviorResumeResponse
00018
00019 ALLOWED_RESOURCES = ['l_arm', 'r_arm', 'l_gripper', 'r_gripper', 'base', 'head', 'torso']
00020 HEARTBEAT_TIMEOUT = 5.
00021 HEARTBEAT_RATE = 10.
00022
00023 class BMPriorities:
00024 TOP = 255
00025 HIGH = 100
00026 MEDIUM = 50
00027 LOW = 10
00028 BOTTOM = 0
00029
00030 def log(s):
00031 rospy.loginfo("[behavior_manager] %s" % s)
00032
00033 def warn(s):
00034 rospy.logwarn("[behavior_manager] %s" % s)
00035
00036 def err(s):
00037 rospy.logerr("[behavior_manager] %s" % s)
00038
00039 class BehaviorManager(object):
00040 def __init__(self):
00041 self._resource_lock = Lock()
00042 self._heartbeat_lock = Lock()
00043 with self._resource_lock:
00044 self._clear_manager()
00045 self._register_srv = rospy.Service('/behavior_manager/register_behavior',
00046 BehaviorRegistration, self._register_cb)
00047 self._request_srv = rospy.Service('/behavior_manager/request_resource',
00048 BehaviorRequest, self._request_cb)
00049 self._relinquish_srv = rospy.Service('/behavior_manager/relinquish_resource',
00050 BehaviorRelinquish, self._relinquish_cb)
00051 self._clear_srv = rospy.Service('/behavior_manager/clear_manager',
00052 Empty, self._clear_cb)
00053
00054 def _clear_manager(self):
00055 self._active_resources = {}
00056 self._registered_behaviors = {}
00057 self._behavior_preempts = {}
00058 self._behavior_resumes = {}
00059 self._preempted_stacks = {}
00060 self._heartbeat_timers = {}
00061 self._hb_last_times = {}
00062 for resource in ALLOWED_RESOURCES:
00063 self._active_resources[resource] = None
00064 self._preempted_stacks[resource] = deque()
00065
00066 def _register_cb(self, req):
00067 with self._resource_lock:
00068 bname = req.behavior_name
00069 if req.ctrl_resource not in ALLOWED_RESOURCES:
00070 err("Resource %s not reservable." % req.ctrl_resource)
00071 return BehaviorRegistrationResponse(False)
00072 if bname in self._registered_behaviors:
00073 err("Behavior %s already registered." % bname)
00074 return BehaviorRegistrationResponse(False)
00075 self._registered_behaviors[bname] = req.ctrl_resource
00076 rospy.wait_for_service('/behavior_manager/%s/preempt' % bname)
00077 rospy.wait_for_service('/behavior_manager/%s/resume' % bname)
00078 self._behavior_preempts[bname] = rospy.ServiceProxy('/behavior_manager/%s/preempt' % bname,
00079 Empty)
00080 self._behavior_resumes[bname] = rospy.ServiceProxy('/behavior_manager/%s/resume' % bname,
00081 BehaviorResume)
00082 log("Behavior %s successfully registered in the behavior manager." % bname)
00083 return BehaviorRegistrationResponse(True)
00084
00085 def _heartbeat_monitor(self, te):
00086 cur_time = rospy.get_time()
00087 for resource in self._active_resources:
00088 running_resource = self._active_resources[resource]
00089 if running_resource is None:
00090 continue
00091 bname = running_resource[1]
00092 if cur_time - self._hb_last_times[bname] > HEARTBEAT_TIMEOUT:
00093 resource = self._registered_behaviors[bname]
00094 warn("Behavior %s has timed out and will release %s" %
00095 (bname, resource))
00096 self._relinquish_from_behavior(bname)
00097
00098 def _heartbeat_sub_cb(self, msg):
00099 with self._heartbeat_lock:
00100 self._hb_last_times[msg.data] = rospy.get_time()
00101
00102 def _request_cb(self, req):
00103 with self._resource_lock:
00104 bname = req.behavior_name
00105 priority = req.priority
00106 if bname not in self._registered_behaviors:
00107 err("Behavior %s not registered." % bname)
00108 return BehaviorRequestResponse(False)
00109 resource = self._registered_behaviors[bname]
00110
00111 if self._active_resources[resource] is None:
00112
00113 self._active_resources[resource] = [priority, bname]
00114 success = True
00115 elif priority >= self._active_resources[resource][0]:
00116
00117 preempted_behavior = self._active_resources[resource]
00118 self._preempted_stacks[resource].append(preempted_behavior)
00119 self._behavior_preempts[bname]()
00120 self._active_resources[resource] = [priority, bname]
00121 success = True
00122 else:
00123
00124 success = False
00125 if success:
00126 self._hb_last_times[bname] = rospy.get_time()
00127 self._heartbeat_sub = rospy.Subscriber('/behavior_manager/%s/heartbeat' % bname,
00128 String, self._heartbeat_sub_cb)
00129 self._heartbeat_timers[bname] = rospy.Timer(rospy.Duration(1./HEARTBEAT_RATE),
00130 self._heartbeat_monitor)
00131 return BehaviorRequestResponse(success)
00132
00133 def _relinquish_cb(self, req):
00134 bname = req.behavior_name
00135 self._relinquish_from_behavior(bname)
00136 return BehaviorRelinquishResponse()
00137
00138 def _relinquish_from_behavior(self, bname):
00139 with self._resource_lock:
00140 if bname not in self._registered_behaviors:
00141 err("Behavior %s not registered.")
00142 return
00143 resource = self._registered_behaviors[bname]
00144 while len(self._preempted_stacks[resource]) > 0:
00145
00146 resumed_behavior = self._preempted_stacks[resource].pop()
00147 will_resume = self._behavior_resumes[resumed_behavior[1]]()
00148 if will_resume:
00149 self._active_resources[resource] = resumed_behavior
00150 return
00151 self._active_resources[resource] = None
00152
00153 def _clear_cb(self, req):
00154 self._clear_manager()
00155 return EmptyResponse()
00156
00157 class BehaviorManagerClient(object):
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172 def __init__(self, resource, preempt_cb, resume_cb=None,
00173 priority=BMPriorities.MEDIUM, name="behavior"):
00174 if resource not in ALLOWED_RESOURCES:
00175 err("Resource %s is not an allowed resource to reserve." % resource)
00176 self._resource = resource
00177 self._default_priority = priority
00178
00179 self._bname = "%s_%s_%s" % (name, os.getpid(), int(time.time()*1000))
00180 self._preempt_cb = preempt_cb
00181 if resume_cb is None:
00182 def resume_cb_tmp():
00183 return False
00184 self._resume_cb = resume_cb_tmp
00185 else:
00186 self._resume_cb = resume_cb
00187
00188 self._preempt_srv = rospy.Service('/behavior_manager/%s/preempt' % self._bname,
00189 Empty, self._call_preempt)
00190 self._resume_srv = rospy.Service('/behavior_manager/%s/resume' % self._bname,
00191 BehaviorResume, self._call_resume)
00192 self._heartbeat_pub = rospy.Publisher('/behavior_manager/%s/heartbeat' % self._bname, String)
00193
00194 rospy.wait_for_service('/behavior_manager/register_behavior')
00195 register_behavior = rospy.ServiceProxy('/behavior_manager/register_behavior', BehaviorRegistration)
00196 register_behavior(behavior_name=self._bname, ctrl_resource=resource)
00197
00198 rospy.wait_for_service('/behavior_manager/request_resource')
00199 rospy.wait_for_service('/behavior_manager/relinquish_resource')
00200 self._request_resource = rospy.ServiceProxy('/behavior_manager/request_resource',
00201 BehaviorRequest)
00202 self._relinquish_resource = rospy.ServiceProxy('/behavior_manager/relinquish_resource',
00203 BehaviorRelinquish)
00204
00205 def _call_preempt(self, req):
00206 self._preempt_cb()
00207 return EmptyResponse()
00208
00209 def _call_resume(self, req):
00210 return BehaviorResumeResponse(self._resume_cb())
00211
00212 def set_preempt_cb(self, preempt_cb):
00213 self._preempt_cb = preempt_cb
00214
00215 def set_resume_cb(self, resume_cb):
00216 self._resume_cb = resume_cb
00217
00218 def _heartbeat_cb(self, te):
00219 self._heartbeat_pub.publish(String(self._bname))
00220
00221
00222
00223
00224
00225 def request(self, priority=None):
00226 if priority is None:
00227 priority = self._default_priority
00228 try:
00229 success = self._request_resource(behavior_name=self._bname, priority=priority)
00230 if success:
00231 self._heartbeat_timer = rospy.Timer(rospy.Duration(1./HEARTBEAT_RATE), self._heartbeat_cb)
00232 return success
00233 except rospy.ServiceException, e:
00234 err("Request service connection issue: %s" % str(e))
00235 return False
00236
00237
00238
00239 def relinquish(self):
00240 try:
00241 self._relinquish_resource(behavior_name=self._bname)
00242 except rospy.ServiceException, e:
00243 err("Relinquish service connection issue: %s" % str(e))
00244
00245
00246
00247
00248 def clear_behavior_manager():
00249 clear_manager = rospy.ServiceProxy('/behavior_manager/clear_manager', Empty)
00250 rospy.wait_for_service('/behavior_manager/clear_manager')
00251 try:
00252 clear_manager()
00253 except rospy.ServiceException, e:
00254 err("Clear manager service connection issue: %s" % str(e))
00255
00256 class Behavior(object):
00257 def __init__(self, resource, priority, name="behavior"):
00258 self.name = name
00259 self.priority = priority
00260 self.bmc = BehaviorManagerClient(resource, self._preempt_cb, self._resume_cb,
00261 priority=priority, name=name)
00262 self._execute = self.execute
00263 def wrap_execute(*args, **kwargs):
00264 if self.bmc.request():
00265 self._execute(*args, **kwargs)
00266 self.bmc.relinquish()
00267 return True
00268 else:
00269 return False
00270 self.execute = wrap_execute
00271
00272 def _preempt_cb(self):
00273 self.preempt()
00274
00275 def _resume_cb(self):
00276 return self.resume()
00277
00278 def preempt(self):
00279 rospy.logerror("[behavior] preempt must be overrided!")
00280
00281 def resume(self):
00282 return False
00283
00284 def execute(self):
00285 rospy.logerror("[behavior] execute must be overrided!")
00286
00287 def main():
00288 rospy.init_node("behavior_manager")
00289 bm = BehaviorManager()
00290 rospy.spin()
00291
00292 if __name__ == "__main__":
00293 main()