controller_spawner.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright 2020 Shadow Robot Company Ltd.
4 #
5 # This program is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation version 2 of the License.
8 #
9 # This program is distributed in the hope that it will be useful, but WITHOUT
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 # more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program. If not, see <http://www.gnu.org/licenses/>.
16 
17 import os
18 import re
19 import rospkg
20 import rospy
21 import yaml
22 from controller_manager_msgs.srv import ListControllers
23 from controller_manager_msgs.srv import SwitchController, LoadController
24 from sr_utilities.hand_finder import HandFinder
25 
26 
27 class ControllerSpawner(object):
28  JOINT_NAMES = ["rh_FFJ1", "rh_FFJ2", "rh_FFJ3", "rh_FFJ4", "rh_LFJ1", "rh_LFJ2", "rh_LFJ3", "rh_LFJ4", "rh_LFJ5",
29  "rh_MFJ1", "rh_MFJ2", "rh_MFJ3", "rh_MFJ4", "rh_RFJ1", "rh_RFJ2", "rh_RFJ3", "rh_RFJ4", "rh_THJ1",
30  "rh_THJ2", "rh_THJ3", "rh_THJ4", "rh_THJ5", "rh_WRJ1", "rh_WRJ2",
31  "lh_FFJ1", "lh_FFJ2", "lh_FFJ3", "lh_FFJ4", "lh_LFJ1", "lh_LFJ2", "lh_LFJ3", "lh_LFJ4", "lh_LFJ5",
32  "lh_MFJ1", "lh_MFJ2", "lh_MFJ3", "lh_MFJ4", "lh_RFJ1", "lh_RFJ2", "lh_RFJ3", "lh_RFJ4", "lh_THJ1",
33  "lh_THJ2", "lh_THJ3", "lh_THJ4", "lh_THJ5", "lh_WRJ1", "lh_WRJ2"]
34 
35  def __init__(self, config_file_path, service_timeout):
36  self._config_file_path = config_file_path
37  self._service_timeout = service_timeout
38  hand_finder = HandFinder()
39  self._hand_mapping = hand_finder.get_hand_parameters().mapping
40  self._joints = hand_finder.get_hand_joints()
41  self._nonpresent_joints = list(ControllerSpawner.JOINT_NAMES)
42  for side in self._joints:
43  for joint in self._joints[side]:
44  if joint in self._nonpresent_joints:
45  self._nonpresent_joints.remove(joint)
46  self._excluded_joints = []
47 
48  self._excluded_joints = list(set(self._excluded_joints) | set(self._nonpresent_joints))
49 
50  def load_config(self):
51  try:
52  with open(self._config_file_path, 'r') as config_yaml:
53  self._config = yaml.load(config_yaml)
54  except EnvironmentError as error:
55  rospy.logerr("Failed to load controller spawner configuration from '{}'".format(self._config_file_path))
56  rospy.logerr(error)
57  return False
58  if not self.load_controller_configs():
59  return False
60  if not self.parse_controllers():
61  return False
62  return True
63 
65  success = True
66  if "controller_configs" in self._config.keys():
67  for side in self._config["controller_configs"]:
68  if side not in self._joints:
69  continue
70  side_config = self._config["controller_configs"][side]
71  rospy.logwarn(side_config)
72  for controller in side_config:
73  try:
74  resolved_config_path = self.resolve_path(side_config[controller],
75  local_path=os.path.dirname(self._config_file_path))
76  with open(resolved_config_path) as controller_config_yaml:
77  controller_config = yaml.load(controller_config_yaml)
78  ControllerSpawner.remove_joints(controller_config, self._excluded_joints)
79  for key in controller_config:
80  rospy.set_param(key, controller_config[key])
81  except EnvironmentError as error:
82  rospy.logerr("Failed to load {} controller configuration from '{}'".format(controller,
83  self._config_file_path))
84  rospy.logerr(error)
85  rospy.logerr("This path is defined in {}".format(self._config_file_path))
86  success = False
87  return success
88 
89  def resolve_path(self, path, joint_name=None, local_path=None):
90  path = self.resolve_string(path, joint_name=joint_name)
91  matches = re.findall(r'%rospack_find_(.+)%', path)
92  if matches:
93  package_name = matches[0]
94  ros_pack = rospkg.RosPack()
95  try:
96  package_path = ros_pack.get_path(package_name)
97  path = re.sub(r'%rospack_find_(.+)%', package_path, path)
98  except rospkg.common.ResourceNotFound as e:
99  rospy.logerr("Package '{}' in controller spawner config doesn't exist.".format(package_name))
100  if path.startswith('/'):
101  return path
102  else:
103  if local_path is None:
104  return path
105  else:
106  return "{}/{}".format(local_path, path)
107 
108  def resolve_string(self, string, joint_name=None):
109  if joint_name is not None:
110  string = re.sub(r'%joint_name%', joint_name, string)
111  return string
112 
113  def parse_controllers(self):
114  if "controller_groups" not in self._config.keys():
115  rospy.logwarn("No controller groups specified in controller spawner config ({})".format(
116  self._config_file_path))
117  return False
120  for controller_group_name in self._config["controller_groups"]:
121  controller_group = []
122  for side in self._config["controller_groups"][controller_group_name]:
123  side_controllers = self._config["controller_groups"][controller_group_name][side]
124  if side not in self._joints:
125  continue
126  necessary_if_joint_present = []
127  if "necessary_if_joint_present" in side_controllers:
128  necessary_if_joint_present = side_controllers["necessary_if_joint_present"]
129  for joint_name in self._joints[side]:
130  if "common" in side_controllers:
131  for controller_raw in side_controllers["common"]:
132  controller = self.resolve_string(controller_raw, joint_name=joint_name.lower())
133  if (joint_name not in self._excluded_joints) or (controller in necessary_if_joint_present):
134  if controller not in controller_group:
135  controller_group.append(controller)
136  if controller not in self._all_controllers:
137  self._all_controllers.append(controller)
138  if joint_name in side_controllers:
139  for controller_raw in side_controllers[joint_name]:
140  controller = self.resolve_string(controller_raw, joint_name=joint_name.lower())
141  if (joint_name not in self._excluded_joints) or (controller in necessary_if_joint_present):
142  if controller not in controller_group:
143  controller_group.append(controller)
144  if controller not in self._all_controllers:
145  self._all_controllers.append(controller)
146  elif "default" in side_controllers:
147  for controller_raw in side_controllers["default"]:
148  controller = self.resolve_string(controller_raw, joint_name=joint_name.lower())
149  if (joint_name not in self._excluded_joints) or (controller in necessary_if_joint_present):
150  if controller not in controller_group:
151  controller_group.append(controller)
152  if controller not in self._all_controllers:
153  self._all_controllers.append(controller)
154  if controller_group:
155  self._controller_groups[controller_group_name] = controller_group
156  ControllerSpawner.remove_joints(self._controller_groups, self._nonpresent_joints)
157  rospy.set_param("controller_groups", self._controller_groups)
158  return True
159 
160  def switch_controllers(self, controller_group_name):
161  if controller_group_name not in self._controller_groups:
162  rospy.logerr("There is no controller group named '{}'".format(controller_group_name))
163  return False
164  desired_controllers = self._controller_groups[controller_group_name]
165  try:
166  rospy.wait_for_service('controller_manager/list_controllers', self._service_timeout)
167  list_controllers = rospy.ServiceProxy(
168  'controller_manager/list_controllers', ListControllers)
169  current_controllers = list_controllers().controller
170  except rospy.ServiceException:
171  success = False
172  rospy.logerr("Failed to get currently running controllers.")
173  return False
174  loaded_controllers = [controller.name for controller in current_controllers]
175  running_controllers = [controller.name for controller in current_controllers if controller.state == "running"]
176  controllers_to_stop = [controller for controller in running_controllers if
177  controller in self._all_controllers and controller not in desired_controllers]
178  controllers_to_load = [controller for controller in desired_controllers if controller not in loaded_controllers]
179  controllers_to_start = [controller for controller in desired_controllers if
180  controller not in running_controllers]
181  success = True
182 
183  # Load any desired controllers that were not already loaded
184  for controller in controllers_to_load:
185  try:
186  rospy.wait_for_service('controller_manager/load_controller', self._service_timeout)
187  load_controllers = rospy.ServiceProxy('controller_manager/load_controller', LoadController)
188  load_success = load_controllers(controller).ok
189  if not load_success:
190  rospy.logerr("Failed to load controller '{}'".format(controller))
191  success = False
192  except rospy.ServiceException as error:
193  rospy.logerr("Failed to load controller '{}'".format(controller))
194  rospy.logerr(error)
195  success = False
196 
197  # Start any desired controllers that were not already running
198  try:
199  rospy.wait_for_service('controller_manager/switch_controller', self._service_timeout)
200  switch_controllers = rospy.ServiceProxy('controller_manager/switch_controller', SwitchController)
201  start_success = switch_controllers(controllers_to_start, controllers_to_stop,
202  SwitchController._request_class.BEST_EFFORT, False, 0).ok
203  if not start_success:
204  rospy.logerr("Failed to stop controllers {} and/or start controllers {}".format(controllers_to_stop,
205  controllers_to_start))
206  success = False
207  except rospy.ServiceException as error:
208  rospy.logerr("Failed to stop controllers {} and/or start controllers {}".format(controllers_to_stop,
209  controllers_to_start))
210  rospy.logerr(error)
211  success = False
212  if (success):
213  rospy.loginfo("Controllers spawned successfully.")
214  else:
215  rospy.logerr("There was an error spawning controllers")
216  return success
217 
218  @staticmethod
219  def remove_joints(config, joints=[]):
220  joints_lower = [joint.lower() for joint in joints]
221  for key in config.keys():
222  if key.lower() in joints_lower:
223  del config[key]
224  continue
225  if isinstance(config[key], dict):
226  ControllerSpawner.remove_joints(config[key], joints)
227  if isinstance(config[key], list):
228  for joint in joints_lower:
229  match_indices = [i for i, item in enumerate(config[key]) if joint in item.lower()]
230  for index in sorted(match_indices, reverse=True):
231  del config[key][index]
232 
233 
234 if __name__ == "__main__":
235  rospy.init_node("sr_controller_spawner")
236  ros_pack = rospkg.RosPack()
237  sr_robot_launch_path = ros_pack.get_path('sr_robot_launch')
238  wait_for_topic = rospy.get_param("~wait_for", "")
239  controller_group = rospy.get_param("~controller_group", "trajectory")
240  config_file_path = rospy.get_param("~config_file_path", "{}/config/controller_spawner.yaml".format(
241  sr_robot_launch_path))
242  service_timeout = rospy.get_param("~service_timeout", 60.0)
243  controller_spawner = ControllerSpawner(config_file_path, service_timeout)
244  if not controller_spawner.load_config():
245  rospy.logerr("Failed to load controller spawner config.")
246  exit(1)
247  if (rospy.has_param("~wait_for")):
248  rospy.loginfo("Shadow controller spawner is waiting for topic '{}'...".format(rospy.get_param("~wait_for")))
249  rospy.wait_for_message(rospy.get_param("~wait_for"), rospy.AnyMsg)
250  controller_spawner.switch_controllers(controller_group)
def resolve_path(self, path, joint_name=None, local_path=None)


sr_utilities
Author(s): Ugo Cupcic
autogenerated on Mon Feb 28 2022 23:52:19