Go to the documentation of this file.00001 import roslib; roslib.load_manifest('hrl_pr2_arms')
00002 import rospy
00003 import rosparam
00004 import roslaunch.substitution_args
00005
00006 from pr2_mechanism_msgs.srv import LoadController, UnloadController, SwitchController, ListControllers
00007
00008 LOADED_ARM_CONTROLLERS = ['%s_arm_controller', '%s_cart']
00009 LOADED_CTRLS_PARAMS = {
00010 'r' : "/controller_manager/loaded_ctrls/right_arm",
00011 'l' : "/controller_manager/loaded_ctrls/left_arm",
00012 }
00013
00014
00015
00016 class ControllerSwitcher:
00017 def __init__(self):
00018 self.load_controller = rospy.ServiceProxy('pr2_controller_manager/load_controller',
00019 LoadController)
00020 self.unload_controller = rospy.ServiceProxy('pr2_controller_manager/unload_controller',
00021 UnloadController)
00022 self.switch_controller_srv = rospy.ServiceProxy('pr2_controller_manager/switch_controller',
00023 SwitchController)
00024 self.list_controllers_srv = rospy.ServiceProxy('pr2_controller_manager/list_controllers',
00025 ListControllers)
00026 self.load_controller.wait_for_service()
00027 self.unload_controller.wait_for_service()
00028 self.switch_controller_srv.wait_for_service()
00029 self.list_controllers_srv.wait_for_service()
00030 for arm in ['r', 'l']:
00031 if LOADED_CTRLS_PARAMS[arm] not in rosparam.list_params(""):
00032 possible_ctrls = []
00033 for ctrl in LOADED_ARM_CONTROLLERS:
00034 possible_ctrls.append(ctrl % arm)
00035 rosparam.set_param_raw(LOADED_CTRLS_PARAMS[arm], possible_ctrls)
00036 rospy.loginfo("[pr2_controller_switcher] ControllerSwitcher ready.")
00037
00038
00039
00040
00041
00042
00043
00044
00045 def switch(self, old_controller, new_controller, param_file=None):
00046 if param_file is None:
00047 self.load_controller(new_controller)
00048 resp = self.switch_controller_srv([new_controller], [old_controller], 1)
00049 self.unload_controller(old_controller)
00050 return resp.ok
00051 else:
00052 params = rosparam.load_file(roslaunch.substitution_args.resolve_args(param_file))
00053 rosparam.upload_params("", params[0][0])
00054 self.switch_controller_srv([], [old_controller], 1)
00055 self.unload_controller(old_controller)
00056 if old_controller != new_controller:
00057 self.unload_controller(new_controller)
00058 self.load_controller(old_controller)
00059 if old_controller != new_controller:
00060 self.load_controller(new_controller)
00061 resp = self.switch_controller_srv([new_controller], [], 1)
00062 return resp.ok
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072 def carefree_switch(self, arm, new_controller, param_file=None, reset=True):
00073 if '%s' in new_controller:
00074 new_ctrl = new_controller % arm
00075 else:
00076 new_ctrl = new_controller
00077 if param_file is not None:
00078 params = rosparam.load_file(roslaunch.substitution_args.resolve_args(param_file))
00079 if new_ctrl not in params[0][0]:
00080 rospy.logwarn("[pr2_controller_switcher] Controller not in parameter file.")
00081 return
00082 else:
00083 rosparam.upload_params("", {new_ctrl : params[0][0][new_ctrl]})
00084 possible_controllers = rosparam.get_param(LOADED_CTRLS_PARAMS[arm])
00085 if new_ctrl not in possible_controllers:
00086 possible_controllers.append(new_ctrl)
00087 rosparam.set_param_raw(LOADED_CTRLS_PARAMS[arm], possible_controllers)
00088 check_arm_controllers = []
00089 for controller in possible_controllers:
00090 if '%s' in controller:
00091 controller = controller % arm
00092 if controller[0] == arm:
00093 check_arm_controllers.append(controller)
00094 resp = self.list_controllers_srv()
00095 start_controllers, stop_controllers = [new_ctrl], []
00096 for i, controller in enumerate(resp.controllers):
00097 if controller in check_arm_controllers and resp.state[i] == 'running':
00098 stop_controllers.append(controller)
00099 if controller == new_ctrl:
00100 if resp.state[i] == 'running':
00101 if not reset:
00102 rospy.loginfo("[pr2_controller_switcher] Specified controller is already running.")
00103 return True
00104 self.switch_controller_srv([], [new_ctrl], 1)
00105 self.unload_controller(new_ctrl)
00106
00107 self.load_controller(new_ctrl)
00108 rospy.loginfo("[pr2_controller_switcher] Starting controller %s" % (start_controllers[0]) +
00109 " and stopping controllers: [" + ",".join(stop_controllers) + "]")
00110 resp = self.switch_controller_srv(start_controllers, stop_controllers, 1)
00111 return resp.ok
00112
00113
00114
00115
00116 def list_running(self):
00117 resp = self.list_controllers_srv()
00118 running_ctrls = []
00119 for controller, state in zip(resp.controllers, resp.state):
00120 if state == 'running':
00121 running_ctrls.append(controller)
00122 return running_ctrls
00123
00124
00125
00126
00127 def list_stopped(self):
00128 resp = self.list_controllers_srv()
00129 running_ctrls = []
00130 for controller, state in zip(resp.controllers, resp.state):
00131 if state == 'stopped':
00132 running_ctrls.append(controller)
00133 return running_ctrls
00134
00135
00136
00137
00138 def list_loaded(self):
00139 resp = self.list_controllers_srv()
00140 return resp.controllers