5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
15 from __future__
import absolute_import
25 try:
import mcap, mcap.reader
26 except ImportError: mcap =
None
27 if "1" == os.getenv(
"ROS_VERSION"):
28 try:
import mcap_ros1
as mcap_ros, mcap_ros1.decoder, mcap_ros1.writer
29 except ImportError: mcap_ros =
None
30 elif "2" == os.getenv(
"ROS_VERSION"):
31 try:
import mcap_ros2
as mcap_ros, mcap_ros2.decoder, mcap_ros2.writer
32 except ImportError: mcap_ros =
None
37 from .. common
import ConsolePrinter
38 from .. outputs
import RolloverSinkMixin, Sink
43 MCAP bag interface, providing most of rosbag.Bag interface.
45 Bag cannot be appended to, and cannot be read and written at the same time
46 (MCAP API limitation).
53 MCAP_MAGIC = b
"\x89MCAP\x30\r\n"
57 Opens file and populates metadata.
59 @param f bag file path, or a stream object
60 @param mode return reader if "r" or writer if "w"
62 if mode
not in self.
MODES:
raise ValueError(
"invalid mode %r" % mode)
85 if common.is_stream(f):
86 if not common.verify_io(f, mode):
87 raise io.UnsupportedOperation(
"read" if "r" == mode
else "write")
91 if not isinstance(f, common.PATH_TYPES):
92 raise ValueError(
"invalid filename %r" % type(f))
93 if "w" == mode: common.makedirs(os.path.dirname(f))
97 (t, c)
for c, t
in api.ROS_TIME_CLASSES.items()
if api.get_message_type(c) == t
105 Returns the number of messages in the bag.
107 @param topic_filters list of topics or a single topic to filter by, if any
110 topics = topic_filters
111 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
112 return sum(c
for (t, _, _), c
in self.
_topics.items()
if t
in topics)
113 return sum(self.
_topics.values())
117 """Returns the start time of the bag, as UNIX timestamp."""
122 """Returns the end time of the bag, as UNIX timestamp."""
128 Returns ROS message class for typename, or None if unknown type.
130 @param typehash message type definition hash, if any
132 typehash = typehash
or next((h
for t, h
in self.
_typedefs if t == typename),
None)
133 typekey = (typename, typehash)
136 name = typename.split(
"/")[-1]
137 fields = api.parse_definition_fields(typename, self.
_typedefs[typekey])
138 cls = type(name, (types.SimpleNamespace, ), {
139 "__name__": name,
"__slots__": list(fields),
140 "__repr__": message_repr,
"__str__": message_repr
144 typeclses = api.realapi.generate_message_classes(typename, self.
_typedefs[typekey])
145 self.
_types[typekey] = typeclses[typename]
147 return self.
_types.get(typekey)
151 """Returns ROS message type definition full text from bag, including subtype definitions."""
152 if api.is_ros_message(msg_or_type):
153 return self.
_typedefs.get((api.get_message_type(msg_or_type),
154 api.get_message_type_hash(msg_or_type)))
155 typename = msg_or_type
156 return next((d
for (n, h), d
in self.
_typedefs.items()
if n == typename),
None)
160 """Returns ROS message type MD5 hash."""
161 typename = msg_or_type
162 if api.is_ros_message(msg_or_type): typename = api.get_message_type(msg_or_type)
163 return next((h
for n, h
in self.
_typedefs if n == typename),
None)
167 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
168 return self.
_qoses.get((topic, typename))
172 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
178 Returns thorough metainfo on topic and message types.
180 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
182 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
183 msg_types as dict of {typename: typehash},
184 topics as a dict of {topic: TopicTuple() namedtuple}.
186 topics = topic_filters
187 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
188 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_topics)):
190 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
192 topics = topic_filters
193 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
194 msgtypes = {n: h
for t, n, h
in self.
_topics}
198 """Returns median value from given sorted numbers."""
200 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
201 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
203 for (t, n, _), c
in sorted(self.
_topics.items()):
204 if topics
and t
not in topics:
continue
207 stamps = sorted(m.publish_time / 1E9
for _, _, m
in self.
_reader.iter_messages([t]))
208 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
209 freq = 1.0 / mymedian
if mymedian
else None
210 topicdict[t] = self.
TopicTuple(n, c, len(self.
_qoses.get((t, n))
or []), freq)
212 if not topics
or set(topics) == set(t
for t, _, _
in self.
_topics): self.
_ttinfo = result
216 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
218 Yields messages from the bag, optionally filtered by topic and timestamp.
220 @param topics list of topics or a single topic to filter by, if at all
221 @param start_time earliest timestamp of message to return, as ROS time or convertible
222 (int/float/duration/datetime/decimal)
223 @param end_time latest timestamp of message to return, as ROS time or convertible
224 (int/float/duration/datetime/decimal)
225 @param raw if true, then returned messages are tuples of
226 (typename, bytes, typehash, typeclass)
227 @return BagMessage namedtuples of
228 (topic, message, timestamp as rospy/rclpy.Time)
230 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
231 if "w" == self.
_mode:
raise io.UnsupportedOperation(
"read")
233 topics = topics
if isinstance(topics, list)
else [topics]
if topics
else None
234 start_ns, end_ns = (api.to_nsec(api.to_time(x))
for x
in (start_time, end_time))
235 for schema, channel, message
in self.
_reader.iter_messages(topics, start_ns, end_ns):
237 typekey = (typename, typehash) = self.
_schematypes[schema.id]
238 if typekey
not in self.
_types:
240 msg = (typename, message.data, typehash, self.
_types[typekey])
242 api.TypeMeta.make(msg, channel.topic, self)
243 yield self.
BagMessage(channel.topic, msg, api.make_time(nsecs=message.publish_time))
247 def write(self, topic, msg, t=None, raw=False, **__):
249 Writes out message to MCAP file.
251 @param topic name of topic
252 @param msg ROS1 message
253 @param t message timestamp if not using current ROS time,
254 as ROS time type or convertible (int/float/duration/datetime/decimal)
255 @param raw if true, `msg` is in raw format, (typename, bytes, typehash, typeclass)
257 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
258 if "r" == self.
_mode:
raise io.UnsupportedOperation(
"write")
261 typename, binary, typehash = msg[:3]
263 typedef = self.
_typedefs.get((typename, typehash))
or api.get_message_definition(cls)
264 msg = api.deserialize_message(binary, cls)
266 with api.TypeMeta.make(msg, topic)
as meta:
267 typename, typehash, typedef = meta.typename, meta.typehash, meta.definition
268 topickey, typekey = (topic, typename, typehash), (typename, typehash)
270 nanosec = (time.time_ns()
if hasattr(time,
"time_ns")
else int(time.time() * 10**9)) \
271 if t
is None else api.to_nsec(api.to_time(t))
274 fullname = api.make_full_typename(typename)
275 schema = self.
_writer.register_msgdef(fullname, typedef)
277 schema, data = self.
_schemas[typekey], api.message_to_dict(msg)
278 self.
_writer.write_message(topic, schema, data, publish_time=nanosec)
280 self.
_writer.write_message(topic, msg, publish_time=nanosec)
286 self.
_types.setdefault(typekey, type(msg))
287 self.
_typedefs.setdefault(typekey, typedef)
292 """Opens the bag file if not already open."""
295 raise io.UnsupportedOperation(
"Cannot reopen bag for writing.")
298 self.
_reader = mcap.reader.make_reader(self.
_file)
if "r" == self.
_mode else None
299 self.
_decoder = mcap_ros.decoder.Decoder()
if "r" == self.
_mode else None
306 """Closes the bag file."""
307 if self.
_file is not None:
315 """Returns whether file is closed."""
316 return self.
_file is None
321 """Returns the list of topics in bag, in alphabetic order."""
322 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
327 """Returns bag file path."""
333 """Returns current file size."""
335 pos, _ = self.
_file.tell(), self.
_file.seek(0, os.SEEK_END)
336 size, _ = self.
_file.tell(), self.
_file.seek(pos)
343 """Returns file open mode."""
348 """Returns whether bag contains given topic."""
349 return any(key == t
for t, _, _
in self.
_topics)
353 """Retrieves next message from bag as (topic, message, timestamp)."""
354 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
361 Returns ROS message deserialized from binary data.
363 @param message mcap.records.Message instance
364 @param channel mcap.records.Channel instance for message
365 @param schema mcap.records.Schema instance for message type
368 if api.ROS2
and not issubclass(cls, types.SimpleNamespace):
369 msg = api.deserialize_message(message.data, cls)
371 msg = self.
_decoder.decode(schema=schema, message=message)
375 api.TypeMeta.make(msg, channel.topic, self, data=message.data)
377 if typekey
not in self.
_types: self.
_types[typekey] = type(msg)
383 Returns message type class, generating if not already available.
385 @param schema mcap.records.Schema instance for message type
386 @param message mcap.records.Message instance
387 @param generate generate message class dynamically if not available
389 typekey = (typename, typehash) = self.
_schematypes[schema.id]
390 if api.ROS2
and typekey
not in self.
_types:
392 cls = api.get_message_class(typename)
393 clshash = api.get_message_type_hash(cls)
394 if typehash == clshash: self.
_types[typekey] = cls
395 except Exception:
pass
396 if typekey
not in self.
_types and generate:
398 msg = self.
_decoder.decode(schema=schema, message=message)
401 typeclses = api.realapi.generate_message_classes(typename, schema.data.decode())
402 self.
_types[typekey] = typeclses[typename]
403 return self.
_types.get(typekey)
408 Patches MCAP ROS2 message class with expected attributes and methods, recursively.
410 @param cls ROS message class as returned from mcap_ros2.decoder
411 @param typename ROS message type name, like "std_msgs/Bool"
412 @param typehash ROS message type hash
413 @return patched class
415 typekey = (typename, typehash)
417 fields = api.parse_definition_fields(typename, self.
_typedefs[typekey])
420 for s
in [api.scalar(t)]
421 if s
not in api.ROS_BUILTIN_TYPES
438 Patches MCAP ROS2 message with expected attributes and methods, recursively.
440 @param message ROS message instance as returned from mcap_ros2.decoder
441 @param typename ROS message type name, like "std_msgs/Bool"
442 @param typehash ROS message type hash
443 @return original message patched, or new instance if ROS2 temporal type
447 stack = [(message, (typename, typehash),
None, ())]
449 msg, typekey, parent, path = stack.pop(0)
450 mycls, typename = type(msg), typekey[0]
454 msg2 = self.
_temporal_ctors[typename](sec=msg.sec, nanosec=msg.nanosec)
455 if msg
is message: result = msg2
456 elif len(path) == 1: setattr(parent, path[0], msg2)
457 else: getattr(parent, path[0])[path[1]] = msg2
463 v = getattr(msg, name)
464 if isinstance(v, list):
465 stack.extend((x, subtypekey, msg, (name, i))
for i, x
in enumerate(v))
467 stack.append((v, subtypekey, msg, (name, )))
473 """Populates bag metainfo."""
474 summary = self.
_reader.get_summary()
475 self.
_start_time = summary.statistics.message_start_time / 1E9
476 self.
_end_time = summary.statistics.message_end_time / 1E9
479 for cid, channel
in summary.channels.items():
480 schema = summary.schemas[channel.schema_id]
481 topic, typename = channel.topic, api.canonical(schema.name)
483 typedef = schema.data.decode(
"utf-8")
484 subtypedefs, nesting = api.parse_definition_subtypes(typedef, nesting=
True)
485 typehash = channel.metadata.get(
"md5sum")
or \
486 api.calculate_definition_hash(typename, typedef,
487 tuple(subtypedefs.items()))
488 topickey, typekey = (topic, typename, typehash), (typename, typehash)
491 if channel.metadata.get(
"offered_qos_profiles"):
492 try: qoses = yaml.safe_load(channel.metadata[
"offered_qos_profiles"])
493 except Exception
as e:
494 ConsolePrinter.warn(
"Error parsing topic QoS profiles from %r: %s.",
495 channel.metadata[
"offered_qos_profiles"], e)
497 self.
_topics.setdefault(topickey, 0)
498 self.
_topics[topickey] += summary.statistics.channel_message_counts[cid]
500 defhashes[typedef] = typehash
501 for t, d
in subtypedefs.items():
502 if d
in defhashes: h = defhashes[d]
503 else: h = api.calculate_definition_hash(t, d, tuple(subtypedefs.items()))
507 for t, subtypes
in nesting.items():
513 if qoses: self.
_qoses[topickey] = qoses
520 """Returns whether file is readable as MCAP format."""
521 if common.is_stream(f):
522 pos, _ = f.tell(), f.seek(0)
524 elif os.path.isfile(f)
and os.path.getsize(f):
525 with open(f,
"rb")
as f:
528 ext = os.path.splitext(f
or "")[-1].lower()
529 result = ext
in McapSink.FILE_EXTENSIONS
535 """Returns a shallow copy, with slots populated manually."""
536 result = self.__class__.__new__(self.__class__)
537 for n
in self.__slots__:
538 setattr(result, n, copy.copy(getattr(self, n)))
543 """Returns a deep copy, with slots populated manually."""
544 result = self.__class__.__new__(self.__class__)
545 for n
in self.__slots__:
546 setattr(result, n, copy.deepcopy(getattr(self, n), memo))
551 """Returns a map of message field names and types (patch method for MCAP message classes)."""
552 return self._fields_and_field_types.copy()
556 """Returns a string representation of ROS message."""
557 fields =
", ".join(
"%s=%r" % (n, getattr(self, n))
for n
in self.__slots__)
558 return "%s(%s)" % (self.__name__, fields)
563 """Writes messages to MCAP file."""
566 FILE_EXTENSIONS = (
".mcap", )
569 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
574 @param args arguments as namespace or dictionary, case-insensitive;
575 or a single path as the file to write
576 @param args.write base name of MCAP files to write
577 @param args.write_options {"overwrite": whether to overwrite existing file
579 "rollover-size": bytes limit for individual output files,
580 "rollover-count": message limit for individual output files,
581 "rollover-duration": time span limit for individual output files,
582 as ROS duration or convertible seconds,
583 "rollover-template": output filename template, supporting
584 strftime format codes like "%H-%M-%S"
585 and "%(index)s" as output file index}
586 @param args.meta whether to print metainfo
587 @param args.verbose whether to print debug information
588 @param kwargs any and all arguments as keyword overrides, case-insensitive
590 args = {
"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
591 args = common.ensure_namespace(args, McapSink.DEFAULT_ARGS, **kwargs)
592 super(McapSink, self).
__init__(args)
593 RolloverSinkMixin.__init__(self, args)
598 self.
_overwrite = (args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
601 atexit.register(self.
close)
606 Returns whether required libraries are available (mcap, mcap_ros1/mcap_ros2)
607 and overwrite is valid and file is writable.
610 ok, mcap_ok, mcap_ros_ok = RolloverSinkMixin.validate(self), bool(mcap), bool(mcap_ros)
611 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
612 ConsolePrinter.error(
"Invalid overwrite option for MCAP: %r. "
613 "Choose one of {true, false}.",
614 self.
args.WRITE_OPTIONS[
"overwrite"])
617 ConsolePrinter.error(
"mcap not available: cannot work with MCAP files.")
619 ConsolePrinter.error(
"mcap_ros%s not available: cannot work with MCAP files.",
620 api.ROS_VERSION
or "")
621 if not common.verify_io(self.
args.WRITE,
"w"):
623 self.
valid = ok
and mcap_ok
and mcap_ros_ok
627 def emit(self, topic, msg, stamp=None, match=None, index=None):
628 """Writes out message to MCAP file."""
629 if not self.
validate():
raise Exception(
"invalid")
631 RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
633 kwargs = dict(publish_time=api.to_nsec(stamp), sequence=index)
635 with api.TypeMeta.make(msg, topic)
as m:
638 fullname = api.make_full_typename(m.typename)
639 self.
_schemas[typekey] = self.
_writer.register_msgdef(fullname, m.definition)
640 schema, data = self.
_schemas[typekey], api.message_to_dict(msg)
641 self.
_writer.write_message(topic, schema, data, **kwargs)
643 self.
_writer.write_message(topic, msg, **kwargs)
644 super(McapSink, self).
emit(topic, msg, stamp, match, index)
648 """Closes output file if open, emits metainfo."""
654 super(McapSink, self).
close()
658 """Closes output file, if any."""
666 """Opens output file if not already open."""
667 if self.
_file:
return
670 common.makedirs(os.path.dirname(self.
filename))
671 if self.
args.VERBOSE:
673 action =
"Overwriting" if sz
and self.
_overwrite else "Creating"
674 ConsolePrinter.debug(
"%s MCAP output %s.", action, self.
filename)
681 """Adds MCAP support to reading and writing. Raises ImportWarning if libraries not available."""
682 if not mcap
or not mcap_ros:
683 ConsolePrinter.error(
"mcap libraries not available: cannot work with MCAP files.")
684 raise ImportWarning()
685 from ..
import plugins
686 plugins.add_write_format(
"mcap", McapSink,
"MCAP", [
687 (
"overwrite=true|false",
"overwrite existing file in MCAP output\n"
688 "instead of appending unique counter (default false)"),
689 ] + RolloverSinkMixin.get_write_options(
"MCAP"))
690 api.BAG_EXTENSIONS += McapSink.FILE_EXTENSIONS
691 api.Bag.READER_CLASSES.add(McapBag)
692 api.Bag.WRITER_CLASSES.add(McapBag)
695 __all__ = [
"McapBag",
"McapSink",
"init"]