testbase.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 """
4 Base class for ROS tests.
5 
6 ------------------------------------------------------------------------------
7 This file is part of grepros - grep for ROS bag files and live topics.
8 Released under the BSD License.
9 
10 @author Erki Suurjaak
11 @created 23.12.2021
12 @modified 22.12.2023
13 ------------------------------------------------------------------------------
14 """
15 import contextlib
16 import logging
17 import glob
18 import os
19 import sqlite3
20 import subprocess
21 import sys
22 import tempfile
23 import time
24 import unittest
25 
26 if os.getenv("ROS_VERSION") == "1":
27  import rosbag
28  import rospy
29  import rostest
30 else:
31  import rclpy.serialization
32  import rclpy.time
33  import rosidl_runtime_py.utilities
34 
35 logger = logging.getLogger()
36 
37 
38 def init_logging(name):
39  """Initializes logging."""
40  fmt = "[%%(levelname)s]\t[%%(created).06f] [%s] %%(message)s" % name
41  logging.basicConfig(level=logging.DEBUG, format=fmt, stream=sys.stdout)
42  logger.setLevel(logging.DEBUG)
43 
44 
45 
46 class TestBase(unittest.TestCase):
47  """Tests grepping from a source to a sink, with prepared test bags."""
48 
49 
50  NAME = ""
51 
52 
53  INPUT_LABEL = "ROS bags"
54 
55 
56  OUTPUT_LABEL = ""
57 
58 
59  OUTPUT_SUFFIX = None
60 
61 
62  DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "data"))
63 
64 
65  BAG_SUFFIX = ".bag" if os.getenv("ROS_VERSION") == "1" else ".db3"
66 
67 
68  SEARCH_WORDS = ["this"]
69 
70 
71  CMD_BASE = ["grepros"] + SEARCH_WORDS + \
72  ["--topic", "/match/this*", "--type", "std_msgs/*",
73  "--no-topic", "/not/this*", "--no-type", "std_msgs/Bool",
74  "--match-wrapper", "--color", "never", "--path", DATA_DIR]
75 
76 
77  EXPECTED_WORDS = ["match_this"]
78 
79 
80  SKIPPED_WORDS = ["not_this"]
81 
82 
83  EXPECTED_TOPIC_BASES = ["/match/this/intarray", "/match/this/header"]
84 
85 
86  SKIPPED_TOPIC_BASES = ["/not/this/intarray", "/match/this/not"]
87 
88 
89  def __init__(self, *args, **kwargs):
90  super(TestBase, self).__init__(*args, **kwargs)
91  self.maxDiff = None # Full diff on assert failure
92  try: unittest.util._MAX_LENGTH = 100000
93  except Exception: pass
94  self._bags = [] # Bags in DATA_DIR
95  self._proc = None # subprocess.Popen for grepros
96  self._cmd = None # Full grepros command
97  self._outname = None # Name of temporary file for write output
98  self._outfile = None # Opened temporary file
99  self._node = None # rclpy.Node for ROS2
100 
101  init_logging(self.NAME)
102 
103 
104  def setUp(self):
105  """Collects bags in data directory, assembles command."""
106  logger.debug("Setting up test.")
107  self._bags = sorted(glob.glob(os.path.join(self.DATA_DIR, "*" + self.BAG_SUFFIX)))
108  if self.OUTPUT_SUFFIX:
109  self._outname = tempfile.NamedTemporaryFile(suffix=self.OUTPUT_SUFFIX).name
110 
111 
112  def tearDown(self):
113  """Terminates subprocess and deletes temporary output files, if any."""
114  logger.debug("Tearing down test.")
115  try: self._proc and self._proc.terminate()
116  except Exception: pass
117  try: self._outfile and self._outfile.close()
118  except Exception: pass
119  try: self._outname and os.unlink(self._outname)
120  except Exception: pass
121 
122 
123  def subTest(self, msg=None, **params):
124  """Shim for TestCase.subTest in Py2."""
125  if callable(getattr(super(TestBase, self), "subTest", None)): # Py 3.7+
126  return super(TestBase, self).subTest(msg, **params)
127  if callable(getattr(contextlib, "nested", None)): # Py 2
128  return contextlib.nested()
129 
130 
131  def create_publisher(self, topic, cls):
132  if os.getenv("ROS_VERSION") == "1":
133  return rospy.Publisher(topic, cls, queue_size=10)
134  return self._node.create_publisher(cls, topic, 10)
135 
136 
137  def init_node(self):
138  """Creates ROS1 or ROS2 node."""
139  if os.getenv("ROS_VERSION") == "1":
140  rospy.init_node(self.NAME)
141  logger.addHandler(Ros1LogHandler(self.NAME))
142  logger.setLevel(logging.DEBUG)
143  else:
144  try: rclpy.init()
145  except Exception: pass # Must not be called twice
146  self._node = rclpy.create_node(self.NAME)
147 
148 
149  def shutdown_node(self):
150  """Shuts down ROS2 node, if any."""
151  self._node and rclpy.shutdown()
152 
153 
154  def run_command(self, communicate=True):
155  """Executes test command, returns command output text if communicate."""
156  logger.debug("Executing %r.", " ".join(self._cmd))
157  self._proc = subprocess.Popen(self._cmd, universal_newlines=True,
158  stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
159  if not communicate: return None
160  return self._proc.communicate()[0]
161 
162 
163  def spin_once(self, timeout):
164  """"""
165  if self._node: rclpy.spin_once(self._node, timeout_sec=timeout)
166  else: time.sleep(timeout)
167 
168 
169  def verify_bags(self):
170  """Asserts that ROS bags were found in data directory."""
171  logger.info("Verifying reading from %s and writing to %s.",
172  self.INPUT_LABEL, self.OUTPUT_LABEL)
173  self.assertTrue(self._bags, "No bags found in data directory.")
174 
175 
176  def verify_topics(self, topics, messages=None):
177  """Asserts that topics and message texts were emitted as expected."""
178  logger.info("Verifying topics and messages." if messages else "Verifying topics.")
179  for bag in self._bags:
180  bagname = os.path.splitext(os.path.basename(bag))[0]
181  for topicbase in self.EXPECTED_TOPIC_BASES:
182  topic = topicbase + "/" + bagname
183  self.assertIn(topic, topics, "Expected topic not in output.")
184  if messages is None: continue # for topicbase
185 
186  value = " ".join(self.EXPECTED_WORDS + [bagname])
187  self.assertIn(value, messages, "Expected message value not in output.")
188 
189  for topicbase in self.SKIPPED_TOPIC_BASES:
190  topic = topicbase + "/" + bagname
191  self.assertNotIn(topic, topics, "Unexpected topic in output.")
192  if messages is None: continue # for topicbase
193 
194  value = " ".join(self.SKIPPED_WORDS + [bagname])
195  self.assertNotIn(value, messages, "Unexpected message value in output.")
196 
197 
198  @classmethod
199  def run_rostest(cls):
200  """Runs rostest if ROS1."""
201  rostest.rosrun("grepros", cls.NAME, cls)
202 
203 
204 
205 class Ros1LogHandler(logging.Handler):
206  """Logging handler that forwards logging messages to rospy.logwarn."""
207 
208  def __init__(self, name, *args, **kwargs):
209  super().__init__(*args, **kwargs)
210  self.__name = name
211 
212  def emit(self, record):
213  """Invokes rospy.logwarn or logerr (only warn or higher gets rostest output)."""
214  if "rospy" in record.name or record.levelno >= logging.WARN:
215  return # Skip rospy internal logging, or higher levels logged by rospy
216 
217  try: text = record.msg % record.args if record.args else record.msg
218  except Exception: text = record.msg
219  text = "[%s] %s" % (self.__name, text)
220  (rospy.logerr if record.levelno >= logging.ERROR else rospy.logwarn)(text)
221 
222 
223 
225  """Simple ROS2 bag reader."""
226 
227  def __init__(self, filename):
228  self._db = sqlite3.connect(filename)
229  self._db.row_factory = lambda cursor, row: dict(sqlite3.Row(cursor, row))
230 
231  def read_messages(self):
232  """Yields messages from the bag."""
233  topicmap = {x["id"]: x for x in self._db.execute("SELECT * FROM topics").fetchall()}
234  msgtypes = {} # {full typename: cls}
235 
236  for row in self._db.execute("SELECT * FROM messages ORDER BY timestamp"):
237  tdata = topicmap[row["topic_id"]]
238  topic, typename = tdata["name"], tdata["type"]
239  if typename not in msgtypes:
240  msgtypes[typename] = rosidl_runtime_py.utilities.get_message(typename)
241  msg = rclpy.serialization.deserialize_message(row["data"], msgtypes[typename])
242  stamp = rclpy.time.Time(nanoseconds=row["timestamp"])
243  yield topic, msg, stamp
244 
245  def close(self):
246  """Closes the bag file."""
247  self._db.close()
248 
249 
250 BagReader = rosbag.Bag if os.getenv("ROS_VERSION") == "1" else Ros2BagReader
test.testbase.TestBase.run_rostest
def run_rostest(cls)
Definition: testbase.py:199
test.testbase.TestBase.OUTPUT_LABEL
string OUTPUT_LABEL
Name used in flow logging.
Definition: testbase.py:56
rosbag::Bag
test.testbase.TestBase.shutdown_node
def shutdown_node(self)
Definition: testbase.py:149
test.testbase.TestBase
Definition: testbase.py:46
test.testbase.TestBase.INPUT_LABEL
string INPUT_LABEL
Name used in flow logging.
Definition: testbase.py:53
test.testbase.Ros2BagReader
Definition: testbase.py:224
test.testbase.TestBase.__init__
def __init__(self, *args, **kwargs)
Definition: testbase.py:89
test.testbase.TestBase.maxDiff
maxDiff
Definition: testbase.py:91
test.testbase.Ros1LogHandler.__name
__name
Definition: testbase.py:210
test.testbase.Ros2BagReader.__init__
def __init__(self, filename)
Definition: testbase.py:227
test.testbase.TestBase.create_publisher
def create_publisher(self, topic, cls)
Definition: testbase.py:131
test.testbase.TestBase.setUp
def setUp(self)
Definition: testbase.py:104
test.testbase.TestBase.SKIPPED_WORDS
list SKIPPED_WORDS
Words not expected in matched messages, bag name appended.
Definition: testbase.py:80
test.testbase.TestBase.init_node
def init_node(self)
Definition: testbase.py:137
test.testbase.TestBase.verify_topics
def verify_topics(self, topics, messages=None)
Definition: testbase.py:176
test.testbase.TestBase.DATA_DIR
DATA_DIR
Test bags directory.
Definition: testbase.py:62
test.testbase.Ros1LogHandler
Definition: testbase.py:205
test.testbase.TestBase.EXPECTED_WORDS
list EXPECTED_WORDS
Words expected in matched messages, bag name appended.
Definition: testbase.py:77
test.testbase.Ros2BagReader._db
_db
Definition: testbase.py:228
test.testbase.TestBase.NAME
string NAME
Test name used in flow logging.
Definition: testbase.py:50
test.testbase.TestBase.subTest
def subTest(self, msg=None, **params)
Definition: testbase.py:123
test.testbase.Ros2BagReader.close
def close(self)
Definition: testbase.py:245
test.testbase.TestBase._node
_node
Definition: testbase.py:99
test.testbase.TestBase.SKIPPED_TOPIC_BASES
list SKIPPED_TOPIC_BASES
Topics not expected in output, bag name appended.
Definition: testbase.py:86
test.testbase.TestBase._cmd
_cmd
Definition: testbase.py:96
test.testbase.TestBase.EXPECTED_TOPIC_BASES
list EXPECTED_TOPIC_BASES
Topics expected in output, bag name appended.
Definition: testbase.py:83
test.testbase.init_logging
def init_logging(name)
Definition: testbase.py:38
test.testbase.TestBase._outname
_outname
Definition: testbase.py:97
test.testbase.TestBase.spin_once
def spin_once(self, timeout)
Definition: testbase.py:163
test.testbase.TestBase.OUTPUT_SUFFIX
OUTPUT_SUFFIX
Suffix for write output file, if any.
Definition: testbase.py:59
test.testbase.TestBase._proc
_proc
Definition: testbase.py:95
test.testbase.TestBase.BAG_SUFFIX
string BAG_SUFFIX
ROS bagfile extension.
Definition: testbase.py:65
test.testbase.TestBase._bags
_bags
Definition: testbase.py:94
test.testbase.Ros1LogHandler.emit
def emit(self, record)
Definition: testbase.py:212
test.testbase.Ros1LogHandler.__init__
def __init__(self, name, *args, **kwargs)
Definition: testbase.py:208
test.testbase.TestBase.verify_bags
def verify_bags(self)
Definition: testbase.py:169
test.testbase.TestBase._outfile
_outfile
Definition: testbase.py:98
test.testbase.TestBase.run_command
def run_command(self, communicate=True)
Definition: testbase.py:154
test.testbase.Ros2BagReader.read_messages
def read_messages(self)
Definition: testbase.py:231
test.testbase.TestBase.tearDown
def tearDown(self)
Definition: testbase.py:112


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29