3 ROS1 bag reader plugin using the `embag` library.
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
15 from __future__
import absolute_import
19 except ImportError: embag =
None
22 from .. common
import PATH_TYPES, ConsolePrinter
27 """embag reader interface, providing most of rosbag.Bag interface."""
36 ROSBAG_MAGIC = b
"#ROSBAG"
39 if not isinstance(filename, PATH_TYPES):
40 raise ValueError(
"invalid filename %r" % type(filename))
41 if mode
not in self.
MODES:
raise ValueError(
"invalid mode %r" % mode)
49 self.
_view = embag.View(filename)
57 Returns the number of messages in the bag.
59 @param topic_filters list of topics or a single topic to filter by, if any
62 topics = topic_filters
63 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
64 return sum(c
for (t, _, _), c
in self.
_topics.items()
if t
in topics)
65 return sum(self.
_topics.values())
69 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
75 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
82 Returns rospy message class for typename, or None if unknown message type for bag.
84 Generates class dynamically if not already generated.
86 @param typehash message type definition hash, if any
88 typekey = (typename, typehash
or next((h
for n, h
in self.
_types if n == typename),
None))
90 for n, c
in api.realapi.generate_message_classes(typename, self.
_typedefs[typekey]).items():
91 self.
_types[(n, c._md5sum)] = c
92 return self.
_types.get(typekey)
97 Returns ROS1 message type definition full text from bag, including subtype definitions.
99 Returns None if unknown message type for bag.
101 if api.is_ros_message(msg_or_type):
102 return self.
_typedefs.get((msg_or_type._type, msg_or_type._md5sum))
103 typename = msg_or_type
104 return next((d
for (n, h), d
in self.
_typedefs.items()
if n == typename),
None)
108 """Returns ROS1 message type MD5 hash, or None if unknown message type for bag."""
109 if api.is_ros_message(msg_or_type):
return msg_or_type._md5sum
110 typename = msg_or_type
111 return next((h
for n, h
in self.
_typedefs if n == typename),
None)
115 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
121 Returns thorough metainfo on topic and message types.
123 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
125 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
126 msg_types as dict of {typename: typehash},
127 topics as a dict of {topic: TopicTuple() namedtuple}.
129 topics = topic_filters
130 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
131 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_topics)):
133 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
135 msgtypes = {n: h
for t, n, h
in self.
_topics}
139 """Returns median value from given sorted numbers."""
141 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
142 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
144 conns = self.
_view.connectionsByTopic()
145 for (t, n, _), c
in sorted(self.
_topics.items()):
146 if topics
and t
not in topics:
continue
149 stamps = sorted(m.timestamp.secs + m.timestamp.nsecs / 1E9
150 for m
in self.
_view.getMessages([t]))
151 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
152 freq = 1.0 / mymedian
if mymedian
else None
153 topicdict[t] = self.
TopicTuple(n, c, len(conns.get(t, [])), freq)
154 if not topics
or set(topics) == set(t
for t, _, _
in self.
_topics):
159 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
161 Yields messages from the bag, optionally filtered by topic and timestamp.
163 @param topics list of topics or a single topic to filter by, if at all
164 @param start_time earliest timestamp of message to return, as ROS time or convertible
165 (int/float/duration/datetime/decimal)
166 @param end_time latest timestamp of message to return, as ROS time or convertible
167 (int/float/duration/datetime/decimal)
168 @param raw if true, then returned messages are tuples of
169 (typename, bytes, typehash, typeclass)
170 @return BagMessage namedtuples of (topic, message, timestamp as rospy.Time)
172 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
174 topics = topics
if isinstance(topics, list)
else [topics]
if topics
else []
175 start_time, end_time = (api.to_sec(api.to_time(x))
for x
in (start_time, end_time))
176 for m
in self.
_view.getMessages(topics)
if topics
else self.
_view.getMessages():
177 if start_time
is not None and start_time > m.timestamp.to_sec():
179 if end_time
is not None and end_time < m.timestamp.to_sec():
182 typename = self.
_hashdefs[(m.topic, m.md5)]
183 stamp = api.make_time(m.timestamp.secs, m.timestamp.nsecs)
184 if raw: msg = (typename, m.data(), m.md5, self.
get_message_class(typename, m.md5))
186 api.TypeMeta.make(msg, m.topic, self)
192 """Opens the bag file if not already open."""
197 """Closes the bag file."""
206 """Returns whether file is closed."""
207 return not self.
_view
212 """Returns the list of topics in bag, in alphabetic order."""
213 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
218 """Returns bag file path."""
224 """Returns current file size."""
230 """Returns file open mode."""
235 """Returns whether bag contains given topic."""
236 return any(key == t
for t, _, _
in self.
_topics)
240 """Retrieves next message from bag as (topic, message, timestamp)."""
241 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
247 """Populates bag metainfo."""
248 connections = self.
_view.connectionsByTopic()
250 for conn
in connections.get(topic, ()):
251 topickey, typekey = (topic, conn.type, conn.md5sum), (conn.type, conn.md5sum)
252 self.
_topics.setdefault(topickey, 0)
253 self.
_topics[topickey] += conn.message_count
254 self.
_hashdefs[(topic, conn.md5sum)] = conn.type
255 self.
_typedefs[typekey] = conn.message_definition
256 subtypedefs = api.parse_definition_subtypes(conn.message_definition)
257 for n, d
in subtypedefs.items():
258 h = api.calculate_definition_hash(n, d, tuple(subtypedefs.items()))
263 """Returns the ROS1 message populated from a corresponding embag.RosValue."""
264 for name, typename
in api.get_message_fields(msg).items():
265 v, scalarname = embagval.get(name), api.scalar(typename)
266 if typename
in api.ROS_BUILTIN_TYPES:
267 msgv = getattr(embagval, name)
268 elif scalarname
in api.ROS_BUILTIN_TYPES:
270 elif typename
in api.ROS_TIME_TYPES:
271 cls =
next(k
for k, v
in api.ROS_TIME_CLASSES.items()
if v == typename)
272 msgv = cls(v.secs, v.nsecs)
273 elif scalarname
in api.ROS_TIME_TYPES:
274 cls =
next(k
for k, v
in api.ROS_TIME_CLASSES.items()
if v == scalarname)
275 msgv = [cls(x.secs, x.nsecs)
for x
in v]
276 elif typename == scalarname:
281 setattr(msg, name, msgv)
287 """Returns whether file is readable as ROS1 bag."""
288 result = os.path.isfile(filename)
290 with open(filename,
"rb")
as f:
296 """Replaces ROS1 bag reader with EmbagReader. Raises ImportWarning if embag not available."""
298 ConsolePrinter.error(
"embag not available: cannot read bag files.")
299 raise ImportWarning()
300 api.Bag.READER_CLASSES.add(EmbagReader)
303 __all__ = [
"EmbagReader",
"init"]