5 from __future__
import print_function
18 from std_msgs.msg
import String
20 from rosmon_msgs.msg
import State, NodeState
21 from rosmon_msgs.srv
import StartStop, StartStopRequest
23 rospack = rospkg.RosPack()
37 return rospy.get_param(name)
39 params =
', '.join([
'"' + name +
'"' for name
in sorted(rospy.get_param_names()) ])
41 'Caught KeyError("%s") while getting parameter. Known parameters: %s' % (e, params)
46 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
47 except rospy.ROSException:
48 self.fail(
'Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
50 self.assertEqual(len(state.nodes), 4)
52 test1 = [ n
for n
in state.nodes
if n.name ==
'test1' ]
53 self.assertEqual(len(test1), 1)
54 self.assertEqual(test1[0].state, NodeState.RUNNING)
57 pub = rospy.Publisher(
'/test_input', String, queue_size=5, latch=
True)
60 sub = rospy.Subscriber(
'/test_output', String, wfm.cb)
64 self.assertGreater(pub.get_num_connections(), 0)
65 self.assertGreater(sub.get_num_connections(), 0)
66 pub.publish(
'Hello world!')
68 timeout_t = time.time() + 5
69 while wfm.msg
is None:
70 rospy.rostime.wallsleep(0.01)
71 if time.time() >= timeout_t:
72 self.fail(
'No reply to test message')
74 self.assertEqual(wfm.msg.data,
'Hello world!')
80 self.assertEqual(self.
get_param(
"path_to_rosmon"), rospack.get_path(
'rosmon_core'))
83 os.path.join(rospack.get_path(
'rosmon_core'),
'test/basic.launch')
85 executable = self.
get_param(
"path_to_rosmon_executable")
86 self.assert_(os.access(executable, os.X_OK),
'Invalid rosmon path: ' + executable)
90 os.path.join(rospack.get_path(
'rosmon_core'),
'test')
93 self.assertEqual(self.
get_param(
"eval_simple"),
True)
94 self.assertEqual(self.
get_param(
"eval_argexpr"),
True)
95 self.assertAlmostEqual(self.
get_param(
"eval_radius_pi"), 0.5 * math.pi)
97 self.assertEqual(self.
get_param(
"/test1/private_param1"),
"hello_world")
98 self.assertEqual(self.
get_param(
"/test1/private_param2"),
"hello_world")
101 self.assertEqual(self.
get_param(
"multiple_lines1"),
"first_line second_line")
102 self.assertEqual(self.
get_param(
"multiple_lines2"),
"first_line second_line")
105 self.assertAlmostEqual(self.
get_param(
"yaml/radius"), 0.5)
108 self.assertEqual(self.
get_param(
"test_argument"), 123)
111 self.assertEqual(self.
get_param(
"/foo/bar"),
True)
114 pub = rospy.Publisher(
'/remapped_test_input', String, queue_size=5, latch=
True)
117 sub = rospy.Subscriber(
'/remapped_test_output', String, wfm.cb)
121 self.assertGreater(pub.get_num_connections(), 0)
122 self.assertGreater(sub.get_num_connections(), 0)
123 pub.publish(
'Hello world!')
125 timeout_t = time.time() + 5
126 while wfm.msg
is None:
127 rospy.rostime.wallsleep(0.01)
128 if time.time() >= timeout_t:
129 self.fail(
'No reply to test message')
131 self.assertEqual(wfm.msg.data,
'Hello world!')
137 self.assertEqual(rospy.get_param(
"/nested/nested_param"),
"hello")
140 lst = list(filter(
lambda n: n.name == name, state.nodes))
141 self.assertEqual(len(lst), 1)
145 srv = rospy.ServiceProxy(
'/rosmon_uut/start_stop', StartStop)
147 srv(node=
r'test1', action=StartStopRequest.STOP)
152 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
153 except rospy.ROSException:
154 self.fail(
'Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
156 self.assertEqual(self.
getNode(state,
'test1').state, NodeState.IDLE)
157 self.assertEqual(self.
getNode(state,
'test2').state, NodeState.RUNNING)
159 srv(node=
r'test1', action=StartStopRequest.START)
164 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
165 except rospy.ROSException:
166 self.fail(
'Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
168 self.assertEqual(self.
getNode(state,
'test1').state, NodeState.RUNNING)
169 self.assertEqual(self.
getNode(state,
'test2').state, NodeState.RUNNING)
172 srv = rospy.ServiceProxy(
'/rosmon_uut/start_stop', StartStop)
174 srv(node=
r'test\d+', action=StartStopRequest.STOP)
179 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
180 except rospy.ROSException:
181 self.fail(
'Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
183 self.assertEqual(self.
getNode(state,
'test1').state, NodeState.IDLE)
184 self.assertEqual(self.
getNode(state,
'test2').state, NodeState.IDLE)
186 srv(node=
r'test\d+', action=StartStopRequest.START)
191 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
192 except rospy.ROSException:
193 self.fail(
'Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
195 self.assertEqual(self.
getNode(state,
'test1').state, NodeState.RUNNING)
196 self.assertEqual(self.
getNode(state,
'test2').state, NodeState.RUNNING)
198 if __name__ ==
'__main__':
199 rospy.init_node(
'basic_test')
202 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
205 rostest.rosrun(
'rosmon_core',
'basic', BasicTest)