00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import unittest
00035 import yaml
00036
00037
00038 class TestRostopicUnit(unittest.TestCase):
00039
00040 def test_sub_str_plot_fields(self):
00041 from rostopic import _str_plot_fields
00042 from std_msgs.msg import String
00043 from test_rostopic.msg import Arrays, Embed, Simple, TVals
00044
00045 from genpy import Time, Duration
00046 from rostopic import create_field_filter
00047
00048
00049 import rospy.rostime
00050 rospy.rostime.set_rostime_initialized(True)
00051 rospy.rostime._set_rostime(Time(0, 1234))
00052 Time(0, 5678)
00053
00054
00055 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
00056 simple_d = 'time,field.b,field.int16,field.int32,field.int64,field.c,field.uint16,field.uint32,field.uint64,field.str'
00057 simple_nostr = 'time,field.b,field.int16,field.int32,field.int64,field.c,field.uint16,field.uint32,field.uint64'
00058
00059 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
00060 arrays_d = 'time,field.int8_arr0,field.uint8_arr0,field.uint8_arr1,field.int32_arr0,field.int32_arr1,field.int32_arr2,field.uint32_arr0,field.uint32_arr1,field.uint32_arr2,field.string_arr0,field.string_arr1,field.string_arr2,field.time_arr0,field.time_arr1'
00061 arrays_nostr = 'time,field.int8_arr0,field.uint8_arr0,field.uint8_arr1,field.int32_arr0,field.int32_arr1,field.int32_arr2,field.uint32_arr0,field.uint32_arr1,field.uint32_arr2,field.time_arr0,field.time_arr1'
00062
00063 embed_v = Embed(simple_v, arrays_v)
00064 embed_d = simple_d.replace('field.', 'field.simple.')+','+arrays_d.replace('field.', 'field.arrays.')[5:]
00065 embed_nostr = simple_nostr.replace('field.', 'field.simple.')+','+arrays_nostr.replace('field.', 'field.arrays.')[5:]
00066 embed_noarr = simple_d.replace('field.', 'field.simple.')
00067 embed_nostr_noarr = simple_nostr.replace('field.', 'field.simple.')
00068
00069
00070 f = create_field_filter(echo_nostr=False, echo_noarr=False)
00071 m = String()
00072 self.assertEquals("time,field.data", _str_plot_fields(m, 'field', f))
00073 m = String('foo')
00074 self.assertEquals('time,field.data', _str_plot_fields(m, 'field', f))
00075 m = TVals(Time(123, 456), Duration(78, 90))
00076 v = _str_plot_fields(m, 'field', f)
00077 self.assertEquals('time,field.t,field.d', v)
00078 m = simple_v
00079 self.assertEquals(simple_d, _str_plot_fields(m, 'field', f))
00080 m = arrays_v
00081 self.assertEquals(arrays_d, _str_plot_fields(m, 'field', f))
00082 m = embed_v
00083 self.assertEquals(embed_d, _str_plot_fields(m, 'field', f))
00084
00085 f = create_field_filter(echo_nostr=True, echo_noarr=False)
00086 m = String()
00087 self.assertEquals("time,", _str_plot_fields(m, 'field', f))
00088 m = String('foo')
00089 self.assertEquals('time,', _str_plot_fields(m, 'field', f))
00090 m = TVals(Time(123, 456), Duration(78, 90))
00091 v = _str_plot_fields(m, 'field', f)
00092 self.assertEquals('time,field.t,field.d', v)
00093 m = simple_v
00094 self.assertEquals(simple_nostr, _str_plot_fields(m, 'field', f))
00095 m = arrays_v
00096 self.assertEquals(arrays_nostr, _str_plot_fields(m, 'field', f))
00097 m = embed_v
00098 self.assertEquals(embed_nostr, _str_plot_fields(m, 'field', f))
00099
00100 f = create_field_filter(echo_nostr=False, echo_noarr=True)
00101 m = String()
00102 self.assertEquals("time,field.data", _str_plot_fields(m, 'field', f))
00103 m = String('foo')
00104 self.assertEquals("time,field.data", _str_plot_fields(m, 'field', f))
00105 m = TVals(Time(123, 456), Duration(78, 90))
00106 v = _str_plot_fields(m, 'field', f)
00107 self.assertEquals('time,field.t,field.d', v)
00108 m = simple_v
00109 self.assertEquals(simple_d, _str_plot_fields(m, 'field', f))
00110 m = arrays_v
00111 self.assertEquals('time,', _str_plot_fields(m, 'field', f))
00112 m = embed_v
00113 self.assertEquals(embed_noarr, _str_plot_fields(m, 'field', f))
00114
00115 f = create_field_filter(echo_nostr=True, echo_noarr=True)
00116 m = String()
00117 self.assertEquals("time,", _str_plot_fields(m, 'field', f))
00118 m = String('foo')
00119 self.assertEquals('time,', _str_plot_fields(m, 'field', f))
00120 m = TVals(Time(123, 456), Duration(78, 90))
00121 v = _str_plot_fields(m, 'field', f)
00122 self.assertEquals('time,field.t,field.d', v)
00123 m = simple_v
00124 self.assertEquals(simple_nostr, _str_plot_fields(m, 'field', f))
00125 m = arrays_v
00126 self.assertEquals('time,', _str_plot_fields(m, 'field', f))
00127 m = embed_v
00128 self.assertEquals(embed_nostr_noarr, _str_plot_fields(m, 'field', f))
00129
00130 def test_str_plot(self):
00131 from rostopic import _str_plot
00132 from std_msgs.msg import String
00133 from test_rostopic.msg import Arrays, Embed, Simple, TVals
00134
00135 from genpy import Time, Duration
00136 from rostopic import create_field_filter
00137
00138
00139 import rospy.rostime
00140 rospy.rostime.set_rostime_initialized(True)
00141 rospy.rostime._set_rostime(Time(0, 1234))
00142 r_time = Time(0, 5678)
00143
00144
00145 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
00146 simple_d = '1234,1,-2,3,-4,a,7,8,9,bar'
00147 simple_nostr = '1234,1,-2,3,-4,a,7,8,9'
00148
00149 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
00150 arrays_d = '1234,-1,2,3,3,4,5,6,7,8,a1,b2,b3,123000000456,78000000090'
00151 arrays_nostr = '1234,-1,2,3,3,4,5,6,7,8,123000000456,78000000090'
00152
00153 embed_v = Embed(simple_v, arrays_v)
00154 embed_d = simple_d+','+arrays_d[5:]
00155
00156
00157 m = String('foo')
00158 self.assertEquals('5678,foo', _str_plot(m, current_time=r_time, field_filter=None))
00159
00160
00161 f = create_field_filter(echo_nostr=False, echo_noarr=False)
00162 m = String()
00163 self.assertEquals("1234,", _str_plot(m, field_filter=f))
00164 m = String('foo')
00165 self.assertEquals('1234,foo', _str_plot(m, field_filter=f))
00166 m = TVals(Time(123, 456), Duration(78, 90))
00167 v = _str_plot(m, field_filter=f)
00168 self.assertEquals('1234,123000000456,78000000090', v)
00169 m = simple_v
00170 self.assertEquals(simple_d, _str_plot(m, field_filter=f))
00171 m = arrays_v
00172 self.assertEquals(arrays_d, _str_plot(m, field_filter=f))
00173 m = embed_v
00174 self.assertEquals(embed_d, _str_plot(m, field_filter=f))
00175
00176 f = create_field_filter(echo_nostr=True, echo_noarr=False)
00177 m = String()
00178 self.assertEquals("1234,", _str_plot(m, field_filter=f))
00179 m = String('foo')
00180 self.assertEquals('1234,', _str_plot(m, field_filter=f))
00181 m = TVals(Time(123, 456), Duration(78, 90))
00182 v = _str_plot(m, field_filter=f)
00183 self.assertEquals('1234,123000000456,78000000090', v)
00184 m = simple_v
00185 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
00186 m = arrays_v
00187 self.assertEquals(arrays_nostr, _str_plot(m, field_filter=f))
00188 m = embed_v
00189 self.assertEquals(simple_nostr+arrays_nostr[4:], _str_plot(m, field_filter=f))
00190
00191 f = create_field_filter(echo_nostr=False, echo_noarr=True)
00192 m = String()
00193 self.assertEquals("1234,", _str_plot(m, field_filter=f))
00194 m = String('foo')
00195 self.assertEquals('1234,foo', _str_plot(m, field_filter=f))
00196 m = TVals(Time(123, 456), Duration(78, 90))
00197 v = _str_plot(m, field_filter=f)
00198 self.assertEquals('1234,123000000456,78000000090', v)
00199 m = simple_v
00200 self.assertEquals(simple_d, _str_plot(m, field_filter=f))
00201 m = arrays_v
00202 self.assertEquals('1234,', _str_plot(m, field_filter=f))
00203 m = embed_v
00204 self.assertEquals(simple_d, _str_plot(m, field_filter=f))
00205
00206 f = create_field_filter(echo_nostr=True, echo_noarr=True)
00207 m = String()
00208 self.assertEquals("1234,", _str_plot(m, field_filter=f))
00209 m = String('foo')
00210 self.assertEquals('1234,', _str_plot(m, field_filter=f))
00211 m = TVals(Time(123, 456), Duration(78, 90))
00212 v = _str_plot(m, field_filter=f)
00213 self.assertEquals('1234,123000000456,78000000090', v)
00214 m = simple_v
00215 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
00216 m = arrays_v
00217 self.assertEquals('1234,', _str_plot(m, field_filter=f))
00218 m = embed_v
00219 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
00220
00221 def test_strify_message(self):
00222
00223
00224
00225
00226 from std_msgs.msg import String
00227 from test_rostopic.msg import Arrays, Embed, Simple, TVals
00228
00229 from genpy import Time, Duration
00230 from roslib.message import strify_message
00231 from rostopic import create_field_filter
00232
00233 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
00234 simple_d = {'b': 1, 'int16': -2, 'int32': 3, 'int64': -4, 'c': 'a', 'uint16': 7, 'uint32': 8, 'uint64': 9, 'str': 'bar'}
00235 simple_nostr = simple_d.copy()
00236 del simple_nostr['str']
00237
00238 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
00239 arrays_d = {'int8_arr': [-1], 'uint8_arr': [2, 3], 'int32_arr': [3, 4, 5], 'uint32_arr': [6, 7, 8], 'string_arr': ['a1', 'b2', 'b3'], 'time_arr': [{'secs': 123, 'nsecs': 456}, {'secs': 78, 'nsecs': 90}]}
00240 arrays_nostr = arrays_d.copy()
00241 del arrays_nostr['string_arr']
00242
00243 embed_v = Embed(simple_v, arrays_v)
00244 embed_d = {'simple': simple_d, 'arrays': arrays_d}
00245
00246 f = create_field_filter(echo_nostr=False, echo_noarr=False)
00247 m = String()
00248 self.assertEquals("data: ''", strify_message(m, field_filter=f))
00249 m = String('foo')
00250 self.assertEquals('data: foo', strify_message(m, field_filter=f))
00251 m = TVals(Time(123, 456), Duration(78, 90))
00252 v = yaml.load(strify_message(m, field_filter=f))
00253 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
00254 m = simple_v
00255 v = yaml.load(strify_message(m, field_filter=f))
00256 self.assertEquals(simple_d, v)
00257 m = arrays_v
00258 v = yaml.load(strify_message(m, field_filter=f))
00259 self.assertEquals(arrays_d, v)
00260 m = embed_v
00261 v = yaml.load(strify_message(m, field_filter=f))
00262 self.assertEquals(embed_d, v)
00263
00264 f = create_field_filter(echo_nostr=True, echo_noarr=False)
00265 m = String()
00266 self.assertEquals('', strify_message(m, field_filter=f))
00267 m = String('foo')
00268 self.assertEquals('', strify_message(m, field_filter=f))
00269 m = TVals(Time(123, 456), Duration(78, 90))
00270 v = yaml.load(strify_message(m, field_filter=f))
00271 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
00272 m = simple_v
00273 v = yaml.load(strify_message(m, field_filter=f))
00274 self.assertEquals(simple_nostr, v)
00275 m = arrays_v
00276 v = yaml.load(strify_message(m, field_filter=f))
00277 self.assertEquals(arrays_nostr, v)
00278 m = embed_v
00279 v = yaml.load(strify_message(m, field_filter=f))
00280 self.assertEquals({'simple': simple_nostr, 'arrays': arrays_nostr}, v)
00281
00282 f = create_field_filter(echo_nostr=False, echo_noarr=True)
00283 m = String()
00284 self.assertEquals("data: ''", strify_message(m, field_filter=f))
00285 m = String('foo')
00286 self.assertEquals('data: foo', strify_message(m, field_filter=f))
00287 m = TVals(Time(123, 456), Duration(78, 90))
00288 v = yaml.load(strify_message(m, field_filter=f))
00289 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
00290 m = simple_v
00291 v = yaml.load(strify_message(m, field_filter=f))
00292 self.assertEquals(simple_d, v)
00293 m = arrays_v
00294 v = yaml.load(strify_message(m, field_filter=f))
00295 self.assertEquals(None, v)
00296 m = embed_v
00297 v = yaml.load(strify_message(m, field_filter=f))
00298 self.assertEquals({'simple': simple_d, 'arrays': None}, v)
00299
00300 f = create_field_filter(echo_nostr=True, echo_noarr=True)
00301 m = String()
00302 self.assertEquals('', strify_message(m, field_filter=f))
00303 m = String('foo')
00304 self.assertEquals('', strify_message(m, field_filter=f))
00305 m = TVals(Time(123, 456), Duration(78, 90))
00306 v = yaml.load(strify_message(m, field_filter=f))
00307 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
00308 m = simple_v
00309 v = yaml.load(strify_message(m, field_filter=f))
00310 self.assertEquals(simple_nostr, v)
00311 m = embed_v
00312 v = yaml.load(strify_message(m, field_filter=f))
00313 self.assertEquals({'simple': simple_nostr, 'arrays': None}, v)
00314
00315 def test_create_field_filter(self):
00316 from std_msgs.msg import Header, Int32, String
00317 from test_rostopic.msg import Arrays, Embed, Floats, Simple, TVals
00318
00319 from rostopic import create_field_filter
00320 f = create_field_filter(echo_nostr=False, echo_noarr=False)
00321 m = String()
00322 self.assertEquals(['data'], list(f(m)))
00323 m = Int32()
00324 self.assertEquals(['data'], list(f(m)))
00325 m = Arrays()
00326 self.assertEquals(['int8_arr', 'uint8_arr', 'int32_arr', 'uint32_arr', 'string_arr', 'time_arr'], list(f(m)))
00327 m = Embed()
00328 self.assertEquals(['simple', 'arrays'], list(f(m)))
00329 m = Simple()
00330 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64', 'str'], list(f(m)))
00331 m = Floats()
00332 self.assertEquals(['float32', 'float64'], list(f(m)))
00333 m = TVals()
00334 self.assertEquals(['t', 'd'], list(f(m)))
00335 m = Header()
00336 self.assertEquals(['seq', 'stamp', 'frame_id'], list(f(m)))
00337
00338 f = create_field_filter(echo_nostr=True, echo_noarr=False)
00339 m = String()
00340 self.assertEquals([], list(f(m)))
00341 m = Int32()
00342 self.assertEquals(['data'], list(f(m)))
00343 m = Arrays()
00344 self.assertEquals(['int8_arr', 'uint8_arr', 'int32_arr', 'uint32_arr', 'time_arr'], list(f(m)))
00345 m = Embed()
00346 self.assertEquals(['simple', 'arrays'], list(f(m)))
00347 m = Simple()
00348 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64'], list(f(m)))
00349 m = Floats()
00350 self.assertEquals(['float32', 'float64'], list(f(m)))
00351 m = TVals()
00352 self.assertEquals(['t', 'd'], list(f(m)))
00353 m = Header()
00354 self.assertEquals(['seq', 'stamp'], list(f(m)))
00355
00356 f = create_field_filter(echo_nostr=False, echo_noarr=True)
00357 m = String()
00358 self.assertEquals(['data'], list(f(m)))
00359 m = Int32()
00360 self.assertEquals(['data'], list(f(m)))
00361 m = Arrays()
00362 self.assertEquals([], list(f(m)))
00363 m = Embed()
00364 self.assertEquals(['simple', 'arrays'], list(f(m)))
00365 m = Simple()
00366 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64', 'str'], list(f(m)))
00367 m = Floats()
00368 self.assertEquals(['float32', 'float64'], list(f(m)))
00369 m = TVals()
00370 self.assertEquals(['t', 'd'], list(f(m)))
00371 m = Header()
00372 self.assertEquals(['seq', 'stamp', 'frame_id'], list(f(m)))
00373
00374 f = create_field_filter(echo_nostr=True, echo_noarr=True)
00375 m = String()
00376 self.assertEquals([], list(f(m)))
00377 m = Int32()
00378 self.assertEquals(['data'], list(f(m)))
00379 m = Arrays()
00380 self.assertEquals([], list(f(m)))
00381 m = Embed()
00382 self.assertEquals(['simple', 'arrays'], list(f(m)))
00383 m = Simple()
00384 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64'], list(f(m)))
00385 m = Floats()
00386 self.assertEquals(['float32', 'float64'], list(f(m)))
00387 m = TVals()
00388 self.assertEquals(['t', 'd'], list(f(m)))
00389 m = Header()
00390 self.assertEquals(['seq', 'stamp'], list(f(m)))
00391
00392 def test_slicing(self):
00393 from test_rostopic.msg import ArrayVal, Val
00394 from rostopic import msgevalgen as f
00395
00396
00397 msg = ArrayVal()
00398 for v in ['ABCDEFG', 'abcdefg', '1234567', 'short']:
00399 msg.vals.append(Val(val=v))
00400
00401 self.assertEqual(f(''), None)
00402 self.assertEqual(f('/'), None)
00403 self.assertListEqual(f('/vals')(msg), msg.vals)
00404 self.assertListEqual(f('/vals/')(msg), msg.vals)
00405
00406 self.assertListEqual(f('/vals[:]')(msg), msg.vals)
00407 self.assertListEqual(f('/vals[0:2]')(msg), msg.vals[0:2])
00408
00409 self.assertEqual(f('/vals[0]')(msg), msg.vals[0])
00410 self.assertEqual(f('/vals[1]')(msg), msg.vals[1])
00411 self.assertEqual(f('/vals['), None)
00412 self.assertEqual(f('/vals[]'), None)
00413 self.assertEqual(f('/vals[0'), None)
00414
00415 self.assertEqual(f('/vals[0]/val')(msg), msg.vals[0].val)
00416 self.assertEqual(f('/vals[1]/val')(msg), msg.vals[1].val)
00417 self.assertEqual(f('/vals[/val'), None)
00418 self.assertEqual(f('/vals[]/val'), None)
00419 self.assertEqual(f('/vals[0/val'), None)
00420
00421 self.assertEqual(f('/vals[0]/val[:]')(msg), msg.vals[0].val)
00422 self.assertEqual(f('/vals[0]/val[0:2]')(msg), msg.vals[0].val[0:2])
00423 self.assertEqual(f('/vals[0]/val[:-3]')(msg), msg.vals[0].val[:-3])
00424 self.assertEqual(f('/vals[0]/val[2]')(msg), msg.vals[0].val[2])
00425
00426 self.assertListEqual(f('/vals[:3]/val[0]')(msg), ['A', 'a', '1'])
00427 self.assertListEqual(f('/vals[:3]/val[0]')(msg), ['A', 'a', '1'])
00428 self.assertListEqual(f('/vals[1:3]/val[0]')(msg), ['a', '1'])
00429 self.assertListEqual(f('/vals[:]/val[-1]')(msg), ['G', 'g', '7', 't'])
00430
00431 self.assertListEqual(f('/vals[:3]/val[1:3]')(msg), ['BC', 'bc', '23'])
00432
00433 self.assertEqual(f('/vals[5]/val')(msg), None)
00434 self.assertListEqual(f('/vals[:]/val[6]')(msg), ['G', 'g', '7', None])
00435
00436 self.assertEqual(f('/vals[:]/val[]'), None)
00437 self.assertEqual(f('/unknown[:]/val[0]')(msg), None)
00438 self.assertListEqual(f('/vals[:]/unknown[0]')(msg), [None, None, None, None])
00439 self.assertEqual(f('/vals/unknown[0]')(msg), None)