test_api.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 """
4 Test: functions of grepros.api as library.
5 
6 ------------------------------------------------------------------------------
7 This file is part of grepros - grep for ROS bag files and live topics.
8 Released under the BSD License.
9 
10 @author Erki Suurjaak
11 @created 22.12.2022
12 @modified 29.12.2023
13 ------------------------------------------------------------------------------
14 """
15 import datetime
16 import decimal
17 import logging
18 import os
19 import re
20 import sys
21 
22 import std_msgs.msg
23 
24 from grepros import api
25 
26 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
27 from test import testbase
28 
29 logger = logging.getLogger()
30 
31 
33  """Tests general API."""
34 
35 
36  NAME = os.path.splitext(os.path.basename(__file__))[0]
37 
38 
39  @classmethod
40  def setUpClass(cls):
41  """Initializes ROS bindings in grepros."""
42  api.validate()
43 
44 
45  def test_messages(self):
46  """Tests API functions for dealing with messages."""
47  NAME = lambda f: "%s.%s()" % (f.__module__, f.__name__)
48  ERR = lambda f: "Unexpected result from %s." % f.__name__
49  logger.info("Testing message API functions.")
50 
51  func = api.get_message_class
52  with self.subTest(NAME(func)):
53  logger.info("Testing %s.", NAME(func))
54  self.assertEqual(func("std_msgs/Bool"), std_msgs.msg.Bool, ERR(func))
55 
56  func = api.get_message_definition
57  with self.subTest(NAME(func)):
58  logger.info("Testing %s.", NAME(func))
59  self.assertTrue(func("std_msgs/Bool"), ERR(func))
60  self.assertTrue(func(std_msgs.msg.Bool), ERR(func))
61  self.assertTrue(func(std_msgs.msg.Bool()), ERR(func))
62 
63  func = api.get_message_fields
64  with self.subTest(NAME(func)):
65  logger.info("Testing %s.", NAME(func))
66  self.assertEqual(func(std_msgs.msg.Bool), {"data": "bool"}, ERR(func))
67  self.assertEqual(func(std_msgs.msg.Bool()), {"data": "bool"}, ERR(func))
68  dct = {"seq": "uint32", "stamp": "time", "frame_id": "string"} if api.ROS1 else \
69  {"stamp": "builtin_interfaces/Time", "frame_id": "string"}
70  self.assertEqual(func(std_msgs.msg.Header), dct, ERR(func))
71  self.assertEqual(func(None), {}, ERR(func))
72 
73  func = api.get_message_type
74  with self.subTest(NAME(func)):
75  logger.info("Testing %s.", NAME(func))
76  self.assertEqual(func(std_msgs.msg.Bool), "std_msgs/Bool", ERR(func))
77  self.assertEqual(func(std_msgs.msg.Bool()), "std_msgs/Bool", ERR(func))
78  self.assertIn(func(type(api.time_message(api.make_time()))),
79  api.ROS_TIME_TYPES, ERR(func))
80  self.assertIn(func(api.time_message(api.make_time())),
81  api.ROS_TIME_TYPES, ERR(func))
82  self.assertIn(func(type(api.time_message(api.make_duration()))),
83  api.ROS_TIME_TYPES, ERR(func))
84  self.assertIn(func(api.time_message(api.make_duration())),
85  api.ROS_TIME_TYPES, ERR(func))
86 
87  func = api.get_message_type_hash
88  with self.subTest(NAME(func)):
89  logger.info("Testing %s.", NAME(func))
90  HASH = "8b94c1b53db61fb6aed406028ad6332a"
91  self.assertEqual(func(std_msgs.msg.Bool), HASH, ERR(func))
92  self.assertEqual(func(std_msgs.msg.Bool()), HASH, ERR(func))
93  self.assertEqual(func("std_msgs/Bool"), HASH, ERR(func))
94  self.assertTrue(func("std_msgs/Header"), ERR(func))
95 
96  func = api.get_message_value
97  with self.subTest(NAME(func)):
98  logger.info("Testing %s.", NAME(func))
99  msg1 = std_msgs.msg.Header(frame_id=self.NAME)
100  msg2 = std_msgs.msg.UInt8MultiArray(data=b"123")
101  timename = next(x for x in api.ROS_TIME_TYPES if "time" in x.lower())
102  self.assertEqual(func(msg1, "frame_id", "string"), msg1.frame_id, ERR(func))
103  self.assertEqual(func(msg1, "stamp", timename), msg1.stamp, ERR(func))
104  self.assertEqual(func(msg2, "data", "uint8[]"), list(msg2.data), ERR(func))
105  with self.assertRaises(Exception, msg=ERR(func)):
106  func(msg1, "nosuchfield", "whatever")
107 
108  func = api.set_message_value
109  with self.subTest(NAME(func)):
110  logger.info("Testing %s.", NAME(func))
111  msg = std_msgs.msg.Header(frame_id=self.NAME)
112  api.set_message_value(msg, "frame_id", str(self))
113  self.assertEqual(msg.frame_id, str(self), ERR(func))
114 
115  func = api.format_message_value
116  with self.subTest(NAME(func)):
117  logger.info("Testing %s.", NAME(func))
118  msg = std_msgs.msg.Header(frame_id=self.NAME)
119  self.assertEqual(func(msg, "frame_id", msg.frame_id), msg.frame_id, ERR(func))
120  for name in api.get_message_fields(msg.stamp):
121  received = func(msg.stamp, name, getattr(msg.stamp, name))
122  self.assertTrue(re.match(r"\s+%s" % getattr(msg.stamp, name), received), ERR(func))
123 
124  func = api.is_ros_message
125  with self.subTest(NAME(func)):
126  logger.info("Testing %s.", NAME(func))
127  msg = std_msgs.msg.Header(frame_id=self.NAME)
128  self.assertTrue(func(msg), ERR(func))
129  self.assertTrue(func(msg.stamp), ERR(func))
130  self.assertFalse(func(func), ERR(func))
131  self.assertFalse(func(msg.stamp, ignore_time=True), ERR(func))
132 
133  func = api.iter_message_fields
134  with self.subTest(NAME(func)):
135  logger.info("Testing %s.", NAME(func))
136  msg1 = std_msgs.msg.Header(frame_id=self.NAME)
137  msg2 = std_msgs.msg.UInt8MultiArray(data=b"123")
138 
139  received = list(func(msg1, scalars=["time"]))
140  paths = [".".join(p) for p, _, _ in received]
141  self.assertTrue("stamp" in paths and not any(p.startswith("stamp.") for p in paths),
142  ERR(func))
143 
144  received = list(func(msg2))
145  expected = [(('layout', 'dim'), [], 'std_msgs/MultiArrayDimension[]'),
146  (('layout', 'data_offset'), 0, 'uint32'),
147  (('data',), [49, 50, 51], 'uint8[]')]
148  self.assertEqual(received, expected, ERR(func))
149 
150  received = list(func(msg2, messages_only=True))
151  expected = [(('layout', 'dim'), [], 'std_msgs/MultiArrayDimension[]'),
152  (('layout',), std_msgs.msg.MultiArrayLayout(), 'std_msgs/MultiArrayLayout')]
153  self.assertEqual(received, expected, ERR(func))
154 
155  func = api.parse_definition_fields
156  with self.subTest(NAME(func)):
157  logger.info("Testing %s.", NAME(func))
158  msg = std_msgs.msg.Header(frame_id=self.NAME)
159  received = func(api.get_message_type(msg), api.get_message_definition(type(msg)))
160  self.assertIsInstance(received, dict, ERR(func))
161  self.assertEqual(received, api.get_message_fields(msg), ERR(func))
162 
163  func = api.parse_definition_subtypes
164  with self.subTest(NAME(func)):
165  logger.info("Testing %s.", NAME(func))
166  received = func(api.get_message_definition(std_msgs.msg.UInt8MultiArray))
167  for typename, typedef in received.items():
168  self.assertEqual(typedef.strip(), api.get_message_definition(typename).strip(), ERR(func))
169  msg = std_msgs.msg.UInt8MultiArray(data=b"123")
170  defs, nesteds = func(api.get_message_definition(type(msg)), nesting=True)
171  for typename, typedef in defs.items():
172  self.assertEqual(typedef.strip(), api.get_message_definition(typename).strip(), ERR(func))
173  for typename in sum([[k] + v for k, v in nesteds.items()], []):
174  self.assertIn(typename, defs, ERR(func))
175 
176  func = api.dict_to_message
177  with self.subTest(NAME(func)):
178  logger.info("Testing %s.", NAME(func))
179  dct = {"seq": 3, "stamp": {"secs": 1, "nsecs": 2}, "frame_id": self.NAME} if api.ROS1 \
180  else {"stamp": {"sec": 1, "nanosec": 2}, "frame_id": self.NAME}
181  stamp = api.make_time(1, 2)
182  msg = std_msgs.msg.Header(frame_id=self.NAME,
183  stamp=stamp if api.ROS1 else stamp.to_msg())
184  if api.ROS1: msg.seq = dct["seq"]
185  self.assertEqual(func(dct, std_msgs.msg.Header()), msg, ERR(func))
186 
187  func = api.message_to_dict
188  with self.subTest(NAME(func)):
189  logger.info("Testing %s.", NAME(func))
190  dct = {"seq": 3, "stamp": {"secs": 1, "nsecs": 2}, "frame_id": self.NAME} if api.ROS1 \
191  else {"stamp": {"sec": 1, "nanosec": 2}, "frame_id": self.NAME}
192  stamp = api.make_time(1, 2)
193  msg = std_msgs.msg.Header(frame_id=self.NAME,
194  stamp=stamp if api.ROS1 else stamp.to_msg())
195  if api.ROS1: msg.seq = dct["seq"]
196  self.assertEqual(func(msg), dct, ERR(func))
197 
198  func = api.serialize_message
199  with self.subTest(NAME(func)):
200  logger.info("Testing %s.", NAME(func))
201  expected = b"\x01" if api.ROS1 else b"\x00\x01\x00\x00\x01"
202  self.assertEqual(func(std_msgs.msg.Bool(data=True)), expected, ERR(func))
203 
204  func = api.deserialize_message
205  with self.subTest(NAME(func)):
206  logger.info("Testing %s.", NAME(func))
207  binary = b"\x01" if api.ROS1 else b"\x00\x01\x00\x00\x01"
208  expected = std_msgs.msg.Bool(data=True)
209  self.assertEqual(func(binary, "std_msgs/Bool"), expected, ERR(func))
210  self.assertEqual(func(binary, std_msgs.msg.Bool), expected, ERR(func))
211 
212 
213  def test_typenames(self):
214  """Tests API functions for dealing with ROS types."""
215  NAME = lambda f: "%s.%s()" % (f.__module__, f.__name__)
216  ERR = lambda f: "Unexpected result from %s." % f.__name__
217  logger.info("Testing ROS type API functions.")
218 
219  func = api.canonical
220  with self.subTest(NAME(func)):
221  logger.info("Testing %s.", NAME(func))
222  self.assertEqual(func("std_msgs/msg/Bool"), "std_msgs/Bool", ERR(func))
223  self.assertEqual(func("std_srvs/srv/SetBool"), "std_srvs/SetBool", ERR(func))
224  self.assertEqual(func("pkg/Cls"), "pkg/Cls", ERR(func))
225  self.assertEqual(func("Cls"), "Cls", ERR(func))
226  self.assertEqual(func("int8[4]", unbounded=True), "int8[]", ERR(func))
227  if api.ROS2:
228  self.assertEqual(func("octet"), "byte", ERR(func))
229  self.assertEqual(func("sequence<uint8, 100>"), "uint8[100]", ERR(func))
230 
231 
232  func = api.get_type_alias
233  with self.subTest(NAME(func)):
234  logger.info("Testing %s.", NAME(func))
235  signed, unsigned = ("byte", "char") if api.ROS1 else ("char", "byte")
236  self.assertEqual(func("int8"), signed, ERR(func))
237  self.assertEqual(func("uint8"), unsigned, ERR(func))
238  self.assertEqual(func("int16"), None, ERR(func))
239 
240  func = api.get_alias_type
241  with self.subTest(NAME(func)):
242  logger.info("Testing %s.", NAME(func))
243  signed, unsigned = ("byte", "char") if api.ROS1 else ("char", "byte")
244  self.assertEqual(func(signed), "int8", ERR(func))
245  self.assertEqual(func(unsigned), "uint8", ERR(func))
246  self.assertEqual(func("other"), None, ERR(func))
247 
248  func = api.make_full_typename
249  with self.subTest(NAME(func)):
250  logger.info("Testing %s.", NAME(func))
251  self.assertEqual(func("std_msgs/Bool"), "std_msgs/msg/Bool", ERR(func))
252  self.assertEqual(func("std_msgs/msg/Bool"), "std_msgs/msg/Bool", ERR(func))
253  self.assertEqual(func("std_srvs/SetBool", "srv"), "std_srvs/srv/SetBool", ERR(func))
254  self.assertEqual(func("std_srvs/srv/SetBool", "srv"), "std_srvs/srv/SetBool", ERR(func))
255  self.assertEqual(func("%s/Time" % api.ROS_FAMILY),
256  "%s/Time" % api.ROS_FAMILY, ERR(func))
257  self.assertEqual(func("%s/Duration" % api.ROS_FAMILY),
258  "%s/Duration" % api.ROS_FAMILY, ERR(func))
259 
260  func = api.scalar
261  with self.subTest(NAME(func)):
262  logger.info("Testing %s.", NAME(func))
263  self.assertEqual(func("int8"), "int8", ERR(func))
264  self.assertEqual(func("std_msgs/Bool[]"), "std_msgs/Bool", ERR(func))
265 
266 
267  def test_temporals(self):
268  """Tests API functions for dealing with ROS time/duration values."""
269  NAME = lambda f: "%s.%s()" % (f.__module__, f.__name__)
270  ERR = lambda f: "Unexpected result from %s." % f.__name__
271  logger.info("Testing temporal API functions.")
272 
273  func = api.get_ros_time_category
274  with self.subTest(NAME(func)):
275  logger.info("Testing %s.", NAME(func))
276  for typename in api.ROS_TIME_TYPES:
277  expected = "duration" if "duration" in typename.lower() else "time"
278  self.assertEqual(func(typename), expected, ERR(func))
279  for cls in api.ROS_TIME_CLASSES:
280  expected = "duration" if "duration" in cls.__name__.lower() else "time"
281  self.assertEqual(func(cls), expected, ERR(func))
282  self.assertEqual(func(cls()), expected, ERR(func))
283  self.assertEqual(func(api.get_message_type(cls)), expected, ERR(func))
284  self.assertEqual(func(self), self, ERR(func))
285 
286  func = api.time_message
287  with self.subTest(NAME(func)):
288  logger.info("Testing %s.", NAME(func))
289  for cls in api.ROS_TIME_CLASSES:
290  v1 = cls()
291  category, is_msg = api.get_ros_time_category(cls), api.is_ros_message(v1)
292  for to_message in (True, False):
293  v2 = func(v1, to_message=to_message)
294  if is_msg == to_message:
295  expected = type(v1)
296  elif is_msg: # and not to_message
297  expected = type(api.make_time() if "time" == category else
298  api.make_duration())
299  else: # not is_msg and to_message
300  expected = next(api.get_message_class(x) for x in api.ROS_TIME_TYPES
301  if category in x.lower())
302  self.assertTrue(issubclass(expected, type(v2)), ERR(func))
303 
304  func = api.is_ros_time
305  with self.subTest(NAME(func)):
306  logger.info("Testing %s.", NAME(func))
307  msg = std_msgs.msg.Header(frame_id=self.NAME)
308  self.assertTrue(func(type(msg.stamp)), ERR(func))
309  self.assertTrue(func(msg.stamp), ERR(func))
310  self.assertFalse(func(msg), ERR(func))
311 
312  func = api.make_duration
313  with self.subTest(NAME(func)):
314  logger.info("Testing %s.", NAME(func))
315  dur = func(1, 2)
316  self.assertIsInstance(dur, tuple(api.ROS_TIME_CLASSES), ERR(func))
317  self.assertTrue(api.is_ros_time(type(dur)), ERR(func))
318  self.assertTrue(api.is_ros_time(dur), ERR(func))
319 
320  func = api.make_time
321  with self.subTest(NAME(func)):
322  logger.info("Testing %s.", NAME(func))
323  self.assertIsInstance(func(1, 2), tuple(api.ROS_TIME_CLASSES), ERR(func))
324 
325  func = api.to_datetime
326  with self.subTest(NAME(func)):
327  logger.info("Testing %s.", NAME(func))
328  tval, dval = api.make_time(1234), api.make_duration(1234)
329  expected = datetime.datetime.fromtimestamp(1234)
330  self.assertEqual(func(tval), expected, ERR(func))
331  self.assertEqual(func(dval), expected, ERR(func))
332  self.assertEqual(func(666), 666, ERR(func))
333 
334  func = api.to_decimal
335  with self.subTest(NAME(func)):
336  logger.info("Testing %s.", NAME(func))
337  tval = api.make_time (123456789, 987654321)
338  dval = api.make_duration(123456789, 987654321)
339  expected = decimal.Decimal("123456789.987654321")
340  self.assertEqual(func(tval), expected, ERR(func))
341  self.assertEqual(func(dval), expected, ERR(func))
342  self.assertEqual(func(666), 666, ERR(func))
343 
344  func = api.to_duration
345  with self.subTest(NAME(func)):
346  logger.info("Testing %s.", NAME(func))
347 
348  func = api.to_nsec
349  with self.subTest(NAME(func)):
350  logger.info("Testing %s.", NAME(func))
351  tval = api.make_time (123456789, 987654321)
352  dval = api.make_duration(123456789, 987654321)
353  expected = 123456789987654321
354  self.assertEqual(func(tval), expected, ERR(func))
355  self.assertEqual(func(dval), expected, ERR(func))
356  self.assertEqual(func(666), 666, ERR(func))
357 
358  func = api.to_sec
359  with self.subTest(NAME(func)):
360  logger.info("Testing %s.", NAME(func))
361  tval = api.make_time (1, 123456789)
362  dval = api.make_duration(1, 123456789)
363  expected = 1.123456789
364  self.assertEqual(func(tval), expected, ERR(func))
365  self.assertEqual(func(dval), expected, ERR(func))
366  self.assertEqual(func(666), 666, ERR(func))
367 
368  func = api.to_sec_nsec
369  with self.subTest(NAME(func)):
370  logger.info("Testing %s.", NAME(func))
371  tval = api.make_time (123456789, 987654321)
372  dval = api.make_duration(123456789, 987654321)
373  expected = (123456789, 987654321)
374  self.assertEqual(func(tval), expected, ERR(func))
375  self.assertEqual(func(dval), expected, ERR(func))
376  self.assertEqual(func(666), 666, ERR(func))
377 
378  func = api.to_time
379  with self.subTest(NAME(func)):
380  logger.info("Testing %s.", NAME(func))
381 
382  val = datetime.datetime.now()
383  tval = func(val)
384  self.assertTrue(api.is_ros_time(tval), ERR(func))
385  self.assertEqual(api.to_datetime(tval), val, ERR(func))
386 
387  val = decimal.Decimal("123456789.987654321")
388  tval = func(val)
389  self.assertTrue(api.is_ros_time(tval), ERR(func))
390  self.assertEqual(api.to_decimal(tval), val, ERR(func))
391 
392  val = api.make_duration(123456789, 987654321)
393  tval = func(val)
394  self.assertTrue(api.is_ros_time(tval), ERR(func))
395  self.assertEqual(api.make_duration(*api.to_sec_nsec(tval)), val, ERR(func))
396 
397  val = 123321123.456
398  tval = func(val)
399  self.assertTrue(api.is_ros_time(tval), ERR(func))
400  self.assertEqual(api.to_sec(tval), val, ERR(func))
401 
402  self.assertEqual(func(None), None, ERR(func))
403  self.assertEqual(func(self), self, ERR(func))
404 
405 
406 
407 if "__main__" == __name__:
408  TestAPI.run_rostest()
test.testbase.TestBase
Definition: testbase.py:46
test.test_api.TestAPI.setUpClass
def setUpClass(cls)
Definition: test_api.py:40
test.test_api.TestAPI.NAME
NAME
Test name used in flow logging.
Definition: test_api.py:36
test.testbase.TestBase.subTest
def subTest(self, msg=None, **params)
Definition: testbase.py:123
test.test_api.TestAPI
Definition: test_api.py:32
test.test_api.TestAPI.test_typenames
def test_typenames(self)
Definition: test_api.py:213
test.test_api.TestAPI.test_temporals
def test_temporals(self)
Definition: test_api.py:267
test.test_api.TestAPI.test_messages
def test_messages(self)
Definition: test_api.py:45


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29