ros1.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 ROS1 interface.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 01.11.2021
11 @modified 27.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 import collections
16 import datetime
17 import decimal
18 import inspect
19 import logging
20 import io
21 import os
22 import shutil
23 import threading
24 import time
25 
26 import genpy
27 import genpy.dynamic
28 import rosbag
29 import roslib
30 import rospy
31 import six
32 
33 from . import api
34 from . import common
35 from . api import TypeMeta, calculate_definition_hash, parse_definition_subtypes
36 from . common import ConsolePrinter, memoize
37 
38 
39 
40 BAG_EXTENSIONS = (".bag", ".bag.active")
41 
42 
43 SKIP_EXTENSIONS = (".bag.orig.active", )
44 
45 
46 ROS_TIME_TYPES = ["time", "duration"]
47 
48 
49 ROS_TIME_CLASSES = {rospy.Time: "time", rospy.Duration: "duration"}
50 
51 
52 ROS_ALIAS_TYPES = {"byte": "int8", "char": "uint8"}
53 
54 
55 TYPECLASSES = {}
56 
57 
58 SLEEP_INTERVAL = 0.5
59 
60 
61 master = None
62 
63 
64 genpy_mtx = threading.RLock()
65 
66 
68  """
69  ROS1 bag reader and writer.
70 
71  Extends `rosbag.Bag` with more conveniences, smooths over the rosbag bug of ignoring
72  topic and time filters in format v1.2, and smooths over the rosbag bug
73  of yielding messages of wrong type, if message types in different topics
74  have different packages but identical fields and hashes.
75 
76  Does **not** smooth over the rosbag bug of writing different types to one topic.
77 
78  rosbag does allow writing messages of different types to one topic,
79  just like live ROS topics can have multiple message types published
80  to one topic. And their serialized bytes will actually be in the bag,
81  but rosbag will only register the first type for this topic (unless it is
82  explicitly given another connection header with metadata on the other type).
83 
84  All messages yielded will be deserialized by rosbag as that first type,
85  and whether reading will raise an exception or not depends on whether
86  the other type has enough bytes to be deserialized as that first type.
87  """
88 
89  # {(typename, typehash): message type class}
90  __TYPES = {}
91 
92 
93  __TYPEDEFS = {}
94 
95  # {(typename, typehash): whether subtype definitions parsed}
96  __PARSEDS = {}
97 
98 
99  def __init__(self, *args, **kwargs):
100  """
101  @param f bag file path, or a stream object
102  @param mode mode to open bag in, defaults to "r" (read mode)
103  @param reindex if true and bag is unindexed, make a copy
104  of the file (unless unindexed format) and reindex original
105  @param progress show progress bar with reindexing status
106  @param kwargs additional keyword arguments for `rosbag.Bag`, like `compression`
107  """
108  self.__topics = {} # {(topic, typename, typehash): message count}
109  self.__iterer = None # Generator from read_messages() for next()
110  f, args = (args[0] if args else kwargs.pop("f")), args[1:]
111  mode, args = (args[0] if args else kwargs.pop("mode", "r")), args[1:]
112  if mode not in self.MODES: raise ValueError("invalid mode %r" % mode)
113 
114  kwargs.setdefault("skip_index", True)
115  reindex, progress = (kwargs.pop(k, False) for k in ("reindex", "progress"))
116  getargspec = getattr(inspect, "getfullargspec", getattr(inspect, "getargspec", None))
117  for n in set(kwargs) - set(getargspec(rosbag.Bag.__init__).args): kwargs.pop(n)
118 
119  if common.is_stream(f):
120  if not common.verify_io(f, mode):
121  raise io.UnsupportedOperation({"r": "read", "w": "write", "a": "append"}[mode])
122  super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
123  self._populate_meta()
124  return
125 
126  f = str(f)
127  if "a" == mode and (not os.path.exists(f) or not os.path.getsize(f)):
128  mode = "w" # rosbag raises error on append if no file or empty file
129  os.path.exists(f) and os.remove(f)
130 
131  try:
132  super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
133  except rosbag.ROSBagUnindexedException:
134  if not reindex: raise
135  Bag.reindex_file(f, progress, *args, **kwargs)
136  super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
137  self._populate_meta()
138 
139 
140  def get_message_definition(self, msg_or_type):
141  """Returns ROS1 message type definition full text from bag, including subtype definitions."""
142  if is_ros_message(msg_or_type):
143  return self.__TYPEDEFS.get((msg_or_type._type, msg_or_type._md5sum))
144  typename = msg_or_type
145  return next((d for (n, h), d in self.__TYPEDEFS.items() if n == typename), None)
146 
147 
148  def get_message_class(self, typename, typehash=None):
149  """
150  Returns rospy message class for typename, or None if unknown type.
151 
152  Generates class dynamically if not already generated.
153 
154  @param typehash message type definition hash, if any
155  """
156  if (typename, typehash) not in self.__TYPES: self._ensure_typedef(typename, typehash)
157  typehash = typehash or next((h for n, h in self.__TYPEDEFS if n == typename), None)
158  typekey = (typename, typehash)
159  if typekey not in self.__TYPES and typekey in self.__TYPEDEFS:
160  for n, c in generate_message_classes(typename, self.__TYPEDEFS[typekey]).items():
161  self.__TYPES[(n, c._md5sum)] = c
162  return self.__TYPES.get(typekey)
163 
164 
165  def get_message_type_hash(self, msg_or_type):
166  """Returns ROS1 message type MD5 hash, or None if unknown type."""
167  if is_ros_message(msg_or_type): return msg_or_type._md5sum
168  typename = msg_or_type
169  typehash = next((h for n, h in self.__TYPEDEFS if n == typename), None)
170  if not typehash:
171  self._ensure_typedef(typename)
172  typehash = next((h for n, h in self.__TYPEDEFS if n == typename), None)
173  return typehash
174 
175 
176  def get_start_time(self):
177  """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
178  try: return super(ROS1Bag, self).get_start_time()
179  except Exception: return None
180 
181 
182  def get_end_time(self):
183  """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
184  try: return super(ROS1Bag, self).get_end_time()
185  except Exception: return None
186 
187 
188  def get_topic_info(self, *_, **__):
189  """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
190  return dict(self.__topics)
191 
192 
193  def read_messages(self, topics=None, start_time=None, end_time=None,
194  raw=False, connection_filter=None, **__):
195  """
196  Yields messages from the bag, optionally filtered by topic, timestamp and connection details.
197 
198  @param topics list of topics or a single topic.
199  If an empty list is given, all topics will be read.
200  @param start_time earliest timestamp of messages to return,
201  as ROS time or convertible (int/float/duration/datetime/decimal)
202  @param end_time latest timestamp of messages to return,
203  as ROS time or convertible (int/float/duration/datetime/decimal)
204  @param connection_filter function to filter connections to include
205  @param raw if true, then returned messages are tuples of
206  (typename, bytes, typehash, typeclass)
207  or (typename, bytes, typehash, position, typeclass),
208  depending on file format
209  @return BagMessage namedtuples of
210  (topic, message, timestamp as rospy.Time)
211  """
212  if self.closed: raise ValueError("I/O operation on closed file.")
213  if "w" == self._mode: raise io.UnsupportedOperation("read")
214 
215  hashtypes = {}
216  for n, h in self.__TYPEDEFS: hashtypes.setdefault(h, []).append(n)
217  read_topics = topics if isinstance(topics, (dict, list, set, tuple)) else \
218  [topics] if topics else None
219  dupes = {t: (n, h) for t, n, h in self.__topics
220  if (read_topics is None or t in read_topics) and len(hashtypes.get(h, [])) > 1}
221 
222  # Workaround for rosbag.Bag ignoring topic and time filters in format v1.2
223  if self.version != 102 or (not topics and start_time is None and end_time is None):
224  in_range = lambda *_: True
225  else: in_range = lambda t, s: ((not read_topics or t in read_topics) and
226  (start_time is None or s >= start_time) and
227  (end_time is None or s <= end_time))
228 
229  start_time, end_time = map(to_time, (start_time, end_time))
230  kwargs = dict(topics=topics, start_time=start_time, end_time=end_time,
231  connection_filter=connection_filter, raw=raw)
232  if not dupes:
233  for topic, msg, stamp in super(ROS1Bag, self).read_messages(**kwargs):
234  if in_range(topic, stamp):
235  TypeMeta.make(msg, topic, self)
236  yield self.BagMessage(topic, msg, stamp)
237  if self.closed: break # for topic
238  return
239 
240  for topic, msg, stamp in super(ROS1Bag, self).read_messages(**kwargs):
241  if not in_range(topic, stamp): continue # for topic
242 
243  # Workaround for rosbag bug of using wrong type for identical type hashes
244  if topic in dupes:
245  typename, typehash = (msg[0], msg[2]) if raw else (msg._type, msg._md5sum)
246  if dupes[topic] != (typename, typehash):
247  if raw:
248  msg = msg[:-1] + (self.get_message_class(typename, typehash), )
249  else:
250  msg = self._convert_message(msg, *dupes[topic])
251  TypeMeta.make(msg, topic, self)
252  yield self.BagMessage(topic, msg, stamp)
253  if self.closed: break # for topic
254 
255 
256  def write(self, topic, msg, t=None, raw=False, connection_header=None, **__):
257  """
258  Writes a message to the bag.
259 
260  Populates connection header if topic already in bag but with a different message type.
261 
262  @param topic name of topic
263  @param msg ROS1 message
264  @param t message timestamp if not using current wall time,
265  as ROS time or convertible (int/float/duration/datetime/decimal)
266  @param raw if true, `msg` is in raw format,
267  (typename, bytes, typehash, typeclass)
268  @param connection_header custom connection record for topic,
269  as {"topic", "type", "md5sum", "message_definition"}
270  """
271  if self.closed: raise ValueError("I/O operation on closed file.")
272  if "r" == self._mode: raise io.UnsupportedOperation("write")
273  self._register_write(topic, msg, raw, connection_header)
274  return super(ROS1Bag, self).write(topic, msg, to_time(t), raw, connection_header)
275 
276 
277  def open(self):
278  """Opens the bag file if not already open."""
279  if not self._file:
280  self._open(self.filename, self.mode, allow_unindexed=True)
281 
282 
283  def close(self):
284  """Closes the bag file."""
285  if self._file:
286  super(ROS1Bag, self).close()
287  self._iterer = None
288  self._clear_index()
289 
290 
291  def __contains__(self, key):
292  """Returns whether bag contains given topic."""
293  return any(key == t for t, _, _ in self.__topics)
294 
295 
296  def __next__(self):
297  """Retrieves next message from bag as (topic, message, timestamp)."""
298  if self.closed: raise ValueError("I/O operation on closed file.")
299  if self.__iterer is None: self.__iterer = self.read_messages()
300  return next(self.__iterer)
301 
302 
303  @property
304  def topics(self):
305  """Returns the list of topics in bag, in alphabetic order."""
306  return sorted((t for t, _, _ in self.__topics), key=str.lower)
307 
308 
309  @property
310  def closed(self):
311  """Returns whether file is closed."""
312  return not self._file
313 
314 
315  def _convert_message(self, msg, typename2, typehash2=None):
316  """Returns message converted to given type; fields must match."""
317  msg2 = self.get_message_class(typename2, typehash2)()
318  fields2 = get_message_fields(msg2)
319  for fname, ftypename in get_message_fields(msg).items():
320  v1 = v2 = getattr(msg, fname)
321  if ftypename != fields2.get(fname, ftypename):
322  v2 = self._convert_message(v1, fields2[fname])
323  setattr(msg2, fname, v2)
324  return msg2
325 
326 
327  def _populate_meta(self):
328  """Populates topics and message type definitions and hashes."""
329  result = collections.Counter() # {(topic, typename, typehash): count}
330  counts = collections.Counter() # {connection ID: count}
331  for c in self._chunks:
332  for c_id, count in c.connection_counts.items():
333  counts[c_id] += count
334  for c in self._connections.values():
335  result[(c.topic, c.datatype, c.md5sum)] += counts[c.id]
336  self.__TYPEDEFS[(c.datatype, c.md5sum)] = c.msg_def
337  self.__topics = dict(result)
338 
339 
340  def _register_write(self, topic, msg, raw=False, connection_header=None):
341  """Registers message in local metadata, writes connection header if new type for topic."""
342  if raw: typename, typehash, typeclass = msg[0], msg[2], msg[3]
343  else: typename, typehash, typeclass = msg._type, msg._md5sum, type(msg)
344  topickey, typekey = (topic, typename, typehash), (typename, typehash)
345 
346  if topickey not in self.__topics \
347  and any(topic == t and typekey != (n, h) for t, n, h in self.__topics):
348  # Write connection header if topic already present but with different type
349  if not connection_header:
350  connection_header = {"topic": topic, "type": typename, "md5sum": typehash,
351  "message_definition": typeclass._full_text}
352  conn_id = len(self._connections)
353  connection_info = rosbag.bag._ConnectionInfo(conn_id, topic, connection_header)
354  self._write_connection_record(connection_info, encrypt=False)
355  self._connections[conn_id] = self._topic_connections[topic] = connection_info
356 
357  self.__TYPES.setdefault(typekey, typeclass)
358  self.__TYPEDEFS.setdefault(typekey, typeclass._full_text)
359  self.__topics[topickey] = self.__topics.get(topickey, 0) + 1
360 
361 
362  def _ensure_typedef(self, typename, typehash=None):
363  """Parses subtype definition from any full definition where available, if not loaded."""
364  typehash = typehash or next((h for n, h in self.__TYPEDEFS if n == typename), None)
365  typekey = (typename, typehash)
366  if typekey not in self.__TYPEDEFS:
367  for (roottype, roothash), rootdef in list(self.__TYPEDEFS.items()):
368  rootkey = (roottype, roothash)
369  if self.__PARSEDS.get(rootkey): continue # for (roottype, roothash)
370 
371  subdefs = tuple(parse_definition_subtypes(rootdef).items()) # ((typename, typedef), )
372  subhashes = {n: calculate_definition_hash(n, d, subdefs) for n, d in subdefs}
373  self.__TYPEDEFS.update(((n, subhashes[n]), d) for n, d in subdefs)
374  self.__PARSEDS.update(((n, h), True) for n, h in subhashes.items())
375  self.__PARSEDS[rootkey] = True
376  if typekey in self.__TYPEDEFS:
377  break # for (roottype, roothash)
378  self.__TYPEDEFS.setdefault(typekey, "")
379 
380 
381  @staticmethod
382  def reindex_file(f, progress, *args, **kwargs):
383  """
384  Reindexes bag, making a backup copy in file directory.
385 
386  @param progress show progress bar for reindexing status
387  """
388  KWS = ["mode", "compression", "chunk_threshold",
389  "allow_unindexed", "options", "skip_index"]
390  kwargs.update(zip(KWS, args), allow_unindexed=True)
391  copied, bar, f2 = False, None, None
392  if progress:
393  fmt = lambda s: common.format_bytes(s, strip=False)
394  name, size = os.path.basename(f), os.path.getsize(f)
395  aftertemplate = " Reindexing %s (%s): {afterword}" % (name, fmt(size))
396  bar = common.ProgressBar(size, interval=0.1, pulse=True, aftertemplate=aftertemplate)
397 
398  ConsolePrinter.warn("Unindexed bag %s, reindexing.", f)
399  bar and bar.update(0).start() # Start progress pulse
400  try:
401  with rosbag.Bag(f, **kwargs) as inbag:
402  inplace = (inbag.version > 102)
403 
404  f2 = "%s.orig%s" % os.path.splitext(f)
405  shutil.copy(f, f2)
406  copied = True
407 
408  inbag, outbag = None, None
409  outkwargs = dict(kwargs, mode="a" if inplace else "w", allow_unindexed=True)
410  try:
411  inbag = rosbag.Bag(f2, **dict(kwargs, mode="r"))
412  outbag = rosbag.Bag(f, **outkwargs)
413  Bag._run_reindex(inbag, outbag, bar)
414  bar and bar.update(bar.max)
415  except BaseException: # Ensure steady state on KeyboardInterrupt/SystemExit
416  inbag and inbag.close()
417  outbag and outbag.close()
418  copied and shutil.move(f2, f)
419  raise
420  finally: bar and bar.update(bar.value, flush=True).stop()
421  inbag.close()
422  outbag.close()
423 
424 
425  @staticmethod
426  def _run_reindex(inbag, outbag, bar=None):
427  """Runs reindexing, showing optional progress bar."""
428  update_bar = noop = lambda s: None
429  indexbag, writebag = (inbag, outbag) if inbag.version == 102 else (outbag, None)
430  if bar:
431  fmt = lambda s: common.format_bytes(s, strip=False)
432  update_bar = lambda s: (setattr(bar, "afterword", fmt(s)),
433  setattr(bar, "pulse", False), bar.update(s).stop())
434  # v102: build index from inbag, write all messages to outbag.
435  # Other versions: re-build index in outbag file in-place.
436  progress = update_bar if not writebag else noop # Incremental progress during re-build
437  for offset in indexbag.reindex():
438  progress(offset)
439  if not writebag:
440  return
441 
442  progress = update_bar if bar else noop # Incremental progress during re-write
443  for (topic, msg, t, header) in indexbag.read_messages(return_connection_header=True):
444  writebag.write(topic, msg, t, connection_header=header)
445  progress(indexbag._file.tell())
446 Bag = ROS1Bag
447 
448 
449 
450 def init_node(name):
451  """
452  Initializes a ROS1 node if not already initialized.
453 
454  Blocks until ROS master available.
455  """
456  if not validate(live=True):
457  return
458 
459  global master
460  if not master:
461  master = rospy.client.get_master()
462  available = None
463  while not available:
464  try: uri = master.getUri()[-1]
465  except Exception:
466  if available is None:
467  ConsolePrinter.log(logging.ERROR,
468  "Unable to register with master. Will keep trying.")
469  available = False
470  time.sleep(SLEEP_INTERVAL)
471  else:
472  ConsolePrinter.debug("Connected to ROS master at %s.", uri)
473  available = True
474  try: rospy.get_rostime()
475  except Exception: # Init node only if not already inited
476  rospy.init_node(name, anonymous=True, disable_signals=True)
477 
478 
480  """Shuts down live ROS1 node."""
481  global master
482  if master:
483  master = None
484  rospy.signal_shutdown("Close grepros")
485 
486 
487 def validate(live=False):
488  """
489  Returns whether ROS1 environment is set, prints error if not.
490 
491  @param live whether environment must support launching a ROS node
492  """
493  VARS = ["ROS_MASTER_URI", "ROS_ROOT", "ROS_VERSION"] if live else ["ROS_VERSION"]
494  missing = [k for k in VARS if not os.getenv(k)]
495  if missing:
496  ConsolePrinter.error("ROS environment not sourced: missing %s.",
497  ", ".join(sorted(missing)))
498  if "1" != os.getenv("ROS_VERSION", "1"):
499  ConsolePrinter.error("ROS environment not supported: need ROS_VERSION=1.")
500  missing = True
501  return not missing
502 
503 
504 @memoize
505 def canonical(typename, unbounded=False):
506  """
507  Returns "pkg/Type" for "pkg/subdir/Type".
508 
509  @param unbounded drop array bounds, e.g. returning "uint8[]" for "uint8[10]"
510  """
511  if typename and typename.count("/") > 1:
512  typename = "%s/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
513  if unbounded and typename and "[" in typename:
514  typename = typename[:typename.index("[")] + "[]"
515  return typename
516 
517 
518 def create_publisher(topic, cls_or_typename, queue_size):
519  """Returns a rospy.Publisher."""
520  def pub_unregister():
521  # ROS1 prints errors when closing a publisher with subscribers
522  if not pub.get_num_connections(): super(rospy.Publisher, pub).unregister()
523 
524  cls = cls_or_typename
525  if isinstance(cls, common.TEXT_TYPES): cls = get_message_class(cls)
526  pub = rospy.Publisher(topic, cls, queue_size=queue_size)
527  pub.unregister = pub_unregister
528  return pub
529 
530 
531 def create_subscriber(topic, typename, handler, queue_size):
532  """
533  Returns a rospy.Subscriber.
534 
535  Local message packages are not required. Subscribes as AnyMsg,
536  creates message class dynamically from connection info,
537  and deserializes message before providing to handler.
538 
539  Supplemented with .get_message_class(), .get_message_definition(),
540  .get_message_type_hash(), and .get_qoses().
541 
542  The supplementary .get_message_xyz() methods should only be invoked after at least one message
543  has been received from the topic, as they get populated from live connection metadata.
544  """
545  def myhandler(msg):
546  if msg._connection_header["type"] != typename:
547  return
548  typekey = (typename, msg._connection_header["md5sum"])
549  if typekey not in TYPECLASSES:
550  typedef = msg._connection_header["message_definition"]
551  for name, cls in generate_message_classes(typename, typedef).items():
552  TYPECLASSES.setdefault((name, cls._md5sum), cls)
553  handler(TYPECLASSES[typekey]().deserialize(msg._buff))
554 
555  sub = rospy.Subscriber(topic, rospy.AnyMsg, myhandler, queue_size=queue_size)
556  sub.get_message_class = lambda: next(c for (n, h), c in TYPECLASSES.items()
557  if n == typename)
558  sub.get_message_definition = lambda: next(get_message_definition(c)
559  for (n, h), c in TYPECLASSES.items() if n == typename)
560  sub.get_message_type_hash = lambda: next(h for n, h in TYPECLASSES if n == typename)
561  sub.get_qoses = lambda: None
562  return sub
563 
564 
565 def format_message_value(msg, name, value):
566  """
567  Returns a message attribute value as string.
568 
569  Result is at least 10 chars wide if message is a ROS time/duration
570  (aligning seconds and nanoseconds).
571  """
572  LENS = {"secs": 10, "nsecs": 9}
573  v = "%s" % (value, )
574  if not isinstance(msg, genpy.TVal) or name not in LENS:
575  return v
576 
577  EXTRA = sum(v.count(x) * len(x) for x in (common.MatchMarkers.START, common.MatchMarkers.END))
578  return ("%%%ds" % (LENS[name] + EXTRA)) % v # Default %10s/%9s for secs/nsecs
579 
580 
581 @memoize
582 def generate_message_classes(typename, typedef):
583  """
584  Generates ROS message classes dynamically from given name and definition.
585 
586  Modifies `sys.path` to include the generated Python module.
587 
588  @param typename message type name like "std_msgs/String"
589  @param typedef message type definition full text, including all subtype definitions
590  @return dictionary of {typename: typeclass} for message type and all subtypes
591  """
592  with genpy_mtx: return genpy.dynamic.generate_dynamic(typename, typedef)
593 
594 
595 @memoize
596 def get_message_class(typename):
597  """Returns ROS1 message class."""
598  if typename in ROS_TIME_TYPES:
599  return next(k for k, v in ROS_TIME_CLASSES.items() if v == typename)
600  if typename in ("genpy/Time", "rospy/Time"):
601  return rospy.Time
602  if typename in ("genpy/Duration", "rospy/Duration"):
603  return rospy.Duration
604  try: return roslib.message.get_message_class(typename)
605  except Exception: return None
606 
607 
608 def get_message_definition(msg_or_type):
609  """
610  Returns ROS1 message type definition full text, including subtype definitions.
611 
612  Returns None if unknown type.
613  """
614  msg_or_cls = msg_or_type if is_ros_message(msg_or_type) else get_message_class(msg_or_type)
615  return getattr(msg_or_cls, "_full_text", None)
616 
617 
618 def get_message_type_hash(msg_or_type):
619  """Returns ROS message type MD5 hash, or "" if unknown type."""
620  msg_or_cls = msg_or_type if is_ros_message(msg_or_type) else get_message_class(msg_or_type)
621  return getattr(msg_or_cls, "_md5sum", "")
622 
623 
625  """
626  Returns OrderedDict({field name: field type name}) if ROS1 message, else {}.
627 
628  @param val ROS1 message class or instance
629  """
630  names, types = (getattr(val, k, []) for k in ("__slots__", "_slot_types"))
631  # Bug in genpy: class slot types defined as "int32", but everywhere else types use "uint32"
632  if isinstance(val, genpy.TVal): names, types = genpy.TVal.__slots__, ["uint32", "uint32"]
633  return collections.OrderedDict(zip(names, types))
634 
635 
636 def get_message_type(msg_or_cls):
637  """Returns ROS1 message type name, like "std_msgs/Header"."""
638  if is_ros_time(msg_or_cls):
639  cls = msg_or_cls if inspect.isclass(msg_or_cls) else type(msg_or_cls)
640  return "duration" if "duration" in cls.__name__.lower() else "time"
641  return msg_or_cls._type
642 
643 
644 def get_message_value(msg, name, typename):
645  """Returns object attribute value, with numeric arrays converted to lists."""
646  v = getattr(msg, name)
647  listifiable = typename.startswith(("uint8[", "char[")) and isinstance(v, bytes)
648  if six.PY2 and listifiable: # Ignore already highlighted values from Scanner
649  listifiable = v[:1] != "[" or v[-1:] != "]" or common.MatchMarkers.START not in v
650  return list(bytearray(v)) if listifiable else v
651  return list(v) if listifiable or isinstance(v, tuple) else v
652 
653 
654 def get_rostime(fallback=False):
655  """
656  Returns current ROS1 time, as rospy.Time.
657 
658  @param fallback use wall time if node not initialized
659  """
660  try: return rospy.get_rostime()
661  except Exception:
662  if fallback: return make_time(time.time())
663  raise
664 
665 
667  """
668  Returns currently available ROS1 topics, as [(topicname, typename)].
669 
670  Omits topics that the current ROS1 node itself has published.
671  """
672  result, myname = [], rospy.get_name()
673  pubs, _, _ = master.getSystemState()[-1]
674  mypubs = [t for t, nn in pubs if myname in nn and t not in ("/rosout", "/rosout_agg")]
675  for topic, typename in master.getTopicTypes()[-1]:
676  if topic not in mypubs:
677  result.append((topic, typename))
678  return result
679 
680 
681 def is_ros_message(val, ignore_time=False):
682  """
683  Returns whether value is a ROS1 message or special like ROS1 time/duration class or instance.
684 
685  @param ignore_time whether to ignore ROS1 time/duration types
686  """
687  isfunc = issubclass if inspect.isclass(val) else isinstance
688  return isfunc(val, genpy.Message if ignore_time else (genpy.Message, genpy.TVal))
689 
690 
691 def is_ros_time(val):
692  """Returns whether value is a ROS1 time/duration class or instance."""
693  return issubclass(val, genpy.TVal) if inspect.isclass(val) else isinstance(val, genpy.TVal)
694 
695 
696 def make_duration(secs=0, nsecs=0):
697  """Returns a ROS1 duration, as rospy.Duration."""
698  return rospy.Duration(secs=secs, nsecs=nsecs)
699 
700 
701 def make_time(secs=0, nsecs=0):
702  """Returns a ROS1 time, as rospy.Time."""
703  return rospy.Time(secs=secs, nsecs=nsecs)
704 
705 
707  """Returns ROS1 message as a serialized binary."""
708  with TypeMeta.make(msg) as m:
709  if m.data is not None: return m.data
710  buf = io.BytesIO()
711  msg.serialize(buf)
712  return buf.getvalue()
713 
714 
715 def deserialize_message(raw, cls_or_typename):
716  """Returns ROS1 message or service request/response instantiated from serialized binary."""
717  cls = cls_or_typename
718  if isinstance(cls, common.TEXT_TYPES): cls = get_message_class(cls)
719  return cls().deserialize(raw)
720 
721 
722 @memoize
723 def scalar(typename):
724  """
725  Returns scalar type from ROS message data type, like "uint8" from "uint8[100]".
726 
727  Returns type unchanged if already a scalar.
728  """
729  return typename[:typename.index("[")] if typename and "[" in typename else typename
730 
731 
732 def set_message_value(obj, name, value):
733  """Sets message or object attribute value."""
734  setattr(obj, name, value)
735 
736 
737 def to_duration(val):
738  """Returns value as ROS1 duration if convertible (int/float/time/datetime/decimal), else value."""
739  result = val
740  if isinstance(val, decimal.Decimal):
741  result = rospy.Duration(int(val), float(val % 1) * 10**9)
742  elif isinstance(val, datetime.datetime):
743  result = rospy.Duration(int(val.timestamp()), 1000 * val.microsecond)
744  elif isinstance(val, (float, int)):
745  result = rospy.Duration(val)
746  elif isinstance(val, rospy.Time):
747  result = rospy.Duration(val.secs, val.nsecs)
748  return result
749 
750 
751 def to_nsec(val):
752  """Returns value in nanoseconds if value is ROS time/duration, else value."""
753  return val.to_nsec() if isinstance(val, genpy.TVal) else val
754 
755 
756 def to_sec(val):
757  """Returns value in seconds if value is ROS1 time/duration, else value."""
758  return val.to_sec() if isinstance(val, genpy.TVal) else val
759 
760 
761 def to_sec_nsec(val):
762  """Returns value as (seconds, nanoseconds) if value is ROS1 time/duration, else value."""
763  return (val.secs, val.nsecs) if isinstance(val, genpy.TVal) else val
764 
765 
766 def to_time(val):
767  """Returns value as ROS1 time if convertible (int/float/duration/datetime/decimal), else value."""
768  result = val
769  if isinstance(val, decimal.Decimal):
770  result = rospy.Time(int(val), float(val % 1) * 10**9)
771  elif isinstance(val, datetime.datetime):
772  result = rospy.Time(int(val.timestamp()), 1000 * val.microsecond)
773  elif isinstance(val, six.integer_types + (float, )):
774  result = rospy.Time(val)
775  elif isinstance(val, genpy.Duration):
776  result = rospy.Time(val.secs, val.nsecs)
777  return result
778 
779 
780 __all__ = [
781  "BAG_EXTENSIONS", "ROS_ALIAS_TYPES", "ROS_TIME_CLASSES", "ROS_TIME_TYPES", "SKIP_EXTENSIONS",
782  "SLEEP_INTERVAL", "TYPECLASSES", "Bag", "ROS1Bag", "master",
783  "canonical", "create_publisher", "create_subscriber", "deserialize_message",
784  "format_message_value", "generate_message_classes", "get_message_class",
785  "get_message_definition", "get_message_fields", "get_message_type", "get_message_type_hash",
786  "get_message_value", "get_rostime", "get_topic_types", "init_node", "is_ros_message",
787  "is_ros_time", "make_duration", "make_time", "scalar", "serialize_message", "set_message_value",
788  "shutdown_node", "to_duration", "to_nsec", "to_sec", "to_sec_nsec", "to_time", "validate",
789 ]
grepros.api.parse_definition_subtypes
def parse_definition_subtypes(typedef, nesting=False)
Definition: api.py:1054
grepros.ros1.shutdown_node
def shutdown_node()
Definition: ros1.py:479
grepros.ros1.ROS1Bag._ensure_typedef
def _ensure_typedef(self, typename, typehash=None)
Definition: ros1.py:362
grepros.ros1.ROS1Bag.__topics
__topics
Definition: ros1.py:108
rosbag::Bag
grepros.ros1.ROS1Bag.get_message_definition
def get_message_definition(self, msg_or_type)
Definition: ros1.py:140
grepros.ros1.ROS1Bag.__init__
def __init__(self, *args, **kwargs)
Definition: ros1.py:99
grepros.ros1.ROS1Bag._register_write
def _register_write(self, topic, msg, raw=False, connection_header=None)
Definition: ros1.py:340
grepros.ros1.to_time
def to_time(val)
Definition: ros1.py:766
grepros.ros1.to_sec
def to_sec(val)
Definition: ros1.py:756
grepros.ros1.get_message_type_hash
def get_message_type_hash(msg_or_type)
Definition: ros1.py:618
grepros.ros1.canonical
def canonical(typename, unbounded=False)
Definition: ros1.py:505
grepros.ros1.ROS1Bag.__PARSEDS
dictionary __PARSEDS
Definition: ros1.py:96
grepros.ros1.ROS1Bag._convert_message
def _convert_message(self, msg, typename2, typehash2=None)
Definition: ros1.py:315
grepros.api.calculate_definition_hash
def calculate_definition_hash(typename, msgdef, extradefs=())
Definition: api.py:623
grepros.api.BaseBag
Definition: api.py:85
grepros.ros1.to_nsec
def to_nsec(val)
Definition: ros1.py:751
rosbag::Bag::close
void close()
grepros.ros1.to_sec_nsec
def to_sec_nsec(val)
Definition: ros1.py:761
grepros.ros1.set_message_value
def set_message_value(obj, name, value)
Definition: ros1.py:732
grepros.ros1.create_subscriber
def create_subscriber(topic, typename, handler, queue_size)
Definition: ros1.py:531
grepros.api.BaseBag.MODES
tuple MODES
Supported opening modes, overridden in subclasses.
Definition: api.py:110
grepros.ros1.ROS1Bag._run_reindex
def _run_reindex(inbag, outbag, bar=None)
Definition: ros1.py:426
grepros.ros1.serialize_message
def serialize_message(msg)
Definition: ros1.py:706
grepros.api.BaseBag.filename
def filename(self)
Definition: api.py:319
grepros.ros1.ROS1Bag.read_messages
def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, connection_filter=None, **__)
Definition: ros1.py:193
grepros.ros1.ROS1Bag.topics
def topics(self)
Definition: ros1.py:304
grepros.ros1.ROS1Bag._populate_meta
def _populate_meta(self)
Definition: ros1.py:327
grepros.api.BaseBag.BagMessage
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition: api.py:99
grepros.ros1.ROS1Bag.__next__
def __next__(self)
Definition: ros1.py:296
grepros.api.BaseBag.closed
def closed(self)
Definition: api.py:334
grepros.ros1.get_message_fields
def get_message_fields(val)
Definition: ros1.py:624
grepros.ros1.deserialize_message
def deserialize_message(raw, cls_or_typename)
Definition: ros1.py:715
grepros.ros1.get_message_class
def get_message_class(typename)
Definition: ros1.py:596
grepros.api.BaseBag.next
next
Definition: api.py:135
grepros.ros1.ROS1Bag.__contains__
def __contains__(self, key)
Definition: ros1.py:291
grepros.ros1.create_publisher
def create_publisher(topic, cls_or_typename, queue_size)
Definition: ros1.py:518
grepros.ros1.get_message_type
def get_message_type(msg_or_cls)
Definition: ros1.py:636
grepros.ros1.format_message_value
def format_message_value(msg, name, value)
Definition: ros1.py:565
grepros.ros1.make_duration
def make_duration(secs=0, nsecs=0)
Definition: ros1.py:696
grepros.ros1.ROS1Bag.__iterer
__iterer
Definition: ros1.py:109
grepros.api.BaseBag.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: api.py:255
grepros.ros1.ROS1Bag
Definition: ros1.py:67
grepros.ros1.ROS1Bag.open
def open(self)
Definition: ros1.py:277
grepros.ros1.ROS1Bag.closed
def closed(self)
Definition: ros1.py:310
grepros.ros1.to_duration
def to_duration(val)
Definition: ros1.py:737
grepros.ros1.ROS1Bag.get_topic_info
def get_topic_info(self, *_, **__)
Definition: ros1.py:188
grepros.ros1.ROS1Bag.get_start_time
def get_start_time(self)
Definition: ros1.py:176
grepros.ros1.get_topic_types
def get_topic_types()
Definition: ros1.py:666
grepros.ros1.get_rostime
def get_rostime(fallback=False)
Definition: ros1.py:654
grepros.api.BaseBag.mode
def mode(self)
Definition: api.py:329
grepros.ros1.init_node
def init_node(name)
Definition: ros1.py:450
grepros.ros1.is_ros_time
def is_ros_time(val)
Definition: ros1.py:691
grepros.ros1.ROS1Bag._iterer
_iterer
Definition: ros1.py:287
grepros.ros1.ROS1Bag.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: ros1.py:148
grepros.ros1.ROS1Bag.reindex_file
def reindex_file(f, progress, *args, **kwargs)
Definition: ros1.py:382
grepros.common.ProgressBar
Definition: common.py:348
grepros.api.BaseBag.read_messages
def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Definition: api.py:271
grepros.ros1.ROS1Bag.write
def write(self, topic, msg, t=None, raw=False, connection_header=None, **__)
Definition: ros1.py:256
grepros.ros1.ROS1Bag.__TYPEDEFS
dictionary __TYPEDEFS
{(typename, typehash): type definition text}
Definition: ros1.py:93
grepros.ros1.get_message_definition
def get_message_definition(msg_or_type)
Definition: ros1.py:608
grepros.ros1.validate
def validate(live=False)
Definition: ros1.py:487
grepros.ros1.ROS1Bag.get_message_type_hash
def get_message_type_hash(self, msg_or_type)
Definition: ros1.py:165
grepros.ros1.generate_message_classes
def generate_message_classes(typename, typedef)
Definition: ros1.py:582
grepros.ros1.get_message_value
def get_message_value(msg, name, typename)
Definition: ros1.py:644
grepros.ros1.ROS1Bag.__TYPES
dictionary __TYPES
Definition: ros1.py:90
grepros.ros1.is_ros_message
def is_ros_message(val, ignore_time=False)
Definition: ros1.py:681
grepros.ros1.ROS1Bag.get_end_time
def get_end_time(self)
Definition: ros1.py:182
grepros.ros1.scalar
def scalar(typename)
Definition: ros1.py:723
grepros.ros1.make_time
def make_time(secs=0, nsecs=0)
Definition: ros1.py:701


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29