5 from __future__
import print_function
18 from std_msgs.msg
import String
20 from rosmon_msgs.msg
import State, NodeState
22 rospack = rospkg.RosPack()
36 return rospy.get_param(name)
38 params =
', '.join([
'"' + name +
'"' for name
in sorted(rospy.get_param_names()) ])
40 'Caught KeyError("%s") while getting parameter. Known parameters: %s' % (e, params)
45 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
46 except rospy.ROSException:
47 self.fail(
'Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
49 self.assertEqual(len(state.nodes), 4)
51 test1 = [ n
for n
in state.nodes
if n.name ==
'test1' ]
52 self.assertEqual(len(test1), 1)
53 self.assertEqual(test1[0].state, NodeState.RUNNING)
56 pub = rospy.Publisher(
'/test_input', String, queue_size=5, latch=
True)
59 sub = rospy.Subscriber(
'/test_output', String, wfm.cb)
63 self.assertGreater(pub.get_num_connections(), 0)
64 self.assertGreater(sub.get_num_connections(), 0)
65 pub.publish(
'Hello world!')
67 timeout_t = time.time() + 5
68 while wfm.msg
is None:
69 rospy.rostime.wallsleep(0.01)
70 if time.time() >= timeout_t:
71 self.fail(
'No reply to test message')
73 self.assertEqual(wfm.msg.data,
'Hello world!')
79 self.assertEqual(self.
get_param(
"path_to_rosmon"), rospack.get_path(
'rosmon_core'))
82 os.path.join(rospack.get_path(
'rosmon_core'),
'test/basic.launch')
84 executable = self.
get_param(
"path_to_rosmon_executable")
85 self.assert_(os.access(executable, os.X_OK),
'Invalid rosmon path: ' + executable)
89 os.path.join(rospack.get_path(
'rosmon_core'),
'test')
92 self.assertEqual(self.
get_param(
"eval_simple"),
True)
93 self.assertEqual(self.
get_param(
"eval_argexpr"),
True)
94 self.assertAlmostEqual(self.
get_param(
"eval_radius_pi"), 0.5 * math.pi)
96 self.assertEqual(self.
get_param(
"/test1/private_param1"),
"hello_world")
97 self.assertEqual(self.
get_param(
"/test1/private_param2"),
"hello_world")
100 self.assertEqual(self.
get_param(
"multiple_lines1"),
"first_line second_line")
101 self.assertEqual(self.
get_param(
"multiple_lines2"),
"first_line second_line")
104 self.assertAlmostEqual(self.
get_param(
"yaml/radius"), 0.5)
107 self.assertEqual(self.
get_param(
"test_argument"), 123)
110 self.assertEqual(self.
get_param(
"/foo/bar"),
True)
113 pub = rospy.Publisher(
'/remapped_test_input', String, queue_size=5, latch=
True)
116 sub = rospy.Subscriber(
'/remapped_test_output', String, wfm.cb)
120 self.assertGreater(pub.get_num_connections(), 0)
121 self.assertGreater(sub.get_num_connections(), 0)
122 pub.publish(
'Hello world!')
124 timeout_t = time.time() + 5
125 while wfm.msg
is None:
126 rospy.rostime.wallsleep(0.01)
127 if time.time() >= timeout_t:
128 self.fail(
'No reply to test message')
130 self.assertEqual(wfm.msg.data,
'Hello world!')
136 self.assertEqual(rospy.get_param(
"/nested/nested_param"),
"hello")
138 if __name__ ==
'__main__':
139 rospy.init_node(
'basic_test')
142 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
145 rostest.rosrun(
'rosmon_core',
'basic', BasicTest)
def test_global_remapping(self)
def test_rosmon_running(self)
def get_param(self, name)
def test_global_nested_ns(self)
def test_arg_passing(self)
A sample python unit test.