controller_manager_scripts.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import unittest
3 import rospy
4 import subprocess
5 
6 # output of controller_manager list, will by combined dynamically
7 myc1_initialized='\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( initialized )\n'
8 myc1_running='\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( running )\n'
9 myc1_stopped='\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( stopped )\n'
10 myc2_initialized='\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( initialized )\n'
11 myc2_running='\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( running )\n'
12 myc2_stopped='\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( stopped )\n'
13 
14 
15 # output of other commands
16 loaded_fmt = 'Loaded \'%s\'\n'
17 unloaded_fmt = 'Unloaded \'%s\' successfully\n'
18 stopped_fmt = "Stopped [\'%s\'] successfully\n"
19 started_fmt = "Started [\'%s\'] successfully\n"
20 
21 no_controllers = 'No controllers are loaded in mechanism control\n'
22 reload_response = 'Restore: False\nSuccessfully reloaded libraries\n'
23 
24 # run controller_manager in process
25 def run_cm(*args):
26  return subprocess.check_output('rosrun controller_manager controller_manager ' + ' '.join(args), shell=True)
27 
28 # run controller_group in process
29 def run_cg(*args):
30  return subprocess.check_output('rosrun controller_manager controller_group ' + ' '.join(args), shell=True)
31 
32 # helper function that return the actual and expected response
33 def load_c(name):
34  return run_cm('load', name), loaded_fmt % name
35 
36 def unload_c(name):
37  return run_cm('unload', name), unloaded_fmt % name
38 
39 def stop_c(name):
40  return run_cm('stop', name), stopped_fmt % name
41 
42 def start_c(name):
43  return run_cm('start', name), started_fmt % name
44 
45 def spawn_c(name):
46  return run_cm('spawn', name), loaded_fmt % name + started_fmt % name
47 
48 def kill_c(name):
49  return run_cm('kill', name), stopped_fmt % name + unloaded_fmt % name
50 
51 
52 class TestUtils(unittest.TestCase):
53  def test_scripts(self):
54  rospy.wait_for_service("/controller_manager/list_controllers", 2.0)
55 
56  # ensure that test controller is available
57  listed_types = run_cm('list-types').split()
58  self.assertIn('controller_manager_tests/EffortTestController', listed_types)
59 
60  # ensure that no controllers are loaded
61  self.assertEqual(run_cm('list'), no_controllers)
62 
63  # spawn (load+start) my_controller1 externally
64  s1 = subprocess.Popen('rosrun controller_manager spawner my_controller1 __name:=myspawner', shell=True)
65  rospy.sleep(1.0)
66  self.assertEqual(run_cm('list'), myc1_running)
67 
68  # load my_controller2
69  self.assertEqual(*load_c('my_controller2'))
70  self.assertEqual(run_cm('list'), myc1_running + myc2_initialized)
71 
72  # stop my_controller1
73  self.assertEqual(*stop_c('my_controller1'))
74  self.assertEqual(run_cm('list'), myc1_stopped + myc2_initialized)
75 
76  # start my_controller2
77  self.assertEqual(*start_c('my_controller2'))
78  self.assertEqual(run_cm('list'), myc1_stopped + myc2_running)
79 
80  # stop my_controller2
81  self.assertEqual(*stop_c('my_controller2'))
82  self.assertEqual(run_cm('list'), myc1_stopped + myc2_stopped)
83 
84  # start my_controller1
85  self.assertEqual(*start_c('my_controller1'))
86  self.assertEqual(run_cm('list'), myc1_running + myc2_stopped)
87 
88  # stop my_controller1 externally
89  u1 = subprocess.Popen('rosrun controller_manager unspawner my_controller1 __name:=myunspawner', shell=True)
90  rospy.sleep(1.0)
91  self.assertEqual(run_cm('list'), myc1_stopped + myc2_stopped)
92 
93  # kill unspawner -> restart my_controller1
94  subprocess.check_call('rosnode kill /myunspawner', shell=True)
95  self.assertEqual(u1.wait(), 0)
96  self.assertEqual(run_cm('list'), myc1_running + myc2_stopped)
97 
98  # kill spawner -> stop+unload my_controller1
99  subprocess.check_call('rosnode kill /myspawner', shell=True)
100  self.assertEqual(0, s1.wait())
101  self.assertEqual(run_cm('list'), myc2_stopped)
102 
103  # spawn (load+start) my_controller1
104  self.assertEqual(*spawn_c('my_controller1'))
105  self.assertEqual(run_cm('list'), myc2_stopped + myc1_running)
106 
107  # kill (stop+unload) my_controller1
108  self.assertEqual(*kill_c('my_controller1'))
109  self.assertEqual(run_cm('list'), myc2_stopped)
110 
111  # unload my_controller2
112  self.assertEqual(*unload_c('my_controller2'))
113  self.assertEqual(run_cm('list'), no_controllers)
114 
115  self.assertEqual(run_cm('reload-libraries'), reload_response)
116 
117  # controller_group
118  # list
119  self.assertEqual(
120  run_cg('list'),
121  'Number of groups: 3\n'
122  ' group1\n my_controller1\n my_controller3\n'
123  ' group2\n my_controller2\n my_controller3\n'
124  ' group3\n my_controller1\n my_controller2\n')
125 
126  # spawn
127  self.assertEqual(
128  run_cg('spawn group1'),
129  'Loaded \'my_controller1\'\n'
130  'Loaded \'my_controller3\'\n'
131  'Started [\'my_controller1\'] successfully\n'
132  'Started [\'my_controller3\'] successfully\n')
133 
134  # switch
135  self.assertEqual(
136  run_cg('switch group2'),
137  'Loaded \'my_controller2\'\n'
138  'Started [\'my_controller2\'] successfully\n'
139  'Stopped [\'my_controller1\'] successfully\n')
140 
141  # switch to faulty group because my_controller1 conflicts with my_controller2
142  self.assertEqual(
143  run_cg('switch group3'),
144  'Error when starting [\'my_controller1\'] and stopping [\'my_controller3\']\n')
145 
146 
147 if __name__ == '__main__':
148  import rostest
149  rostest.rosrun('controller_manager_msgs',
150  'controller_manager_scripts_rostest',
151  TestUtils)


controller_manager_tests
Author(s): Vijay Pradeep, Adolfo Rodriguez Tsouroukdissian
autogenerated on Mon Feb 28 2022 23:30:19