controller_manager_scripts.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import unittest
3 import rospy
4 import subprocess
5 
6 # output of controller_manager list, will by combined dynamically
7 myc1_initialized = b'\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( initialized )\n'
8 myc1_running = b'\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( running )\n'
9 myc1_stopped = b'\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( stopped )\n'
10 myc2_initialized = b'\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( initialized )\n'
11 myc2_running = b'\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( running )\n'
12 myc2_stopped = b'\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( stopped )\n'
13 
14 
15 # output of other commands
16 loaded_fmt = 'Loaded \'{}\'\n'
17 unloaded_fmt = 'Unloaded \'{}\' successfully\n'
18 stopped_fmt = 'Stopped [\'{}\'] successfully\n'
19 started_fmt = 'Started [\'{}\'] successfully\n'
20 
21 no_controllers = b'No controllers are loaded in mechanism control\n'
22 reload_response = b'Restore: False\nSuccessfully reloaded libraries\n'
23 
24 # run controller_manager in process
25 
26 
27 def run_cm(*args):
28  return subprocess.check_output('rosrun controller_manager controller_manager ' + ' '.join(args), shell=True)
29 
30 # run controller_group in process
31 
32 
33 def run_cg(*args):
34  return subprocess.check_output('rosrun controller_manager controller_group ' + ' '.join(args), shell=True)
35 
36 # helper function that return the actual and expected response
37 
38 
39 def load_c(name):
40  return run_cm('load', name), loaded_fmt.format(name).encode("utf-8")
41 
42 
43 def unload_c(name):
44  return run_cm('unload', name), unloaded_fmt.format(name).encode("utf-8")
45 
46 
47 def stop_c(name):
48  return run_cm('stop', name), stopped_fmt.format(name).encode("utf-8")
49 
50 
51 def start_c(name):
52  return run_cm('start', name), started_fmt.format(name).encode("utf-8")
53 
54 
55 def spawn_c(name):
56  return run_cm('spawn', name), (loaded_fmt.format(name) + started_fmt.format(name)).encode("utf-8")
57 
58 
59 def kill_c(name):
60  return run_cm('kill', name), (stopped_fmt.format(name) + unloaded_fmt.format(name)).encode("utf-8")
61 
62 
63 class TestUtils(unittest.TestCase):
64  def test_scripts(self):
65  rospy.wait_for_service("/controller_manager/list_controllers", 2.0)
66 
67  # ensure that test controller is available
68  listed_types = run_cm('list-types').split()
69  self.assertIn(b'controller_manager_tests/EffortTestController', listed_types)
70 
71  # ensure that no controllers are loaded
72  self.assertEqual(run_cm('list'), no_controllers)
73 
74  # spawn (load+start) my_controller1 externally
75  s1 = subprocess.Popen('rosrun controller_manager spawner my_controller1 __name:=myspawner', shell=True)
76  rospy.sleep(1.0)
77  self.assertEqual(run_cm('list'), myc1_running)
78 
79  # load my_controller2
80  self.assertEqual(*load_c('my_controller2'))
81  self.assertEqual(run_cm('list'), myc1_running + myc2_initialized)
82 
83  # stop my_controller1
84  self.assertEqual(*stop_c('my_controller1'))
85  self.assertEqual(run_cm('list'), myc1_stopped + myc2_initialized)
86 
87  # start my_controller2
88  self.assertEqual(*start_c('my_controller2'))
89  self.assertEqual(run_cm('list'), myc1_stopped + myc2_running)
90 
91  # stop my_controller2
92  self.assertEqual(*stop_c('my_controller2'))
93  self.assertEqual(run_cm('list'), myc1_stopped + myc2_stopped)
94 
95  # start my_controller1
96  self.assertEqual(*start_c('my_controller1'))
97  self.assertEqual(run_cm('list'), myc1_running + myc2_stopped)
98 
99  # stop my_controller1 externally
100  u1 = subprocess.Popen('rosrun controller_manager unspawner my_controller1 __name:=myunspawner', shell=True)
101  rospy.sleep(1.0)
102  self.assertEqual(run_cm('list'), myc1_stopped + myc2_stopped)
103 
104  # kill unspawner -> restart my_controller1
105  subprocess.check_call('rosnode kill /myunspawner', shell=True)
106  self.assertEqual(u1.wait(), 0)
107  self.assertEqual(run_cm('list'), myc1_running + myc2_stopped)
108 
109  # kill spawner -> stop+unload my_controller1
110  subprocess.check_call('rosnode kill /myspawner', shell=True)
111  self.assertEqual(0, s1.wait())
112  self.assertEqual(run_cm('list'), myc2_stopped)
113 
114  # spawn (load+start) my_controller1
115  self.assertEqual(*spawn_c('my_controller1'))
116  self.assertEqual(run_cm('list'), myc2_stopped + myc1_running)
117 
118  # kill (stop+unload) my_controller1
119  self.assertEqual(*kill_c('my_controller1'))
120  self.assertEqual(run_cm('list'), myc2_stopped)
121 
122  # unload my_controller2
123  self.assertEqual(*unload_c('my_controller2'))
124  self.assertEqual(run_cm('list'), no_controllers)
125 
126  self.assertEqual(run_cm('reload-libraries'), reload_response)
127 
128  # controller_group
129  # list
130  self.assertEqual(
131  run_cg('list'),
132  ('Number of groups: 3\n' +
133  ' group1\n my_controller1\n my_controller3\n' +
134  ' group2\n my_controller2\n my_controller3\n' +
135  ' group3\n my_controller1\n my_controller2\n').encode("utf-8"))
136 
137  # spawn
138  result_spawn_group1 = run_cg('spawn group1')
139  first_sgp1_solution = ('Loaded \'my_controller1\'\n' +
140  'Loaded \'my_controller3\'\n' +
141  'Started [\'my_controller1\'] successfully\n' +
142  'Started [\'my_controller3\'] successfully\n').encode("utf-8")
143  second_sgp1_solution = ('Loaded \'my_controller3\'\n' +
144  'Loaded \'my_controller1\'\n' +
145  'Started [\'my_controller3\'] successfully\n' +
146  'Started [\'my_controller1\'] successfully\n').encode("utf-8")
147  self.assertTrue( (result_spawn_group1==first_sgp1_solution) or
148  ( result_spawn_group1 == second_sgp1_solution))
149 
150  # switch
151  self.assertEqual(
152  run_cg('switch group2'),
153  ('Loaded \'my_controller2\'\n' +
154  'Started [\'my_controller2\'] successfully\n' +
155  'Stopped [\'my_controller1\'] successfully\n').encode("utf-8"))
156 
157  # switch to faulty group because my_controller1 conflicts with my_controller2
158  self.assertEqual(
159  run_cg('switch group3'),
160  b'Error when starting [\'my_controller1\'] and stopping [\'my_controller3\']\n')
161 
162 
163 if __name__ == '__main__':
164  import rostest
165  rostest.rosrun('controller_manager_msgs',
166  'controller_manager_scripts_rostest',
167  TestUtils)
controller_manager_scripts.start_c
def start_c(name)
Definition: controller_manager_scripts.py:51
controller_manager_scripts.run_cg
def run_cg(*args)
Definition: controller_manager_scripts.py:33
controller_manager_scripts.unload_c
def unload_c(name)
Definition: controller_manager_scripts.py:43
controller_manager_scripts.spawn_c
def spawn_c(name)
Definition: controller_manager_scripts.py:55
controller_manager_scripts.TestUtils.test_scripts
def test_scripts(self)
Definition: controller_manager_scripts.py:64
controller_manager_scripts.kill_c
def kill_c(name)
Definition: controller_manager_scripts.py:59
controller_manager_scripts.TestUtils
Definition: controller_manager_scripts.py:63
controller_manager_scripts.run_cm
def run_cm(*args)
Definition: controller_manager_scripts.py:27
controller_manager_scripts.load_c
def load_c(name)
Definition: controller_manager_scripts.py:39
controller_manager_scripts.stop_c
def stop_c(name)
Definition: controller_manager_scripts.py:47


controller_manager_tests
Author(s): Vijay Pradeep, Adolfo Rodriguez Tsouroukdissian
autogenerated on Tue Oct 15 2024 02:08:24