5 from __future__
import print_function
18 from std_msgs.msg
import String
20 from rosmon_msgs.msg
import State, NodeState
22 rospack = rospkg.RosPack()
36 return rospy.get_param(name)
38 params =
', '.join([
'"' + name +
'"' for name
in sorted(rospy.get_param_names()) ])
40 'Caught KeyError("%s") while getting parameter. Known parameters: %s' % (e, params)
45 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
46 except rospy.ROSException:
47 self.fail(
'Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
49 self.assertEqual(len(state.nodes), 3)
51 test1 = [ n
for n
in state.nodes
if n.name ==
'test1' ]
52 self.assertEqual(len(test1), 1)
53 self.assertEqual(test1[0].state, NodeState.RUNNING)
56 pub = rospy.Publisher(
'/test_input', String, queue_size=5, latch=
True)
59 sub = rospy.Subscriber(
'/test_output', String, wfm.cb)
63 self.assertGreater(pub.get_num_connections(), 0)
64 self.assertGreater(sub.get_num_connections(), 0)
65 pub.publish(
'Hello world!')
67 timeout_t = time.time() + 5
68 while wfm.msg
is None:
69 rospy.rostime.wallsleep(0.01)
70 if time.time() >= timeout_t:
71 self.fail(
'No reply to test message')
73 self.assertEqual(wfm.msg.data,
'Hello world!')
79 self.assertEqual(self.
get_param(
"path_to_rosmon"), rospack.get_path(
'rosmon_core'))
82 os.path.join(rospack.get_path(
'rosmon_core'),
'test/basic.launch')
84 executable = self.
get_param(
"path_to_rosmon_executable")
85 self.assert_(os.access(executable, os.X_OK),
'Invalid rosmon path: ' + executable)
89 os.path.join(rospack.get_path(
'rosmon_core'),
'test')
92 self.assertEqual(self.
get_param(
"eval_simple"),
True)
93 self.assertEqual(self.
get_param(
"eval_argexpr"),
True)
94 self.assertAlmostEqual(self.
get_param(
"eval_radius_pi"), 0.5 * math.pi)
96 self.assertEqual(self.
get_param(
"/test1/private_param1"),
"hello_world")
97 self.assertEqual(self.
get_param(
"/test1/private_param2"),
"hello_world")
100 self.assertEqual(self.
get_param(
"multiple_lines1"),
"first_line second_line")
101 self.assertEqual(self.
get_param(
"multiple_lines2"),
"first_line second_line")
104 self.assertAlmostEqual(self.
get_param(
"yaml/radius"), 0.5)
107 self.assertEqual(self.
get_param(
"test_argument"), 123)
110 pub = rospy.Publisher(
'/remapped_test_input', String, queue_size=5, latch=
True)
113 sub = rospy.Subscriber(
'/remapped_test_output', String, wfm.cb)
117 self.assertGreater(pub.get_num_connections(), 0)
118 self.assertGreater(sub.get_num_connections(), 0)
119 pub.publish(
'Hello world!')
121 timeout_t = time.time() + 5
122 while wfm.msg
is None:
123 rospy.rostime.wallsleep(0.01)
124 if time.time() >= timeout_t:
125 self.fail(
'No reply to test message')
127 self.assertEqual(wfm.msg.data,
'Hello world!')
133 self.assertEqual(rospy.get_param(
"/nested/nested_param"),
"hello")
135 if __name__ ==
'__main__':
136 rospy.init_node(
'basic_test')
139 state = rospy.client.wait_for_message(
'/rosmon_uut/state', State, timeout=5.0)
142 rostest.rosrun(
'rosmon_core',
'basic', BasicTest)
def test_global_remapping(self)
def test_rosmon_running(self)
def get_param(self, name)
def test_arg_passing(self)
A sample python unit test.