test_latency.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import unittest
4 import rosunit
5 from geometry_msgs.msg import Twist, TwistStamped
6 import numpy as np
7 
8 PKG = 'rr_control_input_manager'
9 
10 
11 class TestLatency(unittest.TestCase):
12  def __init__(self, *args, **kwargs):
13  super(TestLatency, self).__init__(*args, **kwargs)
14  self.trials = 1000
15  self.message_received = False
16  self.message_stamp = None
17  self.msg_latency = np.zeros(self.trials)
18  self.msg = None
19  self.pub = None
20  self.sub = None
21 
22  def setUp(self):
23  test_method_name = self._testMethodName
24  subscribers = {'test_twist_unstamped_latency': ('/test/outputA', Twist),
25  'test_twist_stamped_latency': ('/test/outputF', TwistStamped)
26  }
27  publishers = {'test_twist_unstamped_latency': ('/test/inputA', TwistStamped),
28  'test_twist_stamped_latency': ('/test/inputF', TwistStamped),
29  }
30 
31  if test_method_name in subscribers.keys():
32  self.sub = rospy.Subscriber(subscribers[test_method_name][0],
33  subscribers[test_method_name][1],
34  self.listener,
35  queue_size=10)
36  if test_method_name in publishers.keys():
37  self.pub = rospy.Publisher(publishers[test_method_name][0],
38  publishers[test_method_name][1],
39  queue_size=1)
40 
41  # Allow time for the publisher and subscriber to properly connect to endpoints
42  rospy.sleep(1)
43 
44  def tearDown(self):
45  if self.sub:
46  self.sub.unregister()
47  if self.pub:
48  self.pub.unregister()
49 
50  def listener(self, data):
51  self.message_stamp = rospy.Time.now()
52  self.message_received = True
53  self.msg = data
54 
55  def reset_message_info(self):
56  self.msg = None
57  self.message_received = False
58  self.message_stamp = None
59 
60  def generate_twist(self, x=0, y=0, z=0, rx=0, ry=0, rz=0):
61  msg = Twist()
62  msg.linear.x = x
63  msg.linear.y = y
64  msg.linear.z = z
65  msg.angular.x = rx
66  msg.angular.y = ry
67  msg.angular.z = rz
68 
69  return msg
70 
71  def generate_twist_stamped(self, time, seq=0, frame_id='', x=0, y=0, z=0, ax=0, ay=0, az=0):
72  msg = TwistStamped()
73  msg.header.seq = seq
74  msg.header.stamp = time
75  msg.header.frame_id = frame_id
76  msg.twist.linear.x = x
77  msg.twist.linear.y = y
78  msg.twist.linear.z = z
79  msg.twist.angular.x = ax
80  msg.twist.angular.y = ay
81  msg.twist.angular.z = az
82 
83  return msg
84 
85  def compare_twist_stamped(self, msg1, msg2):
86  result = msg1.header.stamp == msg2.header.stamp \
87  and msg1.header.frame_id == msg2.header.frame_id \
88  and msg1.twist == msg2.twist
89 
90  return result
91 
93  rate = rospy.Rate(100)
94 
95  for i in range(self.trials):
96  msg = self.generate_twist_stamped(rospy.Time.now(), frame_id=str(i))
97  self.pub.publish(msg)
98 
99  for timeout in range(1000):
100  if self.message_received:
101  break
102  rate.sleep()
103 
104  self.assertTrue(self.compare_twist_stamped(msg, self.msg))
105  self.msg_latency[i] = (self.message_stamp - msg.header.stamp).to_sec()
106  self.reset_message_info()
107 
108  self.assertGreater(np.min(self.msg_latency), 0.0)
109  self.assertLess(np.mean(self.msg_latency), 0.01)
110  self.assertLess(np.max(self.msg_latency), 0.05)
111 
113  rate = rospy.Rate(100)
114 
115  for i in range(self.trials):
116  msg = self.generate_twist_stamped(rospy.Time.now(), frame_id=str(i))
117  self.pub.publish(msg)
118 
119  for timeout in range(1000):
120  if self.message_received:
121  break
122  rate.sleep()
123 
124  self.assertEqual(msg.twist, self.msg)
125  self.msg_latency[i] = (self.message_stamp - msg.header.stamp).to_sec()
126  self.reset_message_info()
127 
128  self.assertGreater(np.min(self.msg_latency), 0.0)
129  self.assertLess(np.mean(self.msg_latency), 0.01)
130  self.assertLess(np.max(self.msg_latency), 0.05)
131 
132 
133 rospy.init_node('test_control_input_manager_latency_test')
134 rosunit.unitrun(PKG, 'test_control_input_manager_latency', TestLatency)
def listener(self, data)
Definition: test_latency.py:50
def generate_twist(self, x=0, y=0, z=0, rx=0, ry=0, rz=0)
Definition: test_latency.py:60
def test_twist_unstamped_latency(self)
def compare_twist_stamped(self, msg1, msg2)
Definition: test_latency.py:85
def generate_twist_stamped(self, time, seq=0, frame_id='', x=0, y=0, z=0, ax=0, ay=0, az=0)
Definition: test_latency.py:71
def __init__(self, args, kwargs)
Definition: test_latency.py:12
def test_twist_stamped_latency(self)
Definition: test_latency.py:92


rr_control_input_manager
Author(s):
autogenerated on Thu Sep 10 2020 03:38:35