basic.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Test driver for basic test
4 
5 from __future__ import print_function
6 
7 import sys
8 import unittest
9 import time
10 import os
11 import math
12 
13 import rospy
14 import rospy.client
15 
16 import rospkg
17 
18 from std_msgs.msg import String
19 
20 from rosmon_msgs.msg import State, NodeState
21 
22 rospack = rospkg.RosPack()
23 
24 class _WFM(object):
25  def __init__(self):
26  self.msg = None
27  def cb(self, msg):
28  if self.msg is None:
29  self.msg = msg
30 
31 ## A sample python unit test
32 class BasicTest(unittest.TestCase):
33 
34  def get_param(self, name):
35  try:
36  return rospy.get_param(name)
37  except KeyError as e:
38  params = ', '.join([ '"' + name + '"' for name in sorted(rospy.get_param_names()) ])
39  raise KeyError(
40  'Caught KeyError("%s") while getting parameter. Known parameters: %s' % (e, params)
41  )
42 
44  try:
45  state = rospy.client.wait_for_message('/rosmon_uut/state', State, timeout=5.0)
46  except rospy.ROSException:
47  self.fail('Did not get state msg on /rosmon_uut/state' + repr(rospy.client.get_published_topics()))
48 
49  self.assertEqual(len(state.nodes), 3)
50 
51  test1 = [ n for n in state.nodes if n.name == 'test1' ]
52  self.assertEqual(len(test1), 1)
53  self.assertEqual(test1[0].state, NodeState.RUNNING)
54 
55  def test_remapping(self):
56  pub = rospy.Publisher('/test_input', String, queue_size=5, latch=True)
57 
58  wfm = _WFM()
59  sub = rospy.Subscriber('/test_output', String, wfm.cb)
60 
61  time.sleep(1);
62 
63  self.assertGreater(pub.get_num_connections(), 0)
64  self.assertGreater(sub.get_num_connections(), 0)
65  pub.publish('Hello world!')
66 
67  timeout_t = time.time() + 5
68  while wfm.msg is None:
69  rospy.rostime.wallsleep(0.01)
70  if time.time() >= timeout_t:
71  self.fail('No reply to test message')
72 
73  self.assertEqual(wfm.msg.data, 'Hello world!')
74 
75  sub.unregister()
76  pub.unregister()
77 
78  def test_params(self):
79  self.assertEqual(self.get_param("path_to_rosmon"), rospack.get_path('rosmon_core'))
80  self.assertEqual(
81  self.get_param("path_to_launch_file"),
82  os.path.join(rospack.get_path('rosmon_core'), 'test/basic.launch')
83  )
84  executable = self.get_param("path_to_rosmon_executable")
85  self.assert_(os.access(executable, os.X_OK), 'Invalid rosmon path: ' + executable)
86 
87  self.assertEqual(
88  self.get_param("dirname"),
89  os.path.join(rospack.get_path('rosmon_core'), 'test')
90  )
91 
92  self.assertEqual(self.get_param("eval_simple"), True)
93  self.assertEqual(self.get_param("eval_argexpr"), True)
94  self.assertAlmostEqual(self.get_param("eval_radius_pi"), 0.5 * math.pi)
95 
96  self.assertEqual(self.get_param("/test1/private_param1"), "hello_world")
97  self.assertEqual(self.get_param("/test1/private_param2"), "hello_world")
98 
99  def test_multiLine(self):
100  self.assertEqual(self.get_param("multiple_lines1"), "first_line second_line")
101  self.assertEqual(self.get_param("multiple_lines2"), "first_line second_line")
102 
103  def test_yaml(self):
104  self.assertAlmostEqual(self.get_param("yaml/radius"), 0.5)
105 
106  def test_arg_passing(self):
107  self.assertEqual(self.get_param("test_argument"), 123)
108 
110  pub = rospy.Publisher('/remapped_test_input', String, queue_size=5, latch=True)
111 
112  wfm = _WFM()
113  sub = rospy.Subscriber('/remapped_test_output', String, wfm.cb)
114 
115  time.sleep(1);
116 
117  self.assertGreater(pub.get_num_connections(), 0)
118  self.assertGreater(sub.get_num_connections(), 0)
119  pub.publish('Hello world!')
120 
121  timeout_t = time.time() + 5
122  while wfm.msg is None:
123  rospy.rostime.wallsleep(0.01)
124  if time.time() >= timeout_t:
125  self.fail('No reply to test message')
126 
127  self.assertEqual(wfm.msg.data, 'Hello world!')
128 
129  sub.unregister()
130  pub.unregister()
131 
132  def test_nested(self):
133  self.assertEqual(rospy.get_param("/nested/nested_param"), "hello")
134 
135 if __name__ == '__main__':
136  rospy.init_node('basic_test')
137 
138  # Wait for rosmon to start up
139  state = rospy.client.wait_for_message('/rosmon_uut/state', State, timeout=5.0)
140 
141  import rostest
142  rostest.rosrun('rosmon_core', 'basic', BasicTest)
143 
144  time.sleep(1)
def test_global_remapping(self)
Definition: basic.py:109
def test_yaml(self)
Definition: basic.py:103
def test_nested(self)
Definition: basic.py:132
def test_rosmon_running(self)
Definition: basic.py:43
def get_param(self, name)
Definition: basic.py:34
def test_multiLine(self)
Definition: basic.py:99
def test_remapping(self)
Definition: basic.py:55
def test_arg_passing(self)
Definition: basic.py:106
A sample python unit test.
Definition: basic.py:32
def __init__(self)
Definition: basic.py:25
def test_params(self)
Definition: basic.py:78
def cb(self, msg)
Definition: basic.py:27


rosmon_core
Author(s): Max Schwarz
autogenerated on Wed Jul 10 2019 03:10:12