mcap.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 MCAP input and output.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 14.10.2022
11 @modified 28.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 from __future__ import absolute_import
16 import atexit
17 import copy
18 import io
19 import os
20 import time
21 import types
22 
23 from .. import api
24 
25 try: import mcap, mcap.reader
26 except ImportError: mcap = None
27 if "1" == os.getenv("ROS_VERSION"):
28  try: import mcap_ros1 as mcap_ros, mcap_ros1.decoder, mcap_ros1.writer
29  except ImportError: mcap_ros = None
30 elif "2" == os.getenv("ROS_VERSION"):
31  try: import mcap_ros2 as mcap_ros, mcap_ros2.decoder, mcap_ros2.writer
32  except ImportError: mcap_ros = None
33 else: mcap_ros = None
34 import yaml
35 
36 from .. import common
37 from .. common import ConsolePrinter
38 from .. outputs import RolloverSinkMixin, Sink
39 
40 
42  """
43  MCAP bag interface, providing most of rosbag.Bag interface.
44 
45  Bag cannot be appended to, and cannot be read and written at the same time
46  (MCAP API limitation).
47  """
48 
49 
50  MODES = ("r", "w")
51 
52 
53  MCAP_MAGIC = b"\x89MCAP\x30\r\n"
54 
55  def __init__(self, f, mode="r", **__):
56  """
57  Opens file and populates metadata.
58 
59  @param f bag file path, or a stream object
60  @param mode return reader if "r" or writer if "w"
61  """
62  if mode not in self.MODES: raise ValueError("invalid mode %r" % mode)
63  self._mode = mode
64  self._topics = {} # {(topic, typename, typehash): message count}
65  self._types = {} # {(typename, typehash): message type class}
66  self._typedefs = {} # {(typename, typehash): type definition text}
67  self._schemas = {} # {(typename, typehash): mcap.records.Schema}
68  self._schematypes = {} # {mcap.records.Schema.id: (typename, typehash)}
69  self._qoses = {} # {(topic, typename): [{qos profile dict}]}
70  self._typefields = {} # {(typename, typehash): {field name: type name}}
71  self._type_subtypes = {} # {(typename, typehash): {typename: typehash}}
72  self._field_subtypes = {} # {(typename, typehash): {field name: (typename, typehash)}}
73  self._temporal_ctors = {} # {typename: time/duration constructor}
74  self._start_time = None # Bag start time, as UNIX timestamp
75  self._end_time = None # Bag end time, as UNIX timestamp
76  self._file = None # File handle
77  self._reader = None # mcap.McapReader
78  self._decoder = None # mcap_ros.Decoder
79  self._writer = None # mcap_ros.Writer
80  self._iterer = None # Generator from read_messages() for next()
81  self._ttinfo = None # Cached result for get_type_and_topic_info()
82  self._opened = False # Whether file has been opened at least once
83  self._filename = None # File path, or None if stream
84 
85  if common.is_stream(f):
86  if not common.verify_io(f, mode):
87  raise io.UnsupportedOperation("read" if "r" == mode else "write")
88  self._file, self._filename = f, None
89  f.seek(0)
90  else:
91  if not isinstance(f, common.PATH_TYPES):
92  raise ValueError("invalid filename %r" % type(f))
93  if "w" == mode: common.makedirs(os.path.dirname(f))
94  self._filename = str(f)
95 
96  if api.ROS2 and "r" == mode: self._temporal_ctors.update(
97  (t, c) for c, t in api.ROS_TIME_CLASSES.items() if api.get_message_type(c) == t
98  )
99 
100  self.open()
101 
102 
103  def get_message_count(self, topic_filters=None):
104  """
105  Returns the number of messages in the bag.
106 
107  @param topic_filters list of topics or a single topic to filter by, if any
108  """
109  if topic_filters:
110  topics = topic_filters
111  topics = topics if isinstance(topics, (dict, list, set, tuple)) else [topics]
112  return sum(c for (t, _, _), c in self._topics.items() if t in topics)
113  return sum(self._topics.values())
114 
115 
116  def get_start_time(self):
117  """Returns the start time of the bag, as UNIX timestamp."""
118  return self._start_time
119 
120 
121  def get_end_time(self):
122  """Returns the end time of the bag, as UNIX timestamp."""
123  return self._end_time
124 
125 
126  def get_message_class(self, typename, typehash=None):
127  """
128  Returns ROS message class for typename, or None if unknown type.
129 
130  @param typehash message type definition hash, if any
131  """
132  typehash = typehash or next((h for t, h in self._typedefs if t == typename), None)
133  typekey = (typename, typehash)
134  if typekey not in self._types and typekey in self._typedefs:
135  if api.ROS2:
136  name = typename.split("/")[-1]
137  fields = api.parse_definition_fields(typename, self._typedefs[typekey])
138  cls = type(name, (types.SimpleNamespace, ), {
139  "__name__": name, "__slots__": list(fields),
140  "__repr__": message_repr, "__str__": message_repr
141  })
142  self._types[typekey] = self._patch_message_class(cls, typename, typehash)
143  else:
144  typeclses = api.realapi.generate_message_classes(typename, self._typedefs[typekey])
145  self._types[typekey] = typeclses[typename]
146 
147  return self._types.get(typekey)
148 
149 
150  def get_message_definition(self, msg_or_type):
151  """Returns ROS message type definition full text from bag, including subtype definitions."""
152  if api.is_ros_message(msg_or_type):
153  return self._typedefs.get((api.get_message_type(msg_or_type),
154  api.get_message_type_hash(msg_or_type)))
155  typename = msg_or_type
156  return next((d for (n, h), d in self._typedefs.items() if n == typename), None)
157 
158 
159  def get_message_type_hash(self, msg_or_type):
160  """Returns ROS message type MD5 hash."""
161  typename = msg_or_type
162  if api.is_ros_message(msg_or_type): typename = api.get_message_type(msg_or_type)
163  return next((h for n, h in self._typedefs if n == typename), None)
164 
165 
166  def get_qoses(self, topic, typename):
167  """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
168  return self._qoses.get((topic, typename))
169 
170 
171  def get_topic_info(self, *_, **__):
172  """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
173  return dict(self._topics)
174 
175 
176  def get_type_and_topic_info(self, topic_filters=None):
177  """
178  Returns thorough metainfo on topic and message types.
179 
180  @param topic_filters list of topics or a single topic to filter returned topics-dict by,
181  if any
182  @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
183  msg_types as dict of {typename: typehash},
184  topics as a dict of {topic: TopicTuple() namedtuple}.
185  """
186  topics = topic_filters
187  topics = topics if isinstance(topics, (list, set, tuple)) else [topics] if topics else []
188  if self._ttinfo and (not topics or set(topics) == set(t for t, _, _ in self._topics)):
189  return self._ttinfo
190  if self.closed: raise ValueError("I/O operation on closed file.")
191 
192  topics = topic_filters
193  topics = topics if isinstance(topics, (list, set, tuple)) else [topics] if topics else []
194  msgtypes = {n: h for t, n, h in self._topics}
195  topicdict = {}
196 
197  def median(vals):
198  """Returns median value from given sorted numbers."""
199  vlen = len(vals)
200  return None if not vlen else vals[vlen // 2] if vlen % 2 else \
201  float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
202 
203  for (t, n, _), c in sorted(self._topics.items()):
204  if topics and t not in topics: continue # for
205  mymedian = None
206  if c > 1 and self._reader:
207  stamps = sorted(m.publish_time / 1E9 for _, _, m in self._reader.iter_messages([t]))
208  mymedian = median(sorted(s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])))
209  freq = 1.0 / mymedian if mymedian else None
210  topicdict[t] = self.TopicTuple(n, c, len(self._qoses.get((t, n)) or []), freq)
211  result = self.TypesAndTopicsTuple(msgtypes, topicdict)
212  if not topics or set(topics) == set(t for t, _, _ in self._topics): self._ttinfo = result
213  return result
214 
215 
216  def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
217  """
218  Yields messages from the bag, optionally filtered by topic and timestamp.
219 
220  @param topics list of topics or a single topic to filter by, if at all
221  @param start_time earliest timestamp of message to return, as ROS time or convertible
222  (int/float/duration/datetime/decimal)
223  @param end_time latest timestamp of message to return, as ROS time or convertible
224  (int/float/duration/datetime/decimal)
225  @param raw if true, then returned messages are tuples of
226  (typename, bytes, typehash, typeclass)
227  @return BagMessage namedtuples of
228  (topic, message, timestamp as rospy/rclpy.Time)
229  """
230  if self.closed: raise ValueError("I/O operation on closed file.")
231  if "w" == self._mode: raise io.UnsupportedOperation("read")
232 
233  topics = topics if isinstance(topics, list) else [topics] if topics else None
234  start_ns, end_ns = (api.to_nsec(api.to_time(x)) for x in (start_time, end_time))
235  for schema, channel, message in self._reader.iter_messages(topics, start_ns, end_ns):
236  if raw:
237  typekey = (typename, typehash) = self._schematypes[schema.id]
238  if typekey not in self._types:
239  self._types[typekey] = self._make_message_class(schema, message)
240  msg = (typename, message.data, typehash, self._types[typekey])
241  else: msg = self._decode_message(message, channel, schema)
242  api.TypeMeta.make(msg, channel.topic, self)
243  yield self.BagMessage(channel.topic, msg, api.make_time(nsecs=message.publish_time))
244  if self.closed: break # for schema
245 
246 
247  def write(self, topic, msg, t=None, raw=False, **__):
248  """
249  Writes out message to MCAP file.
250 
251  @param topic name of topic
252  @param msg ROS1 message
253  @param t message timestamp if not using current ROS time,
254  as ROS time type or convertible (int/float/duration/datetime/decimal)
255  @param raw if true, `msg` is in raw format, (typename, bytes, typehash, typeclass)
256  """
257  if self.closed: raise ValueError("I/O operation on closed file.")
258  if "r" == self._mode: raise io.UnsupportedOperation("write")
259 
260  if raw:
261  typename, binary, typehash = msg[:3]
262  cls = msg[-1]
263  typedef = self._typedefs.get((typename, typehash)) or api.get_message_definition(cls)
264  msg = api.deserialize_message(binary, cls)
265  else:
266  with api.TypeMeta.make(msg, topic) as meta:
267  typename, typehash, typedef = meta.typename, meta.typehash, meta.definition
268  topickey, typekey = (topic, typename, typehash), (typename, typehash)
269 
270  nanosec = (time.time_ns() if hasattr(time, "time_ns") else int(time.time() * 10**9)) \
271  if t is None else api.to_nsec(api.to_time(t))
272  if api.ROS2:
273  if typekey not in self._schemas:
274  fullname = api.make_full_typename(typename)
275  schema = self._writer.register_msgdef(fullname, typedef)
276  self._schemas[typekey] = schema
277  schema, data = self._schemas[typekey], api.message_to_dict(msg)
278  self._writer.write_message(topic, schema, data, publish_time=nanosec)
279  else:
280  self._writer.write_message(topic, msg, publish_time=nanosec)
281 
282  sec = nanosec / 1E9
283  self._start_time = sec if self._start_time is None else min(sec, self._start_time)
284  self._end_time = sec if self._end_time is None else min(sec, self._end_time)
285  self._topics[topickey] = self._topics.get(topickey, 0) + 1
286  self._types.setdefault(typekey, type(msg))
287  self._typedefs.setdefault(typekey, typedef)
288  self._ttinfo = None
289 
290 
291  def open(self):
292  """Opens the bag file if not already open."""
293  if self._reader is not None or self._writer is not None: return
294  if self._opened and "w" == self._mode:
295  raise io.UnsupportedOperation("Cannot reopen bag for writing.")
296 
297  if self._file is None: self._file = open(self._filename, "%sb" % self._mode)
298  self._reader = mcap.reader.make_reader(self._file) if "r" == self._mode else None
299  self._decoder = mcap_ros.decoder.Decoder() if "r" == self._mode else None
300  self._writer = mcap_ros.writer.Writer(self._file) if "w" == self._mode else None
301  if "r" == self._mode: self._populate_meta()
302  self._opened = True
303 
304 
305  def close(self):
306  """Closes the bag file."""
307  if self._file is not None:
308  if self._writer: self._writer.finish()
309  self._file.close()
310  self._file, self._reader, self._writer, self._iterer = None, None, None, None
311 
312 
313  @property
314  def closed(self):
315  """Returns whether file is closed."""
316  return self._file is None
317 
318 
319  @property
320  def topics(self):
321  """Returns the list of topics in bag, in alphabetic order."""
322  return sorted((t for t, _, _ in self._topics), key=str.lower)
323 
324 
325  @property
326  def filename(self):
327  """Returns bag file path."""
328  return self._filename
329 
330 
331  @property
332  def size(self):
333  """Returns current file size."""
334  if self._filename is None and self._file is not None:
335  pos, _ = self._file.tell(), self._file.seek(0, os.SEEK_END)
336  size, _ = self._file.tell(), self._file.seek(pos)
337  return size
338  return os.path.getsize(self._filename) if os.path.isfile(self._filename) else None
339 
340 
341  @property
342  def mode(self):
343  """Returns file open mode."""
344  return self._mode
345 
346 
347  def __contains__(self, key):
348  """Returns whether bag contains given topic."""
349  return any(key == t for t, _, _ in self._topics)
350 
351 
352  def __next__(self):
353  """Retrieves next message from bag as (topic, message, timestamp)."""
354  if self.closed: raise ValueError("I/O operation on closed file.")
355  if self._iterer is None: self._iterer = self.read_messages()
356  return next(self._iterer)
357 
358 
359  def _decode_message(self, message, channel, schema):
360  """
361  Returns ROS message deserialized from binary data.
362 
363  @param message mcap.records.Message instance
364  @param channel mcap.records.Channel instance for message
365  @param schema mcap.records.Schema instance for message type
366  """
367  cls = self._make_message_class(schema, message, generate=False)
368  if api.ROS2 and not issubclass(cls, types.SimpleNamespace):
369  msg = api.deserialize_message(message.data, cls)
370  else:
371  msg = self._decoder.decode(schema=schema, message=message)
372  if api.ROS2: # MCAP ROS2 message classes need monkey-patching with expected API
373  msg = self._patch_message(msg, *self._schematypes[schema.id])
374  # Register serialized binary, as MCAP does not support serializing its own creations
375  api.TypeMeta.make(msg, channel.topic, self, data=message.data)
376  typekey = self._schematypes[schema.id]
377  if typekey not in self._types: self._types[typekey] = type(msg)
378  return msg
379 
380 
381  def _make_message_class(self, schema, message, generate=True):
382  """
383  Returns message type class, generating if not already available.
384 
385  @param schema mcap.records.Schema instance for message type
386  @param message mcap.records.Message instance
387  @param generate generate message class dynamically if not available
388  """
389  typekey = (typename, typehash) = self._schematypes[schema.id]
390  if api.ROS2 and typekey not in self._types:
391  try: # Try loading class from disk for full compatibility
392  cls = api.get_message_class(typename)
393  clshash = api.get_message_type_hash(cls)
394  if typehash == clshash: self._types[typekey] = cls
395  except Exception: pass # ModuleNotFoundError, AttributeError etc
396  if typekey not in self._types and generate:
397  if api.ROS2: # MCAP ROS2 message classes need monkey-patching with expected API
398  msg = self._decoder.decode(schema=schema, message=message)
399  self._types[typekey] = self._patch_message_class(type(msg), typename, typehash)
400  else:
401  typeclses = api.realapi.generate_message_classes(typename, schema.data.decode())
402  self._types[typekey] = typeclses[typename]
403  return self._types.get(typekey)
404 
405 
406  def _patch_message_class(self, cls, typename, typehash):
407  """
408  Patches MCAP ROS2 message class with expected attributes and methods, recursively.
409 
410  @param cls ROS message class as returned from mcap_ros2.decoder
411  @param typename ROS message type name, like "std_msgs/Bool"
412  @param typehash ROS message type hash
413  @return patched class
414  """
415  typekey = (typename, typehash)
416  if typekey not in self._typefields:
417  fields = api.parse_definition_fields(typename, self._typedefs[typekey])
418  self._typefields[typekey] = fields
419  self._field_subtypes[typekey] = {n: (s, h) for n, t in fields.items()
420  for s in [api.scalar(t)]
421  if s not in api.ROS_BUILTIN_TYPES
422  for h in [self._type_subtypes[typekey][s]]}
423 
424  # mcap_ros2 creates a dynamic class for each message, having __slots__
425  # but no other ROS2 API hooks; even the class module is "mcap_ros2.dynamic".
426  cls.__module__ = typename.split("/", maxsplit=1)[0]
427  cls.SLOT_TYPES = () # rosidl_runtime_py.utilities.is_message() checks for presence
428  cls._fields_and_field_types = dict(self._typefields[typekey])
429  cls.get_fields_and_field_types = message_get_fields_and_field_types
430  cls.__copy__ = copy_with_slots # MCAP message classes lack support for copy
431  cls.__deepcopy__ = deepcopy_with_slots
432 
433  return cls
434 
435 
436  def _patch_message(self, message, typename, typehash):
437  """
438  Patches MCAP ROS2 message with expected attributes and methods, recursively.
439 
440  @param message ROS message instance as returned from mcap_ros2.decoder
441  @param typename ROS message type name, like "std_msgs/Bool"
442  @param typehash ROS message type hash
443  @return original message patched, or new instance if ROS2 temporal type
444  """
445  result = message
446  # [(field value, (field type name, field type hash), parent, (field name, ?array index))]
447  stack = [(message, (typename, typehash), None, ())]
448  while stack:
449  msg, typekey, parent, path = stack.pop(0)
450  mycls, typename = type(msg), typekey[0]
451 
452  if typename in self._temporal_ctors:
453  # Convert temporal types to ROS2 temporal types for grepros to recognize
454  msg2 = self._temporal_ctors[typename](sec=msg.sec, nanosec=msg.nanosec)
455  if msg is message: result = msg2 # Replace input message
456  elif len(path) == 1: setattr(parent, path[0], msg2) # Set scalar field
457  else: getattr(parent, path[0])[path[1]] = msg2 # Set array field element
458  continue # while stack
459 
460  self._patch_message_class(mycls, *typekey)
461 
462  for name, subtypekey in self._field_subtypes[typekey].items():
463  v = getattr(msg, name)
464  if isinstance(v, list): # Queue each array element for patching
465  stack.extend((x, subtypekey, msg, (name, i)) for i, x in enumerate(v))
466  else: # Queue scalar field for patching
467  stack.append((v, subtypekey, msg, (name, )))
468 
469  return result
470 
471 
472  def _populate_meta(self):
473  """Populates bag metainfo."""
474  summary = self._reader.get_summary()
475  self._start_time = summary.statistics.message_start_time / 1E9
476  self._end_time = summary.statistics.message_end_time / 1E9
477 
478  defhashes = {} # Cached {type definition full text: type hash}
479  for cid, channel in summary.channels.items():
480  schema = summary.schemas[channel.schema_id]
481  topic, typename = channel.topic, api.canonical(schema.name)
482 
483  typedef = schema.data.decode("utf-8") # Full definition including subtype definitions
484  subtypedefs, nesting = api.parse_definition_subtypes(typedef, nesting=True)
485  typehash = channel.metadata.get("md5sum") or \
486  api.calculate_definition_hash(typename, typedef,
487  tuple(subtypedefs.items()))
488  topickey, typekey = (topic, typename, typehash), (typename, typehash)
489 
490  qoses = None
491  if channel.metadata.get("offered_qos_profiles"):
492  try: qoses = yaml.safe_load(channel.metadata["offered_qos_profiles"])
493  except Exception as e:
494  ConsolePrinter.warn("Error parsing topic QoS profiles from %r: %s.",
495  channel.metadata["offered_qos_profiles"], e)
496 
497  self._topics.setdefault(topickey, 0)
498  self._topics[topickey] += summary.statistics.channel_message_counts[cid]
499  self._typedefs[typekey] = typedef
500  defhashes[typedef] = typehash
501  for t, d in subtypedefs.items(): # Populate subtype definitions and hashes
502  if d in defhashes: h = defhashes[d]
503  else: h = api.calculate_definition_hash(t, d, tuple(subtypedefs.items()))
504  self._typedefs.setdefault((t, h), d)
505  self._type_subtypes.setdefault(typekey, {})[t] = h
506  defhashes[d] = h
507  for t, subtypes in nesting.items(): # Populate all nested type references
508  h = self._type_subtypes[typekey][t]
509  for t2 in subtypes:
510  h2 = self._type_subtypes[typekey][t2]
511  self._type_subtypes.setdefault((t, h), {})[t2] = h2
512 
513  if qoses: self._qoses[topickey] = qoses
514  self._schemas[typekey] = schema
515  self._schematypes[schema.id] = typekey
516 
517 
518  @classmethod
519  def autodetect(cls, f):
520  """Returns whether file is readable as MCAP format."""
521  if common.is_stream(f):
522  pos, _ = f.tell(), f.seek(0)
523  result, _ = (f.read(len(cls.MCAP_MAGIC)) == cls.MCAP_MAGIC), f.seek(pos)
524  elif os.path.isfile(f) and os.path.getsize(f):
525  with open(f, "rb") as f:
526  result = (f.read(len(cls.MCAP_MAGIC)) == cls.MCAP_MAGIC)
527  else:
528  ext = os.path.splitext(f or "")[-1].lower()
529  result = ext in McapSink.FILE_EXTENSIONS
530  return result
531 
532 
533 
534 def copy_with_slots(self):
535  """Returns a shallow copy, with slots populated manually."""
536  result = self.__class__.__new__(self.__class__)
537  for n in self.__slots__:
538  setattr(result, n, copy.copy(getattr(self, n)))
539  return result
540 
541 
542 def deepcopy_with_slots(self, memo):
543  """Returns a deep copy, with slots populated manually."""
544  result = self.__class__.__new__(self.__class__)
545  for n in self.__slots__:
546  setattr(result, n, copy.deepcopy(getattr(self, n), memo))
547  return result
548 
549 
551  """Returns a map of message field names and types (patch method for MCAP message classes)."""
552  return self._fields_and_field_types.copy()
553 
554 
555 def message_repr(self):
556  """Returns a string representation of ROS message."""
557  fields = ", ".join("%s=%r" % (n, getattr(self, n)) for n in self.__slots__)
558  return "%s(%s)" % (self.__name__, fields)
559 
560 
561 
563  """Writes messages to MCAP file."""
564 
565 
566  FILE_EXTENSIONS = (".mcap", )
567 
568 
569  DEFAULT_ARGS = dict(META=False, WRITE_OPTIONS={}, VERBOSE=False)
570 
571 
572  def __init__(self, args=None, **kwargs):
573  """
574  @param args arguments as namespace or dictionary, case-insensitive;
575  or a single path as the file to write
576  @param args.write base name of MCAP files to write
577  @param args.write_options {"overwrite": whether to overwrite existing file
578  (default false),
579  "rollover-size": bytes limit for individual output files,
580  "rollover-count": message limit for individual output files,
581  "rollover-duration": time span limit for individual output files,
582  as ROS duration or convertible seconds,
583  "rollover-template": output filename template, supporting
584  strftime format codes like "%H-%M-%S"
585  and "%(index)s" as output file index}
586  @param args.meta whether to print metainfo
587  @param args.verbose whether to print debug information
588  @param kwargs any and all arguments as keyword overrides, case-insensitive
589  """
590  args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
591  args = common.ensure_namespace(args, McapSink.DEFAULT_ARGS, **kwargs)
592  super(McapSink, self).__init__(args)
593  RolloverSinkMixin.__init__(self, args)
594 
595  self._file = None # Open file() object
596  self._writer = None # mcap_ros.writer.Writer object
597  self._schemas = {} # {(typename, typehash): mcap.records.Schema}
598  self._overwrite = (args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
599  self._close_printed = False
600 
601  atexit.register(self.close)
602 
603 
604  def validate(self):
605  """
606  Returns whether required libraries are available (mcap, mcap_ros1/mcap_ros2)
607  and overwrite is valid and file is writable.
608  """
609  if self.valid is not None: return self.valid
610  ok, mcap_ok, mcap_ros_ok = RolloverSinkMixin.validate(self), bool(mcap), bool(mcap_ros)
611  if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
612  ConsolePrinter.error("Invalid overwrite option for MCAP: %r. "
613  "Choose one of {true, false}.",
614  self.args.WRITE_OPTIONS["overwrite"])
615  ok = False
616  if not mcap_ok:
617  ConsolePrinter.error("mcap not available: cannot work with MCAP files.")
618  if not mcap_ros_ok:
619  ConsolePrinter.error("mcap_ros%s not available: cannot work with MCAP files.",
620  api.ROS_VERSION or "")
621  if not common.verify_io(self.args.WRITE, "w"):
622  ok = False
623  self.valid = ok and mcap_ok and mcap_ros_ok
624  return self.valid
625 
626 
627  def emit(self, topic, msg, stamp=None, match=None, index=None):
628  """Writes out message to MCAP file."""
629  if not self.validate(): raise Exception("invalid")
630  stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
631  RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
632  self._ensure_open()
633  kwargs = dict(publish_time=api.to_nsec(stamp), sequence=index)
634  if api.ROS2:
635  with api.TypeMeta.make(msg, topic) as m:
636  typekey = m.typekey
637  if typekey not in self._schemas:
638  fullname = api.make_full_typename(m.typename)
639  self._schemas[typekey] = self._writer.register_msgdef(fullname, m.definition)
640  schema, data = self._schemas[typekey], api.message_to_dict(msg)
641  self._writer.write_message(topic, schema, data, **kwargs)
642  else:
643  self._writer.write_message(topic, msg, **kwargs)
644  super(McapSink, self).emit(topic, msg, stamp, match, index)
645 
646 
647  def close(self):
648  """Closes output file if open, emits metainfo."""
649  try: self.close_output()
650  finally:
651  if not self._close_printed and self._counts:
652  self._close_printed = True
653  ConsolePrinter.debug("Wrote MCAP for %s", self.format_output_meta())
654  super(McapSink, self).close()
655 
656 
657  def close_output(self):
658  """Closes output file, if any."""
659  if self._writer:
660  self._writer.finish()
661  self._file.close()
662  self._file, self._writer = None, None
663 
664 
665  def _ensure_open(self):
666  """Opens output file if not already open."""
667  if self._file: return
668 
669  self.filename = self.filename or RolloverSinkMixin.make_filename(self)
670  common.makedirs(os.path.dirname(self.filename))
671  if self.args.VERBOSE:
672  sz = os.path.exists(self.filename) and os.path.getsize(self.filename)
673  action = "Overwriting" if sz and self._overwrite else "Creating"
674  ConsolePrinter.debug("%s MCAP output %s.", action, self.filename)
675  self._file = open(self.filename, "wb")
676  self._writer = mcap_ros.writer.Writer(self._file)
677  self._close_printed = False
678 
679 
680 def init(*_, **__):
681  """Adds MCAP support to reading and writing. Raises ImportWarning if libraries not available."""
682  if not mcap or not mcap_ros:
683  ConsolePrinter.error("mcap libraries not available: cannot work with MCAP files.")
684  raise ImportWarning()
685  from .. import plugins # Late import to avoid circular
686  plugins.add_write_format("mcap", McapSink, "MCAP", [
687  ("overwrite=true|false", "overwrite existing file in MCAP output\n"
688  "instead of appending unique counter (default false)"),
689  ] + RolloverSinkMixin.get_write_options("MCAP"))
690  api.BAG_EXTENSIONS += McapSink.FILE_EXTENSIONS
691  api.Bag.READER_CLASSES.add(McapBag)
692  api.Bag.WRITER_CLASSES.add(McapBag)
693 
694 
695 __all__ = ["McapBag", "McapSink", "init"]
grepros.plugins.mcap.McapBag.get_qoses
def get_qoses(self, topic, typename)
Definition: mcap.py:166
grepros.plugins.mcap.McapBag._decode_message
def _decode_message(self, message, channel, schema)
Definition: mcap.py:359
grepros.plugins.mcap.McapBag._ttinfo
_ttinfo
Definition: mcap.py:81
grepros.plugins.mcap.McapBag.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: mcap.py:126
grepros.plugins.mcap.McapBag.write
def write(self, topic, msg, t=None, raw=False, **__)
Definition: mcap.py:247
grepros.plugins.mcap.McapBag._schemas
_schemas
Definition: mcap.py:67
grepros.outputs.Sink.args
args
Definition: outputs.py:50
grepros.plugins.mcap.McapBag._opened
_opened
Definition: mcap.py:82
grepros.plugins.mcap.McapBag.close
def close(self)
Definition: mcap.py:305
grepros.plugins.mcap.McapBag.get_type_and_topic_info
def get_type_and_topic_info(self, topic_filters=None)
Definition: mcap.py:176
grepros.plugins.mcap.McapBag._schematypes
_schematypes
Definition: mcap.py:68
grepros.plugins.mcap.McapSink._overwrite
_overwrite
Definition: mcap.py:598
grepros.plugins.mcap.McapSink.emit
def emit(self, topic, msg, stamp=None, match=None, index=None)
Definition: mcap.py:627
grepros.plugins.mcap.deepcopy_with_slots
def deepcopy_with_slots(self, memo)
Definition: mcap.py:542
grepros.plugins.mcap.McapBag.__module__
__module__
Definition: mcap.py:426
grepros.plugins.mcap.McapBag.topics
def topics(self)
Definition: mcap.py:320
grepros.plugins.mcap.McapBag._decoder
_decoder
Definition: mcap.py:78
grepros.plugins.mcap.McapBag.size
def size(self)
Definition: mcap.py:332
grepros.plugins.mcap.McapBag._filename
_filename
Definition: mcap.py:83
grepros.plugins.mcap.McapBag.open
def open(self)
Definition: mcap.py:291
grepros.api.BaseBag
Definition: api.py:85
grepros.outputs.Sink
Definition: outputs.py:32
grepros.plugins.mcap.McapSink._writer
_writer
Definition: mcap.py:596
grepros.plugins.mcap.McapSink.close_output
def close_output(self)
Definition: mcap.py:657
grepros.plugins.mcap.McapBag.get_end_time
def get_end_time(self)
Definition: mcap.py:121
grepros.api.BaseBag.MODES
tuple MODES
Supported opening modes, overridden in subclasses.
Definition: api.py:110
grepros.plugins.mcap.McapBag._mode
_mode
Definition: mcap.py:63
grepros.plugins.mcap.McapSink.validate
def validate(self)
Definition: mcap.py:604
grepros.plugins.mcap.McapBag.__contains__
def __contains__(self, key)
Definition: mcap.py:347
grepros.outputs.Sink.valid
valid
Result of validate()
Definition: outputs.py:52
grepros.plugins.mcap.McapBag.filename
def filename(self)
Definition: mcap.py:326
grepros.plugins.mcap.McapBag._typefields
_typefields
Definition: mcap.py:70
grepros.api.BaseBag.BagMessage
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition: api.py:99
grepros.outputs.Sink._counts
_counts
Definition: outputs.py:48
grepros.api.BaseBag.closed
def closed(self)
Definition: api.py:334
grepros.api.BaseBag.next
next
Definition: api.py:135
grepros.plugins.mcap.McapBag.__init__
def __init__(self, f, mode="r", **__)
Definition: mcap.py:55
grepros.outputs.Sink.validate
def validate(self)
Definition: outputs.py:88
grepros.plugins.mcap.McapBag.__deepcopy__
__deepcopy__
Definition: mcap.py:431
grepros.plugins.mcap.message_get_fields_and_field_types
def message_get_fields_and_field_types(self)
Definition: mcap.py:550
grepros.plugins.mcap.McapBag._fields_and_field_types
_fields_and_field_types
Definition: mcap.py:428
grepros.plugins.mcap.McapSink.close
def close(self)
Definition: mcap.py:647
grepros.plugins.mcap.McapBag._reader
_reader
Definition: mcap.py:77
grepros.plugins.mcap.McapSink._close_printed
_close_printed
Definition: mcap.py:599
grepros.plugins.mcap.McapBag._topics
_topics
Definition: mcap.py:64
grepros.plugins.mcap.McapBag.mode
def mode(self)
Definition: mcap.py:342
grepros.plugins.mcap.McapBag.MCAP_MAGIC
string MCAP_MAGIC
MCAP file header magic start bytes.
Definition: mcap.py:53
grepros.api.BaseBag.TopicTuple
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
Definition: api.py:103
grepros.plugins.mcap.McapBag.__copy__
__copy__
Definition: mcap.py:430
grepros.plugins.mcap.McapBag._writer
_writer
Definition: mcap.py:79
grepros.plugins.mcap.copy_with_slots
def copy_with_slots(self)
Definition: mcap.py:534
grepros.plugins.mcap.McapBag._temporal_ctors
_temporal_ctors
Definition: mcap.py:73
grepros.outputs.RolloverSinkMixin.filename
filename
Current output file path.
Definition: outputs.py:371
grepros.plugins.mcap.McapBag.closed
def closed(self)
Definition: mcap.py:314
grepros.plugins.mcap.McapBag._field_subtypes
_field_subtypes
Definition: mcap.py:72
grepros.outputs.RolloverSinkMixin.format_output_meta
def format_output_meta(self)
Definition: outputs.py:458
grepros.outputs.Sink.close
def close(self)
Definition: outputs.py:93
grepros.plugins.mcap.McapSink._ensure_open
def _ensure_open(self)
Definition: mcap.py:665
grepros.plugins.mcap.McapBag.get_start_time
def get_start_time(self)
Definition: mcap.py:116
grepros.plugins.mcap.message_repr
def message_repr(self)
Definition: mcap.py:555
grepros.plugins.mcap.McapBag._populate_meta
def _populate_meta(self)
Definition: mcap.py:472
grepros.plugins.mcap.McapBag._type_subtypes
_type_subtypes
Definition: mcap.py:71
grepros.plugins.mcap.McapBag._end_time
_end_time
Definition: mcap.py:75
grepros.plugins.mcap.McapBag.SLOT_TYPES
SLOT_TYPES
Definition: mcap.py:427
grepros.outputs.RolloverSinkMixin.close_output
def close_output(self)
Definition: outputs.py:441
grepros.plugins.mcap.McapBag.get_message_count
def get_message_count(self, topic_filters=None)
Definition: mcap.py:103
grepros.plugins.mcap.McapBag._start_time
_start_time
Definition: mcap.py:74
grepros.outputs.RolloverSinkMixin
Definition: outputs.py:323
grepros.plugins.mcap.McapSink._file
_file
Definition: mcap.py:595
grepros.plugins.mcap.McapBag._typedefs
_typedefs
Definition: mcap.py:66
grepros.api.BaseBag.TypesAndTopicsTuple
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
Definition: api.py:107
grepros.plugins.mcap.McapBag._patch_message
def _patch_message(self, message, typename, typehash)
Definition: mcap.py:436
grepros.api.BaseBag.open
def open(self)
Definition: api.py:302
grepros.plugins.mcap.McapSink._schemas
_schemas
Definition: mcap.py:597
grepros.plugins.mcap.McapBag.read_messages
def read_messages(self, topics=None, start_time=None, end_time=None, raw=False)
Definition: mcap.py:216
grepros.plugins.mcap.McapSink.__init__
def __init__(self, args=None, **kwargs)
Definition: mcap.py:572
grepros.plugins.mcap.McapBag._patch_message_class
def _patch_message_class(self, cls, typename, typehash)
Definition: mcap.py:406
grepros.plugins.mcap.McapBag.autodetect
def autodetect(cls, f)
Definition: mcap.py:519
grepros.plugins.mcap.init
def init(*_, **__)
Definition: mcap.py:680
grepros.plugins.mcap.McapBag.get_message_definition
def get_message_definition(self, msg_or_type)
Definition: mcap.py:150
grepros.plugins.mcap.McapSink
Definition: mcap.py:562
grepros.api.BaseBag.read_messages
def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Definition: api.py:271
grepros.plugins.mcap.McapBag.get_topic_info
def get_topic_info(self, *_, **__)
Definition: mcap.py:171
grepros.plugins.mcap.McapBag._make_message_class
def _make_message_class(self, schema, message, generate=True)
Definition: mcap.py:381
grepros.outputs.Sink._ensure_stamp_index
def _ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition: outputs.py:115
grepros.plugins.mcap.McapBag.__next__
def __next__(self)
Definition: mcap.py:352
grepros.plugins.mcap.McapBag._types
_types
Definition: mcap.py:65
grepros.plugins.mcap.McapBag._qoses
_qoses
Definition: mcap.py:69
grepros.plugins.mcap.McapBag.get_fields_and_field_types
get_fields_and_field_types
Definition: mcap.py:429
grepros.plugins.mcap.McapBag.get_message_type_hash
def get_message_type_hash(self, msg_or_type)
Definition: mcap.py:159
grepros.plugins.mcap.McapBag
Definition: mcap.py:41
grepros.plugins.mcap.McapBag._iterer
_iterer
Definition: mcap.py:80
grepros.plugins.mcap.McapBag._file
_file
Definition: mcap.py:76


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29