trajectory_controller_spawner.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2015 Shadow Robot Company Ltd.
4 #
5 # This program is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 2 of the License, or (at your option)
8 # any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18 import rospy
19 import yaml
20 import rospkg
21 from controller_manager_msgs.srv import ListControllers
22 from controller_manager_msgs.srv import SwitchController, LoadController
23 from sr_utilities.hand_finder import HandFinder
24 from std_msgs.msg import Bool
25 
26 
28  def __init__(self, trajectory, variable_controller, variable_controller_only_joint_0, service_timeout):
29  self.trajectory = trajectory
30  self.variable_controller = variable_controller
31  self.variable_controller_only_joint_0 = variable_controller_only_joint_0
32  self.service_timeout = service_timeout
33  self.hand_finder = HandFinder()
34  self.joints = self.hand_finder.get_hand_joints()
35  ros_pack = rospkg.RosPack()
36  sr_robot_launch_path = ros_pack.get_path('sr_robot_launch')
37  self.hand_mapping = self.hand_finder.get_hand_parameters().mapping
38  self.yaml_file_path = {}
39  for hand in self.hand_mapping:
40  self.yaml_file_path[self.hand_mapping[hand]] = (
41  sr_robot_launch_path + "/config/" + self.hand_mapping[hand] + "_trajectory_controller.yaml")
42 
44  for hand in self.yaml_file_path:
45  with open(self.yaml_file_path[hand], 'r') as yaml_file:
46  yaml_content = yaml.load(yaml_file)
47  if hand + "_trajectory_controller" not in yaml_content.keys():
48  rospy.logerr("there are errors opening trajectory controller yaml file")
49  else:
50  hand_trajectory = yaml_content[hand + "_trajectory_controller"]
51  if rospy.has_param('~exclude_wrist') and rospy.get_param('~exclude_wrist'):
52  hand_trajectory['joints'] = [s for s in self.joints[hand] if "WR" not in s]
53  else:
54  hand_trajectory['joints'] = self.joints[hand]
55  for joint_name in hand_trajectory['constraints'].keys():
56  if (joint_name not in hand_trajectory['joints'] and
57  joint_name != 'goal_time' and joint_name != 'stopped_velocity_tolerance'):
58  del hand_trajectory['constraints'][joint_name]
59 
60  param_prefix = hand + "_trajectory_controller/"
61  rospy.set_param(param_prefix + 'allow_partial_joints_goal',
62  hand_trajectory['allow_partial_joints_goal'])
63  rospy.set_param(param_prefix + 'joints', hand_trajectory['joints'])
64  rospy.set_param(param_prefix + 'stop_trajectory_duration', hand_trajectory['stop_trajectory_duration'])
65  rospy.set_param(param_prefix + 'type', hand_trajectory['type'])
66  constrain_prefix = param_prefix + 'constraints/'
67  for constraint in hand_trajectory['constraints']:
68  if constraint == 'goal_time' or constraint == 'stopped_velocity_tolerance':
69  rospy.set_param(constrain_prefix + constraint, hand_trajectory['constraints'][constraint])
70  else:
71  rospy.set_param(constrain_prefix + constraint + '/goal',
72  hand_trajectory['constraints'][constraint]['goal'])
73  rospy.set_param(constrain_prefix + constraint + '/trajectory',
74  hand_trajectory['constraints'][constraint]['trajectory'])
75 
76  @staticmethod
77  def check_joint(joint,
78  controllers_to_start,
79  controller_names,
80  variable_controller=False,
81  variable_controller_only_joint_0=False):
82  if joint[3:5].lower() == 'th' or joint[3:5].lower() == 'wr' or (joint[6] != '1' and joint[6] != '2'):
83  if variable_controller and not variable_controller_only_joint_0:
84  joint_controller = 'sh_' + joint.lower() + "_variable_position_controller"
85  else:
86  joint_controller = 'sh_' + joint.lower() + "_position_controller"
87  else:
88  joint = joint[:6] + '0'
89  if variable_controller:
90  joint_controller = 'sh_' + joint.lower() + "_variable_position_controller"
91  else:
92  joint_controller = 'sh_' + joint.lower() + "_position_controller"
93 
94  if joint_controller not in controller_names and joint_controller not in controllers_to_start:
95  controllers_to_start.append(joint_controller)
96 
97  def set_controller(self):
98  controllers_to_start = []
99  for hand_serial in self.hand_mapping:
100  hand_prefix = self.hand_mapping[hand_serial]
101  success = True
102  try:
103  rospy.wait_for_service('controller_manager/list_controllers', self.service_timeout)
104  list_controllers = rospy.ServiceProxy(
105  'controller_manager/list_controllers', ListControllers)
106 
107  running_controllers = list_controllers()
108  except rospy.ServiceException:
109  success = False
110  rospy.logerr("Failed to load trajectory controller")
111  if success:
112  already_running = False
113  controller_names = []
114  for controller_state in running_controllers.controller:
115  controller_names.append(controller_state.name)
116  if controller_state.name == hand_prefix + '_trajectory_controller':
117  already_running = True
118  if self.trajectory and not already_running:
119  controllers_to_start.append(hand_prefix + '_trajectory_controller')
120  for joint in self.joints[hand_prefix]:
121  TrajectoryControllerSpawner.\
122  check_joint(joint,
123  controllers_to_start,
124  controller_names,
125  variable_controller=self.variable_controller,
126  variable_controller_only_joint_0=self.variable_controller_only_joint_0)
127 
128  for load_control in controllers_to_start:
129  try:
130  rospy.wait_for_service('controller_manager/load_controller', self.service_timeout)
131  load_controllers = rospy.ServiceProxy('controller_manager/load_controller', LoadController)
132  loaded_controllers = load_controllers(load_control)
133  except rospy.ServiceException:
134  success = False
135  if not loaded_controllers.ok:
136  success = False
137 
138  try:
139  rospy.wait_for_service('controller_manager/switch_controller', self.service_timeout)
140  switch_controllers = rospy.ServiceProxy('controller_manager/switch_controller', SwitchController)
141  switched_controllers = switch_controllers(controllers_to_start, None,
142  SwitchController._request_class.BEST_EFFORT)
143  except rospy.ServiceException:
144  success = False
145 
146  if not switched_controllers.ok:
147  success = False
148 
149  if not success:
150  rospy.logerr("Failed to launch trajectory controller!")
151 
152  @staticmethod
153  def wait_for_topic(topic_name, timeout):
154  if not topic_name:
155  return True
156 
157  # This has to be a list since Python has a peculiar mechanism to determine
158  # whether a variable is local to a function or not:
159  # if the variable is assigned in the body of the function, then it is
160  # assumed to be local. Modifying a mutable object (like a list)
161  # works around this debatable "design choice".
162  wait_for_topic_result = [None]
163 
164  def wait_for_topic_cb(msg):
165  wait_for_topic_result[0] = msg
166  rospy.logdebug("Heard from wait-for topic: %s" % str(msg.data))
167  rospy.Subscriber(topic_name, Bool, wait_for_topic_cb)
168  started_waiting = rospy.Time.now().to_sec()
169 
170  # We might not have received any time messages yet
171  warned_about_not_hearing_anything = False
172  while not wait_for_topic_result[0]:
173  rospy.sleep(0.01)
174  if rospy.is_shutdown():
175  return False
176  if not warned_about_not_hearing_anything:
177  if rospy.Time.now().to_sec() - started_waiting > timeout:
178  warned_about_not_hearing_anything = True
179  rospy.logwarn("Controller Spawner hasn't heard anything from its \"wait for\" topic (%s)" %
180  topic_name)
181  while not wait_for_topic_result[0].data:
182  rospy.sleep(0.01)
183  if rospy.is_shutdown():
184  return False
185  return True
186 
187 
188 if __name__ == "__main__":
189  rospy.init_node("generate_trajectory_controller_parameters")
190  wait_for_topic = rospy.get_param("~wait_for", "")
191  hand_trajectory = rospy.get_param("~hand_trajectory", False)
192  variable_controller = rospy.get_param("~variable_controller", False)
193  variable_controller_only_joint_0 = rospy.get_param("~variable_controller_only_joint_0", False)
194  timeout = rospy.get_param("~timeout", 30.0)
195  service_timeout = rospy.get_param("~service_timeout", 60.0)
196 
197  trajectory_spawner = TrajectoryControllerSpawner(trajectory=hand_trajectory,
198  variable_controller=variable_controller,
199  variable_controller_only_joint_0=variable_controller_only_joint_0,
200  service_timeout=service_timeout)
201  if trajectory_spawner.wait_for_topic(wait_for_topic, timeout):
202  trajectory_spawner.generate_parameters()
203  trajectory_spawner.set_controller()
def check_joint(joint, controllers_to_start, controller_names, variable_controller=False, variable_controller_only_joint_0=False)
def __init__(self, trajectory, variable_controller, variable_controller_only_joint_0, service_timeout)


sr_utilities
Author(s): Ugo Cupcic
autogenerated on Tue Oct 13 2020 03:55:49