4 Test: grep live topics to console output.
6 ------------------------------------------------------------------------------
7 This file is part of grepros - grep for ROS bag files and live topics.
8 Released under the BSD License.
13 ------------------------------------------------------------------------------
19 sys.path.insert(0, os.path.join(os.path.dirname(__file__),
".."))
20 from test
import testbase
22 logger = logging.getLogger()
26 """Tests grepping from live topics and printing matches to console."""
29 NAME = os.path.splitext(os.path.basename(__file__))[0]
32 INPUT_LABEL =
"live topics"
35 OUTPUT_LABEL =
"console"
42 """Collects bags in data directory, assembles command."""
47 """Terminates subprocess and shuts down ROS2 node, if any."""
52 """Runs grepros on live topics, verifies console output."""
56 logger.info(
"Opening publishers.")
58 for bagfile
in self.
_bags:
59 bag = testbase.BagReader(bagfile)
60 for topic, msg, _
in bag.read_messages():
62 logger.info(
"Opening publisher to %r.", topic)
68 logger.info(
"Publishing messages to live.")
69 for bagfile
in self.
_bags:
70 bag = testbase.BagReader(bagfile)
71 for topic, msg, _
in bag.read_messages():
72 pubs[topic].publish(msg)
77 self.
_proc.terminate()
78 fulltext = self.
_proc.communicate()[0]
79 self.assertTrue(fulltext,
"Command did not print to console.")
83 if "__main__" == __name__:
84 TestLiveInputConsoleOutput.run_rostest()