4 Test: functions of grepros.api as library.
6 ------------------------------------------------------------------------------
7 This file is part of grepros - grep for ROS bag files and live topics.
8 Released under the BSD License.
13 ------------------------------------------------------------------------------
24 from grepros
import api
26 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__),
"..")))
27 from test
import testbase
29 logger = logging.getLogger()
33 """Tests general API."""
36 NAME = os.path.splitext(os.path.basename(__file__))[0]
41 """Initializes ROS bindings in grepros."""
46 """Tests API functions for dealing with messages."""
47 NAME =
lambda f:
"%s.%s()" % (f.__module__, f.__name__)
48 ERR =
lambda f:
"Unexpected result from %s." % f.__name__
49 logger.info(
"Testing message API functions.")
51 func = api.get_message_class
53 logger.info(
"Testing %s.",
NAME(func))
54 self.assertEqual(func(
"std_msgs/Bool"), std_msgs.msg.Bool, ERR(func))
56 func = api.get_message_definition
58 logger.info(
"Testing %s.",
NAME(func))
59 self.assertTrue(func(
"std_msgs/Bool"), ERR(func))
60 self.assertTrue(func(std_msgs.msg.Bool), ERR(func))
61 self.assertTrue(func(std_msgs.msg.Bool()), ERR(func))
63 func = api.get_message_fields
65 logger.info(
"Testing %s.",
NAME(func))
66 self.assertEqual(func(std_msgs.msg.Bool), {
"data":
"bool"}, ERR(func))
67 self.assertEqual(func(std_msgs.msg.Bool()), {
"data":
"bool"}, ERR(func))
68 dct = {
"seq":
"uint32",
"stamp":
"time",
"frame_id":
"string"}
if api.ROS1
else \
69 {
"stamp":
"builtin_interfaces/Time",
"frame_id":
"string"}
70 self.assertEqual(func(std_msgs.msg.Header), dct, ERR(func))
71 self.assertEqual(func(
None), {}, ERR(func))
73 func = api.get_message_type
75 logger.info(
"Testing %s.",
NAME(func))
76 self.assertEqual(func(std_msgs.msg.Bool),
"std_msgs/Bool", ERR(func))
77 self.assertEqual(func(std_msgs.msg.Bool()),
"std_msgs/Bool", ERR(func))
78 self.assertIn(func(type(api.time_message(api.make_time()))),
79 api.ROS_TIME_TYPES, ERR(func))
80 self.assertIn(func(api.time_message(api.make_time())),
81 api.ROS_TIME_TYPES, ERR(func))
82 self.assertIn(func(type(api.time_message(api.make_duration()))),
83 api.ROS_TIME_TYPES, ERR(func))
84 self.assertIn(func(api.time_message(api.make_duration())),
85 api.ROS_TIME_TYPES, ERR(func))
87 func = api.get_message_type_hash
89 logger.info(
"Testing %s.",
NAME(func))
90 HASH =
"8b94c1b53db61fb6aed406028ad6332a"
91 self.assertEqual(func(std_msgs.msg.Bool), HASH, ERR(func))
92 self.assertEqual(func(std_msgs.msg.Bool()), HASH, ERR(func))
93 self.assertEqual(func(
"std_msgs/Bool"), HASH, ERR(func))
94 self.assertTrue(func(
"std_msgs/Header"), ERR(func))
96 func = api.get_message_value
98 logger.info(
"Testing %s.",
NAME(func))
99 msg1 = std_msgs.msg.Header(frame_id=self.
NAME)
100 msg2 = std_msgs.msg.UInt8MultiArray(data=b
"123")
101 timename = next(x
for x
in api.ROS_TIME_TYPES
if "time" in x.lower())
102 self.assertEqual(func(msg1,
"frame_id",
"string"), msg1.frame_id, ERR(func))
103 self.assertEqual(func(msg1,
"stamp", timename), msg1.stamp, ERR(func))
104 self.assertEqual(func(msg2,
"data",
"uint8[]"), list(msg2.data), ERR(func))
105 with self.assertRaises(Exception, msg=ERR(func)):
106 func(msg1,
"nosuchfield",
"whatever")
108 func = api.set_message_value
110 logger.info(
"Testing %s.",
NAME(func))
111 msg = std_msgs.msg.Header(frame_id=self.
NAME)
112 api.set_message_value(msg,
"frame_id", str(self))
113 self.assertEqual(msg.frame_id, str(self), ERR(func))
115 func = api.format_message_value
117 logger.info(
"Testing %s.",
NAME(func))
118 msg = std_msgs.msg.Header(frame_id=self.
NAME)
119 self.assertEqual(func(msg,
"frame_id", msg.frame_id), msg.frame_id, ERR(func))
120 for name
in api.get_message_fields(msg.stamp):
121 received = func(msg.stamp, name, getattr(msg.stamp, name))
122 self.assertTrue(re.match(
r"\s+%s" % getattr(msg.stamp, name), received), ERR(func))
124 func = api.is_ros_message
126 logger.info(
"Testing %s.",
NAME(func))
127 msg = std_msgs.msg.Header(frame_id=self.
NAME)
128 self.assertTrue(func(msg), ERR(func))
129 self.assertTrue(func(msg.stamp), ERR(func))
130 self.assertFalse(func(func), ERR(func))
131 self.assertFalse(func(msg.stamp, ignore_time=
True), ERR(func))
133 func = api.iter_message_fields
135 logger.info(
"Testing %s.",
NAME(func))
136 msg1 = std_msgs.msg.Header(frame_id=self.
NAME)
137 msg2 = std_msgs.msg.UInt8MultiArray(data=b
"123")
139 received = list(func(msg1, scalars=[
"time"]))
140 paths = [
".".join(p)
for p, _, _
in received]
141 self.assertTrue(
"stamp" in paths
and not any(p.startswith(
"stamp.")
for p
in paths),
144 received = list(func(msg2))
145 expected = [((
'layout',
'dim'), [],
'std_msgs/MultiArrayDimension[]'),
146 ((
'layout',
'data_offset'), 0,
'uint32'),
147 ((
'data',), [49, 50, 51],
'uint8[]')]
148 self.assertEqual(received, expected, ERR(func))
150 received = list(func(msg2, messages_only=
True))
151 expected = [((
'layout',
'dim'), [],
'std_msgs/MultiArrayDimension[]'),
152 ((
'layout',), std_msgs.msg.MultiArrayLayout(),
'std_msgs/MultiArrayLayout')]
153 self.assertEqual(received, expected, ERR(func))
155 func = api.parse_definition_fields
157 logger.info(
"Testing %s.",
NAME(func))
158 msg = std_msgs.msg.Header(frame_id=self.
NAME)
159 received = func(api.get_message_type(msg), api.get_message_definition(type(msg)))
160 self.assertIsInstance(received, dict, ERR(func))
161 self.assertEqual(received, api.get_message_fields(msg), ERR(func))
163 func = api.parse_definition_subtypes
165 logger.info(
"Testing %s.",
NAME(func))
166 received = func(api.get_message_definition(std_msgs.msg.UInt8MultiArray))
167 for typename, typedef
in received.items():
168 self.assertEqual(typedef.strip(), api.get_message_definition(typename).strip(), ERR(func))
169 msg = std_msgs.msg.UInt8MultiArray(data=b
"123")
170 defs, nesteds = func(api.get_message_definition(type(msg)), nesting=
True)
171 for typename, typedef
in defs.items():
172 self.assertEqual(typedef.strip(), api.get_message_definition(typename).strip(), ERR(func))
173 for typename
in sum([[k] + v
for k, v
in nesteds.items()], []):
174 self.assertIn(typename, defs, ERR(func))
176 func = api.dict_to_message
178 logger.info(
"Testing %s.",
NAME(func))
179 dct = {
"seq": 3,
"stamp": {
"secs": 1,
"nsecs": 2},
"frame_id": self.
NAME}
if api.ROS1 \
180 else {
"stamp": {
"sec": 1,
"nanosec": 2},
"frame_id": self.
NAME}
181 stamp = api.make_time(1, 2)
182 msg = std_msgs.msg.Header(frame_id=self.
NAME,
183 stamp=stamp
if api.ROS1
else stamp.to_msg())
184 if api.ROS1: msg.seq = dct[
"seq"]
185 self.assertEqual(func(dct, std_msgs.msg.Header()), msg, ERR(func))
187 func = api.message_to_dict
189 logger.info(
"Testing %s.",
NAME(func))
190 dct = {
"seq": 3,
"stamp": {
"secs": 1,
"nsecs": 2},
"frame_id": self.
NAME}
if api.ROS1 \
191 else {
"stamp": {
"sec": 1,
"nanosec": 2},
"frame_id": self.
NAME}
192 stamp = api.make_time(1, 2)
193 msg = std_msgs.msg.Header(frame_id=self.
NAME,
194 stamp=stamp
if api.ROS1
else stamp.to_msg())
195 if api.ROS1: msg.seq = dct[
"seq"]
196 self.assertEqual(func(msg), dct, ERR(func))
198 func = api.serialize_message
200 logger.info(
"Testing %s.",
NAME(func))
201 expected = b
"\x01" if api.ROS1
else b
"\x00\x01\x00\x00\x01"
202 self.assertEqual(func(std_msgs.msg.Bool(data=
True)), expected, ERR(func))
204 func = api.deserialize_message
206 logger.info(
"Testing %s.",
NAME(func))
207 binary = b
"\x01" if api.ROS1
else b
"\x00\x01\x00\x00\x01"
208 expected = std_msgs.msg.Bool(data=
True)
209 self.assertEqual(func(binary,
"std_msgs/Bool"), expected, ERR(func))
210 self.assertEqual(func(binary, std_msgs.msg.Bool), expected, ERR(func))
214 """Tests API functions for dealing with ROS types."""
215 NAME =
lambda f:
"%s.%s()" % (f.__module__, f.__name__)
216 ERR =
lambda f:
"Unexpected result from %s." % f.__name__
217 logger.info(
"Testing ROS type API functions.")
221 logger.info(
"Testing %s.",
NAME(func))
222 self.assertEqual(func(
"std_msgs/msg/Bool"),
"std_msgs/Bool", ERR(func))
223 self.assertEqual(func(
"std_srvs/srv/SetBool"),
"std_srvs/SetBool", ERR(func))
224 self.assertEqual(func(
"pkg/Cls"),
"pkg/Cls", ERR(func))
225 self.assertEqual(func(
"Cls"),
"Cls", ERR(func))
226 self.assertEqual(func(
"int8[4]", unbounded=
True),
"int8[]", ERR(func))
228 self.assertEqual(func(
"octet"),
"byte", ERR(func))
229 self.assertEqual(func(
"sequence<uint8, 100>"),
"uint8[100]", ERR(func))
232 func = api.get_type_alias
234 logger.info(
"Testing %s.",
NAME(func))
235 signed, unsigned = (
"byte",
"char")
if api.ROS1
else (
"char",
"byte")
236 self.assertEqual(func(
"int8"), signed, ERR(func))
237 self.assertEqual(func(
"uint8"), unsigned, ERR(func))
238 self.assertEqual(func(
"int16"),
None, ERR(func))
240 func = api.get_alias_type
242 logger.info(
"Testing %s.",
NAME(func))
243 signed, unsigned = (
"byte",
"char")
if api.ROS1
else (
"char",
"byte")
244 self.assertEqual(func(signed),
"int8", ERR(func))
245 self.assertEqual(func(unsigned),
"uint8", ERR(func))
246 self.assertEqual(func(
"other"),
None, ERR(func))
248 func = api.make_full_typename
250 logger.info(
"Testing %s.",
NAME(func))
251 self.assertEqual(func(
"std_msgs/Bool"),
"std_msgs/msg/Bool", ERR(func))
252 self.assertEqual(func(
"std_msgs/msg/Bool"),
"std_msgs/msg/Bool", ERR(func))
253 self.assertEqual(func(
"std_srvs/SetBool",
"srv"),
"std_srvs/srv/SetBool", ERR(func))
254 self.assertEqual(func(
"std_srvs/srv/SetBool",
"srv"),
"std_srvs/srv/SetBool", ERR(func))
255 self.assertEqual(func(
"%s/Time" % api.ROS_FAMILY),
256 "%s/Time" % api.ROS_FAMILY, ERR(func))
257 self.assertEqual(func(
"%s/Duration" % api.ROS_FAMILY),
258 "%s/Duration" % api.ROS_FAMILY, ERR(func))
262 logger.info(
"Testing %s.",
NAME(func))
263 self.assertEqual(func(
"int8"),
"int8", ERR(func))
264 self.assertEqual(func(
"std_msgs/Bool[]"),
"std_msgs/Bool", ERR(func))
268 """Tests API functions for dealing with ROS time/duration values."""
269 NAME =
lambda f:
"%s.%s()" % (f.__module__, f.__name__)
270 ERR =
lambda f:
"Unexpected result from %s." % f.__name__
271 logger.info(
"Testing temporal API functions.")
273 func = api.get_ros_time_category
275 logger.info(
"Testing %s.",
NAME(func))
276 for typename
in api.ROS_TIME_TYPES:
277 expected =
"duration" if "duration" in typename.lower()
else "time"
278 self.assertEqual(func(typename), expected, ERR(func))
279 for cls
in api.ROS_TIME_CLASSES:
280 expected =
"duration" if "duration" in cls.__name__.lower()
else "time"
281 self.assertEqual(func(cls), expected, ERR(func))
282 self.assertEqual(func(cls()), expected, ERR(func))
283 self.assertEqual(func(api.get_message_type(cls)), expected, ERR(func))
284 self.assertEqual(func(self), self, ERR(func))
286 func = api.time_message
288 logger.info(
"Testing %s.",
NAME(func))
289 for cls
in api.ROS_TIME_CLASSES:
291 category, is_msg = api.get_ros_time_category(cls), api.is_ros_message(v1)
292 for to_message
in (
True,
False):
293 v2 = func(v1, to_message=to_message)
294 if is_msg == to_message:
297 expected = type(api.make_time()
if "time" == category
else
300 expected = next(api.get_message_class(x)
for x
in api.ROS_TIME_TYPES
301 if category
in x.lower())
302 self.assertTrue(issubclass(expected, type(v2)), ERR(func))
304 func = api.is_ros_time
306 logger.info(
"Testing %s.",
NAME(func))
307 msg = std_msgs.msg.Header(frame_id=self.
NAME)
308 self.assertTrue(func(type(msg.stamp)), ERR(func))
309 self.assertTrue(func(msg.stamp), ERR(func))
310 self.assertFalse(func(msg), ERR(func))
312 func = api.make_duration
314 logger.info(
"Testing %s.",
NAME(func))
316 self.assertIsInstance(dur, tuple(api.ROS_TIME_CLASSES), ERR(func))
317 self.assertTrue(api.is_ros_time(type(dur)), ERR(func))
318 self.assertTrue(api.is_ros_time(dur), ERR(func))
322 logger.info(
"Testing %s.",
NAME(func))
323 self.assertIsInstance(func(1, 2), tuple(api.ROS_TIME_CLASSES), ERR(func))
325 func = api.to_datetime
327 logger.info(
"Testing %s.",
NAME(func))
328 tval, dval = api.make_time(1234), api.make_duration(1234)
329 expected = datetime.datetime.fromtimestamp(1234)
330 self.assertEqual(func(tval), expected, ERR(func))
331 self.assertEqual(func(dval), expected, ERR(func))
332 self.assertEqual(func(666), 666, ERR(func))
334 func = api.to_decimal
336 logger.info(
"Testing %s.",
NAME(func))
337 tval = api.make_time (123456789, 987654321)
338 dval = api.make_duration(123456789, 987654321)
339 expected = decimal.Decimal(
"123456789.987654321")
340 self.assertEqual(func(tval), expected, ERR(func))
341 self.assertEqual(func(dval), expected, ERR(func))
342 self.assertEqual(func(666), 666, ERR(func))
344 func = api.to_duration
346 logger.info(
"Testing %s.",
NAME(func))
350 logger.info(
"Testing %s.",
NAME(func))
351 tval = api.make_time (123456789, 987654321)
352 dval = api.make_duration(123456789, 987654321)
353 expected = 123456789987654321
354 self.assertEqual(func(tval), expected, ERR(func))
355 self.assertEqual(func(dval), expected, ERR(func))
356 self.assertEqual(func(666), 666, ERR(func))
360 logger.info(
"Testing %s.",
NAME(func))
361 tval = api.make_time (1, 123456789)
362 dval = api.make_duration(1, 123456789)
363 expected = 1.123456789
364 self.assertEqual(func(tval), expected, ERR(func))
365 self.assertEqual(func(dval), expected, ERR(func))
366 self.assertEqual(func(666), 666, ERR(func))
368 func = api.to_sec_nsec
370 logger.info(
"Testing %s.",
NAME(func))
371 tval = api.make_time (123456789, 987654321)
372 dval = api.make_duration(123456789, 987654321)
373 expected = (123456789, 987654321)
374 self.assertEqual(func(tval), expected, ERR(func))
375 self.assertEqual(func(dval), expected, ERR(func))
376 self.assertEqual(func(666), 666, ERR(func))
380 logger.info(
"Testing %s.",
NAME(func))
382 val = datetime.datetime.now()
384 self.assertTrue(api.is_ros_time(tval), ERR(func))
385 self.assertEqual(api.to_datetime(tval), val, ERR(func))
387 val = decimal.Decimal(
"123456789.987654321")
389 self.assertTrue(api.is_ros_time(tval), ERR(func))
390 self.assertEqual(api.to_decimal(tval), val, ERR(func))
392 val = api.make_duration(123456789, 987654321)
394 self.assertTrue(api.is_ros_time(tval), ERR(func))
395 self.assertEqual(api.make_duration(*api.to_sec_nsec(tval)), val, ERR(func))
399 self.assertTrue(api.is_ros_time(tval), ERR(func))
400 self.assertEqual(api.to_sec(tval), val, ERR(func))
402 self.assertEqual(func(
None),
None, ERR(func))
403 self.assertEqual(func(self), self, ERR(func))
407 if "__main__" == __name__:
408 TestAPI.run_rostest()