4 Test: grep input bags to CSV output.
6 ------------------------------------------------------------------------------
7 This file is part of grepros - grep for ROS bag files and live topics.
8 Released under the BSD License.
13 ------------------------------------------------------------------------------
20 sys.path.insert(0, os.path.join(os.path.dirname(__file__),
".."))
21 from test
import testbase
23 logger = logging.getLogger()
27 """Tests grepping from input bags and writing matches to CSV files."""
30 NAME = os.path.splitext(os.path.basename(__file__))[0]
36 OUTPUT_SUFFIX =
".csv"
39 """Collects bags in data directory, assembles command."""
44 """Terminates subprocess and deletes temporary output files, if any."""
46 for filename
in glob.glob(
"%s*%s" % os.path.splitext(self.
_outname)):
47 try: os.unlink(filename)
48 except Exception:
pass
51 """Runs grepros on bags in data directory, verifies CSV output."""
55 logger.info(
"Verifying topics and messages.")
56 filebase, fileext = os.path.splitext(self.
_outname)
57 for bag
in self.
_bags:
58 bagname = os.path.splitext(os.path.basename(bag))[0]
61 topic = topicbase +
"/" + bagname
64 filename =
"%s.%s%s" % (filebase, topic.lstrip(
"/").replace(
"/",
"__"), fileext)
65 self.assertTrue(os.path.isfile(filename),
66 "Expected output file not written: %s" % filename)
67 logger.info(
"Reading data from written %s %s.",
69 with open(filename)
as f:
71 self.assertIn(value, ftext,
"Expected message value not in output.")
72 fulltext +=
"\n\n" + ftext
74 topic = topicbase +
"/" + bagname
75 filename =
"%s.%s%s" % (filebase, topic.lstrip(
"/").replace(
"/",
"__"), fileext)
76 self.assertFalse(os.path.isfile(filename),
77 "Unexpected output file written: %s" % filename)
80 if "__main__" == __name__:
81 TestBagInputCsvOutput.run_rostest()