7 myc1_initialized = b
'\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( initialized )\n'
8 myc1_running = b
'\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( running )\n'
9 myc1_stopped = b
'\'my_controller1\' - \'hardware_interface::EffortJointInterface\' ( stopped )\n'
10 myc2_initialized = b
'\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( initialized )\n'
11 myc2_running = b
'\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( running )\n'
12 myc2_stopped = b
'\'my_controller2\' - \'hardware_interface::EffortJointInterface\' ( stopped )\n'
16 loaded_fmt =
'Loaded \'{}\'\n'
17 unloaded_fmt =
'Unloaded \'{}\' successfully\n'
18 stopped_fmt =
'Stopped [\'{}\'] successfully\n'
19 started_fmt =
'Started [\'{}\'] successfully\n'
21 no_controllers = b
'No controllers are loaded in mechanism control\n'
22 reload_response = b
'Restore: False\nSuccessfully reloaded libraries\n'
28 return subprocess.check_output(
'rosrun controller_manager controller_manager ' +
' '.join(args), shell=
True)
34 return subprocess.check_output(
'rosrun controller_manager controller_group ' +
' '.join(args), shell=
True)
40 return run_cm(
'load', name), loaded_fmt.format(name).encode(
"utf-8")
44 return run_cm(
'unload', name), unloaded_fmt.format(name).encode(
"utf-8")
48 return run_cm(
'stop', name), stopped_fmt.format(name).encode(
"utf-8")
52 return run_cm(
'start', name), started_fmt.format(name).encode(
"utf-8")
56 return run_cm(
'spawn', name), (loaded_fmt.format(name) + started_fmt.format(name)).encode(
"utf-8")
60 return run_cm(
'kill', name), (stopped_fmt.format(name) + unloaded_fmt.format(name)).encode(
"utf-8")
65 rospy.wait_for_service(
"/controller_manager/list_controllers", 2.0)
68 listed_types =
run_cm(
'list-types').split()
69 self.assertIn(b
'controller_manager_tests/EffortTestController', listed_types)
72 self.assertEqual(
run_cm(
'list'), no_controllers)
75 s1 = subprocess.Popen(
'rosrun controller_manager spawner my_controller1 __name:=myspawner', shell=
True)
77 self.assertEqual(
run_cm(
'list'), myc1_running)
80 self.assertEqual(*
load_c(
'my_controller2'))
81 self.assertEqual(
run_cm(
'list'), myc1_running + myc2_initialized)
84 self.assertEqual(*
stop_c(
'my_controller1'))
85 self.assertEqual(
run_cm(
'list'), myc1_stopped + myc2_initialized)
88 self.assertEqual(*
start_c(
'my_controller2'))
89 self.assertEqual(
run_cm(
'list'), myc1_stopped + myc2_running)
92 self.assertEqual(*
stop_c(
'my_controller2'))
93 self.assertEqual(
run_cm(
'list'), myc1_stopped + myc2_stopped)
96 self.assertEqual(*
start_c(
'my_controller1'))
97 self.assertEqual(
run_cm(
'list'), myc1_running + myc2_stopped)
100 u1 = subprocess.Popen(
'rosrun controller_manager unspawner my_controller1 __name:=myunspawner', shell=
True)
102 self.assertEqual(
run_cm(
'list'), myc1_stopped + myc2_stopped)
105 subprocess.check_call(
'rosnode kill /myunspawner', shell=
True)
106 self.assertEqual(u1.wait(), 0)
107 self.assertEqual(
run_cm(
'list'), myc1_running + myc2_stopped)
110 subprocess.check_call(
'rosnode kill /myspawner', shell=
True)
111 self.assertEqual(0, s1.wait())
112 self.assertEqual(
run_cm(
'list'), myc2_stopped)
115 self.assertEqual(*
spawn_c(
'my_controller1'))
116 self.assertEqual(
run_cm(
'list'), myc2_stopped + myc1_running)
119 self.assertEqual(*
kill_c(
'my_controller1'))
120 self.assertEqual(
run_cm(
'list'), myc2_stopped)
123 self.assertEqual(*
unload_c(
'my_controller2'))
124 self.assertEqual(
run_cm(
'list'), no_controllers)
126 self.assertEqual(
run_cm(
'reload-libraries'), reload_response)
132 (
'Number of groups: 3\n' +
133 ' group1\n my_controller1\n my_controller3\n' +
134 ' group2\n my_controller2\n my_controller3\n' +
135 ' group3\n my_controller1\n my_controller2\n').encode(
"utf-8"))
138 result_spawn_group1 =
run_cg(
'spawn group1')
139 first_sgp1_solution = (
'Loaded \'my_controller1\'\n' +
140 'Loaded \'my_controller3\'\n' +
141 'Started [\'my_controller1\'] successfully\n' +
142 'Started [\'my_controller3\'] successfully\n').encode(
"utf-8")
143 second_sgp1_solution = (
'Loaded \'my_controller3\'\n' +
144 'Loaded \'my_controller1\'\n' +
145 'Started [\'my_controller3\'] successfully\n' +
146 'Started [\'my_controller1\'] successfully\n').encode(
"utf-8")
147 self.assertTrue( (result_spawn_group1==first_sgp1_solution)
or
148 ( result_spawn_group1 == second_sgp1_solution))
153 (
'Loaded \'my_controller2\'\n' +
154 'Started [\'my_controller2\'] successfully\n' +
155 'Stopped [\'my_controller1\'] successfully\n').encode(
"utf-8"))
160 b
'Error when starting [\'my_controller1\'] and stopping [\'my_controller3\']\n')
163 if __name__ ==
'__main__':
165 rostest.rosrun(
'controller_manager_msgs',
166 'controller_manager_scripts_rostest',