random_play.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 
35 import unittest
36 import rospy
37 import rostest
38 import sys
39 try:
40  from cStringIO import StringIO
41 except ImportError:
42  from io import StringIO
43 import time
44 from random_messages import RandomMsgGen
45 import subprocess
46 import os
47 
48 import genpy
49 
50 DELAY = 0.5
51 
52 class RandomPlay(unittest.TestCase):
53 
54  def msg_cb_topic(self, topic):
55  def msg_cb(msg):
56  nowtime = rospy.Time.now()
57 
58  if self.start is None:
59  self.start = nowtime
60 
61  nowtime -= self.start
62 
63  self.input.append((topic, msg, nowtime.to_sec()))
64 
65  return msg_cb
66 
67 
68  def test_random_play(self):
69  rospy.init_node('random_sub', anonymous=True)
70 
71  self.start = None
72  self.input = []
73 
74  self.assertTrue(len(sys.argv[1]) > 3)
75 
76  seed = int(sys.argv[1])
77  topics = int(sys.argv[2])
78  length = float(sys.argv[3])
79  scale = float(sys.argv[4])
80  self.use_clock = bool(int(sys.argv[5]))
81 
82  rmg = RandomMsgGen(int(seed), topics, length)
83 
84  subscribers = {}
85  for (topic, msg_class) in rmg.topics():
86  subscribers[topic] = rospy.Subscriber(topic, msg_class, self.msg_cb_topic(topic))
87 
88  bagpath = os.path.join('/tmp', 'test_rosbag_random_record_%d.bag'%seed)
89  cmd = ['rosbag', 'play', '-d', str(DELAY), '-r', str(scale)]
90 
91  rospy.loginfo(str(cmd))
92 
93  if (self.use_clock):
94  cmd += ['--clock', '--hz', '100']
95 
96  cmd += [bagpath]
97 
98  try:
99  f1 = subprocess.Popen(cmd)
100 
101  last_input_count = 0
102  while (len(self.input) < rmg.message_count()):
103 # print "\n%d/%d\n"%(len(self.input), rmg.message_count())
104  time.sleep(1.0)
105  # abort loop if no input is coming in anymore and process has finished
106  if len(self.input) == last_input_count:
107  rc = f1.poll()
108  if rc is not None:
109  self.assertEqual(rc, 0)
110  break
111  last_input_count = len(self.input)
112 
113  self.assertEqual(len(self.input), rmg.message_count())
114 
115  max_late = 0
116  max_early = 0
117  avg_off = 0
118  power = 0
119 
120  for (expect_topic, expect_msg, expect_time) in rmg.messages():
121 
122  if (not self.use_clock):
123  expect_time /= scale
124 
125  buff = StringIO()
126  expect_msg.serialize(buff)
127  expect_msg.deserialize(buff.getvalue())
128 
129  msg_match = False
130 
131  for ind in range(0,100):
132  (input_topic, input_msg, input_time) = self.input[ind]
133 
134  if (genpy.message.strify_message(expect_msg) == genpy.message.strify_message(input_msg)):
135  msg_match = True
136  del self.input[ind]
137 
138  # stats
139  diff = input_time - expect_time
140 
141  if (diff < max_early):
142  max_early = diff
143 
144  if (diff > max_late):
145  max_late = diff
146 
147  avg_off += diff / rmg.message_count()
148 
149  power += (diff**2) / rmg.message_count()
150 
151  # Messages can arrive late, but never very early Both of
152  # these bounds are much larger than they ought to be, but
153  # you never know with a heavily loaded system.
154  self.assertTrue(input_time - expect_time > -.5)
155  self.assertTrue(abs(input_time - expect_time) < 5.0)
156  break
157 
158  if not msg_match:
159  print("No match at time: %f" % expect_time)
160 
161  self.assertTrue(msg_match)
162 
163  print("%f %f %f %f"%(max_early, max_late, avg_off, power))
164 
165  finally:
166  f1.communicate()
167 
168  self.assertEqual(f1.returncode, 0)
169 
170 if __name__ == '__main__':
171  rostest.rosrun('test_rosbag', 'random_record_play', RandomPlay, sys.argv)
def test_random_play(self)
Definition: random_play.py:68
def msg_cb_topic(self, topic)
Definition: random_play.py:54


test_rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:53:09