test_functional.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import unittest
4 import rosunit
5 from geometry_msgs.msg import Twist, TwistStamped
6 
7 PKG = 'rr_control_input_manager'
8 
9 
10 class TestTwistCommand(unittest.TestCase):
11  def __init__(self, *args, **kwargs):
12  super(TestTwistCommand, self).__init__(*args, **kwargs)
13  self.message_received = False
14  self.message_stamp = None
15  self.msg = None
16  self.pub = None
17  self.sub = None
18 
19  def setUp(self):
20  test_method_name = self._testMethodName
21  subscribers = {'test_manager_unstamped': ('/test/outputA', Twist),
22  'test_manager_stamped': ('/test/outputF', TwistStamped),
23  'test_latency_management_zero': ('/test/outputG', Twist),
24  'test_latency_management_one': ('/test/outputH', Twist),
25  'test_latency_management_less_than_zero': ('/test/outputC', Twist)
26  }
27  publishers = {'test_manager_unstamped': ('/test/inputA', TwistStamped),
28  'test_manager_stamped': ('/test/inputF', TwistStamped),
29  'test_latency_management_zero': ('/test/inputG', TwistStamped),
30  'test_latency_management_one': ('/test/inputH', TwistStamped),
31  'test_latency_management_less_than_zero': ('/test/inputC', TwistStamped)
32  }
33 
34  if test_method_name in subscribers.keys():
35  self.sub = rospy.Subscriber(subscribers[test_method_name][0],
36  subscribers[test_method_name][1],
37  self.listener,
38  queue_size=10)
39  if test_method_name in publishers.keys():
40  self.pub = rospy.Publisher(publishers[test_method_name][0],
41  publishers[test_method_name][1],
42  queue_size=1)
43 
44  # Allow time for the publisher and subscriber to properly connect to endpoints
45  rospy.sleep(1)
46 
47  def tearDown(self):
48  if self.sub:
49  self.sub.unregister()
50  if self.pub:
51  self.pub.unregister()
52 
53  # rospy.signal_shutdown('Test {test_name} completed'.format(test_name=self._testMethodName))
54 
55  def listener(self, data):
56  self.message_stamp = rospy.Time.now()
57  self.message_received = True
58  self.msg = data
59 
60  def generate_twist(self, x=0, y=0, z=0, rx=0, ry=0, rz=0):
61  msg = Twist()
62  msg.linear.x = x
63  msg.linear.y = y
64  msg.linear.z = z
65  msg.angular.x = rx
66  msg.angular.y = ry
67  msg.angular.z = rz
68 
69  return msg
70 
71  def generate_twist_stamped(self, time, seq=0, frame_id='', x=0, y=0, z=0, ax=0, ay=0, az=0):
72  msg = TwistStamped()
73  msg.header.seq = seq
74  msg.header.stamp = time
75  msg.header.frame_id = frame_id
76  msg.twist.linear.x = x
77  msg.twist.linear.y = y
78  msg.twist.linear.z = z
79  msg.twist.angular.x = ax
80  msg.twist.angular.y = ay
81  msg.twist.angular.z = az
82 
83  return msg
84 
85  def twist_stamped_eq(self, msg1, msg2):
86  """Tests TwistStamped equality, disregarding header.seq. `seq` can be overwritten
87  when published and is a deprecated feature, so it left out of the equality check.."""
88  result = msg1.header.stamp == msg2.header.stamp \
89  and msg1.header.frame_id == msg2.header.frame_id \
90  and msg1.twist == msg2.twist
91 
92  return result
93 
94  def test_creates_proper_topics(self): # only functions with 'test_'-prefix will be run!
95  topics = [item[0] for item in rospy.get_published_topics()]
96  self.assertIn('/test/outputA', topics)
97  self.assertIn('/test/outputB', topics)
98  self.assertIn('/test/outputC', topics)
99  self.assertIn('/test/outputF', topics)
100  self.assertIn('/test/outputG', topics)
101 
102  def test_bad_timeout(self): # only functions with 'test_'-prefix will be run!
103  topics = [item[0] for item in rospy.get_published_topics()]
104  self.assertNotIn('/test/inputD', topics)
105  self.assertNotIn('/test/outputD', topics)
106 
107  def test_bad_stamped(self):
108  topics = [item[0] for item in rospy.get_published_topics()]
109  self.assertNotIn('/test/inputE', topics)
110  self.assertNotIn('/test/outputE', topics)
111  print(topics)
112 
114  rate = rospy.Rate(100)
115 
116  msg = self.generate_twist_stamped(rospy.Time.now())
117  self.pub.publish(msg)
118 
119  for timeout in range(1000):
120  if self.message_received:
121  break
122  rate.sleep()
123 
124  self.assertEqual(msg.twist, self.msg)
125 
127  TEST = 'test_manager_stamped'
128  rate = rospy.Rate(100)
129 
130  msg = self.generate_twist_stamped(rospy.Time.now(), frame_id=TEST)
131  self.pub.publish(msg)
132 
133  for timeout in range(1000):
134  if self.message_received:
135  break
136  rate.sleep()
137 
138  self.assertTrue(self.twist_stamped_eq(msg, self.msg))
139 
141  rate = rospy.Rate(100)
142  msg = self.generate_twist_stamped(rospy.Time.now())
143 
144  # This is used to simulate latency
145  rospy.sleep(10)
146 
147  self.pub.publish(msg)
148 
149  for timeout in range(1000):
150  if self.message_received:
151  break
152  rate.sleep()
153 
154  self.assertEqual(msg.twist, self.msg)
155 
157  rate = rospy.Rate(100)
158  msg = self.generate_twist_stamped(rospy.Time.now())
159 
160  # This is used to simulate latency
161  rospy.sleep(10)
162 
163  self.pub.publish(msg)
164 
165  for timeout in range(1000):
166  if self.message_received:
167  break
168  rate.sleep()
169 
170  self.assertEqual(None, self.msg)
171 
173  rate = rospy.Rate(100)
174  msg = self.generate_twist_stamped(rospy.Time.now())
175 
176  # This is used to simulate latency
177  rospy.sleep(10)
178 
179  self.pub.publish(msg)
180 
181  for timeout in range(1000):
182  if self.message_received:
183  break
184  rate.sleep()
185 
186  self.assertEqual(msg.twist, self.msg)
187 
188 
189 rospy.init_node('test_control_input_manager_functional_test')
190 rosunit.unitrun(PKG, 'test_control_input_manager_functional', TestTwistCommand)
def twist_stamped_eq(self, msg1, msg2)
def generate_twist(self, x=0, y=0, z=0, rx=0, ry=0, rz=0)
def __init__(self, args, kwargs)
def generate_twist_stamped(self, time, seq=0, frame_id='', x=0, y=0, z=0, ax=0, ay=0, az=0)


rr_control_input_manager
Author(s):
autogenerated on Thu Sep 10 2020 03:38:35