00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 from __future__ import with_statement
00037
00038 PKG = 'test_rostopic'
00039 NAME = 'test_rostopic_unit'
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 import os
00043 import sys
00044 import unittest
00045 import cStringIO
00046 import time
00047 import yaml
00048
00049 import rostest
00050
00051 from contextlib import contextmanager
00052
00053 class TestRostopicUnit(unittest.TestCase):
00054
00055 def test_sub_str_plot_fields(self):
00056 from rostopic import _str_plot_fields
00057 from std_msgs.msg import String, Int32, Header
00058 from test_rostopic.msg import Simple, TVals, Floats, Arrays, Embed
00059
00060 from roslib.rostime import Time, Duration
00061 from rostopic import create_field_filter
00062
00063
00064 import rospy.rostime
00065 rospy.rostime.set_rostime_initialized(True)
00066 rospy.rostime._set_rostime(Time(0, 1234))
00067 r_time = Time(0, 5678)
00068
00069
00070 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
00071 simple_d = 'time,field.b,field.int16,field.int32,field.int64,field.c,field.uint16,field.uint32,field.uint64,field.str'
00072 simple_nostr = 'time,field.b,field.int16,field.int32,field.int64,field.c,field.uint16,field.uint32,field.uint64'
00073
00074 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
00075 arrays_d = 'time,field.int8_arr0,field.uint8_arr0,field.uint8_arr1,field.int32_arr0,field.int32_arr1,field.int32_arr2,field.uint32_arr0,field.uint32_arr1,field.uint32_arr2,field.string_arr0,field.string_arr1,field.string_arr2,field.time_arr0,field.time_arr1'
00076 arrays_nostr = 'time,field.int8_arr0,field.uint8_arr0,field.uint8_arr1,field.int32_arr0,field.int32_arr1,field.int32_arr2,field.uint32_arr0,field.uint32_arr1,field.uint32_arr2,field.time_arr0,field.time_arr1'
00077
00078 embed_v = Embed(simple_v, arrays_v)
00079 embed_d = simple_d.replace('field.', 'field.simple.')+','+arrays_d.replace('field.', 'field.arrays.')[5:]
00080 embed_nostr = simple_nostr.replace('field.', 'field.simple.')+','+arrays_nostr.replace('field.', 'field.arrays.')[5:]
00081 embed_noarr = simple_d.replace('field.', 'field.simple.')
00082 embed_nostr_noarr = simple_nostr.replace('field.', 'field.simple.')
00083
00084
00085 f = create_field_filter(echo_nostr=False, echo_noarr=False)
00086 m = String()
00087 self.assertEquals("time,field.data", _str_plot_fields(m, 'field',f))
00088 m = String('foo')
00089 self.assertEquals('time,field.data', _str_plot_fields(m, 'field',f))
00090 m = TVals(Time(123, 456), Duration(78, 90))
00091 v = _str_plot_fields(m, 'field', f)
00092 self.assertEquals('time,field.t,field.d', v)
00093 m = simple_v
00094 self.assertEquals(simple_d, _str_plot_fields(m, 'field', f))
00095 m = arrays_v
00096 self.assertEquals(arrays_d, _str_plot_fields(m, 'field', f))
00097 m = embed_v
00098 self.assertEquals(embed_d, _str_plot_fields(m, 'field', f))
00099
00100 f = create_field_filter(echo_nostr=True, echo_noarr=False)
00101 m = String()
00102 self.assertEquals("time,", _str_plot_fields(m, 'field',f))
00103 m = String('foo')
00104 self.assertEquals('time,', _str_plot_fields(m, 'field',f))
00105 m = TVals(Time(123, 456), Duration(78, 90))
00106 v = _str_plot_fields(m, 'field', f)
00107 self.assertEquals('time,field.t,field.d', v)
00108 m = simple_v
00109 self.assertEquals(simple_nostr, _str_plot_fields(m, 'field', f))
00110 m = arrays_v
00111 self.assertEquals(arrays_nostr, _str_plot_fields(m, 'field', f))
00112 m = embed_v
00113 self.assertEquals(embed_nostr, _str_plot_fields(m, 'field', f))
00114
00115 f = create_field_filter(echo_nostr=False, echo_noarr=True)
00116 m = String()
00117 self.assertEquals("time,field.data", _str_plot_fields(m, 'field',f))
00118 m = String('foo')
00119 self.assertEquals("time,field.data", _str_plot_fields(m, 'field',f))
00120 m = TVals(Time(123, 456), Duration(78, 90))
00121 v = _str_plot_fields(m, 'field', f)
00122 self.assertEquals('time,field.t,field.d', v)
00123 m = simple_v
00124 self.assertEquals(simple_d, _str_plot_fields(m, 'field', f))
00125 m = arrays_v
00126 self.assertEquals('time,', _str_plot_fields(m, 'field', f))
00127 m = embed_v
00128 self.assertEquals(embed_noarr, _str_plot_fields(m, 'field', f))
00129
00130 f = create_field_filter(echo_nostr=True, echo_noarr=True)
00131 m = String()
00132 self.assertEquals("time,", _str_plot_fields(m, 'field', f))
00133 m = String('foo')
00134 self.assertEquals('time,', _str_plot_fields(m, 'field', f))
00135 m = TVals(Time(123, 456), Duration(78, 90))
00136 v = _str_plot_fields(m, 'field', f)
00137 self.assertEquals('time,field.t,field.d', v)
00138 m = simple_v
00139 self.assertEquals(simple_nostr, _str_plot_fields(m, 'field', f))
00140 m = arrays_v
00141 self.assertEquals('time,', _str_plot_fields(m, 'field', f))
00142 m = embed_v
00143 self.assertEquals(embed_nostr_noarr, _str_plot_fields(m, 'field', f))
00144
00145 def test_str_plot(self):
00146 from rostopic import _str_plot
00147 from std_msgs.msg import String, Int32, Header
00148 from test_rostopic.msg import Simple, TVals, Floats, Arrays, Embed
00149
00150 from roslib.rostime import Time, Duration
00151 from rostopic import create_field_filter
00152
00153
00154 import rospy.rostime
00155 rospy.rostime.set_rostime_initialized(True)
00156 rospy.rostime._set_rostime(Time(0, 1234))
00157 r_time = Time(0, 5678)
00158
00159
00160 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
00161 simple_d = '1234,1,-2,3,-4,a,7,8,9,bar'
00162 simple_nostr = '1234,1,-2,3,-4,a,7,8,9'
00163
00164 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
00165 arrays_d = '1234,-1,2,3,3,4,5,6,7,8,a1,b2,b3,123000000456,78000000090'
00166 arrays_nostr = '1234,-1,2,3,3,4,5,6,7,8,123000000456,78000000090'
00167
00168 embed_v = Embed(simple_v, arrays_v)
00169 embed_d = simple_d+','+arrays_d[5:]
00170
00171
00172 m = String('foo')
00173 self.assertEquals('5678,foo', _str_plot(m, current_time=r_time, field_filter=None))
00174
00175
00176 f = create_field_filter(echo_nostr=False, echo_noarr=False)
00177 m = String()
00178 self.assertEquals("1234,", _str_plot(m, field_filter=f))
00179 m = String('foo')
00180 self.assertEquals('1234,foo', _str_plot(m, field_filter=f))
00181 m = TVals(Time(123, 456), Duration(78, 90))
00182 v = _str_plot(m, field_filter=f)
00183 self.assertEquals('1234,123000000456,78000000090', v)
00184 m = simple_v
00185 self.assertEquals(simple_d, _str_plot(m, field_filter=f))
00186 m = arrays_v
00187 self.assertEquals(arrays_d, _str_plot(m, field_filter=f))
00188 m = embed_v
00189 self.assertEquals(embed_d, _str_plot(m, field_filter=f))
00190
00191 f = create_field_filter(echo_nostr=True, echo_noarr=False)
00192 m = String()
00193 self.assertEquals("1234,", _str_plot(m, field_filter=f))
00194 m = String('foo')
00195 self.assertEquals('1234,', _str_plot(m, field_filter=f))
00196 m = TVals(Time(123, 456), Duration(78, 90))
00197 v = _str_plot(m, field_filter=f)
00198 self.assertEquals('1234,123000000456,78000000090', v)
00199 m = simple_v
00200 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
00201 m = arrays_v
00202 self.assertEquals(arrays_nostr, _str_plot(m, field_filter=f))
00203 m = embed_v
00204 self.assertEquals(simple_nostr+arrays_nostr[4:], _str_plot(m, field_filter=f))
00205
00206 f = create_field_filter(echo_nostr=False, echo_noarr=True)
00207 m = String()
00208 self.assertEquals("1234,", _str_plot(m, field_filter=f))
00209 m = String('foo')
00210 self.assertEquals('1234,foo', _str_plot(m, field_filter=f))
00211 m = TVals(Time(123, 456), Duration(78, 90))
00212 v = _str_plot(m, field_filter=f)
00213 self.assertEquals('1234,123000000456,78000000090', v)
00214 m = simple_v
00215 self.assertEquals(simple_d, _str_plot(m, field_filter=f))
00216 m = arrays_v
00217 self.assertEquals('1234,', _str_plot(m, field_filter=f))
00218 m = embed_v
00219 self.assertEquals(simple_d, _str_plot(m, field_filter=f))
00220
00221 f = create_field_filter(echo_nostr=True, echo_noarr=True)
00222 m = String()
00223 self.assertEquals("1234,", _str_plot(m, field_filter=f))
00224 m = String('foo')
00225 self.assertEquals('1234,', _str_plot(m, field_filter=f))
00226 m = TVals(Time(123, 456), Duration(78, 90))
00227 v = _str_plot(m, field_filter=f)
00228 self.assertEquals('1234,123000000456,78000000090', v)
00229 m = simple_v
00230 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
00231 m = arrays_v
00232 self.assertEquals('1234,', _str_plot(m, field_filter=f))
00233 m = embed_v
00234 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f))
00235
00236 def test_strify_message(self):
00237
00238
00239
00240
00241 from std_msgs.msg import String, Int32, Header
00242 from test_rostopic.msg import Simple, TVals, Floats, Arrays, Embed
00243
00244 from roslib.rostime import Time, Duration
00245 from roslib.message import strify_message
00246 from rostopic import create_field_filter
00247
00248 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar')
00249 simple_d = {'b': 1, 'int16': -2, 'int32': 3, 'int64': -4, 'c': 'a', 'uint16': 7, 'uint32': 8, 'uint64': 9, 'str': 'bar'}
00250 simple_nostr = simple_d.copy()
00251 del simple_nostr['str']
00252
00253 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)])
00254 arrays_d = {'int8_arr': [-1], 'uint8_arr': [2, 3], 'int32_arr': [3, 4, 5], 'uint32_arr': [6, 7, 8], 'string_arr': ['a1', 'b2', 'b3'], 'time_arr': [{'secs': 123, 'nsecs': 456}, {'secs': 78, 'nsecs': 90}]}
00255 arrays_nostr = arrays_d.copy()
00256 del arrays_nostr['string_arr']
00257
00258 embed_v = Embed(simple_v, arrays_v)
00259 embed_d = {'simple': simple_d, 'arrays': arrays_d}
00260
00261 f = create_field_filter(echo_nostr=False, echo_noarr=False)
00262 m = String()
00263 self.assertEquals("data: ''", strify_message(m, field_filter=f))
00264 m = String('foo')
00265 self.assertEquals('data: foo', strify_message(m, field_filter=f))
00266 m = TVals(Time(123, 456), Duration(78, 90))
00267 v = yaml.load(strify_message(m, field_filter=f))
00268 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
00269 m = simple_v
00270 v = yaml.load(strify_message(m, field_filter=f))
00271 self.assertEquals(simple_d, v)
00272 m = arrays_v
00273 v = yaml.load(strify_message(m, field_filter=f))
00274 self.assertEquals(arrays_d, v)
00275 m = embed_v
00276 v = yaml.load(strify_message(m, field_filter=f))
00277 self.assertEquals(embed_d, v)
00278
00279 f = create_field_filter(echo_nostr=True, echo_noarr=False)
00280 m = String()
00281 self.assertEquals('', strify_message(m, field_filter=f))
00282 m = String('foo')
00283 self.assertEquals('', strify_message(m, field_filter=f))
00284 m = TVals(Time(123, 456), Duration(78, 90))
00285 v = yaml.load(strify_message(m, field_filter=f))
00286 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
00287 m = simple_v
00288 v = yaml.load(strify_message(m, field_filter=f))
00289 self.assertEquals(simple_nostr, v)
00290 m = arrays_v
00291 v = yaml.load(strify_message(m, field_filter=f))
00292 self.assertEquals(arrays_nostr, v)
00293 m = embed_v
00294 v = yaml.load(strify_message(m, field_filter=f))
00295 self.assertEquals({'simple': simple_nostr, 'arrays': arrays_nostr}, v)
00296
00297 f = create_field_filter(echo_nostr=False, echo_noarr=True)
00298 m = String()
00299 self.assertEquals("data: ''", strify_message(m, field_filter=f))
00300 m = String('foo')
00301 self.assertEquals('data: foo', strify_message(m, field_filter=f))
00302 m = TVals(Time(123, 456), Duration(78, 90))
00303 v = yaml.load(strify_message(m, field_filter=f))
00304 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
00305 m = simple_v
00306 v = yaml.load(strify_message(m, field_filter=f))
00307 self.assertEquals(simple_d, v)
00308 m = arrays_v
00309 v = yaml.load(strify_message(m, field_filter=f))
00310 self.assertEquals(None, v)
00311 m = embed_v
00312 v = yaml.load(strify_message(m, field_filter=f))
00313 self.assertEquals({'simple': simple_d, 'arrays': None}, v)
00314
00315 f = create_field_filter(echo_nostr=True, echo_noarr=True)
00316 m = String()
00317 self.assertEquals('', strify_message(m, field_filter=f))
00318 m = String('foo')
00319 self.assertEquals('', strify_message(m, field_filter=f))
00320 m = TVals(Time(123, 456), Duration(78, 90))
00321 v = yaml.load(strify_message(m, field_filter=f))
00322 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v)
00323 m = simple_v
00324 v = yaml.load(strify_message(m, field_filter=f))
00325 self.assertEquals(simple_nostr, v)
00326 m = embed_v
00327 v = yaml.load(strify_message(m, field_filter=f))
00328 self.assertEquals({'simple': simple_nostr, 'arrays': None}, v)
00329
00330 def test_create_field_filter(self):
00331 from std_msgs.msg import String, Int32, Header
00332 from test_rostopic.msg import Simple, TVals, Floats, Arrays, Embed
00333
00334 from rostopic import create_field_filter
00335 f = create_field_filter(echo_nostr=False, echo_noarr=False)
00336 m = String()
00337 self.assertEquals(['data'], list(f(m)))
00338 m = Int32()
00339 self.assertEquals(['data'], list(f(m)))
00340 m = Arrays()
00341 self.assertEquals(['int8_arr', 'uint8_arr', 'int32_arr', 'uint32_arr', 'string_arr', 'time_arr'], list(f(m)))
00342 m = Embed()
00343 self.assertEquals(['simple', 'arrays'], list(f(m)))
00344 m = Simple()
00345 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64', 'str'], list(f(m)))
00346 m = Floats()
00347 self.assertEquals(['float32', 'float64'], list(f(m)))
00348 m = TVals()
00349 self.assertEquals(['t', 'd'], list(f(m)))
00350 m = Header()
00351 self.assertEquals(['seq', 'stamp', 'frame_id'], list(f(m)))
00352
00353 f = create_field_filter(echo_nostr=True, echo_noarr=False)
00354 m = String()
00355 self.assertEquals([], list(f(m)))
00356 m = Int32()
00357 self.assertEquals(['data'], list(f(m)))
00358 m = Arrays()
00359 self.assertEquals(['int8_arr', 'uint8_arr', 'int32_arr', 'uint32_arr', 'time_arr'], list(f(m)))
00360 m = Embed()
00361 self.assertEquals(['simple', 'arrays'], list(f(m)))
00362 m = Simple()
00363 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64'], list(f(m)))
00364 m = Floats()
00365 self.assertEquals(['float32', 'float64'], list(f(m)))
00366 m = TVals()
00367 self.assertEquals(['t', 'd'], list(f(m)))
00368 m = Header()
00369 self.assertEquals(['seq', 'stamp'], list(f(m)))
00370
00371 f = create_field_filter(echo_nostr=False, echo_noarr=True)
00372 m = String()
00373 self.assertEquals(['data'], list(f(m)))
00374 m = Int32()
00375 self.assertEquals(['data'], list(f(m)))
00376 m = Arrays()
00377 self.assertEquals([], list(f(m)))
00378 m = Embed()
00379 self.assertEquals(['simple', 'arrays'], list(f(m)))
00380 m = Simple()
00381 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64', 'str'], list(f(m)))
00382 m = Floats()
00383 self.assertEquals(['float32', 'float64'], list(f(m)))
00384 m = TVals()
00385 self.assertEquals(['t', 'd'], list(f(m)))
00386 m = Header()
00387 self.assertEquals(['seq', 'stamp', 'frame_id'], list(f(m)))
00388
00389 f = create_field_filter(echo_nostr=True, echo_noarr=True)
00390 m = String()
00391 self.assertEquals([], list(f(m)))
00392 m = Int32()
00393 self.assertEquals(['data'], list(f(m)))
00394 m = Arrays()
00395 self.assertEquals([], list(f(m)))
00396 m = Embed()
00397 self.assertEquals(['simple', 'arrays'], list(f(m)))
00398 m = Simple()
00399 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64'], list(f(m)))
00400 m = Floats()
00401 self.assertEquals(['float32', 'float64'], list(f(m)))
00402 m = TVals()
00403 self.assertEquals(['t', 'd'], list(f(m)))
00404 m = Header()
00405 self.assertEquals(['seq', 'stamp'], list(f(m)))
00406
00407
00408 if __name__ == '__main__':
00409 rostest.unitrun(PKG, NAME, TestRostopicUnit, sys.argv, coverage_packages=['rostopic'])