test_bag_to_bag.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 """
4 Test: grep input bags to bag output.
5 
6 ------------------------------------------------------------------------------
7 This file is part of grepros - grep for ROS bag files and live topics.
8 Released under the BSD License.
9 
10 @author Erki Suurjaak
11 @created 22.12.2021
12 @modified 25.12.2021
13 ------------------------------------------------------------------------------
14 """
15 import logging
16 import os
17 import sys
18 
19 sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
20 from test import testbase
21 
22 logger = logging.getLogger()
23 
24 
26  """Tests grepping from input bags and writing matches to bag."""
27 
28 
29  NAME = os.path.splitext(os.path.basename(__file__))[0]
30 
31 
32  OUTPUT_LABEL = "ROS bag"
33 
34 
35  OUTPUT_SUFFIX = testbase.TestBase.BAG_SUFFIX
36 
37  def setUp(self):
38  """Collects bags in data directory, assembles command."""
39  super().setUp()
40  self._cmd = self.CMD_BASE + ["--no-console-output", "--write", self._outname]
41 
42  def test_grepros(self):
43  """Runs grepros on bags in data directory, verifies bag output."""
44  self.verify_bags()
45  self.run_command()
46  self.assertTrue(os.path.isfile(self._outname), "Expected output file not written.")
47 
48  logger.info("Reading data from written %s.", self.OUTPUT_LABEL)
49  messages = {} # {topic: [msg, ]}
50  outfile = self._outfile = testbase.BagReader(self._outname)
51  for topic, msg, _ in outfile.read_messages():
52  messages.setdefault(topic, []).append(msg)
53  outfile.close()
54 
55  fulltext = "\n".join(str(m) for mm in messages.values() for m in mm)
56  super().verify_topics(messages, fulltext)
57 
58 
59 if "__main__" == __name__:
60  TestBagInputBagOutput.run_rostest()
test.testbase.TestBase
Definition: testbase.py:46
test.test_bag_to_bag.TestBagInputBagOutput.setUp
def setUp(self)
Definition: test_bag_to_bag.py:37
test.testbase.TestBase.CMD_BASE
list CMD_BASE
Base command for running grepros.
Definition: testbase.py:71
test.testbase.TestBase.verify_topics
def verify_topics(self, topics, messages=None)
Definition: testbase.py:176
test.test_bag_to_bag.TestBagInputBagOutput._cmd
_cmd
Definition: test_bag_to_bag.py:40
test.test_bag_to_bag.TestBagInputBagOutput
Definition: test_bag_to_bag.py:25
test.testbase.TestBase._outname
_outname
Definition: testbase.py:97
test.test_bag_to_bag.TestBagInputBagOutput._outfile
_outfile
Definition: test_bag_to_bag.py:50
test.test_bag_to_bag.TestBagInputBagOutput.test_grepros
def test_grepros(self)
Definition: test_bag_to_bag.py:42
test.test_bag_to_bag.TestBagInputBagOutput.OUTPUT_LABEL
string OUTPUT_LABEL
Name used in logging.
Definition: test_bag_to_bag.py:32
test.testbase.TestBase.verify_bags
def verify_bags(self)
Definition: testbase.py:169
test.testbase.TestBase.run_command
def run_command(self, communicate=True)
Definition: testbase.py:154


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29