4 Test: main functions and classes of grepros as library.
6 ------------------------------------------------------------------------------
7 This file is part of grepros - grep for ROS bag files and live topics.
8 Released under the BSD License.
13 ------------------------------------------------------------------------------
27 from grepros
import api
30 sys.path.insert(0, os.path.join(os.path.dirname(__file__),
".."))
31 from test
import testbase
33 logger = logging.getLogger()
37 """Tests using grepros main functions and classes as a library."""
40 NAME = os.path.splitext(os.path.basename(__file__))[0]
43 OUTPUT_LABEL =
"various sinks"
46 OUTPUT_SUFFIX = testbase.TestBase.BAG_SUFFIX
55 """Collects and verifies bags in data directory."""
62 """Deletes temporary output files, if any."""
65 except Exception:
pass
70 """Tests reading and writing bags and matching messages."""
71 logger.info(
"Verifying reading and writing bags, and grepping messages.")
73 topic=
"/match/this*", no_topic=
"/not/this*",
74 type=
"std_msgs/*", no_type=
"std_msgs/Bool")
75 logger.debug(
"Writing to bag %r.", self.
_outname)
77 for bagname
in self.
_bags:
78 logger.debug(
"Reading from bag %r.", bagname)
80 for topic, msg, stamp
in inbag:
81 if grep.match(topic, msg, stamp):
82 outbag.write(topic, msg, stamp)
84 logger.debug(
"Validating messages written to %r.", self.
_outname)
87 for topic, msg, _
in outfile.read_messages():
88 messages.setdefault(topic, []).append(msg)
90 fulltext =
"\n".join(str(m)
for mm
in messages.values()
for m
in mm)
95 """Tests Bag class general interface."""
96 logger.info(
"Verifying Bag-class attributes.")
98 if ".mcap" in api.BAG_EXTENSIONS:
99 outname = self.
mkfile(
".mcap")
104 """Tests general interface for given bag class."""
105 logger.info(
"Verifying Bag %s attributes.", bagcls)
107 bag = bagcls(outfilename, mode=
"w")
108 self.assertEqual(bag.mode,
"w",
"Unexpected result for Bag.mode.")
109 self.assertGreaterEqual(bag.size, 0,
"Unexpected result for Bag.size.")
111 self.assertFalse(bag.closed,
"Unexpected result for Bag.closed.")
112 self.assertEqual(bag.get_message_count(), 0,
113 "Unexpected result for Bag.get_message_count().")
114 self.assertEqual(bag.get_start_time(),
None,
115 "Unexpected result for Bag.get_start_time().")
116 self.assertEqual(bag.get_end_time(),
None,
117 "Unexpected result for Bag.get_end_time().")
118 self.assertIsInstance(bag.get_topic_info(), dict,
119 "Unexpected result for Bag.get_topic_info().")
120 self.assertIsInstance(bag.get_type_and_topic_info(), tuple,
121 "Unexpected result for Bag.get_type_and_topic_info().")
122 bag.write(
"/my/topic", std_msgs.msg.Bool())
123 with self.assertRaises(Exception):
127 self.assertTrue(bag.closed,
"Unexpected result for Bag.closed.")
129 bag = bagcls(infilename)
130 self.assertIsInstance(bag.size, int,
"Unexpected result for Bag.size.")
132 self.assertFalse(bag.closed,
"Unexpected result for Bag.closed.")
133 topic, msg, stamp = next(bag)
134 self.assertIsInstance(topic, str,
"Unexpected result for next(Bag).")
135 self.assertTrue(api.is_ros_message(msg),
"Unexpected result for next(Bag).")
136 self.assertTrue(api.is_ros_time(stamp),
"Unexpected result for next(Bag).")
137 with self.assertRaises(Exception):
138 bag.write(topic, msg, stamp)
140 self.assertIsInstance(len(bag), int,
"Unexpected result for len(Bag).")
141 self.assertTrue(bool(bag),
"Unexpected result for bool(Bag).")
142 self.assertIsInstance(bag.size, int,
"Unexpected result for Bag.size.")
143 self.assertEqual(bag.mode,
"r",
"Unexpected result for Bag.mode.")
145 self.assertTrue(callable(bag.get_message_class),
146 "Unexpected result for Bag.get_message_class.")
147 self.assertTrue(callable(bag.get_message_definition),
148 "Unexpected result for Bag.get_message_definition.")
149 self.assertTrue(callable(bag.get_message_type_hash),
150 "Unexpected result for Bag.get_message_type_hash.")
151 self.assertTrue(callable(bag.get_qoses),
152 "Unexpected result for Bag.get_qoses.")
154 self.assertIsInstance(bag.get_message_count(), int,
155 "Unexpected result for Bag.get_message_count().")
156 self.assertIsInstance(bag.get_end_time(), (float, int),
157 "Unexpected result for Bag.get_start_time().")
158 self.assertIsInstance(bag.get_end_time(), (float, int),
159 "Unexpected result for Bag.get_end_time().")
160 self.assertIsInstance(bag.get_topic_info(), dict,
161 "Unexpected result for Bag.get_topic_info().")
162 self.assertIsInstance(bag.get_type_and_topic_info(), tuple,
163 "Unexpected result for Bag.get_type_and_topic_info().")
165 self.assertTrue(bag.closed,
"Unexpected result for Bag.closed.")
169 """Tests parameters to Bag functions."""
170 logger.info(
"Verifying invoking Bag methods with parameters.")
173 if ".mcap" in api.BAG_EXTENSIONS:
174 outname = self.
mkfile(
".mcap")
180 """Tests parameters to read functions of given Bag class."""
181 NAME =
lambda f, *a:
"%s.%s(%s)" % (f.__module__, (f.__name__),
", ".join(map(repr, a)))
182 ERR =
lambda f, *a:
"Unexpected result from %s(%s)." % (f.__name__,
", ".join(map(repr, a)))
184 logger.info(
"Verifying invoking Bag %r read methods with parameters.", bagcls)
187 with bagcls(filename)
as bag:
188 for t, m, s
in bag: messages.setdefault(t, []).append((m, s))
189 self.assertTrue(messages,
"Unexpected result for reading bag contents.")
191 bag = bagcls(filename)
194 func = bag.get_message_class
196 logger.info(
"Testing %s.",
NAME(func))
197 typename =
"std_msgs/Bool"
198 self.assertEqual(func(typename +
"unknown"),
None, ERR(func))
199 self.assertEqual(func(typename,
"wronghash"),
None, ERR(func))
201 self.assertTrue(api.is_ros_message(cls), ERR(func))
202 self.assertEqual(api.get_message_type(cls), typename, ERR(func))
204 func = bag.read_messages
207 logger.info(
"Testing %s.",
NAME(func))
209 logger.debug(
"Verifying %s.",
NAME(func,
"topics"))
210 topics = random.sample(list(messages), 2)
212 for topic, msg, stamp
in func(topics):
213 mymsgs.setdefault(topic, []).append((msg, stamp))
216 self.assertEqual([(api.message_to_dict(m), t)
for m, t
in mymsgs[topic]],
217 [(api.message_to_dict(m), t)
for m, t
in messages[topic]],
220 topic, shift = next(iter(topics)), api.make_duration(1)
221 logger.debug(
"Verifying %s.",
NAME(func,
"topic",
"start_time=.."))
222 start_time = messages[topic][-1][-1] + shift
223 nomsgs = list(func(topic, start_time=start_time))
224 self.assertFalse(nomsgs, ERR(func,
"topic",
"start_time=.."))
225 logger.debug(
"Verifying %s.",
NAME(func,
"topic",
"end_time=.."))
226 end_time = messages[topic][0][-1] - shift
227 nomsgs = list(func(topic, end_time=end_time))
228 self.assertFalse(nomsgs, ERR(func,
"topic",
"end_time=.."))
230 logger.debug(
"Verifying %s.",
NAME(func,
"topic",
"..",
"raw=True"))
231 start_time, end_time = messages[topic][0][-1], messages[topic][-1][-1]
232 for _, msg, _
in func(topic, start_time=start_time, end_time=end_time, raw=
True):
233 bbytes, typeclass = msg[1], msg[-1]
234 self.assertIsInstance(bbytes, bytes, ERR(func,
"topic",
"..",
"raw=True"))
235 self.assertTrue(inspect.isclass(typeclass), ERR(func,
"topic",
"..",
"raw=True"))
237 func = bag.get_message_definition
240 logger.info(
"Testing %s.",
NAME(func))
241 msgcls = func(
"std_msgs/Bool")
242 self.assertTrue(msgcls, ERR(func))
243 self.assertIsInstance(msgcls, str, ERR(func))
244 msgcls = func(std_msgs.msg.Bool)
245 self.assertTrue(msgcls, ERR(func))
246 self.assertIsInstance(msgcls, str, ERR(func))
248 func = bag.get_message_type_hash
251 logger.info(
"Testing %s.",
NAME(func))
252 typehash = func(
"std_msgs/Bool")
253 self.assertTrue(typehash, ERR(func))
254 self.assertIsInstance(typehash, str, ERR(func))
255 typehash = func(std_msgs.msg.Bool)
256 self.assertTrue(typehash, ERR(func))
257 self.assertIsInstance(typehash, str, ERR(func))
262 logger.info(
"Testing %s.",
NAME(func))
263 topic = next(iter(messages))
264 typename = api.get_message_type(messages[topic][0][0])
265 received = func(topic, typename)
266 expected = type(
None)
if api.ROS1
else (list, type(
None))
267 self.assertIsInstance(received, expected, ERR(func))
269 func = bag.get_type_and_topic_info
272 logger.info(
"Testing %s.",
NAME(func))
273 topic = next(iter(messages))
274 typename = api.get_message_type(messages[topic][0][0])
275 msg_types, topics = func(topic_filters=topic)
276 self.assertGreaterEqual(len(msg_types), 1, ERR(func,
"topic_filters=sometopic"))
277 self.assertEqual(len(topics), 1, ERR(func,
"topic_filters=sometopic"))
278 self.assertIn(typename, msg_types, ERR(func,
"topic_filters=sometopic"))
279 self.assertIn(topic, topics, ERR(func,
"topic_filters=sometopic"))
281 func = bag.get_message_count
284 logger.info(
"Testing %s.",
NAME(func))
286 topics = random.sample(list(messages), count)
287 received = func(topic_filters=topics)
288 expected = sum(len(messages[t])
for t
in topics)
289 self.assertEqual(received, expected, ERR(func,
"topic_filters=sometopics"))
294 bag = bagcls(filename)
297 func = bag.get_topic_info
300 logger.info(
"Testing %s.",
NAME(func))
301 counts = list(func(counts=
True).values())
302 self.assertNotIn(
None, counts, ERR(func,
"counts=True"))
306 """Tests parameters to write functions of given Bag class."""
307 NAME =
lambda f, *a:
"%s.%s(%s)" % (f.__module__, (f.__name__),
", ".join(map(repr, a)))
308 ERR =
lambda f, *a:
"Unexpected result from %s(%s)." % (f.__name__,
", ".join(map(repr, a)))
310 logger.info(
"Verifying invoking Bag %r write methods with parameters.", bagcls)
313 bag = bagcls(filename,
"w")
319 logger.info(
"Testing %s.",
NAME(func))
320 topicbase, typename =
"/my/topic",
"std_msgs/Bool"
321 typehash, typeclass = api.get_message_type_hash(typename), std_msgs.msg.Bool
323 topic, msg =
"%s/%s" % (topicbase, i % 2), std_msgs.msg.Bool(data=bool(i % 2))
324 bbytes = api.serialize_message(msg)
325 bag.write(topic, (typename, bbytes, typehash, typeclass), raw=
True)
326 messages.setdefault(topic, []).append(msg)
329 bag = bagcls(filename)
331 for topic, msg, stamp
in bag:
332 self.assertIn(topic, messages, ERR(func,
"..",
"raw=True"))
333 self.assertTrue(any(api.message_to_dict(msg) == api.message_to_dict(m)
334 for m
in messages[topic]), ERR(func,
"..",
"raw=True"))
340 """Tests grepros global functions: init(), grep(), source(), sink()."""
347 """Tests rollover settings for sinks."""
348 NAME =
lambda f:
"%s.%s" % (f.__module__, f.__name__)
349 logger.debug(
"Verifying sink rollover.")
352 TEMPLATE =
"test_%Y_%m_%d__%(index)s__%(index)s"
354 (dict(rollover_size=2000), (2,
None)),
355 (dict(rollover_count=40), (2, 3)),
356 (dict(rollover_duration=40), (2, 3)),
360 START = api.to_time(12345)
362 EXT = api.BAG_EXTENSIONS[0]
if cls
is grepros.BagSink else cls.FILE_EXTENSIONS[0]
363 WRITE, OUTDIR = next((x, os.path.dirname(x))
for x
in [self.
mkfile(EXT)])
365 logger.info(
"Testing %s rollover.",
NAME(cls))
366 for ropts, output_range
in OPTS:
367 if cls
in OPT_OVERRIDES
and any(k
in ropts
for k
in OPT_OVERRIDES[cls]):
368 ropts.update(OPT_OVERRIDES[cls])
369 if not any(ropts.values()):
continue
370 logger.info(
"Testing %s rollover with %s.",
NAME(cls), ropts)
371 SUFF =
"".join(
"%s=%s" % x
for x
in ropts.items())
372 template = os.path.join(OUTDIR, TEMPLATE + SUFF + EXT)
374 with cls(WRITE, write_options=dict(ropts, rollover_template=template))
as sink:
376 msg = std_msgs.msg.Bool(data=
not i % 2)
377 sink.emit(
"my/topic%s" % (i % 2), msg, START + api.make_duration(i))
379 outputs = sorted(glob.glob(os.path.join(OUTDIR,
"test_*" + SUFF + EXT)))
381 self.assertGreaterEqual(len(outputs), output_range[0],
382 "Unexpected output files from %s." %
NAME(cls))
383 if output_range[1]
is not None:
384 self.assertLessEqual(len(outputs), output_range[1],
385 "Unexpected output files from %s." %
NAME(cls))
386 self.assertFalse(os.path.exists(WRITE),
"Unexpected output from %s." %
NAME(cls))
387 for name
in map(os.path.basename, outputs):
388 self.assertTrue(re.search(
r"^test_\d+_\d+_\d+__\d+__\d+", name),
389 "Unexpected output file from %s." %
NAME(cls))
393 """Tests grepros.grep()."""
394 NAME =
lambda f, **w:
"%s.%s(%s)" % (f.__module__, f.__name__,
"**%s" % w
if w
else "")
395 ERR =
lambda f, **w:
"Unexpected result from %s." %
NAME(f, **w)
396 logger.info(
"Verifying reading bags and grepping messages, via grepros.grep().")
398 args = dict(pattern=self.
SEARCH_WORDS, topic=
"/match/this*", no_topic=
"/not/this*",
399 file=self.
_bags, type=
"std_msgs/*", no_type=
"std_msgs/Bool")
400 for topic, msg, stamp, match, index
in grepros.grep(**args):
401 self.assertIn(
"/match/this", topic, ERR(grepros.grep, **args))
402 self.assertNotIn(
"/not/this", topic, ERR(grepros.grep, **args))
403 self.assertIn(
"std_msgs/", api.get_message_type(msg), ERR(grepros.grep, **args))
404 self.assertNotIn(
"/Bool", api.get_message_type(msg), ERR(grepros.grep, **args))
405 self.assertTrue(match, ERR(grepros.grep, **args))
406 self.assertIsInstance(index, int, ERR(grepros.grep, **args))
407 messages.setdefault(topic, []).append(msg)
408 fulltext =
"\n".join(str(m)
for mm
in messages.values()
for m
in mm)
413 """Tests general Source and Sink API."""
414 NAME =
lambda f, **w:
"%s.%s(%s)" % (f.__module__, f.__name__,
"**%s" % w
if w
else "")
415 ERR =
lambda f, **w:
"Unexpected result from %s." %
NAME(f, **w)
438 if ".mcap" in api.BAG_EXTENSIONS: FUNC_TESTS[grepros.sink].append(
441 if "parquet" in grepros.MultiSink.FORMAT_CLASSES: FUNC_TESTS[grepros.sink].append(
445 for func, args
in FUNC_TESTS.items():
446 logger.info(
"Verifying %s.",
NAME(func))
447 for kwargs, cls
in args:
448 logger.debug(
"Verifying %s.",
NAME(func, **kwargs))
449 with func(**kwargs)
as source:
450 self.assertIsInstance(source, cls, ERR(func, **kwargs))
453 logger.debug(
"Verifying grepros.PostgresSink failing for invalid configuration.",)
454 with self.assertRaises(Exception, msg=
"Unexpected success from PostgresSink()."):
457 with self.assertRaises(Exception, msg=
"Unexpected success from PostgresSink()."):
458 with grepros.sink(
"postgresql://nosuchuser/nosuchdb")
as sink:
461 logger.debug(
"Verifying grepros.AppSource and AppSink.")
462 expected, received = [], []
463 iterable = [(
"/my/topic", std_msgs.msg.String(data=
"my")),
None]
464 emitter =
lambda *a: received.append(a)
467 for topic, msg, stamp
in source:
468 sink.emit(topic, msg, stamp)
469 expected.append((topic, msg, stamp))
470 source.push(
"/other/topic", std_msgs.msg.String(data=
"other"))
471 source.push(
"/third/topic", std_msgs.msg.String(data=
"third"))
473 for topic, msg, stamp
in source:
474 sink.emit(topic, msg, stamp)
475 expected.append((topic, msg, stamp))
476 self.assertEqual(len(received), len(expected), ERR(type(sink)))
477 for a, b
in zip(received, expected):
478 self.assertEqual(a[:2], b[:2], ERR(type(sink)))
482 """Returns temporary filename with given suffix, deleted in teardown."""
483 name = tempfile.NamedTemporaryFile(suffix=suffix).name
488 if "__main__" == __name__:
489 TestLibrary.run_rostest()