5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
28 import builtin_interfaces.msg
30 except Exception: numpy =
None
34 import rclpy.executors
35 import rclpy.serialization
37 import rosidl_parser.parser
38 import rosidl_parser.definition
39 import rosidl_runtime_py.utilities
43 from . common
import PATH_TYPES, ConsolePrinter, MatchMarkers, memoize
47 BAG_EXTENSIONS = (
".db3", )
53 ROS_TIME_TYPES = [
"builtin_interfaces/Time",
"builtin_interfaces/Duration"]
56 ROS_TIME_CLASSES = {rclpy.time.Time:
"builtin_interfaces/Time",
57 builtin_interfaces.msg.Time:
"builtin_interfaces/Time",
58 rclpy.duration.Duration:
"builtin_interfaces/Duration",
59 builtin_interfaces.msg.Duration:
"builtin_interfaces/Duration"}
62 ROS_TIME_MESSAGES = {rclpy.time.Time: builtin_interfaces.msg.Time,
63 rclpy.duration.Duration: builtin_interfaces.msg.Duration}
66 ROS_ALIAS_TYPES = {
"byte":
"uint8",
"char":
"int8"}
69 DDS_TYPES = {
"boolean":
"bool",
74 "unsigned short":
"uint16",
76 "unsigned long":
"uint32",
78 "unsigned long long":
"uint64", }
92 """ROS2 bag reader and writer (SQLite format), providing most of rosbag.Bag interface."""
99 CREATE TABLE IF NOT EXISTS messages (
100 id INTEGER PRIMARY KEY,
101 topic_id INTEGER NOT NULL,
102 timestamp INTEGER NOT NULL,
106 CREATE TABLE IF NOT EXISTS topics (
107 id INTEGER PRIMARY KEY,
110 serialization_format TEXT NOT NULL,
111 offered_qos_profiles TEXT NOT NULL
114 CREATE INDEX IF NOT EXISTS timestamp_idx ON messages (timestamp ASC);
116 PRAGMA journal_mode=WAL;
117 PRAGMA synchronous=NORMAL;
123 @param filename bag file path to open
124 @param mode file will be overwritten if "w"
126 if not isinstance(filename, PATH_TYPES):
127 raise ValueError(
"invalid filename %r" % type(filename))
128 if mode
not in self.
MODES:
raise ValueError(
"invalid mode %r" % mode)
145 Returns the number of messages in the bag.
147 @param topic_filters list of topics or a single topic to filter by, if any
150 sql, where =
"SELECT COUNT(*) AS count FROM messages",
""
153 topics = topic_filters
154 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
155 topic_ids = [x[
"id"]
for (topic, _), x
in self.
_topics.items()
if topic
in topics]
156 where =
" WHERE topic_id IN (%s)" %
", ".join(map(str, topic_ids))
157 return self.
_db.execute(sql + where).fetchone()[
"count"]
162 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
164 row = self.
_db.execute(
"SELECT MIN(timestamp) AS val FROM messages").fetchone()
165 if row[
"val"]
is None:
return None
166 secs, nsecs = divmod(row[
"val"], 10**9)
167 return secs + nsecs / 1E9
172 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
174 row = self.
_db.execute(
"SELECT MAX(timestamp) AS val FROM messages").fetchone()
175 if row[
"val"]
is None:
return None
176 secs, nsecs = divmod(row[
"val"], 10**9)
177 return secs + nsecs / 1E9
183 Returns topic and message type metainfo as {(topic, typename, typehash): count}.
185 Can skip retrieving message counts, as this requires a full table scan.
186 Can skip looking up message type classes, as those might be unavailable in ROS2 environment.
188 @param counts whether to return actual message counts instead of None
189 @param ensure_types whether to look up type classes instead of returning typehash as None
199 Returns thorough metainfo on topic and message types.
201 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
203 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
204 msg_types as dict of {typename: typehash},
205 topics as a dict of {topic: TopicTuple() namedtuple}.
207 topics = topic_filters
208 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
209 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_counts)):
211 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
214 msgtypes = {n: h
for t, n, h
in counts}
218 """Returns median value from given sorted numbers."""
220 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
221 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
223 for (t, n, _), c
in sorted(counts.items(), key=
lambda x: x[0][:2]):
224 if topics
and t
not in topics:
continue
227 args = (self.
_topics[(t, n)][
"id"], )
228 cursor = self.
_db.execute(
"SELECT timestamp FROM messages WHERE topic_id = ?", args)
229 stamps = sorted(x[
"timestamp"] / 1E9
for x
in cursor)
230 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
231 freq = 1.0 / mymedian
if mymedian
else None
234 if not topics
or set(topics) == set(t
for t, _, _
in self.
_counts): self.
_ttinfo = result
239 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
240 topickey = (topic, typename)
242 topicrow = self.
_topics[topickey]
244 if topicrow.get(
"offered_qos_profiles"):
245 self.
_qoses[topickey] = yaml.safe_load(topicrow[
"offered_qos_profiles"])
246 except Exception
as e:
247 ConsolePrinter.warn(
"Error parsing quality of service for topic %r: %r", topic, e)
248 self.
_qoses.setdefault(topickey,
None)
249 return self.
_qoses[topickey]
253 """Returns ROS2 message type class, or None if unknown message type for bag."""
255 if any(n == typename
for _, n,
in self.
_topics)
and typehash
is not None \
256 and not any((n, h) == (typename, typehash)
for _, n, h
in self.
_counts):
258 if any((typename, typehash)
in [(n, h), (n,
None)]
for _, n, h
in self.
_counts):
265 Returns ROS2 message type definition full text, including subtype definitions.
267 Returns None if unknown message type for bag.
270 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
271 if any(n == typename
for _, n, _
in self.
_counts):
277 """Returns ROS2 message type MD5 hash, or None if unknown message type for bag."""
278 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
280 return next((h
for _, n, h
in self.
_counts if n == typename),
None)
283 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__):
285 Yields messages from the bag, optionally filtered by topic and timestamp.
287 @param topics list of topics or a single topic to filter by, if any
288 @param start_time earliest timestamp of message to return, as ROS time or convertible
289 (int/float/duration/datetime/decimal)
290 @param end_time latest timestamp of message to return, as ROS time or convertible
291 (int/float/duration/datetime/decimal)
292 @param raw if True, then returned messages are tuples of
293 (typename, bytes, typehash, typeclass).
294 If message type unavailable, returns None for hash and class.
295 @return BagMessage namedtuples of
296 (topic, message, timestamp as rclpy.time.Time)
298 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
299 if "w" == self.
_mode:
raise io.UnsupportedOperation(
"read")
302 if not self.
_topics or (topics
is not None and not topics):
305 sql, exprs, args =
"SELECT * FROM messages", [], ()
307 topics = topics
if isinstance(topics, (list, tuple))
else [topics]
308 topic_ids = [x[
"id"]
for (topic, _), x
in self.
_topics.items()
if topic
in topics]
309 exprs += [
"topic_id IN (%s)" %
", ".join(map(str, topic_ids))]
310 if start_time
is not None:
311 exprs += [
"timestamp >= ?"]
313 if end_time
is not None:
314 exprs += [
"timestamp <= ?"]
316 sql += ((
" WHERE " +
" AND ".join(exprs))
if exprs
else "")
317 sql +=
" ORDER BY timestamp"
319 topicmap = {v[
"id"]: v
for v
in self.
_topics.values()}
321 topicset = set(topics
or [t
for t, _
in self.
_topics])
322 typehashes = {n: h
for _, n, h
in self.
_counts}
323 for row
in self.
_db.execute(sql, args):
324 tdata = topicmap[row[
"topic_id"]]
325 topic, typename = tdata[
"name"],
canonical(tdata[
"type"])
326 if not raw
and msgtypes.get(typename, typename)
is None:
continue
327 if typehashes.get(typename)
is None:
329 selector = (h
for t, n, h
in self.
_counts if (t, n) == (topic, typename))
330 typehash = typehashes[typename] =
next(selector,
None)
331 else: typehash = typehashes[typename]
334 cls = msgtypes.get(typename)
or \
336 if raw: msg = (typename, row[
"data"], typehash
or None, cls)
337 else: msg = rclpy.serialization.deserialize_message(row[
"data"], cls)
338 except Exception
as e:
339 reportfunc = ConsolePrinter.error
if self.
_stop_on_error else ConsolePrinter.warn
340 reportfunc(
"Error loading type %s in topic %s: %%s" % (typename, topic),
341 "message class not found." if cls
is None else e,
344 if raw: msg = (typename, row[
"data"], typehash
or None, msgtypes.get(typename))
345 elif set(n
for n, c
in msgtypes.items()
if c
is None) == topicset:
348 stamp = rclpy.time.Time(nanoseconds=row[
"timestamp"])
350 api.TypeMeta.make(msg, topic, self)
356 def write(self, topic, msg, t=None, raw=False, qoses=None, **__):
358 Writes a message to the bag.
360 @param topic name of topic
361 @param msg ROS2 message
362 @param t message timestamp if not using wall time, as ROS time or convertible
363 (int/float/duration/datetime/decimal)
364 @param qoses topic Quality-of-Service settings, if any, as a list of dicts
366 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
367 if "r" == self.
_mode:
raise io.UnsupportedOperation(
"write")
371 typename, binary, typehash = msg[:3]
376 topickey = (topic, typename)
377 cursor = self.
_db.cursor()
378 if topickey
not in self.
_topics:
380 sql =
"INSERT INTO topics (name, type, serialization_format, offered_qos_profiles) " \
381 "VALUES (?, ?, ?, ?)"
382 qoses = yaml.safe_dump(qoses)
if isinstance(qoses, list)
else ""
383 args = (topic, full_typename,
"cdr", qoses)
384 cursor.execute(sql, args)
385 tdata = {
"id": cursor.lastrowid,
"name": topic,
"type": full_typename,
386 "serialization_format":
"cdr",
"offered_qos_profiles": qoses}
389 timestamp = (time.time_ns()
if hasattr(time,
"time_ns")
else int(time.time() * 10**9)) \
391 sql =
"INSERT INTO messages (topic_id, timestamp, data) VALUES (?, ?, ?)"
392 args = (self.
_topics[topickey][
"id"], timestamp, binary)
393 cursor.execute(sql, args)
394 countkey = (topic, typename, typehash)
395 if self.
_counts.get(countkey, self)
is not None:
401 """Opens the bag file if not already open."""
406 """Closes the bag file."""
416 """Returns whether file is closed."""
422 """Returns the list of topics in bag, in alphabetic order."""
423 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
428 """Returns bag file path."""
434 """Returns current file size in bytes (including journaling files)."""
436 for suffix
in (
"-journal",
"-wal")
if result
else ():
438 result += os.path.getsize(path)
if os.path.isfile(path)
else 0
444 """Returns file open mode."""
449 """Returns whether bag contains given topic."""
450 return any(key == t
for t, _, _
in self.
_topics)
454 """Retrieves next message from bag as (topic, message, timestamp)."""
455 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
461 """Opens bag database if not open, populates schema if specified."""
465 self.
_db = sqlite3.connect(self.
_filename, detect_types=sqlite3.PARSE_DECLTYPES,
466 isolation_level=
None, check_same_thread=
False)
467 self.
_db.row_factory =
lambda cursor, row: dict(sqlite3.Row(cursor, row))
473 """Populates local topic struct from database, if not already available."""
475 for row
in self.
_db.execute(
"SELECT * FROM topics ORDER BY id"):
476 topickey = (topic, typename) = row[
"name"],
canonical(row[
"type"])
477 self.
_topics[(topic, typename)] = row
478 self.
_counts[(topic, typename,
None)] =
None
482 """Populates local counts values from database, if not already available."""
483 if not self.
_db or all(v
is not None for v
in self.
_counts.values()) \
486 topickeys = {self.
_topics[(t, n)][
"id"]: (t, n, h)
for (t, n, h)
in self.
_counts}
488 for row
in self.
_db.execute(
"SELECT topic_id, COUNT(*) AS count FROM messages "
489 "GROUP BY topic_id").fetchall():
490 if row[
"topic_id"]
in topickeys:
491 self.
_counts[topickeys[row[
"topic_id"]]] = row[
"count"]
496 Populates local type definitions and classes from database, if not already available.
498 @param topics selected topics to ensure types for, if not all
500 if not self.
_db or (
not topics
and topics
is not None)
or not self.
_has_table(
"topics") \
501 or not any(h
is None for t, _, h
in self.
_counts if topics
is None or t
in topics):
504 for countkey, count
in list(self.
_counts.items()):
505 (topic, typename, typehash) = countkey
506 if typehash
is None and (topics
is None or topic
in topics):
509 self.
_counts[(topic, typename, typehash)] = count
513 """Returns whether specified table exists in database."""
514 sql =
"SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?"
515 return bool(self.
_db.execute(sql, (
"table", name)).fetchone())
521 """Initializes a ROS2 node if not already initialized."""
522 global node, context, executor
527 while context
and context.ok():
528 executor.spin_once(timeout_sec=1)
530 context = rclpy.Context()
531 try: rclpy.init(context=context)
532 except Exception:
pass
533 node_name =
"%s_%s_%s" % (name, os.getpid(), int(time.time() * 1000))
534 node = rclpy.create_node(node_name, context=context, use_global_arguments=
False,
535 enable_rosout=
False, start_parameter_services=
False)
536 executor = rclpy.executors.MultiThreadedExecutor(context=context)
537 executor.add_node(node)
538 spinner = threading.Thread(target=spin_loop)
539 spinner.daemon =
True
544 """Shuts down live ROS2 node."""
545 global node, context, executor
547 context_, executor_ = context, executor
548 context = executor = node =
None
555 Returns whether ROS2 environment is set, prints error if not.
557 @param live whether environment must support launching a ROS node
559 missing = [k
for k
in [
"ROS_VERSION"]
if not os.getenv(k)]
561 ConsolePrinter.error(
"ROS environment not sourced: missing %s.",
562 ", ".join(sorted(missing)))
563 if "2" != os.getenv(
"ROS_VERSION",
"2"):
564 ConsolePrinter.error(
"ROS environment not supported: need ROS_VERSION=2.")
572 Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
574 Converts DDS types like "octet" to "byte", and "sequence<uint8, 100>" to "uint8[100]".
576 @param unbounded drop constraints like array and string bounds,
577 e.g. returning "uint8[]" for "uint8[10]" and "string" for "string<=8"
579 if not typename:
return typename
580 is_array, bound, dimension =
False,
"",
""
583 match = re.match(
"sequence<(.+)>", typename)
586 typename = match.group(1)
587 match = re.match(
r"([^,]+)?,\s?(\d+)", typename)
589 typename = match.group(1)
590 if match.lastindex > 1: dimension = match.group(2)
592 match = re.match(
"(w?string)<(.+)>", typename)
594 typename, bound = match.groups()
597 dimension = typename[typename.index(
"[") + 1:typename.index(
"]")]
598 typename, is_array = typename[:typename.index(
"[")],
True
601 typename, bound = typename.split(
"<=")
603 if typename.count(
"/") > 1:
604 typename =
"%s/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
606 if unbounded: suffix =
"[]" if is_array
else ""
607 else: suffix = (
"<=%s" % bound
if bound
else "") + (
"[%s]" % dimension
if is_array
else "")
608 return DDS_TYPES.get(typename, typename) + suffix
612 """Returns an rclpy.Publisher instance, with .get_num_connections() and .unregister()."""
613 cls = cls_or_typename
615 qos = rclpy.qos.QoSProfile(depth=queue_size)
616 pub = node.create_publisher(cls, topic, qos)
617 pub.get_num_connections = pub.get_subscription_count
618 pub.unregister = pub.destroy
624 Returns an rclpy.Subscription.
626 Supplemented with .get_message_class(), .get_message_definition(),
627 .get_message_type_hash(), .get_qoses(), and.unregister().
629 cls = typename = cls_or_typename
633 qos = rclpy.qos.QoSProfile(depth=queue_size)
634 qoses = [x.qos_profile
for x
in node.get_publishers_info_by_topic(topic)
636 rels, durs = zip(*[(x.reliability, x.durability)
for x
in qoses])
if qoses
else ([], [])
638 if rels: qos.reliability = max(rels)
639 if durs: qos.durability = max(durs)
642 sub = node.create_subscription(cls, topic, handler, qos)
643 sub.get_message_class =
lambda: cls
646 sub.get_qoses =
lambda: qosdicts
647 sub.unregister = sub.destroy
653 Returns a message attribute value as string.
655 Result is at least 10 chars wide if message is a ROS2 time/duration
656 (aligning seconds and nanoseconds).
658 LENS = {
"sec": 13,
"nanosec": 9}
660 if not isinstance(msg, tuple(ROS_TIME_CLASSES))
or name
not in LENS:
663 EXTRA = sum(v.count(x) * len(x)
for x
in (MatchMarkers.START, MatchMarkers.END))
664 return (
"%%%ds" % (LENS[name] + EXTRA)) % v
669 """Returns ROS2 message class, or None if unknown type."""
671 except Exception:
return None
676 Returns ROS2 message type definition full text, including subtype definitions.
678 Returns None if unknown type.
680 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
685 """Returns ROS2 message type MD5 hash, or "" if unknown type."""
686 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
692 """Returns ROS2 message type definition full text, or None on error (internal caching method)."""
694 texts, pkg = collections.OrderedDict(), typename.rsplit(
"/", 1)[0]
696 typepath = rosidl_runtime_py.get_interface_path(
make_full_typename(typename) +
".msg")
697 with open(typepath)
as f:
698 texts[typename] = f.read()
701 for line
in texts[typename].splitlines():
702 if not line
or not line[0].isalpha():
704 linetype =
scalar(
canonical(re.sub(
r"^([a-zA-Z][^\s]+)(.+)",
r"\1", line)))
705 if linetype
in api.ROS_BUILTIN_TYPES:
707 linetype = linetype
if "/" in linetype
else "std_msgs/Header" \
708 if "Header" == linetype
else "%s/%s" % (pkg, linetype)
710 if linedef: texts[linetype] = linedef
712 basedef = texts.pop(next(iter(texts)))
713 subdefs = [
"%s\nMSG: %s\n%s" % (
"=" * 80, k, v)
for k, v
in texts.items()]
714 return basedef + (
"\n" if subdefs
else "") +
"\n".join(subdefs)
715 except Exception
as e:
716 ConsolePrinter.warn(
"Error collecting type definition of %s: %s", typename, e)
723 Returns ROS2 message type definition parsed from IDL file.
728 def format_comment(text):
729 """Returns annotation text formatted with comment prefixes and escapes."""
730 ESCAPES = {
"\n":
"\\n",
"\t":
"\\t",
"\x07":
"\\a",
731 "\x08":
"\\b",
"\x0b":
"\\v",
"\x0c":
"\\f"}
732 repl =
lambda m: ESCAPES[m.group(0)]
733 return "#" +
"\n#".join(re.sub(
"|".join(map(re.escape, ESCAPES)), repl, l)
734 for l
in text.split(
"\\n"))
736 def format_type(typeobj, msgpackage, constant=False):
737 """Returns canonical type name, like "uint8" or "string<=5" or "nav_msgs/Path"."""
739 if isinstance(typeobj, rosidl_parser.definition.AbstractNestedType):
741 valuetype = format_type(typeobj.value_type, msgpackage, constant)
742 size, bounding =
"",
""
743 if isinstance(typeobj, rosidl_parser.definition.Array):
745 elif typeobj.has_maximum_size():
746 size = typeobj.maximum_size
747 if isinstance(typeobj, rosidl_parser.definition.BoundedSequence):
749 result =
"%s[%s%s]" % (valuetype, bounding, size)
750 elif isinstance(typeobj, rosidl_parser.definition.AbstractWString):
752 elif isinstance(typeobj, rosidl_parser.definition.AbstractString):
754 elif isinstance(typeobj, rosidl_parser.definition.NamespacedType):
755 nameparts = typeobj.namespaced_name()
757 if nameparts[0].value == msgpackage
or "std_msgs/Header" == result:
758 result =
canonical(
"/".join(nameparts[-1:]))
760 result = DDS_TYPES.get(typeobj.typename, typeobj.typename)
762 if isinstance(typeobj, rosidl_parser.definition.AbstractGenericString) \
763 and typeobj.has_maximum_size()
and not constant:
764 result +=
"<=%s" % typeobj.maximum_size
768 def get_comments(obj):
769 """Returns all comments for annotatable object, as [text, ]."""
770 return [v.get(
"text",
"")
for v
in obj.get_annotation_values(
"verbatim")
771 if "comment" == v.get(
"language")]
773 typepath = rosidl_runtime_py.get_interface_path(
make_full_typename(typename) +
".idl")
774 with open(typepath)
as f:
775 idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
776 msgidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Message)[0]
777 package = msgidl.structure.namespaced_type.namespaces[0]
778 DUMMY = rosidl_parser.definition.EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME
782 lines.extend(map(format_comment, get_comments(msgidl.structure)))
784 if lines
and msgidl.constants: lines.append(
"")
786 for c
in msgidl.constants:
787 ctype = format_type(c.type, package, constant=
True)
788 lines.extend(map(format_comment, get_comments(c)))
789 lines.append(
"%s %s=%s" % (ctype, c.name, c.value))
791 if not (len(msgidl.structure.members) == 1
and DUMMY == msgidl.structure[0].name):
793 if msgidl.constants
and msgidl.structure.members: lines.append(
"")
795 for m
in msgidl.structure.members:
796 lines.extend(map(format_comment, get_comments(m)))
797 lines.append(
"%s %s" % (format_type(m.type, package), m.name))
798 return "\n".join(lines)
803 """Returns ROS2 message type MD5 hash (internal caching method)."""
805 return "" if msgdef
is None else api.calculate_definition_hash(typename, msgdef)
809 """Returns OrderedDict({field name: field type name}) if ROS2 message, else {}."""
811 fields = ((k,
canonical(v))
for k, v
in val.get_fields_and_field_types().items())
812 return collections.OrderedDict(fields)
816 """Returns ROS2 message type name, like "std_msgs/Header"."""
817 cls = msg_or_cls
if inspect.isclass(msg_or_cls)
else type(msg_or_cls)
818 return canonical(
"%s/%s" % (cls.__module__.split(
".")[0], cls.__name__))
822 """Returns object attribute value, with numeric arrays converted to lists."""
823 v, scalartype = getattr(msg, name),
scalar(typename)
824 if isinstance(v, (bytes, array.array)): v = list(v)
825 elif numpy
and isinstance(v, (numpy.generic, numpy.ndarray)):
827 if v
and isinstance(v, (list, tuple))
and scalartype
in (
"byte",
"uint8"):
828 if isinstance(v[0], bytes):
829 v = list(map(ord, v))
830 elif scalartype == typename:
832 return list(v)
if isinstance(v, tuple)
else v
837 Returns current ROS2 time, as rclpy.time.Time.
839 @param fallback use wall time if node not initialized
841 try:
return node.get_clock().now()
843 if fallback:
return make_time(time.time())
849 Returns currently available ROS2 topics, as [(topicname, typename)].
851 Omits topics that the current ROS2 node itself has published.
854 myname, myns = node.get_name(), node.get_namespace()
856 for topic, typenames
in node.get_publisher_names_and_types_by_node(myname, myns):
857 mytypes.setdefault(topic, []).extend(typenames)
858 for t
in (
"/parameter_events",
"/rosout"):
860 for topic, typenames
in node.get_topic_names_and_types():
861 for typename
in typenames:
862 if topic
not in mytypes
or typename
not in mytypes[topic]:
869 Returns whether value is a ROS2 message or special like ROS2 time/duration class or instance.
871 @param ignore_time whether to ignore ROS2 time/duration types
873 is_message = rosidl_runtime_py.utilities.is_message(val)
874 if is_message
and ignore_time: is_message =
not is_ros_time(val)
879 """Returns whether value is a ROS2 time/duration class or instance."""
880 if inspect.isclass(val):
return issubclass(val, tuple(ROS_TIME_CLASSES))
881 return isinstance(val, tuple(ROS_TIME_CLASSES))
885 """Returns an rclpy.duration.Duration."""
886 return rclpy.duration.Duration(seconds=secs, nanoseconds=nsecs)
890 """Returns a ROS2 time, as rclpy.time.Time."""
891 return rclpy.time.Time(seconds=secs, nanoseconds=nsecs)
895 """Returns "pkg/msg/Type" for "pkg/Type"."""
896 if "/msg/" in typename
or "/" not in typename:
898 return "%s/msg/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
903 Returns rclpy.qos.QoSProfile that matches the most permissive publisher.
905 @param queue_size QoSProfile.depth
907 qos = rclpy.qos.QoSProfile(depth=queue_size)
908 infos = node.get_publishers_info_by_topic(topic)
909 rels, durs = zip(*[(x.qos_profile.reliability, x.qos_profile.durability)
910 for x
in infos
if canonical(x.topic_type) == typename])
912 if rels: qos.reliability = max(rels)
913 if durs: qos.durability = max(durs)
918 """Returns rclpy.qos.QoSProfile as dictionary."""
921 QOS_TYPES = (bool, int, enum.Enum) + tuple(ROS_TIME_CLASSES)
922 for name
in (n
for n
in dir(qos)
if not n.startswith(
"_")):
923 val = getattr(qos, name)
924 if name.startswith(
"_")
or not isinstance(val, QOS_TYPES):
926 if isinstance(val, enum.Enum):
928 elif isinstance(val, tuple(ROS_TIME_CLASSES)):
935 """Returns ROS2 message as a serialized binary."""
936 with api.TypeMeta.make(msg)
as m:
937 if m.data
is not None:
return m.data
938 return rclpy.serialization.serialize_message(msg)
942 """Returns ROS2 message or service request/response instantiated from serialized binary."""
943 cls = cls_or_typename
945 return rclpy.serialization.deserialize_message(raw, cls)
951 Returns unbounded scalar type from ROS2 message data type
953 Like "uint8" from "uint8[]", or "string" from "string<=10[<=5]".
954 Returns type unchanged if not a collection or bounded type.
956 if typename
and "[" in typename: typename = typename[:typename.index(
"[")]
957 if typename
and "<=" in typename: typename = typename[:typename.index(
"<=")]
962 """Sets message or object attribute value."""
965 fieldmap = obj.get_fields_and_field_types()
967 name = obj.__slots__[list(fieldmap).index(name)]
968 setattr(obj, name, value)
973 Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
975 @param val ROS2 time/duration object from `rclpy` or `builtin_interfaces`
976 @param to_message whether to convert from `rclpy` to `builtin_interfaces` or vice versa
977 @param clock_type ClockType for converting to `rclpy.Time`, defaults to `ROS_TIME`
978 @return value converted to appropriate type, or original value if not convertible
980 to_message, clock_type = bool(to_message), (clock_type
or rclpy.clock.ClockType.ROS_TIME)
981 if isinstance(val, tuple(ROS_TIME_CLASSES)):
982 rcl_cls = next(k
for k, v
in ROS_TIME_MESSAGES.items()
if isinstance(val, (k, v)))
983 is_rcl = isinstance(val, tuple(ROS_TIME_MESSAGES))
984 name =
"to_msg" if to_message
and is_rcl
else "from_msg" if to_message == is_rcl
else None
985 args = [val] + ([clock_type]
if rcl_cls
is rclpy.time.Time
and "from_msg" == name
else [])
986 return getattr(rcl_cls, name)(*args)
if name
else val
992 Returns value as ROS2 duration if convertible (int/float/time/datetime/decimal), else value.
994 Convertible types: int/float/time/datetime/decimal/builtin_interfaces.Time.
997 if isinstance(val, decimal.Decimal):
999 elif isinstance(val, datetime.datetime):
1000 result =
make_duration(int(val.timestamp()), 1000 * val.microsecond)
1001 elif isinstance(val, (float, int)):
1003 elif isinstance(val, rclpy.time.Time):
1005 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1011 """Returns value in nanoseconds if value is ROS2 time/duration, else value."""
1012 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1014 if hasattr(val,
"nanoseconds"):
1015 return val.nanoseconds
1016 return val.sec * 10**9 + val.nanosec
1020 """Returns value in seconds if value is ROS2 time/duration, else value."""
1021 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1023 if hasattr(val,
"nanoseconds"):
1024 secs, nsecs = divmod(val.nanoseconds, 10**9)
1025 return secs + nsecs / 1E9
1026 return val.sec + val.nanosec / 1E9
1030 """Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value."""
1031 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1033 if hasattr(val,
"seconds_nanoseconds"):
1034 return val.seconds_nanoseconds()
1035 if hasattr(val,
"nanoseconds"):
1036 return divmod(val.nanoseconds, 10**9)
1037 return (val.sec, val.nanosec)
1042 Returns value as ROS2 time if convertible, else value.
1044 Convertible types: int/float/duration/datetime/decimal/builtin_interfaces.Time.
1047 if isinstance(val, decimal.Decimal):
1048 result =
make_time(int(val), float(val % 1) * 10**9)
1049 elif isinstance(val, datetime.datetime):
1050 result =
make_time(int(val.timestamp()), 1000 * val.microsecond)
1051 elif isinstance(val, (float, int)):
1053 elif isinstance(val, rclpy.duration.Duration):
1054 result =
make_time(nsecs=val.nanoseconds)
1055 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1056 result =
make_time(val.sec, val.nanosec)
1061 "BAG_EXTENSIONS",
"DDS_TYPES",
"ROS_ALIAS_TYPES",
"ROS_TIME_CLASSES",
"ROS_TIME_MESSAGES",
1062 "ROS_TIME_TYPES",
"SKIP_EXTENSIONS",
"Bag",
"ROS2Bag",
"context",
"executor",
"node",
1063 "canonical",
"create_publisher",
"create_subscriber",
"deserialize_message",
1064 "format_message_value",
"get_message_class",
"get_message_definition",
1065 "get_message_definition_idl",
"get_message_fields",
"get_message_type",
1066 "get_message_type_hash",
"get_message_value",
"get_rostime",
"get_topic_types",
"init_node",
1067 "is_ros_message",
"is_ros_time",
"make_duration",
"make_full_typename",
"make_subscriber_qos",
1068 "make_time",
"qos_to_dict",
"scalar",
"serialize_message",
"set_message_value",
"shutdown_node",
1069 "time_message",
"to_duration",
"to_nsec",
"to_sec",
"to_sec_nsec",
"to_time",
"validate",