basic.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Test driver for basic test
4 
5 from __future__ import print_function
6 
7 import sys
8 import unittest
9 import time
10 import os
11 import math
12 
13 import rospy
14 import rospy.client
15 
16 import rospkg
17 
18 from std_msgs.msg import String
19 
20 from rosmon_msgs.msg import State, NodeState
21 from rosmon_msgs.srv import StartStop, StartStopRequest
22 
23 rospack = rospkg.RosPack()
24 
25 class _WFM(object):
26  def __init__(self):
27  self.msg = None
28  def cb(self, msg):
29  if self.msg is None:
30  self.msg = msg
31 
32 
33 class BasicTest(unittest.TestCase):
34 
35  def get_param(self, name):
36  try:
37  return rospy.get_param(name)
38  except KeyError as e:
39  params = ', '.join([ '"' + name + '"' for name in sorted(rospy.get_param_names()) ])
40  raise KeyError(
41  'Caught KeyError("%s") while getting parameter. Known parameters: %s' % (e, params)
42  )
43 
45  try:
46  state = rospy.client.wait_for_message('/rosmon_uut/state', State, timeout=5.0)
47  except rospy.ROSException:
48  self.fail('Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
49 
50  self.assertEqual(len(state.nodes), 4)
51 
52  test1 = [ n for n in state.nodes if n.name == 'test1' ]
53  self.assertEqual(len(test1), 1)
54  self.assertEqual(test1[0].state, NodeState.RUNNING)
55 
56  def test_remapping(self):
57  pub = rospy.Publisher('/test_input', String, queue_size=5, latch=True)
58 
59  wfm = _WFM()
60  sub = rospy.Subscriber('/test_output', String, wfm.cb)
61 
62  time.sleep(1);
63 
64  self.assertGreater(pub.get_num_connections(), 0)
65  self.assertGreater(sub.get_num_connections(), 0)
66  pub.publish('Hello world!')
67 
68  timeout_t = time.time() + 5
69  while wfm.msg is None:
70  rospy.rostime.wallsleep(0.01)
71  if time.time() >= timeout_t:
72  self.fail('No reply to test message')
73 
74  self.assertEqual(wfm.msg.data, 'Hello world!')
75 
76  sub.unregister()
77  pub.unregister()
78 
79  def test_params(self):
80  self.assertEqual(self.get_param("path_to_rosmon"), rospack.get_path('rosmon_core'))
81  self.assertEqual(
82  self.get_param("path_to_launch_file"),
83  os.path.join(rospack.get_path('rosmon_core'), 'test/basic.launch')
84  )
85  executable = self.get_param("path_to_rosmon_executable")
86  self.assert_(os.access(executable, os.X_OK), 'Invalid rosmon path: ' + executable)
87 
88  self.assertEqual(
89  self.get_param("dirname"),
90  os.path.join(rospack.get_path('rosmon_core'), 'test')
91  )
92 
93  self.assertEqual(self.get_param("eval_simple"), True)
94  self.assertEqual(self.get_param("eval_argexpr"), True)
95  self.assertAlmostEqual(self.get_param("eval_radius_pi"), 0.5 * math.pi)
96 
97  self.assertEqual(self.get_param("/test1/private_param1"), "hello_world")
98  self.assertEqual(self.get_param("/test1/private_param2"), "hello_world")
99 
100  def test_multiLine(self):
101  self.assertEqual(self.get_param("multiple_lines1"), "first_line second_line")
102  self.assertEqual(self.get_param("multiple_lines2"), "first_line second_line")
103 
104  def test_yaml(self):
105  self.assertAlmostEqual(self.get_param("yaml/radius"), 0.5)
106 
107  def test_arg_passing(self):
108  self.assertEqual(self.get_param("test_argument"), 123)
109 
111  self.assertEqual(self.get_param("/foo/bar"), True)
112 
114  pub = rospy.Publisher('/remapped_test_input', String, queue_size=5, latch=True)
115 
116  wfm = _WFM()
117  sub = rospy.Subscriber('/remapped_test_output', String, wfm.cb)
118 
119  time.sleep(1);
120 
121  self.assertGreater(pub.get_num_connections(), 0)
122  self.assertGreater(sub.get_num_connections(), 0)
123  pub.publish('Hello world!')
124 
125  timeout_t = time.time() + 5
126  while wfm.msg is None:
127  rospy.rostime.wallsleep(0.01)
128  if time.time() >= timeout_t:
129  self.fail('No reply to test message')
130 
131  self.assertEqual(wfm.msg.data, 'Hello world!')
132 
133  sub.unregister()
134  pub.unregister()
135 
136  def test_nested(self):
137  self.assertEqual(rospy.get_param("/nested/nested_param"), "hello")
138 
139  def getNode(self, state, name):
140  lst = list(filter(lambda n: n.name == name, state.nodes))
141  self.assertEqual(len(lst), 1)
142  return lst[0]
143 
145  srv = rospy.ServiceProxy('/rosmon_uut/start_stop', StartStop)
146 
147  srv(node=r'test1', action=StartStopRequest.STOP)
148 
149  time.sleep(2)
150 
151  try:
152  state = rospy.client.wait_for_message('/rosmon_uut/state', State, timeout=5.0)
153  except rospy.ROSException:
154  self.fail('Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
155 
156  self.assertEqual(self.getNode(state, 'test1').state, NodeState.IDLE)
157  self.assertEqual(self.getNode(state, 'test2').state, NodeState.RUNNING)
158 
159  srv(node=r'test1', action=StartStopRequest.START)
160 
161  time.sleep(2)
162 
163  try:
164  state = rospy.client.wait_for_message('/rosmon_uut/state', State, timeout=5.0)
165  except rospy.ROSException:
166  self.fail('Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
167 
168  self.assertEqual(self.getNode(state, 'test1').state, NodeState.RUNNING)
169  self.assertEqual(self.getNode(state, 'test2').state, NodeState.RUNNING)
170 
172  srv = rospy.ServiceProxy('/rosmon_uut/start_stop', StartStop)
173 
174  srv(node=r'test\d+', action=StartStopRequest.STOP)
175 
176  time.sleep(2)
177 
178  try:
179  state = rospy.client.wait_for_message('/rosmon_uut/state', State, timeout=5.0)
180  except rospy.ROSException:
181  self.fail('Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
182 
183  self.assertEqual(self.getNode(state, 'test1').state, NodeState.IDLE)
184  self.assertEqual(self.getNode(state, 'test2').state, NodeState.IDLE)
185 
186  srv(node=r'test\d+', action=StartStopRequest.START)
187 
188  time.sleep(2)
189 
190  try:
191  state = rospy.client.wait_for_message('/rosmon_uut/state', State, timeout=5.0)
192  except rospy.ROSException:
193  self.fail('Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
194 
195  self.assertEqual(self.getNode(state, 'test1').state, NodeState.RUNNING)
196  self.assertEqual(self.getNode(state, 'test2').state, NodeState.RUNNING)
197 
198 if __name__ == '__main__':
199  rospy.init_node('basic_test')
200 
201  # Wait for rosmon to start up
202  state = rospy.client.wait_for_message('/rosmon_uut/state', State, timeout=5.0)
203 
204  import rostest
205  rostest.rosrun('rosmon_core', 'basic', BasicTest)
206 
207  time.sleep(1)
basic.BasicTest.test_startstop_regex
def test_startstop_regex(self)
Definition: basic.py:171
basic.BasicTest.test_startstop_one
def test_startstop_one(self)
Definition: basic.py:144
basic._WFM.__init__
def __init__(self)
Definition: basic.py:26
basic.BasicTest.test_rosmon_running
def test_rosmon_running(self)
Definition: basic.py:44
basic.BasicTest.test_params
def test_params(self)
Definition: basic.py:79
basic.BasicTest.getNode
def getNode(self, state, name)
Definition: basic.py:139
basic.BasicTest.test_yaml
def test_yaml(self)
Definition: basic.py:104
basic._WFM.msg
msg
Definition: basic.py:27
basic.BasicTest.test_nested
def test_nested(self)
Definition: basic.py:136
basic.BasicTest.test_multiLine
def test_multiLine(self)
Definition: basic.py:100
basic.BasicTest.test_arg_passing
def test_arg_passing(self)
Definition: basic.py:107
basic.BasicTest.get_param
def get_param(self, name)
Definition: basic.py:35
basic.BasicTest.test_remapping
def test_remapping(self)
Definition: basic.py:56
basic.BasicTest
A sample python unit test.
Definition: basic.py:33
basic.BasicTest.test_global_remapping
def test_global_remapping(self)
Definition: basic.py:113
basic._WFM
Definition: basic.py:25
basic.BasicTest.test_global_nested_ns
def test_global_nested_ns(self)
Definition: basic.py:110
basic._WFM.cb
def cb(self, msg)
Definition: basic.py:28


rosmon_core
Author(s): Max Schwarz
autogenerated on Wed Feb 21 2024 04:01:14