5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
35 from . api
import TypeMeta, calculate_definition_hash, parse_definition_subtypes
36 from . common
import ConsolePrinter, memoize
40 BAG_EXTENSIONS = (
".bag",
".bag.active")
43 SKIP_EXTENSIONS = (
".bag.orig.active", )
46 ROS_TIME_TYPES = [
"time",
"duration"]
49 ROS_TIME_CLASSES = {rospy.Time:
"time", rospy.Duration:
"duration"}
52 ROS_ALIAS_TYPES = {
"byte":
"int8",
"char":
"uint8"}
64 genpy_mtx = threading.RLock()
69 ROS1 bag reader and writer.
71 Extends `rosbag.Bag` with more conveniences, smooths over the rosbag bug of ignoring
72 topic and time filters in format v1.2, and smooths over the rosbag bug
73 of yielding messages of wrong type, if message types in different topics
74 have different packages but identical fields and hashes.
76 Does **not** smooth over the rosbag bug of writing different types to one topic.
78 rosbag does allow writing messages of different types to one topic,
79 just like live ROS topics can have multiple message types published
80 to one topic. And their serialized bytes will actually be in the bag,
81 but rosbag will only register the first type for this topic (unless it is
82 explicitly given another connection header with metadata on the other type).
84 All messages yielded will be deserialized by rosbag as that first type,
85 and whether reading will raise an exception or not depends on whether
86 the other type has enough bytes to be deserialized as that first type.
101 @param f bag file path, or a stream object
102 @param mode mode to open bag in, defaults to "r" (read mode)
103 @param reindex if true and bag is unindexed, make a copy
104 of the file (unless unindexed format) and reindex original
105 @param progress show progress bar with reindexing status
106 @param kwargs additional keyword arguments for `rosbag.Bag`, like `compression`
110 f, args = (args[0]
if args
else kwargs.pop(
"f")), args[1:]
111 mode, args = (args[0]
if args
else kwargs.pop(
"mode",
"r")), args[1:]
112 if mode
not in self.
MODES:
raise ValueError(
"invalid mode %r" % mode)
114 kwargs.setdefault(
"skip_index",
True)
115 reindex, progress = (kwargs.pop(k,
False)
for k
in (
"reindex",
"progress"))
116 getargspec = getattr(inspect,
"getfullargspec", getattr(inspect,
"getargspec",
None))
117 for n
in set(kwargs) - set(getargspec(rosbag.Bag.__init__).args): kwargs.pop(n)
119 if common.is_stream(f):
120 if not common.verify_io(f, mode):
121 raise io.UnsupportedOperation({
"r":
"read",
"w":
"write",
"a":
"append"}[mode])
122 super(ROS1Bag, self).
__init__(f, mode, *args, **kwargs)
127 if "a" == mode
and (
not os.path.exists(f)
or not os.path.getsize(f)):
129 os.path.exists(f)
and os.remove(f)
132 super(ROS1Bag, self).
__init__(f, mode, *args, **kwargs)
133 except rosbag.ROSBagUnindexedException:
134 if not reindex:
raise
135 Bag.reindex_file(f, progress, *args, **kwargs)
136 super(ROS1Bag, self).
__init__(f, mode, *args, **kwargs)
141 """Returns ROS1 message type definition full text from bag, including subtype definitions."""
143 return self.
__TYPEDEFS.get((msg_or_type._type, msg_or_type._md5sum))
144 typename = msg_or_type
145 return next((d
for (n, h), d
in self.
__TYPEDEFS.items()
if n == typename),
None)
150 Returns rospy message class for typename, or None if unknown type.
152 Generates class dynamically if not already generated.
154 @param typehash message type definition hash, if any
157 typehash = typehash
or next((h
for n, h
in self.
__TYPEDEFS if n == typename),
None)
158 typekey = (typename, typehash)
161 self.
__TYPES[(n, c._md5sum)] = c
162 return self.
__TYPES.get(typekey)
166 """Returns ROS1 message type MD5 hash, or None if unknown type."""
168 typename = msg_or_type
169 typehash =
next((h
for n, h
in self.
__TYPEDEFS if n == typename),
None)
172 typehash =
next((h
for n, h
in self.
__TYPEDEFS if n == typename),
None)
177 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
179 except Exception:
return None
183 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
185 except Exception:
return None
189 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
194 raw=False, connection_filter=None, **__):
196 Yields messages from the bag, optionally filtered by topic, timestamp and connection details.
198 @param topics list of topics or a single topic.
199 If an empty list is given, all topics will be read.
200 @param start_time earliest timestamp of messages to return,
201 as ROS time or convertible (int/float/duration/datetime/decimal)
202 @param end_time latest timestamp of messages to return,
203 as ROS time or convertible (int/float/duration/datetime/decimal)
204 @param connection_filter function to filter connections to include
205 @param raw if true, then returned messages are tuples of
206 (typename, bytes, typehash, typeclass)
207 or (typename, bytes, typehash, position, typeclass),
208 depending on file format
209 @return BagMessage namedtuples of
210 (topic, message, timestamp as rospy.Time)
212 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
213 if "w" == self._mode:
raise io.UnsupportedOperation(
"read")
216 for n, h
in self.
__TYPEDEFS: hashtypes.setdefault(h, []).append(n)
217 read_topics = topics
if isinstance(topics, (dict, list, set, tuple))
else \
218 [topics]
if topics
else None
219 dupes = {t: (n, h)
for t, n, h
in self.
__topics
220 if (read_topics
is None or t
in read_topics)
and len(hashtypes.get(h, [])) > 1}
223 if self.version != 102
or (
not topics
and start_time
is None and end_time
is None):
224 in_range =
lambda *_:
True
225 else: in_range =
lambda t, s: ((
not read_topics
or t
in read_topics)
and
226 (start_time
is None or s >= start_time)
and
227 (end_time
is None or s <= end_time))
229 start_time, end_time = map(to_time, (start_time, end_time))
230 kwargs = dict(topics=topics, start_time=start_time, end_time=end_time,
231 connection_filter=connection_filter, raw=raw)
233 for topic, msg, stamp
in super(ROS1Bag, self).
read_messages(**kwargs):
234 if in_range(topic, stamp):
235 TypeMeta.make(msg, topic, self)
240 for topic, msg, stamp
in super(ROS1Bag, self).
read_messages(**kwargs):
241 if not in_range(topic, stamp):
continue
245 typename, typehash = (msg[0], msg[2])
if raw
else (msg._type, msg._md5sum)
246 if dupes[topic] != (typename, typehash):
251 TypeMeta.make(msg, topic, self)
256 def write(self, topic, msg, t=None, raw=False, connection_header=None, **__):
258 Writes a message to the bag.
260 Populates connection header if topic already in bag but with a different message type.
262 @param topic name of topic
263 @param msg ROS1 message
264 @param t message timestamp if not using current wall time,
265 as ROS time or convertible (int/float/duration/datetime/decimal)
266 @param raw if true, `msg` is in raw format,
267 (typename, bytes, typehash, typeclass)
268 @param connection_header custom connection record for topic,
269 as {"topic", "type", "md5sum", "message_definition"}
271 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
272 if "r" == self._mode:
raise io.UnsupportedOperation(
"write")
274 return super(ROS1Bag, self).
write(topic, msg,
to_time(t), raw, connection_header)
278 """Opens the bag file if not already open."""
280 self._open(self.
filename, self.
mode, allow_unindexed=
True)
284 """Closes the bag file."""
286 super(ROS1Bag, self).
close()
292 """Returns whether bag contains given topic."""
293 return any(key == t
for t, _, _
in self.
__topics)
297 """Retrieves next message from bag as (topic, message, timestamp)."""
298 if self.
closed:
raise ValueError(
"I/O operation on closed file.")
305 """Returns the list of topics in bag, in alphabetic order."""
306 return sorted((t
for t, _, _
in self.
__topics), key=str.lower)
311 """Returns whether file is closed."""
312 return not self._file
316 """Returns message converted to given type; fields must match."""
320 v1 = v2 = getattr(msg, fname)
321 if ftypename != fields2.get(fname, ftypename):
323 setattr(msg2, fname, v2)
328 """Populates topics and message type definitions and hashes."""
329 result = collections.Counter()
330 counts = collections.Counter()
331 for c
in self._chunks:
332 for c_id, count
in c.connection_counts.items():
333 counts[c_id] += count
334 for c
in self._connections.values():
335 result[(c.topic, c.datatype, c.md5sum)] += counts[c.id]
336 self.
__TYPEDEFS[(c.datatype, c.md5sum)] = c.msg_def
341 """Registers message in local metadata, writes connection header if new type for topic."""
342 if raw: typename, typehash, typeclass = msg[0], msg[2], msg[3]
343 else: typename, typehash, typeclass = msg._type, msg._md5sum, type(msg)
344 topickey, typekey = (topic, typename, typehash), (typename, typehash)
347 and any(topic == t
and typekey != (n, h)
for t, n, h
in self.
__topics):
349 if not connection_header:
350 connection_header = {
"topic": topic,
"type": typename,
"md5sum": typehash,
351 "message_definition": typeclass._full_text}
352 conn_id = len(self._connections)
353 connection_info = rosbag.bag._ConnectionInfo(conn_id, topic, connection_header)
354 self._write_connection_record(connection_info, encrypt=
False)
355 self._connections[conn_id] = self._topic_connections[topic] = connection_info
357 self.
__TYPES.setdefault(typekey, typeclass)
358 self.
__TYPEDEFS.setdefault(typekey, typeclass._full_text)
363 """Parses subtype definition from any full definition where available, if not loaded."""
364 typehash = typehash
or next((h
for n, h
in self.
__TYPEDEFS if n == typename),
None)
365 typekey = (typename, typehash)
367 for (roottype, roothash), rootdef
in list(self.
__TYPEDEFS.items()):
368 rootkey = (roottype, roothash)
373 self.
__TYPEDEFS.update(((n, subhashes[n]), d)
for n, d
in subdefs)
374 self.
__PARSEDS.update(((n, h),
True)
for n, h
in subhashes.items())
384 Reindexes bag, making a backup copy in file directory.
386 @param progress show progress bar for reindexing status
388 KWS = [
"mode",
"compression",
"chunk_threshold",
389 "allow_unindexed",
"options",
"skip_index"]
390 kwargs.update(zip(KWS, args), allow_unindexed=
True)
391 copied, bar, f2 =
False,
None,
None
393 fmt =
lambda s: common.format_bytes(s, strip=
False)
394 name, size = os.path.basename(f), os.path.getsize(f)
395 aftertemplate =
" Reindexing %s (%s): {afterword}" % (name, fmt(size))
398 ConsolePrinter.warn(
"Unindexed bag %s, reindexing.", f)
399 bar
and bar.update(0).start()
402 inplace = (inbag.version > 102)
404 f2 =
"%s.orig%s" % os.path.splitext(f)
408 inbag, outbag =
None,
None
409 outkwargs = dict(kwargs, mode=
"a" if inplace
else "w", allow_unindexed=
True)
411 inbag =
rosbag.Bag(f2, **dict(kwargs, mode=
"r"))
413 Bag._run_reindex(inbag, outbag, bar)
414 bar
and bar.update(bar.max)
415 except BaseException:
416 inbag
and inbag.close()
417 outbag
and outbag.close()
418 copied
and shutil.move(f2, f)
420 finally: bar
and bar.update(bar.value, flush=
True).stop()
427 """Runs reindexing, showing optional progress bar."""
428 update_bar = noop =
lambda s:
None
429 indexbag, writebag = (inbag, outbag)
if inbag.version == 102
else (outbag,
None)
431 fmt =
lambda s: common.format_bytes(s, strip=
False)
432 update_bar =
lambda s: (setattr(bar,
"afterword", fmt(s)),
433 setattr(bar,
"pulse",
False), bar.update(s).stop())
436 progress = update_bar
if not writebag
else noop
437 for offset
in indexbag.reindex():
442 progress = update_bar
if bar
else noop
443 for (topic, msg, t, header)
in indexbag.read_messages(return_connection_header=
True):
444 writebag.write(topic, msg, t, connection_header=header)
445 progress(indexbag._file.tell())
452 Initializes a ROS1 node if not already initialized.
454 Blocks until ROS master available.
461 master = rospy.client.get_master()
464 try: uri = master.getUri()[-1]
466 if available
is None:
467 ConsolePrinter.log(logging.ERROR,
468 "Unable to register with master. Will keep trying.")
470 time.sleep(SLEEP_INTERVAL)
472 ConsolePrinter.debug(
"Connected to ROS master at %s.", uri)
474 try: rospy.get_rostime()
476 rospy.init_node(name, anonymous=
True, disable_signals=
True)
480 """Shuts down live ROS1 node."""
484 rospy.signal_shutdown(
"Close grepros")
489 Returns whether ROS1 environment is set, prints error if not.
491 @param live whether environment must support launching a ROS node
493 VARS = [
"ROS_MASTER_URI",
"ROS_ROOT",
"ROS_VERSION"]
if live
else [
"ROS_VERSION"]
494 missing = [k
for k
in VARS
if not os.getenv(k)]
496 ConsolePrinter.error(
"ROS environment not sourced: missing %s.",
497 ", ".join(sorted(missing)))
498 if "1" != os.getenv(
"ROS_VERSION",
"1"):
499 ConsolePrinter.error(
"ROS environment not supported: need ROS_VERSION=1.")
507 Returns "pkg/Type" for "pkg/subdir/Type".
509 @param unbounded drop array bounds, e.g. returning "uint8[]" for "uint8[10]"
511 if typename
and typename.count(
"/") > 1:
512 typename =
"%s/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
513 if unbounded
and typename
and "[" in typename:
514 typename = typename[:typename.index(
"[")] +
"[]"
519 """Returns a rospy.Publisher."""
520 def pub_unregister():
522 if not pub.get_num_connections(): super(rospy.Publisher, pub).unregister()
524 cls = cls_or_typename
526 pub = rospy.Publisher(topic, cls, queue_size=queue_size)
527 pub.unregister = pub_unregister
533 Returns a rospy.Subscriber.
535 Local message packages are not required. Subscribes as AnyMsg,
536 creates message class dynamically from connection info,
537 and deserializes message before providing to handler.
539 Supplemented with .get_message_class(), .get_message_definition(),
540 .get_message_type_hash(), and .get_qoses().
542 The supplementary .get_message_xyz() methods should only be invoked after at least one message
543 has been received from the topic, as they get populated from live connection metadata.
546 if msg._connection_header[
"type"] != typename:
548 typekey = (typename, msg._connection_header[
"md5sum"])
549 if typekey
not in TYPECLASSES:
550 typedef = msg._connection_header[
"message_definition"]
552 TYPECLASSES.setdefault((name, cls._md5sum), cls)
553 handler(TYPECLASSES[typekey]().deserialize(msg._buff))
555 sub = rospy.Subscriber(topic, rospy.AnyMsg, myhandler, queue_size=queue_size)
556 sub.get_message_class =
lambda: next(c
for (n, h), c
in TYPECLASSES.items()
559 for (n, h), c
in TYPECLASSES.items()
if n == typename)
560 sub.get_message_type_hash =
lambda: next(h
for n, h
in TYPECLASSES
if n == typename)
561 sub.get_qoses =
lambda:
None
567 Returns a message attribute value as string.
569 Result is at least 10 chars wide if message is a ROS time/duration
570 (aligning seconds and nanoseconds).
572 LENS = {
"secs": 10,
"nsecs": 9}
574 if not isinstance(msg, genpy.TVal)
or name
not in LENS:
577 EXTRA = sum(v.count(x) * len(x)
for x
in (common.MatchMarkers.START, common.MatchMarkers.END))
578 return (
"%%%ds" % (LENS[name] + EXTRA)) % v
584 Generates ROS message classes dynamically from given name and definition.
586 Modifies `sys.path` to include the generated Python module.
588 @param typename message type name like "std_msgs/String"
589 @param typedef message type definition full text, including all subtype definitions
590 @return dictionary of {typename: typeclass} for message type and all subtypes
592 with genpy_mtx:
return genpy.dynamic.generate_dynamic(typename, typedef)
597 """Returns ROS1 message class."""
598 if typename
in ROS_TIME_TYPES:
599 return next(k
for k, v
in ROS_TIME_CLASSES.items()
if v == typename)
600 if typename
in (
"genpy/Time",
"rospy/Time"):
602 if typename
in (
"genpy/Duration",
"rospy/Duration"):
603 return rospy.Duration
604 try:
return roslib.message.get_message_class(typename)
605 except Exception:
return None
610 Returns ROS1 message type definition full text, including subtype definitions.
612 Returns None if unknown type.
615 return getattr(msg_or_cls,
"_full_text",
None)
619 """Returns ROS message type MD5 hash, or "" if unknown type."""
621 return getattr(msg_or_cls,
"_md5sum",
"")
626 Returns OrderedDict({field name: field type name}) if ROS1 message, else {}.
628 @param val ROS1 message class or instance
630 names, types = (getattr(val, k, [])
for k
in (
"__slots__",
"_slot_types"))
632 if isinstance(val, genpy.TVal): names, types = genpy.TVal.__slots__, [
"uint32",
"uint32"]
633 return collections.OrderedDict(zip(names, types))
637 """Returns ROS1 message type name, like "std_msgs/Header"."""
639 cls = msg_or_cls
if inspect.isclass(msg_or_cls)
else type(msg_or_cls)
640 return "duration" if "duration" in cls.__name__.lower()
else "time"
641 return msg_or_cls._type
645 """Returns object attribute value, with numeric arrays converted to lists."""
646 v = getattr(msg, name)
647 listifiable = typename.startswith((
"uint8[",
"char["))
and isinstance(v, bytes)
648 if six.PY2
and listifiable:
649 listifiable = v[:1] !=
"[" or v[-1:] !=
"]" or common.MatchMarkers.START
not in v
650 return list(bytearray(v))
if listifiable
else v
651 return list(v)
if listifiable
or isinstance(v, tuple)
else v
656 Returns current ROS1 time, as rospy.Time.
658 @param fallback use wall time if node not initialized
660 try:
return rospy.get_rostime()
662 if fallback:
return make_time(time.time())
668 Returns currently available ROS1 topics, as [(topicname, typename)].
670 Omits topics that the current ROS1 node itself has published.
672 result, myname = [], rospy.get_name()
673 pubs, _, _ = master.getSystemState()[-1]
674 mypubs = [t
for t, nn
in pubs
if myname
in nn
and t
not in (
"/rosout",
"/rosout_agg")]
675 for topic, typename
in master.getTopicTypes()[-1]:
676 if topic
not in mypubs:
677 result.append((topic, typename))
683 Returns whether value is a ROS1 message or special like ROS1 time/duration class or instance.
685 @param ignore_time whether to ignore ROS1 time/duration types
687 isfunc = issubclass
if inspect.isclass(val)
else isinstance
688 return isfunc(val, genpy.Message
if ignore_time
else (genpy.Message, genpy.TVal))
692 """Returns whether value is a ROS1 time/duration class or instance."""
693 return issubclass(val, genpy.TVal)
if inspect.isclass(val)
else isinstance(val, genpy.TVal)
697 """Returns a ROS1 duration, as rospy.Duration."""
698 return rospy.Duration(secs=secs, nsecs=nsecs)
702 """Returns a ROS1 time, as rospy.Time."""
703 return rospy.Time(secs=secs, nsecs=nsecs)
707 """Returns ROS1 message as a serialized binary."""
708 with TypeMeta.make(msg)
as m:
709 if m.data
is not None:
return m.data
712 return buf.getvalue()
716 """Returns ROS1 message or service request/response instantiated from serialized binary."""
717 cls = cls_or_typename
719 return cls().deserialize(raw)
725 Returns scalar type from ROS message data type, like "uint8" from "uint8[100]".
727 Returns type unchanged if already a scalar.
729 return typename[:typename.index(
"[")]
if typename
and "[" in typename
else typename
733 """Sets message or object attribute value."""
734 setattr(obj, name, value)
738 """Returns value as ROS1 duration if convertible (int/float/time/datetime/decimal), else value."""
740 if isinstance(val, decimal.Decimal):
741 result = rospy.Duration(int(val), float(val % 1) * 10**9)
742 elif isinstance(val, datetime.datetime):
743 result = rospy.Duration(int(val.timestamp()), 1000 * val.microsecond)
744 elif isinstance(val, (float, int)):
745 result = rospy.Duration(val)
746 elif isinstance(val, rospy.Time):
747 result = rospy.Duration(val.secs, val.nsecs)
752 """Returns value in nanoseconds if value is ROS time/duration, else value."""
753 return val.to_nsec()
if isinstance(val, genpy.TVal)
else val
757 """Returns value in seconds if value is ROS1 time/duration, else value."""
758 return val.to_sec()
if isinstance(val, genpy.TVal)
else val
762 """Returns value as (seconds, nanoseconds) if value is ROS1 time/duration, else value."""
763 return (val.secs, val.nsecs)
if isinstance(val, genpy.TVal)
else val
767 """Returns value as ROS1 time if convertible (int/float/duration/datetime/decimal), else value."""
769 if isinstance(val, decimal.Decimal):
770 result = rospy.Time(int(val), float(val % 1) * 10**9)
771 elif isinstance(val, datetime.datetime):
772 result = rospy.Time(int(val.timestamp()), 1000 * val.microsecond)
773 elif isinstance(val, six.integer_types + (float, )):
774 result = rospy.Time(val)
775 elif isinstance(val, genpy.Duration):
776 result = rospy.Time(val.secs, val.nsecs)
781 "BAG_EXTENSIONS",
"ROS_ALIAS_TYPES",
"ROS_TIME_CLASSES",
"ROS_TIME_TYPES",
"SKIP_EXTENSIONS",
782 "SLEEP_INTERVAL",
"TYPECLASSES",
"Bag",
"ROS1Bag",
"master",
783 "canonical",
"create_publisher",
"create_subscriber",
"deserialize_message",
784 "format_message_value",
"generate_message_classes",
"get_message_class",
785 "get_message_definition",
"get_message_fields",
"get_message_type",
"get_message_type_hash",
786 "get_message_value",
"get_rostime",
"get_topic_types",
"init_node",
"is_ros_message",
787 "is_ros_time",
"make_duration",
"make_time",
"scalar",
"serialize_message",
"set_message_value",
788 "shutdown_node",
"to_duration",
"to_nsec",
"to_sec",
"to_sec_nsec",
"to_time",
"validate",