$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rostopic_command_line_offline.py 6214 2009-09-18 23:38:22Z kwc $ 00035 00036 from __future__ import with_statement 00037 00038 PKG = 'test_rostopic' 00039 NAME = 'test_rostopic_unit' 00040 import roslib; roslib.load_manifest(PKG) 00041 00042 import os 00043 import sys 00044 import unittest 00045 import cStringIO 00046 import time 00047 import yaml 00048 00049 import rostest 00050 00051 from contextlib import contextmanager 00052 00053 class TestRostopicUnit(unittest.TestCase): 00054 00055 def test_sub_str_plot_fields(self): 00056 from rostopic import _str_plot_fields 00057 from std_msgs.msg import String, Int32, Header 00058 from test_rostopic.msg import Simple, TVals, Floats, Arrays, Embed 00059 00060 from roslib.rostime import Time, Duration 00061 from rostopic import create_field_filter 00062 00063 # str plotting requires rospy time, we fix time to a set time 00064 import rospy.rostime 00065 rospy.rostime.set_rostime_initialized(True) 00066 rospy.rostime._set_rostime(Time(0, 1234)) 00067 r_time = Time(0, 5678) 00068 00069 # prepare test values 00070 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar') 00071 simple_d = 'time,field.b,field.int16,field.int32,field.int64,field.c,field.uint16,field.uint32,field.uint64,field.str' 00072 simple_nostr = 'time,field.b,field.int16,field.int32,field.int64,field.c,field.uint16,field.uint32,field.uint64' 00073 00074 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)]) 00075 arrays_d = 'time,field.int8_arr0,field.uint8_arr0,field.uint8_arr1,field.int32_arr0,field.int32_arr1,field.int32_arr2,field.uint32_arr0,field.uint32_arr1,field.uint32_arr2,field.string_arr0,field.string_arr1,field.string_arr2,field.time_arr0,field.time_arr1' 00076 arrays_nostr = 'time,field.int8_arr0,field.uint8_arr0,field.uint8_arr1,field.int32_arr0,field.int32_arr1,field.int32_arr2,field.uint32_arr0,field.uint32_arr1,field.uint32_arr2,field.time_arr0,field.time_arr1' 00077 00078 embed_v = Embed(simple_v, arrays_v) 00079 embed_d = simple_d.replace('field.', 'field.simple.')+','+arrays_d.replace('field.', 'field.arrays.')[5:] 00080 embed_nostr = simple_nostr.replace('field.', 'field.simple.')+','+arrays_nostr.replace('field.', 'field.arrays.')[5:] 00081 embed_noarr = simple_d.replace('field.', 'field.simple.') 00082 embed_nostr_noarr = simple_nostr.replace('field.', 'field.simple.') 00083 00084 # test over all combinations of field filters 00085 f = create_field_filter(echo_nostr=False, echo_noarr=False) 00086 m = String() 00087 self.assertEquals("time,field.data", _str_plot_fields(m, 'field',f)) 00088 m = String('foo') 00089 self.assertEquals('time,field.data', _str_plot_fields(m, 'field',f)) 00090 m = TVals(Time(123, 456), Duration(78, 90)) 00091 v = _str_plot_fields(m, 'field', f) 00092 self.assertEquals('time,field.t,field.d', v) 00093 m = simple_v 00094 self.assertEquals(simple_d, _str_plot_fields(m, 'field', f)) 00095 m = arrays_v 00096 self.assertEquals(arrays_d, _str_plot_fields(m, 'field', f)) 00097 m = embed_v 00098 self.assertEquals(embed_d, _str_plot_fields(m, 'field', f)) 00099 00100 f = create_field_filter(echo_nostr=True, echo_noarr=False) 00101 m = String() 00102 self.assertEquals("time,", _str_plot_fields(m, 'field',f)) 00103 m = String('foo') 00104 self.assertEquals('time,', _str_plot_fields(m, 'field',f)) 00105 m = TVals(Time(123, 456), Duration(78, 90)) 00106 v = _str_plot_fields(m, 'field', f) 00107 self.assertEquals('time,field.t,field.d', v) 00108 m = simple_v 00109 self.assertEquals(simple_nostr, _str_plot_fields(m, 'field', f)) 00110 m = arrays_v 00111 self.assertEquals(arrays_nostr, _str_plot_fields(m, 'field', f)) 00112 m = embed_v 00113 self.assertEquals(embed_nostr, _str_plot_fields(m, 'field', f)) 00114 00115 f = create_field_filter(echo_nostr=False, echo_noarr=True) 00116 m = String() 00117 self.assertEquals("time,field.data", _str_plot_fields(m, 'field',f)) 00118 m = String('foo') 00119 self.assertEquals("time,field.data", _str_plot_fields(m, 'field',f)) 00120 m = TVals(Time(123, 456), Duration(78, 90)) 00121 v = _str_plot_fields(m, 'field', f) 00122 self.assertEquals('time,field.t,field.d', v) 00123 m = simple_v 00124 self.assertEquals(simple_d, _str_plot_fields(m, 'field', f)) 00125 m = arrays_v 00126 self.assertEquals('time,', _str_plot_fields(m, 'field', f)) 00127 m = embed_v 00128 self.assertEquals(embed_noarr, _str_plot_fields(m, 'field', f)) 00129 00130 f = create_field_filter(echo_nostr=True, echo_noarr=True) 00131 m = String() 00132 self.assertEquals("time,", _str_plot_fields(m, 'field', f)) 00133 m = String('foo') 00134 self.assertEquals('time,', _str_plot_fields(m, 'field', f)) 00135 m = TVals(Time(123, 456), Duration(78, 90)) 00136 v = _str_plot_fields(m, 'field', f) 00137 self.assertEquals('time,field.t,field.d', v) 00138 m = simple_v 00139 self.assertEquals(simple_nostr, _str_plot_fields(m, 'field', f)) 00140 m = arrays_v 00141 self.assertEquals('time,', _str_plot_fields(m, 'field', f)) 00142 m = embed_v 00143 self.assertEquals(embed_nostr_noarr, _str_plot_fields(m, 'field', f)) 00144 00145 def test_str_plot(self): 00146 from rostopic import _str_plot 00147 from std_msgs.msg import String, Int32, Header 00148 from test_rostopic.msg import Simple, TVals, Floats, Arrays, Embed 00149 00150 from roslib.rostime import Time, Duration 00151 from rostopic import create_field_filter 00152 00153 # str plotting requires rospy time, we fix time to a set time 00154 import rospy.rostime 00155 rospy.rostime.set_rostime_initialized(True) 00156 rospy.rostime._set_rostime(Time(0, 1234)) 00157 r_time = Time(0, 5678) 00158 00159 # prepare test values 00160 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar') 00161 simple_d = '1234,1,-2,3,-4,a,7,8,9,bar' 00162 simple_nostr = '1234,1,-2,3,-4,a,7,8,9' 00163 00164 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)]) 00165 arrays_d = '1234,-1,2,3,3,4,5,6,7,8,a1,b2,b3,123000000456,78000000090' 00166 arrays_nostr = '1234,-1,2,3,3,4,5,6,7,8,123000000456,78000000090' 00167 00168 embed_v = Embed(simple_v, arrays_v) 00169 embed_d = simple_d+','+arrays_d[5:] 00170 00171 # test current_time override 00172 m = String('foo') 00173 self.assertEquals('5678,foo', _str_plot(m, current_time=r_time, field_filter=None)) 00174 00175 # test over all combinations of field filters 00176 f = create_field_filter(echo_nostr=False, echo_noarr=False) 00177 m = String() 00178 self.assertEquals("1234,", _str_plot(m, field_filter=f)) 00179 m = String('foo') 00180 self.assertEquals('1234,foo', _str_plot(m, field_filter=f)) 00181 m = TVals(Time(123, 456), Duration(78, 90)) 00182 v = _str_plot(m, field_filter=f) 00183 self.assertEquals('1234,123000000456,78000000090', v) 00184 m = simple_v 00185 self.assertEquals(simple_d, _str_plot(m, field_filter=f)) 00186 m = arrays_v 00187 self.assertEquals(arrays_d, _str_plot(m, field_filter=f)) 00188 m = embed_v 00189 self.assertEquals(embed_d, _str_plot(m, field_filter=f)) 00190 00191 f = create_field_filter(echo_nostr=True, echo_noarr=False) 00192 m = String() 00193 self.assertEquals("1234,", _str_plot(m, field_filter=f)) 00194 m = String('foo') 00195 self.assertEquals('1234,', _str_plot(m, field_filter=f)) 00196 m = TVals(Time(123, 456), Duration(78, 90)) 00197 v = _str_plot(m, field_filter=f) 00198 self.assertEquals('1234,123000000456,78000000090', v) 00199 m = simple_v 00200 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f)) 00201 m = arrays_v 00202 self.assertEquals(arrays_nostr, _str_plot(m, field_filter=f)) 00203 m = embed_v 00204 self.assertEquals(simple_nostr+arrays_nostr[4:], _str_plot(m, field_filter=f)) 00205 00206 f = create_field_filter(echo_nostr=False, echo_noarr=True) 00207 m = String() 00208 self.assertEquals("1234,", _str_plot(m, field_filter=f)) 00209 m = String('foo') 00210 self.assertEquals('1234,foo', _str_plot(m, field_filter=f)) 00211 m = TVals(Time(123, 456), Duration(78, 90)) 00212 v = _str_plot(m, field_filter=f) 00213 self.assertEquals('1234,123000000456,78000000090', v) 00214 m = simple_v 00215 self.assertEquals(simple_d, _str_plot(m, field_filter=f)) 00216 m = arrays_v 00217 self.assertEquals('1234,', _str_plot(m, field_filter=f)) 00218 m = embed_v 00219 self.assertEquals(simple_d, _str_plot(m, field_filter=f)) 00220 00221 f = create_field_filter(echo_nostr=True, echo_noarr=True) 00222 m = String() 00223 self.assertEquals("1234,", _str_plot(m, field_filter=f)) 00224 m = String('foo') 00225 self.assertEquals('1234,', _str_plot(m, field_filter=f)) 00226 m = TVals(Time(123, 456), Duration(78, 90)) 00227 v = _str_plot(m, field_filter=f) 00228 self.assertEquals('1234,123000000456,78000000090', v) 00229 m = simple_v 00230 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f)) 00231 m = arrays_v 00232 self.assertEquals('1234,', _str_plot(m, field_filter=f)) 00233 m = embed_v 00234 self.assertEquals(simple_nostr, _str_plot(m, field_filter=f)) 00235 00236 def test_strify_message(self): 00237 # strify message is part of roslib, but we want to test with 00238 # rostopic's field filters. It's also the case that 00239 # roslib.messages cannot be unit tested within the ROS stack 00240 # -- part of the reason it needs to be moved elsewhere. 00241 from std_msgs.msg import String, Int32, Header 00242 from test_rostopic.msg import Simple, TVals, Floats, Arrays, Embed 00243 00244 from roslib.rostime import Time, Duration 00245 from roslib.message import strify_message 00246 from rostopic import create_field_filter 00247 00248 simple_v = Simple(1, -2, 3, -4, 'a', 7, 8, 9, 'bar') 00249 simple_d = {'b': 1, 'int16': -2, 'int32': 3, 'int64': -4, 'c': 'a', 'uint16': 7, 'uint32': 8, 'uint64': 9, 'str': 'bar'} 00250 simple_nostr = simple_d.copy() 00251 del simple_nostr['str'] 00252 00253 arrays_v = Arrays([-1], chr(2)+chr(3), [3, 4, 5], [6, 7, 8], ['a1', 'b2', 'b3'], [Time(123, 456), Time(78, 90)]) 00254 arrays_d = {'int8_arr': [-1], 'uint8_arr': [2, 3], 'int32_arr': [3, 4, 5], 'uint32_arr': [6, 7, 8], 'string_arr': ['a1', 'b2', 'b3'], 'time_arr': [{'secs': 123, 'nsecs': 456}, {'secs': 78, 'nsecs': 90}]} 00255 arrays_nostr = arrays_d.copy() 00256 del arrays_nostr['string_arr'] 00257 00258 embed_v = Embed(simple_v, arrays_v) 00259 embed_d = {'simple': simple_d, 'arrays': arrays_d} 00260 00261 f = create_field_filter(echo_nostr=False, echo_noarr=False) 00262 m = String() 00263 self.assertEquals("data: ''", strify_message(m, field_filter=f)) 00264 m = String('foo') 00265 self.assertEquals('data: foo', strify_message(m, field_filter=f)) 00266 m = TVals(Time(123, 456), Duration(78, 90)) 00267 v = yaml.load(strify_message(m, field_filter=f)) 00268 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v) 00269 m = simple_v 00270 v = yaml.load(strify_message(m, field_filter=f)) 00271 self.assertEquals(simple_d, v) 00272 m = arrays_v 00273 v = yaml.load(strify_message(m, field_filter=f)) 00274 self.assertEquals(arrays_d, v) 00275 m = embed_v 00276 v = yaml.load(strify_message(m, field_filter=f)) 00277 self.assertEquals(embed_d, v) 00278 00279 f = create_field_filter(echo_nostr=True, echo_noarr=False) 00280 m = String() 00281 self.assertEquals('', strify_message(m, field_filter=f)) 00282 m = String('foo') 00283 self.assertEquals('', strify_message(m, field_filter=f)) 00284 m = TVals(Time(123, 456), Duration(78, 90)) 00285 v = yaml.load(strify_message(m, field_filter=f)) 00286 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v) 00287 m = simple_v 00288 v = yaml.load(strify_message(m, field_filter=f)) 00289 self.assertEquals(simple_nostr, v) 00290 m = arrays_v 00291 v = yaml.load(strify_message(m, field_filter=f)) 00292 self.assertEquals(arrays_nostr, v) 00293 m = embed_v 00294 v = yaml.load(strify_message(m, field_filter=f)) 00295 self.assertEquals({'simple': simple_nostr, 'arrays': arrays_nostr}, v) 00296 00297 f = create_field_filter(echo_nostr=False, echo_noarr=True) 00298 m = String() 00299 self.assertEquals("data: ''", strify_message(m, field_filter=f)) 00300 m = String('foo') 00301 self.assertEquals('data: foo', strify_message(m, field_filter=f)) 00302 m = TVals(Time(123, 456), Duration(78, 90)) 00303 v = yaml.load(strify_message(m, field_filter=f)) 00304 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v) 00305 m = simple_v 00306 v = yaml.load(strify_message(m, field_filter=f)) 00307 self.assertEquals(simple_d, v) 00308 m = arrays_v 00309 v = yaml.load(strify_message(m, field_filter=f)) 00310 self.assertEquals(None, v) 00311 m = embed_v 00312 v = yaml.load(strify_message(m, field_filter=f)) 00313 self.assertEquals({'simple': simple_d, 'arrays': None}, v) 00314 00315 f = create_field_filter(echo_nostr=True, echo_noarr=True) 00316 m = String() 00317 self.assertEquals('', strify_message(m, field_filter=f)) 00318 m = String('foo') 00319 self.assertEquals('', strify_message(m, field_filter=f)) 00320 m = TVals(Time(123, 456), Duration(78, 90)) 00321 v = yaml.load(strify_message(m, field_filter=f)) 00322 self.assertEquals({'t': {'secs': 123, 'nsecs': 456}, 'd': {'secs': 78, 'nsecs': 90}}, v) 00323 m = simple_v 00324 v = yaml.load(strify_message(m, field_filter=f)) 00325 self.assertEquals(simple_nostr, v) 00326 m = embed_v 00327 v = yaml.load(strify_message(m, field_filter=f)) 00328 self.assertEquals({'simple': simple_nostr, 'arrays': None}, v) 00329 00330 def test_create_field_filter(self): 00331 from std_msgs.msg import String, Int32, Header 00332 from test_rostopic.msg import Simple, TVals, Floats, Arrays, Embed 00333 00334 from rostopic import create_field_filter 00335 f = create_field_filter(echo_nostr=False, echo_noarr=False) 00336 m = String() 00337 self.assertEquals(['data'], list(f(m))) 00338 m = Int32() 00339 self.assertEquals(['data'], list(f(m))) 00340 m = Arrays() 00341 self.assertEquals(['int8_arr', 'uint8_arr', 'int32_arr', 'uint32_arr', 'string_arr', 'time_arr'], list(f(m))) 00342 m = Embed() 00343 self.assertEquals(['simple', 'arrays'], list(f(m))) 00344 m = Simple() 00345 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64', 'str'], list(f(m))) 00346 m = Floats() 00347 self.assertEquals(['float32', 'float64'], list(f(m))) 00348 m = TVals() 00349 self.assertEquals(['t', 'd'], list(f(m))) 00350 m = Header() 00351 self.assertEquals(['seq', 'stamp', 'frame_id'], list(f(m))) 00352 00353 f = create_field_filter(echo_nostr=True, echo_noarr=False) 00354 m = String() 00355 self.assertEquals([], list(f(m))) 00356 m = Int32() 00357 self.assertEquals(['data'], list(f(m))) 00358 m = Arrays() 00359 self.assertEquals(['int8_arr', 'uint8_arr', 'int32_arr', 'uint32_arr', 'time_arr'], list(f(m))) 00360 m = Embed() 00361 self.assertEquals(['simple', 'arrays'], list(f(m))) 00362 m = Simple() 00363 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64'], list(f(m))) 00364 m = Floats() 00365 self.assertEquals(['float32', 'float64'], list(f(m))) 00366 m = TVals() 00367 self.assertEquals(['t', 'd'], list(f(m))) 00368 m = Header() 00369 self.assertEquals(['seq', 'stamp'], list(f(m))) 00370 00371 f = create_field_filter(echo_nostr=False, echo_noarr=True) 00372 m = String() 00373 self.assertEquals(['data'], list(f(m))) 00374 m = Int32() 00375 self.assertEquals(['data'], list(f(m))) 00376 m = Arrays() 00377 self.assertEquals([], list(f(m))) 00378 m = Embed() 00379 self.assertEquals(['simple', 'arrays'], list(f(m))) 00380 m = Simple() 00381 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64', 'str'], list(f(m))) 00382 m = Floats() 00383 self.assertEquals(['float32', 'float64'], list(f(m))) 00384 m = TVals() 00385 self.assertEquals(['t', 'd'], list(f(m))) 00386 m = Header() 00387 self.assertEquals(['seq', 'stamp', 'frame_id'], list(f(m))) 00388 00389 f = create_field_filter(echo_nostr=True, echo_noarr=True) 00390 m = String() 00391 self.assertEquals([], list(f(m))) 00392 m = Int32() 00393 self.assertEquals(['data'], list(f(m))) 00394 m = Arrays() 00395 self.assertEquals([], list(f(m))) 00396 m = Embed() 00397 self.assertEquals(['simple', 'arrays'], list(f(m))) 00398 m = Simple() 00399 self.assertEquals(['b', 'int16', 'int32', 'int64', 'c', 'uint16', 'uint32', 'uint64'], list(f(m))) 00400 m = Floats() 00401 self.assertEquals(['float32', 'float64'], list(f(m))) 00402 m = TVals() 00403 self.assertEquals(['t', 'd'], list(f(m))) 00404 m = Header() 00405 self.assertEquals(['seq', 'stamp'], list(f(m))) 00406 00407 00408 if __name__ == '__main__': 00409 rostest.unitrun(PKG, NAME, TestRostopicUnit, sys.argv, coverage_packages=['rostopic'])