4 Base class for ROS tests.
6 ------------------------------------------------------------------------------
7 This file is part of grepros - grep for ROS bag files and live topics.
8 Released under the BSD License.
13 ------------------------------------------------------------------------------
26 if os.getenv(
"ROS_VERSION") ==
"1":
31 import rclpy.serialization
33 import rosidl_runtime_py.utilities
35 logger = logging.getLogger()
39 """Initializes logging."""
40 fmt =
"[%%(levelname)s]\t[%%(created).06f] [%s] %%(message)s" % name
41 logging.basicConfig(level=logging.DEBUG, format=fmt, stream=sys.stdout)
42 logger.setLevel(logging.DEBUG)
47 """Tests grepping from a source to a sink, with prepared test bags."""
53 INPUT_LABEL =
"ROS bags"
62 DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
"data"))
65 BAG_SUFFIX =
".bag" if os.getenv(
"ROS_VERSION") ==
"1" else ".db3"
68 SEARCH_WORDS = [
"this"]
71 CMD_BASE = [
"grepros"] + SEARCH_WORDS + \
72 [
"--topic",
"/match/this*",
"--type",
"std_msgs/*",
73 "--no-topic",
"/not/this*",
"--no-type",
"std_msgs/Bool",
74 "--match-wrapper",
"--color",
"never",
"--path", DATA_DIR]
77 EXPECTED_WORDS = [
"match_this"]
80 SKIPPED_WORDS = [
"not_this"]
83 EXPECTED_TOPIC_BASES = [
"/match/this/intarray",
"/match/this/header"]
86 SKIPPED_TOPIC_BASES = [
"/not/this/intarray",
"/match/this/not"]
90 super(TestBase, self).
__init__(*args, **kwargs)
92 try: unittest.util._MAX_LENGTH = 100000
93 except Exception:
pass
105 """Collects bags in data directory, assembles command."""
106 logger.debug(
"Setting up test.")
113 """Terminates subprocess and deletes temporary output files, if any."""
114 logger.debug(
"Tearing down test.")
116 except Exception:
pass
118 except Exception:
pass
120 except Exception:
pass
124 """Shim for TestCase.subTest in Py2."""
125 if callable(getattr(super(TestBase, self),
"subTest",
None)):
126 return super(TestBase, self).
subTest(msg, **params)
127 if callable(getattr(contextlib,
"nested",
None)):
128 return contextlib.nested()
132 if os.getenv(
"ROS_VERSION") ==
"1":
133 return rospy.Publisher(topic, cls, queue_size=10)
138 """Creates ROS1 or ROS2 node."""
139 if os.getenv(
"ROS_VERSION") ==
"1":
140 rospy.init_node(self.
NAME)
142 logger.setLevel(logging.DEBUG)
145 except Exception:
pass
146 self.
_node = rclpy.create_node(self.
NAME)
150 """Shuts down ROS2 node, if any."""
151 self.
_node and rclpy.shutdown()
155 """Executes test command, returns command output text if communicate."""
156 logger.debug(
"Executing %r.",
" ".join(self.
_cmd))
157 self.
_proc = subprocess.Popen(self.
_cmd, universal_newlines=
True,
158 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
159 if not communicate:
return None
160 return self.
_proc.communicate()[0]
165 if self.
_node: rclpy.spin_once(self.
_node, timeout_sec=timeout)
166 else: time.sleep(timeout)
170 """Asserts that ROS bags were found in data directory."""
171 logger.info(
"Verifying reading from %s and writing to %s.",
173 self.assertTrue(self.
_bags,
"No bags found in data directory.")
177 """Asserts that topics and message texts were emitted as expected."""
178 logger.info(
"Verifying topics and messages." if messages
else "Verifying topics.")
179 for bag
in self.
_bags:
180 bagname = os.path.splitext(os.path.basename(bag))[0]
182 topic = topicbase +
"/" + bagname
183 self.assertIn(topic, topics,
"Expected topic not in output.")
184 if messages
is None:
continue
187 self.assertIn(value, messages,
"Expected message value not in output.")
190 topic = topicbase +
"/" + bagname
191 self.assertNotIn(topic, topics,
"Unexpected topic in output.")
192 if messages
is None:
continue
195 self.assertNotIn(value, messages,
"Unexpected message value in output.")
200 """Runs rostest if ROS1."""
201 rostest.rosrun(
"grepros", cls.
NAME, cls)
206 """Logging handler that forwards logging messages to rospy.logwarn."""
213 """Invokes rospy.logwarn or logerr (only warn or higher gets rostest output)."""
214 if "rospy" in record.name
or record.levelno >= logging.WARN:
217 try: text = record.msg % record.args
if record.args
else record.msg
218 except Exception: text = record.msg
219 text =
"[%s] %s" % (self.
__name, text)
220 (rospy.logerr
if record.levelno >= logging.ERROR
else rospy.logwarn)(text)
225 """Simple ROS2 bag reader."""
228 self.
_db = sqlite3.connect(filename)
229 self.
_db.row_factory =
lambda cursor, row: dict(sqlite3.Row(cursor, row))
232 """Yields messages from the bag."""
233 topicmap = {x[
"id"]: x
for x
in self.
_db.execute(
"SELECT * FROM topics").fetchall()}
236 for row
in self.
_db.execute(
"SELECT * FROM messages ORDER BY timestamp"):
237 tdata = topicmap[row[
"topic_id"]]
238 topic, typename = tdata[
"name"], tdata[
"type"]
239 if typename
not in msgtypes:
240 msgtypes[typename] = rosidl_runtime_py.utilities.get_message(typename)
241 msg = rclpy.serialization.deserialize_message(row[
"data"], msgtypes[typename])
242 stamp = rclpy.time.Time(nanoseconds=row[
"timestamp"])
243 yield topic, msg, stamp
246 """Closes the bag file."""
250 BagReader =
rosbag.Bag if os.getenv(
"ROS_VERSION") ==
"1" else Ros2BagReader