embag.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 ROS1 bag reader plugin using the `embag` library.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 19.11.2021
11 @modified 30.08.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 from __future__ import absolute_import
16 import os
17 
18 try: import embag
19 except ImportError: embag = None
20 
21 from .. import api
22 from .. common import PATH_TYPES, ConsolePrinter
23 
24 
25 
27  """embag reader interface, providing most of rosbag.Bag interface."""
28 
29 
30  MODES = ("r", )
31 
32 
33  STREAMABLE = False
34 
35 
36  ROSBAG_MAGIC = b"#ROSBAG"
37 
38  def __init__(self, filename, mode="r", **__):
39  if not isinstance(filename, PATH_TYPES):
40  raise ValueError("invalid filename %r" % type(filename))
41  if mode not in self.MODES: raise ValueError("invalid mode %r" % mode)
42 
43  self._topics = {} # {(topic, typename, typehash): message count}
44  self._types = {} # {(typename, typehash): message type class}
45  self._hashdefs = {} # {(topic, typehash): typename}
46  self._typedefs = {} # {(typename, typehash): type definition text}
47  self._iterer = None # Generator from read_messages() for next()
48  self._ttinfo = None # Cached result for get_type_and_topic_info()
49  self._view = embag.View(filename)
50  self._filename = str(filename)
51 
52  self._populate_meta()
53 
54 
55  def get_message_count(self, topic_filters=None):
56  """
57  Returns the number of messages in the bag.
58 
59  @param topic_filters list of topics or a single topic to filter by, if any
60  """
61  if topic_filters:
62  topics = topic_filters
63  topics = topics if isinstance(topics, (dict, list, set, tuple)) else [topics]
64  return sum(c for (t, _, _), c in self._topics.items() if t in topics)
65  return sum(self._topics.values())
66 
67 
68  def get_start_time(self):
69  """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
70  if self.closed or not self._topics: return None
71  return self._view.getStartTime().to_sec()
72 
73 
74  def get_end_time(self):
75  """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
76  if self.closed or not self._topics: return None
77  return self._view.getEndTime().to_sec()
78 
79 
80  def get_message_class(self, typename, typehash=None):
81  """
82  Returns rospy message class for typename, or None if unknown message type for bag.
83 
84  Generates class dynamically if not already generated.
85 
86  @param typehash message type definition hash, if any
87  """
88  typekey = (typename, typehash or next((h for n, h in self._types if n == typename), None))
89  if typekey not in self._types and typekey in self._typedefs:
90  for n, c in api.realapi.generate_message_classes(typename, self._typedefs[typekey]).items():
91  self._types[(n, c._md5sum)] = c
92  return self._types.get(typekey)
93 
94 
95  def get_message_definition(self, msg_or_type):
96  """
97  Returns ROS1 message type definition full text from bag, including subtype definitions.
98 
99  Returns None if unknown message type for bag.
100  """
101  if api.is_ros_message(msg_or_type):
102  return self._typedefs.get((msg_or_type._type, msg_or_type._md5sum))
103  typename = msg_or_type
104  return next((d for (n, h), d in self._typedefs.items() if n == typename), None)
105 
106 
107  def get_message_type_hash(self, msg_or_type):
108  """Returns ROS1 message type MD5 hash, or None if unknown message type for bag."""
109  if api.is_ros_message(msg_or_type): return msg_or_type._md5sum
110  typename = msg_or_type
111  return next((h for n, h in self._typedefs if n == typename), None)
112 
113 
114  def get_topic_info(self, *_, **__):
115  """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
116  return dict(self._topics)
117 
118 
119  def get_type_and_topic_info(self, topic_filters=None):
120  """
121  Returns thorough metainfo on topic and message types.
122 
123  @param topic_filters list of topics or a single topic to filter returned topics-dict by,
124  if any
125  @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
126  msg_types as dict of {typename: typehash},
127  topics as a dict of {topic: TopicTuple() namedtuple}.
128  """
129  topics = topic_filters
130  topics = topics if isinstance(topics, (list, set, tuple)) else [topics] if topics else []
131  if self._ttinfo and (not topics or set(topics) == set(t for t, _, _ in self._topics)):
132  return self._ttinfo
133  if self.closed: raise ValueError("I/O operation on closed file.")
134 
135  msgtypes = {n: h for t, n, h in self._topics}
136  topicdict = {}
137 
138  def median(vals):
139  """Returns median value from given sorted numbers."""
140  vlen = len(vals)
141  return None if not vlen else vals[vlen // 2] if vlen % 2 else \
142  float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
143 
144  conns = self._view.connectionsByTopic() # {topic: [embag.Connection, ]}
145  for (t, n, _), c in sorted(self._topics.items()):
146  if topics and t not in topics: continue # for
147  mymedian = None
148  if c > 1:
149  stamps = sorted(m.timestamp.secs + m.timestamp.nsecs / 1E9
150  for m in self._view.getMessages([t]))
151  mymedian = median(sorted(s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])))
152  freq = 1.0 / mymedian if mymedian else None
153  topicdict[t] = self.TopicTuple(n, c, len(conns.get(t, [])), freq)
154  if not topics or set(topics) == set(t for t, _, _ in self._topics):
155  self._ttinfo = self.TypesAndTopicsTuple(msgtypes, topicdict)
156  return self._ttinfo
157 
158 
159  def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
160  """
161  Yields messages from the bag, optionally filtered by topic and timestamp.
162 
163  @param topics list of topics or a single topic to filter by, if at all
164  @param start_time earliest timestamp of message to return, as ROS time or convertible
165  (int/float/duration/datetime/decimal)
166  @param end_time latest timestamp of message to return, as ROS time or convertible
167  (int/float/duration/datetime/decimal)
168  @param raw if true, then returned messages are tuples of
169  (typename, bytes, typehash, typeclass)
170  @return BagMessage namedtuples of (topic, message, timestamp as rospy.Time)
171  """
172  if self.closed: raise ValueError("I/O operation on closed file.")
173 
174  topics = topics if isinstance(topics, list) else [topics] if topics else []
175  start_time, end_time = (api.to_sec(api.to_time(x)) for x in (start_time, end_time))
176  for m in self._view.getMessages(topics) if topics else self._view.getMessages():
177  if start_time is not None and start_time > m.timestamp.to_sec():
178  continue # for m
179  if end_time is not None and end_time < m.timestamp.to_sec():
180  continue # for m
181 
182  typename = self._hashdefs[(m.topic, m.md5)]
183  stamp = api.make_time(m.timestamp.secs, m.timestamp.nsecs)
184  if raw: msg = (typename, m.data(), m.md5, self.get_message_class(typename, m.md5))
185  else: msg = self._populate_message(self.get_message_class(typename, m.md5)(), m.data())
186  api.TypeMeta.make(msg, m.topic, self)
187  yield self.BagMessage(m.topic, msg, stamp)
188  if self.closed: break # for m
189 
190 
191  def open(self):
192  """Opens the bag file if not already open."""
193  if not self._view: self._view = embag.View(self._filename)
194 
195 
196  def close(self):
197  """Closes the bag file."""
198  if self._view:
199  del self._view
200  self._view = None
201  self._iterer = None
202 
203 
204  @property
205  def closed(self):
206  """Returns whether file is closed."""
207  return not self._view
208 
209 
210  @property
211  def topics(self):
212  """Returns the list of topics in bag, in alphabetic order."""
213  return sorted((t for t, _, _ in self._topics), key=str.lower)
214 
215 
216  @property
217  def filename(self):
218  """Returns bag file path."""
219  return self._filename
220 
221 
222  @property
223  def size(self):
224  """Returns current file size."""
225  return os.path.getsize(self._filename) if os.path.isfile(self._filename) else None
226 
227 
228  @property
229  def mode(self):
230  """Returns file open mode."""
231  return "r"
232 
233 
234  def __contains__(self, key):
235  """Returns whether bag contains given topic."""
236  return any(key == t for t, _, _ in self._topics)
237 
238 
239  def __next__(self):
240  """Retrieves next message from bag as (topic, message, timestamp)."""
241  if self.closed: raise ValueError("I/O operation on closed file.")
242  if self._iterer is None: self._iterer = self.read_messages()
243  return next(self._iterer)
244 
245 
246  def _populate_meta(self):
247  """Populates bag metainfo."""
248  connections = self._view.connectionsByTopic()
249  for topic in self._view.topics():
250  for conn in connections.get(topic, ()):
251  topickey, typekey = (topic, conn.type, conn.md5sum), (conn.type, conn.md5sum)
252  self._topics.setdefault(topickey, 0)
253  self._topics[topickey] += conn.message_count
254  self._hashdefs[(topic, conn.md5sum)] = conn.type
255  self._typedefs[typekey] = conn.message_definition
256  subtypedefs = api.parse_definition_subtypes(conn.message_definition)
257  for n, d in subtypedefs.items():
258  h = api.calculate_definition_hash(n, d, tuple(subtypedefs.items()))
259  self._typedefs.setdefault((n, h), d)
260 
261 
262  def _populate_message(self, msg, embagval):
263  """Returns the ROS1 message populated from a corresponding embag.RosValue."""
264  for name, typename in api.get_message_fields(msg).items():
265  v, scalarname = embagval.get(name), api.scalar(typename)
266  if typename in api.ROS_BUILTIN_TYPES: # Single built-in type
267  msgv = getattr(embagval, name)
268  elif scalarname in api.ROS_BUILTIN_TYPES: # List of built-in types
269  msgv = list(v)
270  elif typename in api.ROS_TIME_TYPES: # Single temporal type
271  cls = next(k for k, v in api.ROS_TIME_CLASSES.items() if v == typename)
272  msgv = cls(v.secs, v.nsecs)
273  elif scalarname in api.ROS_TIME_TYPES: # List of temporal types
274  cls = next(k for k, v in api.ROS_TIME_CLASSES.items() if v == scalarname)
275  msgv = [cls(x.secs, x.nsecs) for x in v]
276  elif typename == scalarname: # Single subtype
277  msgv = self._populate_message(self.get_message_class(typename)(), v)
278  else: # List of subtypes
279  cls = self.get_message_class(scalarname)
280  msgv = [self._populate_message(cls(), x) for x in v]
281  setattr(msg, name, msgv)
282  return msg
283 
284 
285  @classmethod
286  def autodetect(cls, filename):
287  """Returns whether file is readable as ROS1 bag."""
288  result = os.path.isfile(filename)
289  if result:
290  with open(filename, "rb") as f:
291  result = (f.read(len(cls.ROSBAG_MAGIC)) == cls.ROSBAG_MAGIC)
292  return result
293 
294 
295 def init(*_, **__):
296  """Replaces ROS1 bag reader with EmbagReader. Raises ImportWarning if embag not available."""
297  if not embag:
298  ConsolePrinter.error("embag not available: cannot read bag files.")
299  raise ImportWarning()
300  api.Bag.READER_CLASSES.add(EmbagReader)
301 
302 
303 __all__ = ["EmbagReader", "init"]
grepros.plugins.embag.EmbagReader._ttinfo
_ttinfo
Definition: embag.py:48
grepros.plugins.embag.EmbagReader.get_end_time
def get_end_time(self)
Definition: embag.py:74
grepros.plugins.embag.EmbagReader._topics
_topics
Definition: embag.py:43
grepros.plugins.embag.EmbagReader.get_message_type_hash
def get_message_type_hash(self, msg_or_type)
Definition: embag.py:107
grepros.plugins.embag.EmbagReader.open
def open(self)
Definition: embag.py:191
grepros.plugins.embag.EmbagReader.filename
def filename(self)
Definition: embag.py:217
grepros.plugins.embag.EmbagReader.get_topic_info
def get_topic_info(self, *_, **__)
Definition: embag.py:114
grepros.plugins.embag.EmbagReader.get_type_and_topic_info
def get_type_and_topic_info(self, topic_filters=None)
Definition: embag.py:119
grepros.api.BaseBag
Definition: api.py:85
grepros.plugins.embag.EmbagReader._iterer
_iterer
Definition: embag.py:47
grepros.plugins.embag.EmbagReader.__contains__
def __contains__(self, key)
Definition: embag.py:234
grepros.plugins.embag.EmbagReader._hashdefs
_hashdefs
Definition: embag.py:45
grepros.api.BaseBag.MODES
tuple MODES
Supported opening modes, overridden in subclasses.
Definition: api.py:110
grepros.plugins.embag.EmbagReader.__init__
def __init__(self, filename, mode="r", **__)
Definition: embag.py:38
grepros.plugins.embag.EmbagReader._view
_view
Definition: embag.py:49
grepros.plugins.embag.EmbagReader.get_start_time
def get_start_time(self)
Definition: embag.py:68
grepros.plugins.embag.EmbagReader.autodetect
def autodetect(cls, filename)
Definition: embag.py:286
grepros.api.BaseBag.BagMessage
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition: api.py:99
grepros.api.BaseBag.closed
def closed(self)
Definition: api.py:334
grepros.api.BaseBag.next
next
Definition: api.py:135
grepros.api.to_sec
def to_sec(val)
Definition: api.py:1166
grepros.plugins.embag.EmbagReader.__next__
def __next__(self)
Definition: embag.py:239
grepros.plugins.embag.EmbagReader._populate_message
def _populate_message(self, msg, embagval)
Definition: embag.py:262
grepros.plugins.embag.EmbagReader.mode
def mode(self)
Definition: embag.py:229
grepros.plugins.embag.EmbagReader.get_message_definition
def get_message_definition(self, msg_or_type)
Definition: embag.py:95
grepros.plugins.embag.EmbagReader._types
_types
Definition: embag.py:44
grepros.api.BaseBag.TopicTuple
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
Definition: api.py:103
grepros.api.BaseBag.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: api.py:255
grepros.plugins.embag.EmbagReader.get_message_count
def get_message_count(self, topic_filters=None)
Definition: embag.py:55
grepros.plugins.embag.EmbagReader.size
def size(self)
Definition: embag.py:223
grepros.plugins.embag.init
def init(*_, **__)
Definition: embag.py:295
grepros.plugins.embag.EmbagReader._filename
_filename
Definition: embag.py:50
grepros.plugins.embag.EmbagReader._typedefs
_typedefs
Definition: embag.py:46
grepros.plugins.embag.EmbagReader._populate_meta
def _populate_meta(self)
Definition: embag.py:246
grepros.api.BaseBag.TypesAndTopicsTuple
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
Definition: api.py:107
grepros.plugins.embag.EmbagReader.closed
def closed(self)
Definition: embag.py:205
grepros.api.BaseBag.read_messages
def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Definition: api.py:271
grepros.plugins.embag.EmbagReader.ROSBAG_MAGIC
string ROSBAG_MAGIC
ROS1 bag file header magic start bytes.
Definition: embag.py:36
grepros.plugins.embag.EmbagReader.close
def close(self)
Definition: embag.py:196
grepros.plugins.embag.EmbagReader.read_messages
def read_messages(self, topics=None, start_time=None, end_time=None, raw=False)
Definition: embag.py:159
grepros.plugins.embag.EmbagReader.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: embag.py:80
grepros.plugins.embag.EmbagReader.topics
def topics(self)
Definition: embag.py:211
grepros.plugins.embag.EmbagReader
Definition: embag.py:26


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29