ros2.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 ROS2 interface.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 02.11.2021
11 @modified 27.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 import array
16 import collections
17 import datetime
18 import decimal
19 import enum
20 import inspect
21 import io
22 import os
23 import re
24 import sqlite3
25 import threading
26 import time
27 
28 import builtin_interfaces.msg
29 try: import numpy
30 except Exception: numpy = None
31 import rclpy
32 import rclpy.clock
33 import rclpy.duration
34 import rclpy.executors
35 import rclpy.serialization
36 import rclpy.time
37 import rosidl_parser.parser
38 import rosidl_parser.definition
39 import rosidl_runtime_py.utilities
40 import yaml
41 
42 from . import api
43 from . common import PATH_TYPES, ConsolePrinter, MatchMarkers, memoize
44 
45 
46 
47 BAG_EXTENSIONS = (".db3", )
48 
49 
50 SKIP_EXTENSIONS = ()
51 
52 
53 ROS_TIME_TYPES = ["builtin_interfaces/Time", "builtin_interfaces/Duration"]
54 
55 
56 ROS_TIME_CLASSES = {rclpy.time.Time: "builtin_interfaces/Time",
57  builtin_interfaces.msg.Time: "builtin_interfaces/Time",
58  rclpy.duration.Duration: "builtin_interfaces/Duration",
59  builtin_interfaces.msg.Duration: "builtin_interfaces/Duration"}
60 
61 
62 ROS_TIME_MESSAGES = {rclpy.time.Time: builtin_interfaces.msg.Time,
63  rclpy.duration.Duration: builtin_interfaces.msg.Duration}
64 
65 
66 ROS_ALIAS_TYPES = {"byte": "uint8", "char": "int8"}
67 
68 
69 DDS_TYPES = {"boolean": "bool",
70  "float": "float32",
71  "double": "float64",
72  "octet": "byte",
73  "short": "int16",
74  "unsigned short": "uint16",
75  "long": "int32",
76  "unsigned long": "uint32",
77  "long long": "int64",
78  "unsigned long long": "uint64", }
79 
80 
81 node = None
82 
83 
84 context = None
85 
86 
87 executor = None
88 
89 
90 
92  """ROS2 bag reader and writer (SQLite format), providing most of rosbag.Bag interface."""
93 
94 
95  STREAMABLE = False
96 
97 
98  CREATE_SQL = """
99 CREATE TABLE IF NOT EXISTS messages (
100  id INTEGER PRIMARY KEY,
101  topic_id INTEGER NOT NULL,
102  timestamp INTEGER NOT NULL,
103  data BLOB NOT NULL
104 );
105 
106 CREATE TABLE IF NOT EXISTS topics (
107  id INTEGER PRIMARY KEY,
108  name TEXT NOT NULL,
109  type TEXT NOT NULL,
110  serialization_format TEXT NOT NULL,
111  offered_qos_profiles TEXT NOT NULL
112 );
113 
114 CREATE INDEX IF NOT EXISTS timestamp_idx ON messages (timestamp ASC);
115 
116 PRAGMA journal_mode=WAL;
117 PRAGMA synchronous=NORMAL;
118  """
119 
120 
121  def __init__(self, filename, mode="a", *_, **__):
122  """
123  @param filename bag file path to open
124  @param mode file will be overwritten if "w"
125  """
126  if not isinstance(filename, PATH_TYPES):
127  raise ValueError("invalid filename %r" % type(filename))
128  if mode not in self.MODES: raise ValueError("invalid mode %r" % mode)
129 
130  self._db = None # sqlite3.Connection instance
131  self._mode = mode
132  self._topics = {} # {(topic, typename): {id, name, type}}
133  self._counts = {} # {(topic, typename, typehash): message count}
134  self._qoses = {} # {(topic, typename): [{qos profile dict}]}
135  self._iterer = None # Generator from read_messages() for next()
136  self._ttinfo = None # Cached result for get_type_and_topic_info()
137  self._filename = str(filename)
138  self._stop_on_error = True
139 
140  self._ensure_open(populate=("r" != mode))
141 
142 
143  def get_message_count(self, topic_filters=None):
144  """
145  Returns the number of messages in the bag.
146 
147  @param topic_filters list of topics or a single topic to filter by, if any
148  """
149  if self._db and self._has_table("messages"):
150  sql, where = "SELECT COUNT(*) AS count FROM messages", ""
151  if topic_filters:
152  self._ensure_topics()
153  topics = topic_filters
154  topics = topics if isinstance(topics, (dict, list, set, tuple)) else [topics]
155  topic_ids = [x["id"] for (topic, _), x in self._topics.items() if topic in topics]
156  where = " WHERE topic_id IN (%s)" % ", ".join(map(str, topic_ids))
157  return self._db.execute(sql + where).fetchone()["count"]
158  return None
159 
160 
161  def get_start_time(self):
162  """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
163  if self._db and self._has_table("messages"):
164  row = self._db.execute("SELECT MIN(timestamp) AS val FROM messages").fetchone()
165  if row["val"] is None: return None
166  secs, nsecs = divmod(row["val"], 10**9)
167  return secs + nsecs / 1E9
168  return None
169 
170 
171  def get_end_time(self):
172  """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
173  if self._db and self._has_table("messages"):
174  row = self._db.execute("SELECT MAX(timestamp) AS val FROM messages").fetchone()
175  if row["val"] is None: return None
176  secs, nsecs = divmod(row["val"], 10**9)
177  return secs + nsecs / 1E9
178  return None
179 
180 
181  def get_topic_info(self, counts=True, ensure_types=True):
182  """
183  Returns topic and message type metainfo as {(topic, typename, typehash): count}.
184 
185  Can skip retrieving message counts, as this requires a full table scan.
186  Can skip looking up message type classes, as those might be unavailable in ROS2 environment.
187 
188  @param counts whether to return actual message counts instead of None
189  @param ensure_types whether to look up type classes instead of returning typehash as None
190  """
191  self._ensure_topics()
192  if counts: self._ensure_counts()
193  if ensure_types: self._ensure_types()
194  return dict(self._counts)
195 
196 
197  def get_type_and_topic_info(self, topic_filters=None):
198  """
199  Returns thorough metainfo on topic and message types.
200 
201  @param topic_filters list of topics or a single topic to filter returned topics-dict by,
202  if any
203  @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
204  msg_types as dict of {typename: typehash},
205  topics as a dict of {topic: TopicTuple() namedtuple}.
206  """
207  topics = topic_filters
208  topics = topics if isinstance(topics, (list, set, tuple)) else [topics] if topics else []
209  if self._ttinfo and (not topics or set(topics) == set(t for t, _, _ in self._counts)):
210  return self._ttinfo
211  if self.closed: raise ValueError("I/O operation on closed file.")
212 
213  counts = self.get_topic_info()
214  msgtypes = {n: h for t, n, h in counts}
215  topicdict = {}
216 
217  def median(vals):
218  """Returns median value from given sorted numbers."""
219  vlen = len(vals)
220  return None if not vlen else vals[vlen // 2] if vlen % 2 else \
221  float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
222 
223  for (t, n, _), c in sorted(counts.items(), key=lambda x: x[0][:2]):
224  if topics and t not in topics: continue # for
225  mymedian = None
226  if c > 1:
227  args = (self._topics[(t, n)]["id"], )
228  cursor = self._db.execute("SELECT timestamp FROM messages WHERE topic_id = ?", args)
229  stamps = sorted(x["timestamp"] / 1E9 for x in cursor)
230  mymedian = median(sorted(s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])))
231  freq = 1.0 / mymedian if mymedian else None
232  topicdict[t] = self.TopicTuple(n, c, len(self.get_qoses(t, n) or []), freq)
233  result = self.TypesAndTopicsTuple(msgtypes, topicdict)
234  if not topics or set(topics) == set(t for t, _, _ in self._counts): self._ttinfo = result
235  return result
236 
237 
238  def get_qoses(self, topic, typename):
239  """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
240  topickey = (topic, typename)
241  if topickey not in self._qoses and topickey in self._topics:
242  topicrow = self._topics[topickey]
243  try:
244  if topicrow.get("offered_qos_profiles"):
245  self._qoses[topickey] = yaml.safe_load(topicrow["offered_qos_profiles"])
246  except Exception as e:
247  ConsolePrinter.warn("Error parsing quality of service for topic %r: %r", topic, e)
248  self._qoses.setdefault(topickey, None)
249  return self._qoses[topickey]
250 
251 
252  def get_message_class(self, typename, typehash=None):
253  """Returns ROS2 message type class, or None if unknown message type for bag."""
254  self._ensure_topics()
255  if any(n == typename for _, n, in self._topics) and typehash is not None \
256  and not any((n, h) == (typename, typehash) for _, n, h in self._counts):
257  self._ensure_types([t for t, n in self._topics if n == typename])
258  if any((typename, typehash) in [(n, h), (n, None)] for _, n, h in self._counts):
259  return get_message_class(typename)
260  return None
261 
262 
263  def get_message_definition(self, msg_or_type):
264  """
265  Returns ROS2 message type definition full text, including subtype definitions.
266 
267  Returns None if unknown message type for bag.
268  """
269  self._ensure_topics()
270  typename = msg_or_type if isinstance(msg_or_type, str) else get_message_type(msg_or_type)
271  if any(n == typename for _, n, _ in self._counts):
272  return get_message_definition(msg_or_type)
273  return None
274 
275 
276  def get_message_type_hash(self, msg_or_type):
277  """Returns ROS2 message type MD5 hash, or None if unknown message type for bag."""
278  typename = msg_or_type if isinstance(msg_or_type, str) else get_message_type(msg_or_type)
279  self._ensure_types([t for t, n in self._topics if n == typename])
280  return next((h for _, n, h in self._counts if n == typename), None)
281 
282 
283  def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__):
284  """
285  Yields messages from the bag, optionally filtered by topic and timestamp.
286 
287  @param topics list of topics or a single topic to filter by, if any
288  @param start_time earliest timestamp of message to return, as ROS time or convertible
289  (int/float/duration/datetime/decimal)
290  @param end_time latest timestamp of message to return, as ROS time or convertible
291  (int/float/duration/datetime/decimal)
292  @param raw if True, then returned messages are tuples of
293  (typename, bytes, typehash, typeclass).
294  If message type unavailable, returns None for hash and class.
295  @return BagMessage namedtuples of
296  (topic, message, timestamp as rclpy.time.Time)
297  """
298  if self.closed: raise ValueError("I/O operation on closed file.")
299  if "w" == self._mode: raise io.UnsupportedOperation("read")
300 
301  self._ensure_topics()
302  if not self._topics or (topics is not None and not topics):
303  return
304 
305  sql, exprs, args = "SELECT * FROM messages", [], ()
306  if topics:
307  topics = topics if isinstance(topics, (list, tuple)) else [topics]
308  topic_ids = [x["id"] for (topic, _), x in self._topics.items() if topic in topics]
309  exprs += ["topic_id IN (%s)" % ", ".join(map(str, topic_ids))]
310  if start_time is not None:
311  exprs += ["timestamp >= ?"]
312  args += (to_nsec(to_time(start_time)), )
313  if end_time is not None:
314  exprs += ["timestamp <= ?"]
315  args += (to_nsec(to_time(end_time)), )
316  sql += ((" WHERE " + " AND ".join(exprs)) if exprs else "")
317  sql += " ORDER BY timestamp"
318 
319  topicmap = {v["id"]: v for v in self._topics.values()}
320  msgtypes = {} # {typename: cls or None if unavailable}
321  topicset = set(topics or [t for t, _ in self._topics])
322  typehashes = {n: h for _, n, h in self._counts} # {typename: typehash or None or ""}
323  for row in self._db.execute(sql, args):
324  tdata = topicmap[row["topic_id"]]
325  topic, typename = tdata["name"], canonical(tdata["type"])
326  if not raw and msgtypes.get(typename, typename) is None: continue # for row
327  if typehashes.get(typename) is None:
328  self._ensure_types([topic])
329  selector = (h for t, n, h in self._counts if (t, n) == (topic, typename))
330  typehash = typehashes[typename] = next(selector, None)
331  else: typehash = typehashes[typename]
332 
333  try:
334  cls = msgtypes.get(typename) or \
335  msgtypes.setdefault(typename, get_message_class(typename))
336  if raw: msg = (typename, row["data"], typehash or None, cls)
337  else: msg = rclpy.serialization.deserialize_message(row["data"], cls)
338  except Exception as e:
339  reportfunc = ConsolePrinter.error if self._stop_on_error else ConsolePrinter.warn
340  reportfunc("Error loading type %s in topic %s: %%s" % (typename, topic),
341  "message class not found." if cls is None else e,
342  __once=not self._stop_on_error)
343  if self._stop_on_error: raise
344  if raw: msg = (typename, row["data"], typehash or None, msgtypes.get(typename))
345  elif set(n for n, c in msgtypes.items() if c is None) == topicset:
346  break # for row
347  continue # for row
348  stamp = rclpy.time.Time(nanoseconds=row["timestamp"])
349 
350  api.TypeMeta.make(msg, topic, self)
351  yield self.BagMessage(topic, msg, stamp)
352  if not self._db: # Bag has been closed in the meantime
353  break # for row
354 
355 
356  def write(self, topic, msg, t=None, raw=False, qoses=None, **__):
357  """
358  Writes a message to the bag.
359 
360  @param topic name of topic
361  @param msg ROS2 message
362  @param t message timestamp if not using wall time, as ROS time or convertible
363  (int/float/duration/datetime/decimal)
364  @param qoses topic Quality-of-Service settings, if any, as a list of dicts
365  """
366  if self.closed: raise ValueError("I/O operation on closed file.")
367  if "r" == self._mode: raise io.UnsupportedOperation("write")
368 
369  self._ensure_topics()
370  if raw:
371  typename, binary, typehash = msg[:3]
372  else:
373  typename = get_message_type(msg)
374  typehash = get_message_type_hash(msg)
375  binary = serialize_message(msg)
376  topickey = (topic, typename)
377  cursor = self._db.cursor()
378  if topickey not in self._topics:
379  full_typename = make_full_typename(typename)
380  sql = "INSERT INTO topics (name, type, serialization_format, offered_qos_profiles) " \
381  "VALUES (?, ?, ?, ?)"
382  qoses = yaml.safe_dump(qoses) if isinstance(qoses, list) else ""
383  args = (topic, full_typename, "cdr", qoses)
384  cursor.execute(sql, args)
385  tdata = {"id": cursor.lastrowid, "name": topic, "type": full_typename,
386  "serialization_format": "cdr", "offered_qos_profiles": qoses}
387  self._topics[topickey] = tdata
388 
389  timestamp = (time.time_ns() if hasattr(time, "time_ns") else int(time.time() * 10**9)) \
390  if t is None else to_nsec(to_time(t))
391  sql = "INSERT INTO messages (topic_id, timestamp, data) VALUES (?, ?, ?)"
392  args = (self._topics[topickey]["id"], timestamp, binary)
393  cursor.execute(sql, args)
394  countkey = (topic, typename, typehash)
395  if self._counts.get(countkey, self) is not None:
396  self._counts[countkey] = self._counts.get(countkey, 0) + 1
397  self._ttinfo = None
398 
399 
400  def open(self):
401  """Opens the bag file if not already open."""
402  self._ensure_open()
403 
404 
405  def close(self):
406  """Closes the bag file."""
407  if self._db:
408  self._db.close()
409  self._db = None
410  self._mode = None
411  self._iterer = None
412 
413 
414  @property
415  def closed(self):
416  """Returns whether file is closed."""
417  return not self._db
418 
419 
420  @property
421  def topics(self):
422  """Returns the list of topics in bag, in alphabetic order."""
423  return sorted((t for t, _, _ in self._topics), key=str.lower)
424 
425 
426  @property
427  def filename(self):
428  """Returns bag file path."""
429  return self._filename
430 
431 
432  @property
433  def size(self):
434  """Returns current file size in bytes (including journaling files)."""
435  result = os.path.getsize(self._filename) if os.path.isfile(self._filename) else None
436  for suffix in ("-journal", "-wal") if result else ():
437  path = "%s%s" % (self._filename, suffix)
438  result += os.path.getsize(path) if os.path.isfile(path) else 0
439  return result
440 
441 
442  @property
443  def mode(self):
444  """Returns file open mode."""
445  return self._mode
446 
447 
448  def __contains__(self, key):
449  """Returns whether bag contains given topic."""
450  return any(key == t for t, _, _ in self._topics)
451 
452 
453  def __next__(self):
454  """Retrieves next message from bag as (topic, message, timestamp)."""
455  if self.closed: raise ValueError("I/O operation on closed file.")
456  if self._iterer is None: self._iterer = self.read_messages()
457  return next(self._iterer)
458 
459 
460  def _ensure_open(self, populate=False):
461  """Opens bag database if not open, populates schema if specified."""
462  if not self._db:
463  if "w" == self._mode and os.path.exists(self._filename):
464  os.remove(self._filename)
465  self._db = sqlite3.connect(self._filename, detect_types=sqlite3.PARSE_DECLTYPES,
466  isolation_level=None, check_same_thread=False)
467  self._db.row_factory = lambda cursor, row: dict(sqlite3.Row(cursor, row))
468  if populate:
469  self._db.executescript(self.CREATE_SQL)
470 
471 
472  def _ensure_topics(self):
473  """Populates local topic struct from database, if not already available."""
474  if not self._db or self._topics or not self._has_table("topics"): return
475  for row in self._db.execute("SELECT * FROM topics ORDER BY id"):
476  topickey = (topic, typename) = row["name"], canonical(row["type"])
477  self._topics[(topic, typename)] = row
478  self._counts[(topic, typename, None)] = None
479 
480 
481  def _ensure_counts(self):
482  """Populates local counts values from database, if not already available."""
483  if not self._db or all(v is not None for v in self._counts.values()) \
484  or not self._has_table("messages"): return
485  self._ensure_topics()
486  topickeys = {self._topics[(t, n)]["id"]: (t, n, h) for (t, n, h) in self._counts}
487  self._counts.clear()
488  for row in self._db.execute("SELECT topic_id, COUNT(*) AS count FROM messages "
489  "GROUP BY topic_id").fetchall():
490  if row["topic_id"] in topickeys:
491  self._counts[topickeys[row["topic_id"]]] = row["count"]
492 
493 
494  def _ensure_types(self, topics=None):
495  """
496  Populates local type definitions and classes from database, if not already available.
497 
498  @param topics selected topics to ensure types for, if not all
499  """
500  if not self._db or (not topics and topics is not None) or not self._has_table("topics") \
501  or not any(h is None for t, _, h in self._counts if topics is None or t in topics):
502  return
503  self._ensure_topics()
504  for countkey, count in list(self._counts.items()):
505  (topic, typename, typehash) = countkey
506  if typehash is None and (topics is None or topic in topics):
507  typehash = get_message_type_hash(typename)
508  self._counts.pop(countkey)
509  self._counts[(topic, typename, typehash)] = count
510 
511 
512  def _has_table(self, name):
513  """Returns whether specified table exists in database."""
514  sql = "SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?"
515  return bool(self._db.execute(sql, ("table", name)).fetchone())
516 Bag = ROS2Bag
517 
518 
519 
520 def init_node(name):
521  """Initializes a ROS2 node if not already initialized."""
522  global node, context, executor
523  if node or not validate(live=True):
524  return
525 
526  def spin_loop():
527  while context and context.ok():
528  executor.spin_once(timeout_sec=1)
529 
530  context = rclpy.Context()
531  try: rclpy.init(context=context)
532  except Exception: pass # Must not be called twice at runtime
533  node_name = "%s_%s_%s" % (name, os.getpid(), int(time.time() * 1000))
534  node = rclpy.create_node(node_name, context=context, use_global_arguments=False,
535  enable_rosout=False, start_parameter_services=False)
536  executor = rclpy.executors.MultiThreadedExecutor(context=context)
537  executor.add_node(node)
538  spinner = threading.Thread(target=spin_loop)
539  spinner.daemon = True
540  spinner.start()
541 
542 
544  """Shuts down live ROS2 node."""
545  global node, context, executor
546  if context:
547  context_, executor_ = context, executor
548  context = executor = node = None
549  executor_.shutdown()
550  context_.shutdown()
551 
552 
553 def validate(live=False):
554  """
555  Returns whether ROS2 environment is set, prints error if not.
556 
557  @param live whether environment must support launching a ROS node
558  """
559  missing = [k for k in ["ROS_VERSION"] if not os.getenv(k)]
560  if missing:
561  ConsolePrinter.error("ROS environment not sourced: missing %s.",
562  ", ".join(sorted(missing)))
563  if "2" != os.getenv("ROS_VERSION", "2"):
564  ConsolePrinter.error("ROS environment not supported: need ROS_VERSION=2.")
565  missing = True
566  return not missing
567 
568 
569 @memoize
570 def canonical(typename, unbounded=False):
571  """
572  Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
573 
574  Converts DDS types like "octet" to "byte", and "sequence<uint8, 100>" to "uint8[100]".
575 
576  @param unbounded drop constraints like array and string bounds,
577  e.g. returning "uint8[]" for "uint8[10]" and "string" for "string<=8"
578  """
579  if not typename: return typename
580  is_array, bound, dimension = False, "", ""
581 
582  if "<" in typename:
583  match = re.match("sequence<(.+)>", typename)
584  if match: # "sequence<uint8, 100>" or "sequence<uint8>"
585  is_array = True
586  typename = match.group(1)
587  match = re.match(r"([^,]+)?,\s?(\d+)", typename)
588  if match: # sequence<uint8, 10>
589  typename = match.group(1)
590  if match.lastindex > 1: dimension = match.group(2)
591 
592  match = re.match("(w?string)<(.+)>", typename)
593  if match: # string<5>
594  typename, bound = match.groups()
595 
596  if "[" in typename: # "string<=5[<=10]" or "string<=5[10]" or "byte[10]" or "byte[]"
597  dimension = typename[typename.index("[") + 1:typename.index("]")]
598  typename, is_array = typename[:typename.index("[")], True
599 
600  if "<=" in typename: # "string<=5"
601  typename, bound = typename.split("<=")
602 
603  if typename.count("/") > 1:
604  typename = "%s/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
605 
606  if unbounded: suffix = "[]" if is_array else ""
607  else: suffix = ("<=%s" % bound if bound else "") + ("[%s]" % dimension if is_array else "")
608  return DDS_TYPES.get(typename, typename) + suffix
609 
610 
611 def create_publisher(topic, cls_or_typename, queue_size):
612  """Returns an rclpy.Publisher instance, with .get_num_connections() and .unregister()."""
613  cls = cls_or_typename
614  if isinstance(cls, str): cls = get_message_class(cls)
615  qos = rclpy.qos.QoSProfile(depth=queue_size)
616  pub = node.create_publisher(cls, topic, qos)
617  pub.get_num_connections = pub.get_subscription_count
618  pub.unregister = pub.destroy
619  return pub
620 
621 
622 def create_subscriber(topic, cls_or_typename, handler, queue_size):
623  """
624  Returns an rclpy.Subscription.
625 
626  Supplemented with .get_message_class(), .get_message_definition(),
627  .get_message_type_hash(), .get_qoses(), and.unregister().
628  """
629  cls = typename = cls_or_typename
630  if isinstance(cls, str): cls = get_message_class(cls)
631  else: typename = get_message_type(cls)
632 
633  qos = rclpy.qos.QoSProfile(depth=queue_size)
634  qoses = [x.qos_profile for x in node.get_publishers_info_by_topic(topic)
635  if canonical(x.topic_type) == typename]
636  rels, durs = zip(*[(x.reliability, x.durability) for x in qoses]) if qoses else ([], [])
637  # If subscription demands stricter QoS than publisher offers, no messages are received
638  if rels: qos.reliability = max(rels) # DEFAULT < RELIABLE < BEST_EFFORT
639  if durs: qos.durability = max(durs) # DEFAULT < TRANSIENT_LOCAL < VOLATILE
640 
641  qosdicts = [qos_to_dict(x) for x in qoses] or None
642  sub = node.create_subscription(cls, topic, handler, qos)
643  sub.get_message_class = lambda: cls
644  sub.get_message_definition = lambda: get_message_definition(cls)
645  sub.get_message_type_hash = lambda: get_message_type_hash(cls)
646  sub.get_qoses = lambda: qosdicts
647  sub.unregister = sub.destroy
648  return sub
649 
650 
651 def format_message_value(msg, name, value):
652  """
653  Returns a message attribute value as string.
654 
655  Result is at least 10 chars wide if message is a ROS2 time/duration
656  (aligning seconds and nanoseconds).
657  """
658  LENS = {"sec": 13, "nanosec": 9}
659  v = "%s" % (value, )
660  if not isinstance(msg, tuple(ROS_TIME_CLASSES)) or name not in LENS:
661  return v
662 
663  EXTRA = sum(v.count(x) * len(x) for x in (MatchMarkers.START, MatchMarkers.END))
664  return ("%%%ds" % (LENS[name] + EXTRA)) % v # Default %10s/%9s for secs/nanosecs
665 
666 
667 @memoize
668 def get_message_class(typename):
669  """Returns ROS2 message class, or None if unknown type."""
670  try: return rosidl_runtime_py.utilities.get_message(make_full_typename(typename))
671  except Exception: return None
672 
673 
674 def get_message_definition(msg_or_type):
675  """
676  Returns ROS2 message type definition full text, including subtype definitions.
677 
678  Returns None if unknown type.
679  """
680  typename = msg_or_type if isinstance(msg_or_type, str) else get_message_type(msg_or_type)
681  return _get_message_definition(canonical(typename))
682 
683 
684 def get_message_type_hash(msg_or_type):
685  """Returns ROS2 message type MD5 hash, or "" if unknown type."""
686  typename = msg_or_type if isinstance(msg_or_type, str) else get_message_type(msg_or_type)
687  return _get_message_type_hash(canonical(typename))
688 
689 
690 @memoize
692  """Returns ROS2 message type definition full text, or None on error (internal caching method)."""
693  try:
694  texts, pkg = collections.OrderedDict(), typename.rsplit("/", 1)[0]
695  try:
696  typepath = rosidl_runtime_py.get_interface_path(make_full_typename(typename) + ".msg")
697  with open(typepath) as f:
698  texts[typename] = f.read()
699  except Exception: # .msg file unavailable: parse IDL
700  texts[typename] = get_message_definition_idl(typename)
701  for line in texts[typename].splitlines():
702  if not line or not line[0].isalpha():
703  continue # for line
704  linetype = scalar(canonical(re.sub(r"^([a-zA-Z][^\s]+)(.+)", r"\1", line)))
705  if linetype in api.ROS_BUILTIN_TYPES:
706  continue # for line
707  linetype = linetype if "/" in linetype else "std_msgs/Header" \
708  if "Header" == linetype else "%s/%s" % (pkg, linetype)
709  linedef = None if linetype in texts else get_message_definition(linetype)
710  if linedef: texts[linetype] = linedef
711 
712  basedef = texts.pop(next(iter(texts)))
713  subdefs = ["%s\nMSG: %s\n%s" % ("=" * 80, k, v) for k, v in texts.items()]
714  return basedef + ("\n" if subdefs else "") + "\n".join(subdefs)
715  except Exception as e:
716  ConsolePrinter.warn("Error collecting type definition of %s: %s", typename, e)
717  return None
718 
719 
720 @memoize
722  """
723  Returns ROS2 message type definition parsed from IDL file.
724 
725  @since version 0.4.2
726  """
727 
728  def format_comment(text):
729  """Returns annotation text formatted with comment prefixes and escapes."""
730  ESCAPES = {"\n": "\\n", "\t": "\\t", "\x07": "\\a",
731  "\x08": "\\b", "\x0b": "\\v", "\x0c": "\\f"}
732  repl = lambda m: ESCAPES[m.group(0)]
733  return "#" + "\n#".join(re.sub("|".join(map(re.escape, ESCAPES)), repl, l)
734  for l in text.split("\\n"))
735 
736  def format_type(typeobj, msgpackage, constant=False):
737  """Returns canonical type name, like "uint8" or "string<=5" or "nav_msgs/Path"."""
738  result = None
739  if isinstance(typeobj, rosidl_parser.definition.AbstractNestedType):
740  # Array, BoundedSequence, UnboundedSequence
741  valuetype = format_type(typeobj.value_type, msgpackage, constant)
742  size, bounding = "", ""
743  if isinstance(typeobj, rosidl_parser.definition.Array):
744  size = typeobj.size
745  elif typeobj.has_maximum_size():
746  size = typeobj.maximum_size
747  if isinstance(typeobj, rosidl_parser.definition.BoundedSequence):
748  bounding = "<="
749  result = "%s[%s%s]" % (valuetype, bounding, size) # type[], type[N], type[<=N]
750  elif isinstance(typeobj, rosidl_parser.definition.AbstractWString):
751  result = "wstring"
752  elif isinstance(typeobj, rosidl_parser.definition.AbstractString):
753  result = "string"
754  elif isinstance(typeobj, rosidl_parser.definition.NamespacedType):
755  nameparts = typeobj.namespaced_name()
756  result = canonical("/".join(nameparts))
757  if nameparts[0].value == msgpackage or "std_msgs/Header" == result:
758  result = canonical("/".join(nameparts[-1:])) # Omit package if local or Header
759  else: # Primitive like int8
760  result = DDS_TYPES.get(typeobj.typename, typeobj.typename)
761 
762  if isinstance(typeobj, rosidl_parser.definition.AbstractGenericString) \
763  and typeobj.has_maximum_size() and not constant: # Constants get parsed into "string<=N"
764  result += "<=%s" % typeobj.maximum_size
765 
766  return result
767 
768  def get_comments(obj):
769  """Returns all comments for annotatable object, as [text, ]."""
770  return [v.get("text", "") for v in obj.get_annotation_values("verbatim")
771  if "comment" == v.get("language")]
772 
773  typepath = rosidl_runtime_py.get_interface_path(make_full_typename(typename) + ".idl")
774  with open(typepath) as f:
775  idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
776  msgidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Message)[0]
777  package = msgidl.structure.namespaced_type.namespaces[0]
778  DUMMY = rosidl_parser.definition.EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME
779 
780  lines = []
781  # Add general comments
782  lines.extend(map(format_comment, get_comments(msgidl.structure)))
783  # Add blank line between general comments and constants
784  if lines and msgidl.constants: lines.append("")
785  # Add constants
786  for c in msgidl.constants:
787  ctype = format_type(c.type, package, constant=True)
788  lines.extend(map(format_comment, get_comments(c)))
789  lines.append("%s %s=%s" % (ctype, c.name, c.value))
790  # Parser adds dummy placeholder if constants-only message
791  if not (len(msgidl.structure.members) == 1 and DUMMY == msgidl.structure[0].name):
792  # Add blank line between constants and fields
793  if msgidl.constants and msgidl.structure.members: lines.append("")
794  # Add fields
795  for m in msgidl.structure.members:
796  lines.extend(map(format_comment, get_comments(m)))
797  lines.append("%s %s" % (format_type(m.type, package), m.name))
798  return "\n".join(lines)
799 
800 
801 @memoize
803  """Returns ROS2 message type MD5 hash (internal caching method)."""
804  msgdef = get_message_definition(typename)
805  return "" if msgdef is None else api.calculate_definition_hash(typename, msgdef)
806 
807 
809  """Returns OrderedDict({field name: field type name}) if ROS2 message, else {}."""
810  if not is_ros_message(val): return {}
811  fields = ((k, canonical(v)) for k, v in val.get_fields_and_field_types().items())
812  return collections.OrderedDict(fields)
813 
814 
815 def get_message_type(msg_or_cls):
816  """Returns ROS2 message type name, like "std_msgs/Header"."""
817  cls = msg_or_cls if inspect.isclass(msg_or_cls) else type(msg_or_cls)
818  return canonical("%s/%s" % (cls.__module__.split(".")[0], cls.__name__))
819 
820 
821 def get_message_value(msg, name, typename):
822  """Returns object attribute value, with numeric arrays converted to lists."""
823  v, scalartype = getattr(msg, name), scalar(typename)
824  if isinstance(v, (bytes, array.array)): v = list(v)
825  elif numpy and isinstance(v, (numpy.generic, numpy.ndarray)):
826  v = v.tolist() # Returns value as Python type, either scalar or list
827  if v and isinstance(v, (list, tuple)) and scalartype in ("byte", "uint8"):
828  if isinstance(v[0], bytes):
829  v = list(map(ord, v)) # In ROS2, a byte array like [0, 1] is [b"\0", b"\1"]
830  elif scalartype == typename:
831  v = v[0] # In ROS2, single byte values are given as bytes()
832  return list(v) if isinstance(v, tuple) else v
833 
834 
835 def get_rostime(fallback=False):
836  """
837  Returns current ROS2 time, as rclpy.time.Time.
838 
839  @param fallback use wall time if node not initialized
840  """
841  try: return node.get_clock().now()
842  except Exception:
843  if fallback: return make_time(time.time())
844  raise
845 
846 
848  """
849  Returns currently available ROS2 topics, as [(topicname, typename)].
850 
851  Omits topics that the current ROS2 node itself has published.
852  """
853  result = []
854  myname, myns = node.get_name(), node.get_namespace()
855  mytypes = {} # {topic: [typename, ]}
856  for topic, typenames in node.get_publisher_names_and_types_by_node(myname, myns):
857  mytypes.setdefault(topic, []).extend(typenames)
858  for t in ("/parameter_events", "/rosout"): # Published by all nodes
859  mytypes.pop(t, None)
860  for topic, typenames in node.get_topic_names_and_types(): # [(topicname, [typename, ])]
861  for typename in typenames:
862  if topic not in mytypes or typename not in mytypes[topic]:
863  result += [(topic, canonical(typename))]
864  return result
865 
866 
867 def is_ros_message(val, ignore_time=False):
868  """
869  Returns whether value is a ROS2 message or special like ROS2 time/duration class or instance.
870 
871  @param ignore_time whether to ignore ROS2 time/duration types
872  """
873  is_message = rosidl_runtime_py.utilities.is_message(val)
874  if is_message and ignore_time: is_message = not is_ros_time(val)
875  return is_message
876 
877 
878 def is_ros_time(val):
879  """Returns whether value is a ROS2 time/duration class or instance."""
880  if inspect.isclass(val): return issubclass(val, tuple(ROS_TIME_CLASSES))
881  return isinstance(val, tuple(ROS_TIME_CLASSES))
882 
883 
884 def make_duration(secs=0, nsecs=0):
885  """Returns an rclpy.duration.Duration."""
886  return rclpy.duration.Duration(seconds=secs, nanoseconds=nsecs)
887 
888 
889 def make_time(secs=0, nsecs=0):
890  """Returns a ROS2 time, as rclpy.time.Time."""
891  return rclpy.time.Time(seconds=secs, nanoseconds=nsecs)
892 
893 
894 def make_full_typename(typename):
895  """Returns "pkg/msg/Type" for "pkg/Type"."""
896  if "/msg/" in typename or "/" not in typename:
897  return typename
898  return "%s/msg/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
899 
900 
901 def make_subscriber_qos(topic, typename, queue_size=10):
902  """
903  Returns rclpy.qos.QoSProfile that matches the most permissive publisher.
904 
905  @param queue_size QoSProfile.depth
906  """
907  qos = rclpy.qos.QoSProfile(depth=queue_size)
908  infos = node.get_publishers_info_by_topic(topic)
909  rels, durs = zip(*[(x.qos_profile.reliability, x.qos_profile.durability)
910  for x in infos if canonical(x.topic_type) == typename])
911  # If subscription demands stricter QoS than publisher offers, no messages are received
912  if rels: qos.reliability = max(rels) # DEFAULT < RELIABLE < BEST_EFFORT
913  if durs: qos.durability = max(durs) # DEFAULT < TRANSIENT_LOCAL < VOLATILE
914  return qos
915 
916 
917 def qos_to_dict(qos):
918  """Returns rclpy.qos.QoSProfile as dictionary."""
919  result = {}
920  if qos:
921  QOS_TYPES = (bool, int, enum.Enum) + tuple(ROS_TIME_CLASSES)
922  for name in (n for n in dir(qos) if not n.startswith("_")):
923  val = getattr(qos, name)
924  if name.startswith("_") or not isinstance(val, QOS_TYPES):
925  continue # for name
926  if isinstance(val, enum.Enum):
927  val = val.value
928  elif isinstance(val, tuple(ROS_TIME_CLASSES)):
929  val = dict(zip(["sec", "nsec"], to_sec_nsec(val)))
930  result[name] = val
931  return [result]
932 
933 
935  """Returns ROS2 message as a serialized binary."""
936  with api.TypeMeta.make(msg) as m:
937  if m.data is not None: return m.data
938  return rclpy.serialization.serialize_message(msg)
939 
940 
941 def deserialize_message(raw, cls_or_typename):
942  """Returns ROS2 message or service request/response instantiated from serialized binary."""
943  cls = cls_or_typename
944  if isinstance(cls, str): cls = get_message_class(cls)
945  return rclpy.serialization.deserialize_message(raw, cls)
946 
947 
948 @memoize
949 def scalar(typename):
950  """
951  Returns unbounded scalar type from ROS2 message data type
952 
953  Like "uint8" from "uint8[]", or "string" from "string<=10[<=5]".
954  Returns type unchanged if not a collection or bounded type.
955  """
956  if typename and "[" in typename: typename = typename[:typename.index("[")]
957  if typename and "<=" in typename: typename = typename[:typename.index("<=")]
958  return typename
959 
960 
961 def set_message_value(obj, name, value):
962  """Sets message or object attribute value."""
963  if is_ros_message(obj):
964  # Bypass setter as it does type checking
965  fieldmap = obj.get_fields_and_field_types()
966  if name in fieldmap:
967  name = obj.__slots__[list(fieldmap).index(name)]
968  setattr(obj, name, value)
969 
970 
971 def time_message(val, to_message=True, clock_type=None):
972  """
973  Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
974 
975  @param val ROS2 time/duration object from `rclpy` or `builtin_interfaces`
976  @param to_message whether to convert from `rclpy` to `builtin_interfaces` or vice versa
977  @param clock_type ClockType for converting to `rclpy.Time`, defaults to `ROS_TIME`
978  @return value converted to appropriate type, or original value if not convertible
979  """
980  to_message, clock_type = bool(to_message), (clock_type or rclpy.clock.ClockType.ROS_TIME)
981  if isinstance(val, tuple(ROS_TIME_CLASSES)):
982  rcl_cls = next(k for k, v in ROS_TIME_MESSAGES.items() if isinstance(val, (k, v)))
983  is_rcl = isinstance(val, tuple(ROS_TIME_MESSAGES))
984  name = "to_msg" if to_message and is_rcl else "from_msg" if to_message == is_rcl else None
985  args = [val] + ([clock_type] if rcl_cls is rclpy.time.Time and "from_msg" == name else [])
986  return getattr(rcl_cls, name)(*args) if name else val
987  return val
988 
989 
990 def to_duration(val):
991  """
992  Returns value as ROS2 duration if convertible (int/float/time/datetime/decimal), else value.
993 
994  Convertible types: int/float/time/datetime/decimal/builtin_interfaces.Time.
995  """
996  result = val
997  if isinstance(val, decimal.Decimal):
998  result = make_duration(int(val), float(val % 1) * 10**9)
999  elif isinstance(val, datetime.datetime):
1000  result = make_duration(int(val.timestamp()), 1000 * val.microsecond)
1001  elif isinstance(val, (float, int)):
1002  result = make_duration(val)
1003  elif isinstance(val, rclpy.time.Time):
1004  result = make_duration(nsecs=val.nanoseconds)
1005  elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1006  result = make_duration(val.sec, val.nanosec)
1007  return result
1008 
1009 
1010 def to_nsec(val):
1011  """Returns value in nanoseconds if value is ROS2 time/duration, else value."""
1012  if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1013  return val
1014  if hasattr(val, "nanoseconds"): # rclpy.Time/Duration
1015  return val.nanoseconds
1016  return val.sec * 10**9 + val.nanosec # builtin_interfaces.msg.Time/Duration
1017 
1018 
1019 def to_sec(val):
1020  """Returns value in seconds if value is ROS2 time/duration, else value."""
1021  if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1022  return val
1023  if hasattr(val, "nanoseconds"): # rclpy.Time/Duration
1024  secs, nsecs = divmod(val.nanoseconds, 10**9)
1025  return secs + nsecs / 1E9
1026  return val.sec + val.nanosec / 1E9 # builtin_interfaces.msg.Time/Duration
1027 
1028 
1029 def to_sec_nsec(val):
1030  """Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value."""
1031  if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1032  return val
1033  if hasattr(val, "seconds_nanoseconds"): # rclpy.Time
1034  return val.seconds_nanoseconds()
1035  if hasattr(val, "nanoseconds"): # rclpy.Duration
1036  return divmod(val.nanoseconds, 10**9)
1037  return (val.sec, val.nanosec) # builtin_interfaces.msg.Time/Duration
1038 
1039 
1040 def to_time(val):
1041  """
1042  Returns value as ROS2 time if convertible, else value.
1043 
1044  Convertible types: int/float/duration/datetime/decimal/builtin_interfaces.Time.
1045  """
1046  result = val
1047  if isinstance(val, decimal.Decimal):
1048  result = make_time(int(val), float(val % 1) * 10**9)
1049  elif isinstance(val, datetime.datetime):
1050  result = make_time(int(val.timestamp()), 1000 * val.microsecond)
1051  elif isinstance(val, (float, int)):
1052  result = make_time(val)
1053  elif isinstance(val, rclpy.duration.Duration):
1054  result = make_time(nsecs=val.nanoseconds)
1055  elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1056  result = make_time(val.sec, val.nanosec)
1057  return result
1058 
1059 
1060 __all__ = [
1061  "BAG_EXTENSIONS", "DDS_TYPES", "ROS_ALIAS_TYPES", "ROS_TIME_CLASSES", "ROS_TIME_MESSAGES",
1062  "ROS_TIME_TYPES", "SKIP_EXTENSIONS", "Bag", "ROS2Bag", "context", "executor", "node",
1063  "canonical", "create_publisher", "create_subscriber", "deserialize_message",
1064  "format_message_value", "get_message_class", "get_message_definition",
1065  "get_message_definition_idl", "get_message_fields", "get_message_type",
1066  "get_message_type_hash", "get_message_value", "get_rostime", "get_topic_types", "init_node",
1067  "is_ros_message", "is_ros_time", "make_duration", "make_full_typename", "make_subscriber_qos",
1068  "make_time", "qos_to_dict", "scalar", "serialize_message", "set_message_value", "shutdown_node",
1069  "time_message", "to_duration", "to_nsec", "to_sec", "to_sec_nsec", "to_time", "validate",
1070 ]
grepros.ros2.ROS2Bag._has_table
def _has_table(self, name)
Definition: ros2.py:512
grepros.ros2.format_message_value
def format_message_value(msg, name, value)
Definition: ros2.py:651
grepros.ros2.ROS2Bag.get_message_type_hash
def get_message_type_hash(self, msg_or_type)
Definition: ros2.py:276
grepros.ros2.ROS2Bag._iterer
_iterer
Definition: ros2.py:135
grepros.ros2.ROS2Bag._mode
_mode
Definition: ros2.py:131
grepros.ros2.deserialize_message
def deserialize_message(raw, cls_or_typename)
Definition: ros2.py:941
grepros.ros2.create_publisher
def create_publisher(topic, cls_or_typename, queue_size)
Definition: ros2.py:611
grepros.ros2.ROS2Bag.open
def open(self)
Definition: ros2.py:400
grepros.ros2.serialize_message
def serialize_message(msg)
Definition: ros2.py:934
grepros.ros2.ROS2Bag.mode
def mode(self)
Definition: ros2.py:443
grepros.api.BaseBag.get_qoses
def get_qoses(self, topic, typename)
Definition: api.py:247
grepros.ros2.set_message_value
def set_message_value(obj, name, value)
Definition: ros2.py:961
grepros.ros2.ROS2Bag._ttinfo
_ttinfo
Definition: ros2.py:136
grepros.ros2.ROS2Bag.topics
def topics(self)
Definition: ros2.py:421
grepros.api.BaseBag.get_topic_info
def get_topic_info(self, counts=True)
Definition: api.py:227
grepros.ros2.to_sec_nsec
def to_sec_nsec(val)
Definition: ros2.py:1029
grepros.ros2.ROS2Bag.__init__
def __init__(self, filename, mode="a", *_, **__)
Definition: ros2.py:121
grepros.ros2.ROS2Bag.closed
def closed(self)
Definition: ros2.py:415
grepros.ros2.time_message
def time_message(val, to_message=True, clock_type=None)
Definition: ros2.py:971
grepros.ros2.get_message_type
def get_message_type(msg_or_cls)
Definition: ros2.py:815
grepros.api.BaseBag
Definition: api.py:85
grepros.ros2.ROS2Bag.__contains__
def __contains__(self, key)
Definition: ros2.py:448
grepros.ros2.ROS2Bag._db
_db
Definition: ros2.py:130
grepros.ros2.ROS2Bag._counts
_counts
Definition: ros2.py:133
grepros.api.BaseBag.MODES
tuple MODES
Supported opening modes, overridden in subclasses.
Definition: api.py:110
grepros.ros2.make_duration
def make_duration(secs=0, nsecs=0)
Definition: ros2.py:884
grepros.ros2.ROS2Bag._topics
_topics
Definition: ros2.py:132
grepros.api.BaseBag.BagMessage
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition: api.py:99
grepros.ros2.get_message_value
def get_message_value(msg, name, typename)
Definition: ros2.py:821
grepros.ros2.is_ros_message
def is_ros_message(val, ignore_time=False)
Definition: ros2.py:867
grepros.api.BaseBag.closed
def closed(self)
Definition: api.py:334
grepros.ros2.to_duration
def to_duration(val)
Definition: ros2.py:990
grepros.ros2.ROS2Bag.write
def write(self, topic, msg, t=None, raw=False, qoses=None, **__)
Definition: ros2.py:356
grepros.api.BaseBag.next
next
Definition: api.py:135
grepros.ros2.ROS2Bag._filename
_filename
Definition: ros2.py:137
grepros.ros2.shutdown_node
def shutdown_node()
Definition: ros2.py:543
grepros.ros2.ROS2Bag._ensure_open
def _ensure_open(self, populate=False)
Definition: ros2.py:460
grepros.ros2.get_message_class
def get_message_class(typename)
Definition: ros2.py:668
grepros.ros2.ROS2Bag.get_qoses
def get_qoses(self, topic, typename)
Definition: ros2.py:238
grepros.ros2.ROS2Bag.size
def size(self)
Definition: ros2.py:433
grepros.ros2.to_sec
def to_sec(val)
Definition: ros2.py:1019
grepros.ros2.get_rostime
def get_rostime(fallback=False)
Definition: ros2.py:835
grepros.ros2.ROS2Bag.filename
def filename(self)
Definition: ros2.py:427
grepros.api.BaseBag.TopicTuple
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
Definition: api.py:103
grepros.ros2.ROS2Bag.get_type_and_topic_info
def get_type_and_topic_info(self, topic_filters=None)
Definition: ros2.py:197
grepros.ros2.get_message_fields
def get_message_fields(val)
Definition: ros2.py:808
grepros.ros2.ROS2Bag.CREATE_SQL
string CREATE_SQL
ROS2 bag SQLite schema.
Definition: ros2.py:98
grepros.ros2.canonical
def canonical(typename, unbounded=False)
Definition: ros2.py:570
grepros.ros2.ROS2Bag._qoses
_qoses
Definition: ros2.py:134
grepros.ros2.init_node
def init_node(name)
Definition: ros2.py:520
grepros.ros2.ROS2Bag._ensure_topics
def _ensure_topics(self)
Definition: ros2.py:472
grepros.ros2._get_message_definition
def _get_message_definition(typename)
Definition: ros2.py:691
grepros.ros2.create_subscriber
def create_subscriber(topic, cls_or_typename, handler, queue_size)
Definition: ros2.py:622
grepros.ros2.make_subscriber_qos
def make_subscriber_qos(topic, typename, queue_size=10)
Definition: ros2.py:901
grepros.ros2._get_message_type_hash
def _get_message_type_hash(typename)
Definition: ros2.py:802
grepros.ros2.ROS2Bag.close
def close(self)
Definition: ros2.py:405
grepros.api.BaseBag.TypesAndTopicsTuple
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
Definition: api.py:107
grepros.ros2.qos_to_dict
def qos_to_dict(qos)
Definition: ros2.py:917
grepros.ros2.to_nsec
def to_nsec(val)
Definition: ros2.py:1010
grepros.ros2.scalar
def scalar(typename)
Definition: ros2.py:949
grepros.ros2.get_message_definition
def get_message_definition(msg_or_type)
Definition: ros2.py:674
grepros.ros2.make_time
def make_time(secs=0, nsecs=0)
Definition: ros2.py:889
grepros.api.BaseBag.read_messages
def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Definition: api.py:271
grepros.ros2.ROS2Bag.get_message_definition
def get_message_definition(self, msg_or_type)
Definition: ros2.py:263
grepros.ros2.ROS2Bag._ensure_types
def _ensure_types(self, topics=None)
Definition: ros2.py:494
grepros.ros2.ROS2Bag.get_start_time
def get_start_time(self)
Definition: ros2.py:161
grepros.ros2.ROS2Bag.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: ros2.py:252
grepros.ros2.ROS2Bag.__next__
def __next__(self)
Definition: ros2.py:453
grepros.ros2.ROS2Bag.get_end_time
def get_end_time(self)
Definition: ros2.py:171
grepros.ros2.validate
def validate(live=False)
Definition: ros2.py:553
grepros.ros2.make_full_typename
def make_full_typename(typename)
Definition: ros2.py:894
grepros.ros2.ROS2Bag.read_messages
def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Definition: ros2.py:283
grepros.ros2.get_message_definition_idl
def get_message_definition_idl(typename)
Definition: ros2.py:721
grepros.ros2.to_time
def to_time(val)
Definition: ros2.py:1040
grepros.ros2.ROS2Bag.get_message_count
def get_message_count(self, topic_filters=None)
Definition: ros2.py:143
grepros.ros2.ROS2Bag._stop_on_error
_stop_on_error
Definition: ros2.py:138
grepros.ros2.ROS2Bag.get_topic_info
def get_topic_info(self, counts=True, ensure_types=True)
Definition: ros2.py:181
grepros.ros2.ROS2Bag
Definition: ros2.py:91
grepros.ros2.ROS2Bag._ensure_counts
def _ensure_counts(self)
Definition: ros2.py:481
grepros.ros2.get_topic_types
def get_topic_types()
Definition: ros2.py:847
grepros.ros2.is_ros_time
def is_ros_time(val)
Definition: ros2.py:878
grepros.ros2.get_message_type_hash
def get_message_type_hash(msg_or_type)
Definition: ros2.py:684


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29