grasp_controller_spawner.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2015 Shadow Robot Company Ltd.
4 #
5 # This program is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 2 of the License, or (at your option)
8 # any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18 import rospy
19 import yaml
20 import rospkg
21 from controller_manager_msgs.srv import ListControllers
22 from controller_manager_msgs.srv import SwitchController, LoadController
23 from std_msgs.msg import Bool
24 
25 
26 class GraspControllerSpawner(object):
27  def __init__(self, service_timeout):
28  self.service_timeout = service_timeout
29 
30  def set_controller(self):
31  controller_to_start = 'sh_rh_grasp_controller'
32  success = True
33 
34  try:
35  rospy.wait_for_service('controller_manager/load_controller', self.service_timeout)
36  load_controllers = rospy.ServiceProxy('controller_manager/load_controller', LoadController)
37  loaded_controllers = load_controllers(controller_to_start)
38  except rospy.ServiceException:
39  success = False
40  if not loaded_controllers.ok:
41  success = False
42 
43  try:
44  rospy.wait_for_service('controller_manager/switch_controller', self.service_timeout)
45  switch_controllers = rospy.ServiceProxy('controller_manager/switch_controller', SwitchController)
46  switched_controllers = switch_controllers([controller_to_start], None,
47  SwitchController._request_class.BEST_EFFORT)
48  except rospy.ServiceException:
49  success = False
50 
51  if not switched_controllers.ok:
52  success = False
53 
54  if not success:
55  rospy.logerr("Failed to launch grasp controller!")
56 
57  @staticmethod
58  def wait_for_topic(topic_name, timeout):
59  if not topic_name:
60  return True
61 
62  # This has to be a list since Python has a peculiar mechanism to determine
63  # whether a variable is local to a function or not:
64  # if the variable is assigned in the body of the function, then it is
65  # assumed to be local. Modifying a mutable object (like a list)
66  # works around this debatable "design choice".
67  wait_for_topic_result = [None]
68 
69  def wait_for_topic_cb(msg):
70  wait_for_topic_result[0] = msg
71  rospy.logdebug("Heard from wait-for topic: %s" % str(msg.data))
72  rospy.Subscriber(topic_name, Bool, wait_for_topic_cb)
73  started_waiting = rospy.Time.now().to_sec()
74 
75  # We might not have received any time messages yet
76  warned_about_not_hearing_anything = False
77  while not wait_for_topic_result[0]:
78  rospy.sleep(0.01)
79  if rospy.is_shutdown():
80  return False
81  if not warned_about_not_hearing_anything:
82  if rospy.Time.now().to_sec() - started_waiting > timeout:
83  warned_about_not_hearing_anything = True
84  rospy.logwarn("Controller Spawner hasn't heard anything from its \"wait for\" topic (%s)" %
85  topic_name)
86  while not wait_for_topic_result[0].data:
87  rospy.sleep(0.01)
88  if rospy.is_shutdown():
89  return False
90  return True
91 
92 
93 if __name__ == "__main__":
94  rospy.init_node("grasp_controller_spawner")
95  wait_for_topic = rospy.get_param("~wait_for", "")
96  timeout = rospy.get_param("~timeout", 30.0)
97  service_timeout = rospy.get_param("~service_timeout", 60.0)
98 
99  grasp_spawner = GraspControllerSpawner(service_timeout=service_timeout)
100  if grasp_spawner.wait_for_topic(wait_for_topic, timeout):
101  grasp_spawner.set_controller()


sr_utilities
Author(s): Ugo Cupcic
autogenerated on Tue Oct 13 2020 03:55:49