test_rostopic_unit.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import unittest
35 import yaml
36 
37 
38 class TestRostopicUnit(unittest.TestCase):
39 
41  from rostopic import _str_plot_fields
42  from std_msgs.msg import String
43  from test_rostopic.msg import Arrays, Embed, Simple, TVals
44 
45  from genpy import Time, Duration
46  from rostopic import create_field_filter
47 
48  # str plotting requires rospy time, we fix time to a set time
49  import rospy.rostime
50  rospy.rostime.set_rostime_initialized(True)
51  rospy.rostime._set_rostime(Time(0, 1234))
52  Time(0, 5678)
53 
54  # prepare test values
55  simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
56  simple_d = 'time,field.b,field.int16,field.int32,field.int64,field.c,field.uint16,field.uint32,field.uint64,field.str'
57  simple_nostr = 'time,field.b,field.int16,field.int32,field.int64,field.c,field.uint16,field.uint32,field.uint64'
58 
59  arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
60  arrays_d = 'time,field.int8_arr0,field.uint8_arr0,field.uint8_arr1,field.int32_arr0,field.int32_arr1,field.int32_arr2,field.uint32_arr0,field.uint32_arr1,field.uint32_arr2,field.string_arr0,field.string_arr1,field.string_arr2,field.time_arr0,field.time_arr1'
61  arrays_nostr = 'time,field.int8_arr0,field.uint8_arr0,field.uint8_arr1,field.int32_arr0,field.int32_arr1,field.int32_arr2,field.uint32_arr0,field.uint32_arr1,field.uint32_arr2,field.time_arr0,field.time_arr1'
62 
63  embed_v = Embed(simple_v, arrays_v)
64  embed_d = simple_d.replace('field.', 'field.simple.')+','+arrays_d.replace('field.', 'field.arrays.')[5:]
65  embed_nostr = simple_nostr.replace('field.', 'field.simple.')+','+arrays_nostr.replace('field.', 'field.arrays.')[5:]
66  embed_noarr = simple_d.replace('field.', 'field.simple.')
67  embed_nostr_noarr = simple_nostr.replace('field.', 'field.simple.')
68 
69  # test over all combinations of field filters
70  f = create_field_filter(echo_nostr=False, echo_noarr=False)
71  m = String()
72  self.assertEquals("time,field.data", _str_plot_fields(m, 'field', f))
73  m = String('foo')
74  self.assertEquals('time,field.data', _str_plot_fields(m, 'field', f))
75  m = TVals(Time(123, 456), Duration(78, 90))
76  v = _str_plot_fields(m, 'field', f)
77  self.assertEquals('time,field.t,field.d', v)
78  m = simple_v
79  self.assertEquals(simple_d, _str_plot_fields(m, 'field', f))
80  m = arrays_v
81  self.assertEquals(arrays_d, _str_plot_fields(m, 'field', f))
82  m = embed_v
83  self.assertEquals(embed_d, _str_plot_fields(m, 'field', f))
84 
85  f = create_field_filter(echo_nostr=True, echo_noarr=False)
86  m = String()
87  self.assertEquals("time,", _str_plot_fields(m, 'field', f))
88  m = String('foo')
89  self.assertEquals('time,', _str_plot_fields(m, 'field', f))
90  m = TVals(Time(123, 456), Duration(78, 90))
91  v = _str_plot_fields(m, 'field', f)
92  self.assertEquals('time,field.t,field.d', v)
93  m = simple_v
94  self.assertEquals(simple_nostr, _str_plot_fields(m, 'field', f))
95  m = arrays_v
96  self.assertEquals(arrays_nostr, _str_plot_fields(m, 'field', f))
97  m = embed_v
98  self.assertEquals(embed_nostr, _str_plot_fields(m, 'field', f))
99 
100  f = create_field_filter(echo_nostr=False, echo_noarr=True)
101  m = String()
102  self.assertEquals("time,field.data", _str_plot_fields(m, 'field', f))
103  m = String('foo')
104  self.assertEquals("time,field.data", _str_plot_fields(m, 'field', f))
105  m = TVals(Time(123, 456), Duration(78, 90))
106  v = _str_plot_fields(m, 'field', f)
107  self.assertEquals('time,field.t,field.d', v)
108  m = simple_v
109  self.assertEquals(simple_d, _str_plot_fields(m, 'field', f))
110  m = arrays_v
111  self.assertEquals('time,', _str_plot_fields(m, 'field', f))
112  m = embed_v
113  self.assertEquals(embed_noarr, _str_plot_fields(m, 'field', f))
114 
115  f = create_field_filter(echo_nostr=True, echo_noarr=True)
116  m = String()
117  self.assertEquals("time,", _str_plot_fields(m, 'field', f))
118  m = String('foo')
119  self.assertEquals('time,', _str_plot_fields(m, 'field', f))
120  m = TVals(Time(123, 456), Duration(78, 90))
121  v = _str_plot_fields(m, 'field', f)
122  self.assertEquals('time,field.t,field.d', v)
123  m = simple_v
124  self.assertEquals(simple_nostr, _str_plot_fields(m, 'field', f))
125  m = arrays_v
126  self.assertEquals('time,', _str_plot_fields(m, 'field', f))
127  m = embed_v
128  self.assertEquals(embed_nostr_noarr, _str_plot_fields(m, 'field', f))
129 
130  def test_str_plot(self):
131  from rostopic import _str_plot
132  from std_msgs.msg import String
133  from test_rostopic.msg import Arrays, Embed, Simple, TVals
134 
135  from genpy import Time, Duration
136  from rostopic import create_field_filter
137 
138  # str plotting requires rospy time, we fix time to a set time
139  import rospy.rostime
140  rospy.rostime.set_rostime_initialized(True)
141  rospy.rostime._set_rostime(Time(0, 1234))
142  r_time = Time(0, 5678)
143 
144  # prepare test values
145  simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
146  simple_d = '1234,1,-2,3,-4,a,7,8,9,bar'
147  simple_nostr = '1234,1,-2,3,-4,a,7,8,9'
148 
149  arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
150  arrays_d = '1234,-1,2,3,3,4,5,6,7,8,a1,b2,b3,123000000456,78000000090'
151  arrays_nostr = '1234,-1,2,3,3,4,5,6,7,8,123000000456,78000000090'
152 
153  embed_v = Embed(simple_v, arrays_v)
154  embed_d = simple_d+','+arrays_d[5:]
155 
156  # test current_time override
157  m = String('foo')
158  self.assertEquals('5678,foo', _str_plot(m, current_time=r_time, field_filter=None))
159 
160  # test over all combinations of field filters
161  f = create_field_filter(echo_nostr=False, echo_noarr=False)
162  m = String()
163  self.assertEquals("1234,", _str_plot(m, field_filter=f))
164  m = String('foo')
165  self.assertEquals('1234,foo', _str_plot(m, field_filter=f))
166  m = TVals(Time(123, 456), Duration(78, 90))
167  v = _str_plot(m, field_filter=f)
168  self.assertEquals('1234,123000000456,78000000090', v)
169  m = simple_v
170  self.assertEquals(simple_d, _str_plot(m, field_filter=f))
171  m = arrays_v
172  self.assertEquals(arrays_d, _str_plot(m, field_filter=f))
173  m = embed_v
174  self.assertEquals(embed_d, _str_plot(m, field_filter=f))
175 
176  f = create_field_filter(echo_nostr=True, echo_noarr=False)
177  m = String()
178  self.assertEquals("1234,", _str_plot(m, field_filter=f))
179  m = String('foo')
180  self.assertEquals('1234,', _str_plot(m, field_filter=f))
181  m = TVals(Time(123, 456), Duration(78, 90))
182  v = _str_plot(m, field_filter=f)
183  self.assertEquals('1234,123000000456,78000000090', v)
184  m = simple_v
185  self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
186  m = arrays_v
187  self.assertEquals(arrays_nostr, _str_plot(m, field_filter=f))
188  m = embed_v
189  self.assertEquals(simple_nostr+arrays_nostr[4:], _str_plot(m, field_filter=f))
190 
191  f = create_field_filter(echo_nostr=False, echo_noarr=True)
192  m = String()
193  self.assertEquals("1234,", _str_plot(m, field_filter=f))
194  m = String('foo')
195  self.assertEquals('1234,foo', _str_plot(m, field_filter=f))
196  m = TVals(Time(123, 456), Duration(78, 90))
197  v = _str_plot(m, field_filter=f)
198  self.assertEquals('1234,123000000456,78000000090', v)
199  m = simple_v
200  self.assertEquals(simple_d, _str_plot(m, field_filter=f))
201  m = arrays_v
202  self.assertEquals('1234,', _str_plot(m, field_filter=f))
203  m = embed_v
204  self.assertEquals(simple_d, _str_plot(m, field_filter=f))
205 
206  f = create_field_filter(echo_nostr=True, echo_noarr=True)
207  m = String()
208  self.assertEquals("1234,", _str_plot(m, field_filter=f))
209  m = String('foo')
210  self.assertEquals('1234,', _str_plot(m, field_filter=f))
211  m = TVals(Time(123, 456), Duration(78, 90))
212  v = _str_plot(m, field_filter=f)
213  self.assertEquals('1234,123000000456,78000000090', v)
214  m = simple_v
215  self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
216  m = arrays_v
217  self.assertEquals('1234,', _str_plot(m, field_filter=f))
218  m = embed_v
219  self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
220 
222  # strify message is part of roslib, but we want to test with
223  # rostopic's field filters. It's also the case that
224  # roslib.messages cannot be unit tested within the ROS stack
225  # -- part of the reason it needs to be moved elsewhere.
226  from std_msgs.msg import String
227  from test_rostopic.msg import Arrays, Embed, Simple, TVals
228 
229  from genpy import Time, Duration
230  from roslib.message import strify_message
231  from rostopic import create_field_filter
232 
233  simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
234  simple_d = {'b': 1, 'int16': -2, 'int32': 3, 'int64': -4, 'c': 'a', 'uint16': 7, 'uint32': 8, 'uint64': 9, 'str': 'bar'}
235  simple_nostr = simple_d.copy()
236  del simple_nostr['str']
237 
238  arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
239  arrays_d = {'int8_arr': [-1], 'uint8_arr': [2, 3], 'int32_arr': [3, 4, 5], 'uint32_arr': [6, 7, 8], 'string_arr': ['a1', 'b2', 'b3'], 'time_arr': [{'secs': 123, 'nsecs': 456}, {'secs': 78, 'nsecs': 90}]}
240  arrays_nostr = arrays_d.copy()
241  del arrays_nostr['string_arr']
242 
243  embed_v = Embed(simple_v, arrays_v)
244  embed_d = {'simple': simple_d, 'arrays': arrays_d}
245 
246  f = create_field_filter(echo_nostr=False, echo_noarr=False)
247  m = String()
248  self.assertEquals("data: ''", strify_message(m, field_filter=f))
249  m = String('foo')
250  self.assertEquals('data: "foo"', strify_message(m, field_filter=f))
251  m = TVals(Time(123, 456), Duration(78, 90))
252  v = yaml.load(strify_message(m, field_filter=f))
253  self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
254  m = simple_v
255  v = yaml.load(strify_message(m, field_filter=f))
256  self.assertEquals(simple_d, v)
257  m = arrays_v
258  v = yaml.load(strify_message(m, field_filter=f))
259  self.assertEquals(arrays_d, v)
260  m = embed_v
261  v = yaml.load(strify_message(m, field_filter=f))
262  self.assertEquals(embed_d, v)
263 
264  f = create_field_filter(echo_nostr=True, echo_noarr=False)
265  m = String()
266  self.assertEquals('', strify_message(m, field_filter=f))
267  m = String('foo')
268  self.assertEquals('', strify_message(m, field_filter=f))
269  m = TVals(Time(123, 456), Duration(78, 90))
270  v = yaml.load(strify_message(m, field_filter=f))
271  self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
272  m = simple_v
273  v = yaml.load(strify_message(m, field_filter=f))
274  self.assertEquals(simple_nostr, v)
275  m = arrays_v
276  v = yaml.load(strify_message(m, field_filter=f))
277  self.assertEquals(arrays_nostr, v)
278  m = embed_v
279  v = yaml.load(strify_message(m, field_filter=f))
280  self.assertEquals({'simple': simple_nostr, 'arrays': arrays_nostr}, v)
281 
282  f = create_field_filter(echo_nostr=False, echo_noarr=True)
283  m = String()
284  self.assertEquals("data: ''", strify_message(m, field_filter=f))
285  m = String('foo')
286  self.assertEquals('data: "foo"', strify_message(m, field_filter=f))
287  m = TVals(Time(123, 456), Duration(78, 90))
288  v = yaml.load(strify_message(m, field_filter=f))
289  self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
290  m = simple_v
291  v = yaml.load(strify_message(m, field_filter=f))
292  self.assertEquals(simple_d, v)
293  m = arrays_v
294  v = yaml.load(strify_message(m, field_filter=f))
295  self.assertEquals(None, v)
296  m = embed_v
297  v = yaml.load(strify_message(m, field_filter=f))
298  self.assertEquals({'simple': simple_d, 'arrays': None}, v)
299 
300  f = create_field_filter(echo_nostr=True, echo_noarr=True)
301  m = String()
302  self.assertEquals('', strify_message(m, field_filter=f))
303  m = String('foo')
304  self.assertEquals('', strify_message(m, field_filter=f))
305  m = TVals(Time(123, 456), Duration(78, 90))
306  v = yaml.load(strify_message(m, field_filter=f))
307  self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
308  m = simple_v
309  v = yaml.load(strify_message(m, field_filter=f))
310  self.assertEquals(simple_nostr, v)
311  m = embed_v
312  v = yaml.load(strify_message(m, field_filter=f))
313  self.assertEquals({'simple': simple_nostr, 'arrays': None}, v)
314 
316  from std_msgs.msg import Header, Int32, String
317  from test_rostopic.msg import Arrays, Embed, Floats, Simple, TVals
318 
319  from rostopic import create_field_filter
320  f = create_field_filter(echo_nostr=False, echo_noarr=False)
321  m = String()
322  self.assertEquals(['data'], list(f(m)))
323  m = Int32()
324  self.assertEquals(['data'], list(f(m)))
325  m = Arrays()
326  self.assertEquals(['int8_arr', 'uint8_arr', 'int32_arr', 'uint32_arr', 'string_arr', 'time_arr'], list(f(m)))
327  m = Embed()
328  self.assertEquals(['simple', 'arrays'], list(f(m)))
329  m = Simple()
330  self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64', 'str'], list(f(m)))
331  m = Floats()
332  self.assertEquals(['float32', 'float64'], list(f(m)))
333  m = TVals()
334  self.assertEquals(['t', 'd'], list(f(m)))
335  m = Header()
336  self.assertEquals(['seq', 'stamp', 'frame_id'], list(f(m)))
337 
338  f = create_field_filter(echo_nostr=True, echo_noarr=False)
339  m = String()
340  self.assertEquals([], list(f(m)))
341  m = Int32()
342  self.assertEquals(['data'], list(f(m)))
343  m = Arrays()
344  self.assertEquals(['int8_arr', 'uint8_arr', 'int32_arr', 'uint32_arr', 'time_arr'], list(f(m)))
345  m = Embed()
346  self.assertEquals(['simple', 'arrays'], list(f(m)))
347  m = Simple()
348  self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64'], list(f(m)))
349  m = Floats()
350  self.assertEquals(['float32', 'float64'], list(f(m)))
351  m = TVals()
352  self.assertEquals(['t', 'd'], list(f(m)))
353  m = Header()
354  self.assertEquals(['seq', 'stamp'], list(f(m)))
355 
356  f = create_field_filter(echo_nostr=False, echo_noarr=True)
357  m = String()
358  self.assertEquals(['data'], list(f(m)))
359  m = Int32()
360  self.assertEquals(['data'], list(f(m)))
361  m = Arrays()
362  self.assertEquals([], list(f(m)))
363  m = Embed()
364  self.assertEquals(['simple', 'arrays'], list(f(m)))
365  m = Simple()
366  self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64', 'str'], list(f(m)))
367  m = Floats()
368  self.assertEquals(['float32', 'float64'], list(f(m)))
369  m = TVals()
370  self.assertEquals(['t', 'd'], list(f(m)))
371  m = Header()
372  self.assertEquals(['seq', 'stamp', 'frame_id'], list(f(m)))
373 
374  f = create_field_filter(echo_nostr=True, echo_noarr=True)
375  m = String()
376  self.assertEquals([], list(f(m)))
377  m = Int32()
378  self.assertEquals(['data'], list(f(m)))
379  m = Arrays()
380  self.assertEquals([], list(f(m)))
381  m = Embed()
382  self.assertEquals(['simple', 'arrays'], list(f(m)))
383  m = Simple()
384  self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64'], list(f(m)))
385  m = Floats()
386  self.assertEquals(['float32', 'float64'], list(f(m)))
387  m = TVals()
388  self.assertEquals(['t', 'd'], list(f(m)))
389  m = Header()
390  self.assertEquals(['seq', 'stamp'], list(f(m)))
391 
392  def test_slicing(self):
393  from test_rostopic.msg import ArrayVal, Val
394  from rostopic import msgevalgen as f
395 
396  # prepare a sliceable msg
397  msg = ArrayVal()
398  for v in ['ABCDEFG', 'abcdefg', '1234567', 'short']:
399  msg.vals.append(Val(val=v))
400 
401  self.assertEqual(f(''), None)
402  self.assertEqual(f('/'), None)
403  self.assertListEqual(f('/vals')(msg), msg.vals)
404  self.assertListEqual(f('/vals/')(msg), msg.vals)
405  # first-level slicing
406  self.assertListEqual(f('/vals[:]')(msg), msg.vals)
407  self.assertListEqual(f('/vals[0:2]')(msg), msg.vals[0:2])
408  # element access
409  self.assertEqual(f('/vals[0]')(msg), msg.vals[0])
410  self.assertEqual(f('/vals[1]')(msg), msg.vals[1])
411  self.assertEqual(f('/vals['), None)
412  self.assertEqual(f('/vals[]'), None)
413  self.assertEqual(f('/vals[0'), None)
414  # element access continued
415  self.assertEqual(f('/vals[0]/val')(msg), msg.vals[0].val)
416  self.assertEqual(f('/vals[1]/val')(msg), msg.vals[1].val)
417  self.assertEqual(f('/vals[/val'), None)
418  self.assertEqual(f('/vals[]/val'), None)
419  self.assertEqual(f('/vals[0/val'), None)
420  # second-level slicing
421  self.assertEqual(f('/vals[0]/val[:]')(msg), msg.vals[0].val)
422  self.assertEqual(f('/vals[0]/val[0:2]')(msg), msg.vals[0].val[0:2])
423  self.assertEqual(f('/vals[0]/val[:-3]')(msg), msg.vals[0].val[:-3])
424  self.assertEqual(f('/vals[0]/val[2]')(msg), msg.vals[0].val[2])
425  # first-level slicing + second-level access
426  self.assertListEqual(f('/vals[:3]/val[0]')(msg), ['A', 'a', '1'])
427  self.assertListEqual(f('/vals[:3]/val[0]')(msg), ['A', 'a', '1'])
428  self.assertListEqual(f('/vals[1:3]/val[0]')(msg), ['a', '1'])
429  self.assertListEqual(f('/vals[:]/val[-1]')(msg), ['G', 'g', '7', 't'])
430  # multiple slicing
431  self.assertListEqual(f('/vals[:3]/val[1:3]')(msg), ['BC', 'bc', '23'])
432  # out-of-range errors
433  self.assertEqual(f('/vals[5]/val')(msg), None)
434  self.assertListEqual(f('/vals[:]/val[6]')(msg), ['G', 'g', '7', None])
435  # invalid descriptions
436  self.assertEqual(f('/vals[:]/val[]'), None)
437  self.assertEqual(f('/unknown[:]/val[0]')(msg), None)
438  self.assertListEqual(f('/vals[:]/unknown[0]')(msg), [None, None, None, None])
439  self.assertEqual(f('/vals/unknown[0]')(msg), None)


test_rostopic
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:53:10