3 Input sources for ROS messages.
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
15 from __future__
import print_function
22 except ImportError:
import Queue
as queue
31 from . common
import ConsolePrinter, ensure_namespace, drop_zeros
35 """Message producer base class."""
41 MESSAGE_META_TEMPLATE =
"{topic} #{index} ({type} {dt} {stamp})"
44 DEFAULT_ARGS = dict(START_TIME=
None, END_TIME=
None, UNIQUE=
False, SELECT_FIELD=(),
45 NOSELECT_FIELD=(), NTH_MESSAGE=1, NTH_INTERVAL=0)
49 @param args arguments as namespace or dictionary, case-insensitive
50 @param args.start_time earliest timestamp of messages to read
51 @param args.end_time latest timestamp of messages to read
52 @param args.unique emit messages that are unique in topic
53 @param args.select_field message fields to use for uniqueness if not all
54 @param args.noselect_field message fields to skip for uniqueness
55 @param args.nth_message read every Nth message in topic
56 @param args.nth_interval minimum time interval between messages in topic
57 @param kwargs any and all arguments as keyword overrides, case-insensitive
62 self.
_topics = collections.defaultdict(list)
65 self.
_hashes = collections.defaultdict(set)
83 """Yields messages from source, as (topic, msg, ROS time)."""
87 """Context manager entry."""
90 def __exit__(self, exc_type, exc_value, traceback):
91 """Context manager exit, closes source."""
95 """Yields messages from source, as (topic, msg, ROS time)."""
98 """Attaches sink to source"""
102 """Returns whether source prerequisites are met (like ROS environment for TopicSource)."""
107 """Shuts down input, closing any files or connections."""
114 self.
bar.pulse_pos =
None
115 self.
bar.update(flush=
True).stop()
119 """Shuts down input batch if any (like bagfile), else all input."""
123 """Returns source metainfo string."""
127 """Returns message metainfo string."""
129 meta = {k:
"" if v
is None else v
for k, v
in meta.items()}
133 """Returns source batch identifier if any (like bagfile name if BagSource)."""
136 """Returns source metainfo data dict."""
140 """Returns message metainfo data dict."""
141 with api.TypeMeta.make(msg, topic)
as m:
142 return dict(topic=topic, type=m.typename, stamp=
drop_zeros(api.to_sec(stamp)),
143 index=index, dt=
drop_zeros(common.format_stamp(api.to_sec(stamp)),
" "),
144 hash=m.typehash, schema=m.definition)
147 """Returns message type class."""
148 return api.get_message_class(typename)
151 """Returns ROS message type definition full text, including subtype definitions."""
152 return api.get_message_definition(msg_or_type)
155 """Returns ROS message type MD5 hash."""
156 return api.get_message_type_hash(msg_or_type)
159 """Returns whether message passes source filters."""
160 if self.
args.START_TIME
and stamp < self.
args.START_TIME:
162 if self.
args.END_TIME
and stamp > self.
args.END_TIME:
164 if self.
args.UNIQUE
or self.
args.NTH_MESSAGE > 1
or self.
args.NTH_INTERVAL > 0:
165 topickey = api.TypeMeta.make(msg, topic).topickey
167 if self.
args.NTH_MESSAGE > 1
and last_accepted
and index
is not None:
168 if (index - 1) % self.
args.NTH_MESSAGE:
170 if self.
args.NTH_INTERVAL > 0
and last_accepted
and stamp
is not None:
171 if api.to_sec(stamp - last_accepted[1]) < self.
args.NTH_INTERVAL:
175 msghash = api.make_message_hash(msg, include, exclude)
176 if msghash
in self.
_hashes[topickey]:
178 self.
_hashes[topickey].add(msghash)
182 """Reports match status of last produced message."""
185 """Handles exception, used by background threads."""
186 ConsolePrinter.error(text)
189 """Parses pattern arguments into re.Patterns."""
190 selects, noselects = self.
args.SELECT_FIELD, self.
args.NOSELECT_FIELD
191 for key, vals
in [(
"select", selects), (
"noselect", noselects)]:
192 self.
_patterns[key] = [(tuple(v.split(
".")), common.wildcard_to_regex(v))
for v
in vals]
197 Provides topic conditions evaluation.
199 Evaluates a set of Python expressions, with a namespace of:
200 - msg: current message being checked
201 - topic: current topic being read
202 - <topic /any/name> messages in named or wildcarded topic
204 <topic ..> gets replaced with an object with the following behavior:
205 - len(obj) -> number of messages processed in topic
206 - bool(obj) -> whether there are any messages in topic
207 - obj[pos] -> topic message at position (from latest if negative, first if positive)
208 - obj.x -> attribute x of last message
210 All conditions need to evaluate as true for a message to be processable.
211 If a condition tries to access attributes of a message not yet present,
212 condition evaluates as false.
214 If a condition topic matches more than one real topic (by wildcard or by
215 different types in one topic), evaluation is done for each set of
216 topics separately, condition passing if any set passes.
218 Example condition: `<topic */control_enable>.data and <topic */cmd_vel>.linear.x > 0`
219 `and <topic */cmd_vel>.angular.z < 0.02`.
222 TOPIC_RGX = re.compile(
r"<topic\s+([^\s><]+)\s*>")
225 DEFAULT_ARGS = dict(CONDITION=())
232 Object for <topic x> replacements in condition expressions.
234 - len(topic) -> number of messages processed in topic
235 - bool(topic) -> whether there are any messages in topic
236 - topic[x] -> history at -1 -2 for last and but one, or 0 1 for first and second
237 - topic.x -> attribute x of last message
238 - value in topic -> whether any field of last message contains value
239 - value in topic[x] -> whether any field of topic history at position contains value
252 """Returns whether value exists in last message, or raises NoMessageException."""
257 """Returns message from history at key, or Empty() if no such message."""
262 """Returns attribute value of last message, or raises NoMessageException."""
264 return getattr(self.
_lasts[-1], name)
269 Object for current topic message in condition expressions.
271 - value in msg -> whether any message field contains value
272 - msg.x -> attribute x of message
280 """Returns whether value exists in any message field."""
282 self.
_fulltext =
"\n".join(
"%s" % (v, )
for _, v, _
in
283 api.iter_message_fields(self.
_msg, flat=
True))
284 value = item
if isinstance(item, six.text_type)
else \
285 item.decode()
if isinstance(item, six.binary_type)
else str(item)
286 return re.search(re.escape(value), self.
_fulltext, re.I)
289 """Returns attribute value of message."""
290 return getattr(self.
_msg, name)
294 """Placeholder falsy object that raises NoMessageException on attribute access."""
297 def __nonzero__(self):
return False
299 def __len__(self):
return 0
304 @param args arguments as namespace or dictionary, case-insensitive
305 @param args.condition Python expressions that must evaluate as true
306 for message to be processable, see ConditionMixin
307 @param kwargs any and all arguments as keyword overrides, case-insensitive
315 self.
_lastmsgs = collections.defaultdict(collections.deque)
324 """Returns whether message passes passes current state conditions, if any."""
328 for i, (expr, code)
in enumerate(self.
_conditions.items()):
331 realcarded = {wt: [(t, n, h)
for (t, n, h)
in self.
_lastmsgs if p.match(t)]
333 variants = [[(wt, (t, n, h))
for (t, n, h)
in tt]
or [(wt, (wt,
None))]
334 for wt, tt
in realcarded.items()]
335 variants = variants
or [[
None]]
338 for remaps
in itertools.product(*variants):
339 if remaps == (
None, ): remaps = ()
342 try: result = eval(code, ns)
344 except Exception
as e:
345 ConsolePrinter.error(
'Error evaluating condition "%s": %s', expr, e)
352 """Clears cached messages."""
357 """Returns whether there are any conditions configured."""
361 """Returns a list of all topics used in conditions (may contain wildcards)."""
366 Returns whether topic is used for checking condition.
368 @param pure whether use should be solely for condition, not for matching at all
373 wildcarded = [t
for t, p
in self.
_wildcard_topics.items()
if p.match(topic)]
374 if not wildcarded:
return False
375 return all(map(self.
_topic_states.get, wildcarded))
if pure
else True
378 """Sets whether topic is purely used for conditions not matching."""
383 """Retains message for condition evaluation if in condition topic."""
385 topickey = api.TypeMeta.make(msg, topic).topickey
394 Returns Topic() by name.
396 @param remap optional remap dictionary as {topic1: (topic2, typename, typehash)}
398 if remap
and topic
in remap:
399 topickey = remap[topic]
401 topickey = next(((t, n, h)
for (t, n, h)
in self.
_lastmsgs if t == topic),
None)
402 if topickey
not in self._counts:
405 return self.
Topic(c, f, l)
408 """Parses condition expressions and populates local structures."""
409 for v
in args.CONDITION:
410 topics = list(set(self.
TOPIC_RGX.findall(v)))
413 for t
in (t
for t
in topics
if "*" in t):
415 expr = self.
TOPIC_RGX.sub(
r'get_topic("\1")', v)
418 for v
in args.CONDITION:
419 indexexprs = re.findall(self.
TOPIC_RGX.pattern +
r"\s*\[([^\]]+)\]", v)
420 for topic, indexexpr
in indexexprs:
423 index = eval(indexexpr)
424 limits[index < 0] = max(limits[index < 0], abs(index) + (index >= 0))
425 except Exception:
continue
429 """Produces messages from ROS bagfiles."""
432 MESSAGE_META_TEMPLATE =
"{topic} {index}/{total} ({type} {dt} {stamp})"
435 META_TEMPLATE =
"\nFile {file} ({size}), {tcount} topics, {mcount:,d} messages\n" \
436 "File period {startdt} - {enddt}\n" \
437 "File span {delta} ({start} - {end})"
440 DEFAULT_ARGS = dict(BAG=(), FILE=(), PATH=(), RECURSE=
False, TOPIC=(), TYPE=(),
441 SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None, END_TIME=
None,
442 START_INDEX=
None, END_INDEX=
None, CONDITION=(), AFTER=0, ORDERBY=
None,
443 DECOMPRESS=
False, REINDEX=
False, WRITE=(), PROGRESS=
False,
448 @param args arguments as namespace or dictionary, case-insensitive;
449 or a single path as the ROS bagfile to read,
450 or a stream to read from,
451 or one or more {@link grepros.api.Bag Bag} instances
454 Bag-specific arguments:
455 @param args.file names of ROS bagfiles to read if not all in directory,
456 or a stream to read from;
457 or one or more {@link grepros.api.Bag Bag} instances
458 @param args.path paths to scan if not current directory
459 @param args.recurse recurse into subdirectories when looking for bagfiles
460 @param args.orderby "topic" or "type" if any to group results by
461 @param args.decompress decompress archived bags to file directory
462 @param args.reindex make a copy of unindexed bags and reindex them (ROS1 only)
463 @param args.write outputs, to skip in input files
464 @param args.bag one or more {@link grepros.api.Bag Bag} instances
468 @param args.topic ROS topics to read if not all
469 @param args.type ROS message types to read if not all
470 @param args.skip_topic ROS topics to skip
471 @param args.skip_type ROS message types to skip
472 @param args.start_time earliest timestamp of messages to read
473 @param args.end_time latest timestamp of messages to read
474 @param args.start_index message index within topic to start from
475 @param args.end_index message index within topic to stop at
476 @param args.unique emit messages that are unique in topic
477 @param args.select_field message fields to use for uniqueness if not all
478 @param args.noselect_field message fields to skip for uniqueness
479 @param args.nth_message read every Nth message in topic
480 @param args.nth_interval minimum time interval between messages in topic
481 @param args.condition Python expressions that must evaluate as true
482 for message to be processable, see ConditionMixin
483 @param args.progress whether to print progress bar
484 @param args.stop_on_error stop execution on any error like unknown message type
485 @param kwargs any and all arguments as keyword overrides, case-insensitive
488 is_bag = isinstance(args,
api.Bag)
or \
489 common.is_iterable(args)
and all(isinstance(x,
api.Bag)
for x
in args)
490 args = {
"FILE": str(args)}
if isinstance(args, common.PATH_TYPES)
else \
491 {
"FILE": args}
if common.is_stream(args)
else {}
if is_bag
else args
493 super(BagSource, self).
__init__(args)
494 ConditionMixin.__init__(self, args)
504 self.
_bag0 = ([args0]
if isinstance(args0,
api.Bag)
else args0)
if is_bag
else None
507 """Yields messages from ROS bagfiles, as (topic, msg, ROS time)."""
508 if not self.
validate():
raise Exception(
"invalid")
516 if "topic" == self.
args.ORDERBY:
517 topicsets = [{n: tt}
for n, tt
in sorted(self.
_topics.items())]
518 elif "type" == self.
args.ORDERBY:
520 for n, tt
in self.
_topics.items():
521 for t
in tt: typetopics.setdefault(t, []).append(n)
522 topicsets = [{n: [t]
for n
in nn}
for t, nn
in sorted(typetopics.items())]
526 for topics
in topicsets:
527 for topic, msg, stamp, index
in self.
_produce(topics)
if topics
else ():
539 """Returns whether ROS environment is set and arguments valid, prints error if not."""
542 if not self.
_bag0 and self.
args.FILE
and os.path.isfile(self.
args.FILE[0]) \
543 and not common.verify_io(self.
args.FILE[0],
"r"):
544 ConsolePrinter.error(
"File not readable.")
546 if not self.
_bag0 and common.is_stream(self.
args.FILE) \
547 and not any(c.STREAMABLE
for c
in api.Bag.READER_CLASSES):
548 ConsolePrinter.error(
"Bag format does not support reading streams.")
550 if self.
_bag0 and not any(x.mode
in (
"r",
"a")
for x
in self.
_bag0):
551 ConsolePrinter.error(
"Bag not in read mode.")
554 ConsolePrinter.error(
"Cannot use topics in conditions and bag order by %s.",
560 """Closes current bag, if any."""
563 ConditionMixin.close_batch(self)
564 super(BagSource, self).
close()
567 """Closes current bag, if any."""
572 self.
bar.update(flush=
True)
574 ConditionMixin.close_batch(self)
577 """Returns bagfile metainfo string."""
581 """Returns message metainfo string."""
583 meta = {k:
"" if v
is None else v
for k, v
in meta.items()}
587 """Returns name of current bagfile, or self if reading stream."""
591 """Returns bagfile metainfo data dict."""
592 if self.
_meta is not None:
594 mcount = self.
_bag.get_message_count()
595 start, end = (self.
_bag.get_start_time(), self.
_bag.get_end_time())
if mcount
else (
"",
"")
596 delta = common.format_timedelta(datetime.timedelta(seconds=(end
or 0) - (start
or 0)))
598 mcount=mcount, tcount=len(self.
topics), delta=delta,
600 startdt=
drop_zeros(common.format_stamp(start))
if start !=
"" else "",
601 enddt=
drop_zeros(common.format_stamp(end))
if end !=
"" else "")
605 """Returns message metainfo data dict."""
608 result.update(total=self.
topics[(topic, result[
"type"], result[
"hash"])])
609 if callable(getattr(self.
_bag,
"get_qoses",
None)):
610 result.update(qoses=self.
_bag.get_qoses(topic, result[
"type"]))
614 """Returns ROS message type class."""
616 api.get_message_class(typename)
619 """Returns ROS message type definition full text, including subtype definitions."""
621 api.get_message_definition(msg_or_type)
624 """Returns ROS message type MD5 hash."""
626 api.get_message_type_hash(msg_or_type)
629 """Reports match status of last produced message."""
635 """Returns whether message passes source filters."""
636 topickey = api.TypeMeta.make(msg, topic).topickey
637 if self.
args.START_INDEX
and index
is not None:
639 START = self.
args.START_INDEX
640 MIN = max(0, START + (self.
topics[topickey]
if START < 0
else 0))
643 if self.
args.END_INDEX
and index
is not None:
645 END = self.
args.END_INDEX
646 MAX = END + (self.
topics[topickey]
if END < 0
else 0)
649 if not super(BagSource, self).
is_processable(topic, msg, stamp, index):
651 return ConditionMixin.is_processable(self, topic, msg, stamp, index)
655 Yields messages from current ROS bagfile, as (topic, msg, ROS time, index in topic).
657 @param topics {topic: [typename, ]}
660 counts = collections.Counter()
661 for topic, msg, stamp
in self.
_bag.read_messages(list(topics), start_time):
664 typename = api.get_message_type(msg)
665 if topics
and typename
not in topics[topic]:
670 topickey = api.TypeMeta.make(msg, topic, self).topickey
671 counts[topickey] += 1; self.
_counts[topickey] += 1
673 if not self.
_sticky and counts[topickey] != self.
_counts[topickey]:
677 self.
bar and self.
bar.update(value=sum(self.
_counts.values()))
678 yield topic, msg, stamp, self.
_counts[topickey]
680 if self.
args.NTH_MESSAGE > 1
or self.
args.NTH_INTERVAL > 0:
684 and (len(self.
_topics) > 1
or len(next(iter(self.
_topics.values()))) > 1):
687 continue_from = stamp + api.make_duration(nsecs=1)
688 for entry
in self.
_produce({topic: typename}, continue_from):
695 """Yields Bag instances from configured arguments."""
697 for bag
in self.
_bag0:
702 names, paths = self.
args.FILE, self.
args.PATH
703 exts, skip_exts = api.BAG_EXTENSIONS, api.SKIP_EXTENSIONS
704 exts = list(exts) + [
"%s%s" % (a, b)
for a
in exts
for b
in common.Decompressor.EXTENSIONS]
707 for filename
in common.find_files(names, paths, exts, skip_exts, self.
args.RECURSE):
711 fullname = os.path.realpath(os.path.abspath(filename))
712 skip = common.Decompressor.make_decompressed_name(fullname)
in encountereds
713 encountereds.add(fullname)
718 encountereds.add(self.
_bag.filename)
722 """Initializes progress bar, if any, for current bag."""
723 if self.
args.PROGRESS
and not self.
bar:
726 self.
bar.afterword = os.path.basename(self.
_filename or "<stream>")
727 self.
bar.max = sum(sum(c
for (t, n, _), c
in self.
topics.items()
728 if c
and t == t_
and n
in nn)
729 for t_, nn
in self.
_topics.items())
730 self.
bar.update(value=0)
733 """Retrieves total message counts if not retrieved."""
735 has_ensure = common.has_arg(self.
_bag.get_topic_info,
"ensure_types")
736 kws = dict(ensure_types=
False)
if has_ensure
else {}
737 for (t, n, h), c
in self.
_bag.get_topic_info(**kws).items():
738 self.
topics[(t, n, h)] = c
742 """Opens bag and populates bag-specific argument state, returns success."""
753 if bag
is not None and bag.mode
not in (
"r",
"a"):
754 ConsolePrinter.warn(
"Cannot read %s: bag in write mode.", bag)
757 if filename
and self.
args.WRITE \
758 and any(os.path.realpath(x[0]) == os.path.realpath(filename)
759 for x
in self.
args.WRITE):
762 if filename
and common.Decompressor.is_compressed(filename):
763 if self.
args.DECOMPRESS:
764 filename = common.Decompressor.decompress(filename, self.
args.PROGRESS)
765 else:
raise Exception(
"decompression not enabled")
766 bag =
api.Bag(filename, mode=
"r", reindex=self.
args.REINDEX,
767 progress=self.
args.PROGRESS)
if bag
is None else bag
768 bag.stop_on_error = self.
args.STOP_ON_ERROR
770 except Exception
as e:
771 ConsolePrinter.error(
"\nError opening %r: %s", filename
or bag, e)
772 if self.
args.STOP_ON_ERROR:
raise
779 kws = dict(ensure_types=
False)
if common.has_arg(bag.get_topic_info,
"ensure_types")
else {}
780 for (t, n, h), c
in bag.get_topic_info(counts=
False, **kws).items():
781 dct.setdefault(t, []).append(n)
782 self.
topics[(t, n, h)] = c
787 dct = common.filter_dict(dct, self.
args.TOPIC, self.
args.TYPE)
788 dct = common.filter_dict(dct, self.
args.SKIP_TOPIC, self.
args.SKIP_TYPE, reverse=
True)
790 matches = [t
for p
in [common.wildcard_to_regex(topic, end=
True)]
for t
in fulldct
791 if t == topic
or "*" in topic
and p.match(t)]
792 for topic
in matches:
794 dct.setdefault(topic, fulldct[topic])
799 if args.START_TIME
is not None:
800 args.START_TIME = api.make_bag_time(args.START_TIME, bag)
801 if args.END_TIME
is not None:
802 args.END_TIME = api.make_bag_time(args.END_TIME, bag)
807 """Produces messages from live ROS topics."""
813 DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None,
814 END_TIME=
None, START_INDEX=
None, END_INDEX=
None, CONDITION=(),
815 QUEUE_SIZE_IN=10, ROS_TIME_IN=
False, PROGRESS=
False, STOP_ON_ERROR=
False)
819 @param args arguments as namespace or dictionary, case-insensitive
820 @param args.topic ROS topics to read if not all
821 @param args.type ROS message types to read if not all
822 @param args.skip_topic ROS topics to skip
823 @param args.skip_type ROS message types to skip
824 @param args.start_time earliest timestamp of messages to read
825 @param args.end_time latest timestamp of messages to read
826 @param args.start_index message index within topic to start from
827 @param args.end_index message index within topic to stop at
828 @param args.unique emit messages that are unique in topic
829 @param args.select_field message fields to use for uniqueness if not all
830 @param args.noselect_field message fields to skip for uniqueness
831 @param args.nth_message read every Nth message in topic
832 @param args.nth_interval minimum time interval between messages in topic
833 @param args.condition Python expressions that must evaluate as true
834 for message to be processable, see ConditionMixin
835 @param args.queue_size_in subscriber queue size (default 10)
836 @param args.ros_time_in stamp messages with ROS time instead of wall time
837 @param args.progress whether to print progress bar
838 @param args.stop_on_error stop execution on any error like unknown message type
839 @param kwargs any and all arguments as keyword overrides, case-insensitive
842 super(TopicSource, self).
__init__(args)
843 ConditionMixin.__init__(self, args)
851 """Yields messages from subscribed ROS topics, as (topic, msg, ROS time)."""
853 if not self.
validate():
raise Exception(
"invalid")
856 self.
_queue = queue.Queue()
865 topic, msg, stamp = self.
_queue.get()
869 topickey = api.TypeMeta.make(msg, topic, self).topickey
877 if self.
args.NTH_MESSAGE > 1
or self.
args.NTH_INTERVAL > 0:
883 """Attaches sink to source and blocks until connected to ROS live."""
884 if not self.
validate():
raise Exception(
"invalid")
885 super(TopicSource, self).
bind(sink)
889 """Returns whether ROS environment is set, prints error if not."""
894 """Shuts down subscribers and stops producing messages."""
896 for k
in list(self.
_subs):
897 self.
_subs.pop(k).unregister()
900 ConditionMixin.close_batch(self)
901 super(TopicSource, self).
close()
904 """Returns source metainfo data dict."""
905 ENV = {k: os.getenv(k)
for k
in (
"ROS_MASTER_URI",
"ROS_DOMAIN_ID")
if os.getenv(k)}
906 return dict(ENV, tcount=len(self.
topics))
909 """Returns message metainfo data dict."""
910 result = super(TopicSource, self).
get_message_meta(topic, msg, stamp, index)
911 topickey = (topic, result[
"type"], result[
"hash"])
912 if topickey
in self.
_subs:
913 result.update(qoses=self.
_subs[topickey].get_qoses())
917 """Returns message type class, from active subscription if available."""
918 sub = next((s
for (t, n, h), s
in self.
_subs.items()
919 if n == typename
and typehash
in (s.get_message_type_hash(),
None)),
None)
920 return sub
and sub.get_message_class()
or api.get_message_class(typename)
923 """Returns ROS message type definition full text, including subtype definitions."""
924 if api.is_ros_message(msg_or_type):
925 return api.get_message_definition(msg_or_type)
926 sub = next((s
for (t, n, h), s
in self.
_subs.items()
if n == msg_or_type),
None)
927 return sub
and sub.get_message_definition()
or api.get_message_definition(msg_or_type)
930 """Returns ROS message type MD5 hash."""
931 if api.is_ros_message(msg_or_type):
932 return api.get_message_type_hash(msg_or_type)
933 sub = next((s
for (t, n, h), s
in self.
_subs.items()
if n == msg_or_type),
None)
934 return sub
and sub.get_message_type_hash()
or api.get_message_type_hash(msg_or_type)
937 """Returns source metainfo string."""
939 result =
"\nROS%s live" % api.ROS_VERSION
940 if "ROS_MASTER_URI" in metadata:
941 result +=
", ROS master %s" % metadata[
"ROS_MASTER_URI"]
942 if "ROS_DOMAIN_ID" in metadata:
943 result +=
", ROS domain ID %s" % metadata[
"ROS_DOMAIN_ID"]
944 result +=
", %s initially" % common.plural(
"topic", metadata[
"tcount"])
948 """Returns whether message passes source filters."""
949 if self.
args.START_INDEX
and index
is not None:
950 if max(0, self.
args.START_INDEX) >= index:
952 if self.
args.END_INDEX
and index
is not None:
953 if 0 < self.
args.END_INDEX < index:
955 if not super(TopicSource, self).
is_processable(topic, msg, stamp, index):
957 return ConditionMixin.is_processable(self, topic, msg, stamp, index)
960 """Refreshes topics and subscriptions from ROS live."""
961 for topic, typename
in api.get_topic_types():
962 dct = common.filter_dict({topic: [typename]}, self.
args.TOPIC, self.
args.TYPE)
963 if not common.filter_dict(dct, self.
args.SKIP_TOPIC, self.
args.SKIP_TYPE, reverse=
True):
965 if api.ROS2
and api.get_message_class(typename)
is None:
966 msg =
"Error loading type %s in topic %s." % (typename, topic)
967 if self.
args.STOP_ON_ERROR:
raise Exception(msg)
968 ConsolePrinter.warn(msg, __once=
True)
970 topickey = (topic, typename,
None)
971 if topickey
in self.
topics:
974 handler = functools.partial(self.
_on_message, topic)
976 sub = api.create_subscriber(topic, typename, handler,
977 queue_size=self.
args.QUEUE_SIZE_IN)
978 except Exception
as e:
979 ConsolePrinter.warn(
"Error subscribing to topic %s: %%r" % topic,
981 if self.
args.STOP_ON_ERROR:
raise
983 self.
_subs[topickey] = sub
984 self.
topics[topickey] =
None
987 """Initializes progress bar, if any."""
988 if self.
args.PROGRESS
and not self.
bar:
990 aftertemplate=
" {afterword}", pulse=
True)
994 """Updates progress bar, if any."""
996 afterword =
"ROS%s live, %s" % (api.ROS_VERSION, common.plural(
"message", count))
997 self.
bar.afterword, self.
bar.max = afterword, count
999 self.
bar.pause, self.
bar.pulse_pos =
True,
None
1000 self.
bar.update(count)
1003 """Adjusts start/end time filter values to current time."""
1004 if self.
args.START_TIME
is not None:
1005 self.
args.START_TIME = api.make_live_time(self.
args.START_TIME)
1006 if self.
args.END_TIME
is not None:
1007 self.
args.END_TIME = api.make_live_time(self.
args.END_TIME)
1010 """Periodically refreshes topics and subscriptions from ROS live."""
1014 except Exception
as e: self.
thread_excepthook(
"Error refreshing live topics: %r" % e, e)
1018 """Subscription callback handler, queues message for yielding."""
1019 stamp = api.get_rostime()
if self.
args.ROS_TIME_IN
else api.make_time(time.time())
1024 """Produces messages from iterable or pushed data."""
1027 DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None,
1028 END_TIME=
None, START_INDEX=
None, END_INDEX=
None, UNIQUE=
False,
1029 SELECT_FIELD=(), NOSELECT_FIELD=(), NTH_MESSAGE=1, NTH_INTERVAL=0,
1030 CONDITION=(), ITERABLE=
None)
1034 @param args arguments as namespace or dictionary, case-insensitive;
1035 or iterable yielding messages
1036 @param args.topic ROS topics to read if not all
1037 @param args.type ROS message types to read if not all
1038 @param args.skip_topic ROS topics to skip
1039 @param args.skip_type ROS message types to skip
1040 @param args.start_time earliest timestamp of messages to read
1041 @param args.end_time latest timestamp of messages to read
1042 @param args.start_index message index within topic to start from
1043 @param args.end_index message index within topic to stop at
1044 @param args.unique emit messages that are unique in topic
1045 @param args.select_field message fields to use for uniqueness if not all
1046 @param args.noselect_field message fields to skip for uniqueness
1047 @param args.nth_message read every Nth message in topic
1048 @param args.nth_interval minimum time interval between messages in topic
1049 @param args.condition Python expressions that must evaluate as true
1050 for message to be processable, see ConditionMixin
1051 @param args.iterable iterable yielding (topic, msg, stamp) or (topic, msg);
1052 yielding `None` signals end of content
1053 @param kwargs any and all arguments as keyword overrides, case-insensitive
1055 if common.is_iterable(args)
and not isinstance(args, dict):
1058 super(AppSource, self).
__init__(args)
1059 ConditionMixin.__init__(self, args)
1067 Yields messages from iterable or pushed data, as (topic, msg, ROS timestamp).
1069 Blocks until a message is available, or source is closed.
1071 def generate(iterable):
1072 for x
in iterable:
yield x
1073 feeder = generate(self.
args.ITERABLE)
if self.
args.ITERABLE
else None
1076 item = self.
_queue.get()
if not feeder
or self.
_queue.qsize()
else next(feeder,
None)
1077 if item
is None:
break
1079 if len(item) > 2: topic, msg, stamp = item[:3]
1080 else: (topic, msg), stamp = item[:2], api.get_rostime(fallback=
True)
1081 topickey = api.TypeMeta.make(msg, topic, self).topickey
1089 if self.
args.NTH_MESSAGE > 1
or self.
args.NTH_INTERVAL > 0:
1094 """Closes current read() yielding, if any."""
1101 Returns (topic, msg, stamp) from push queue, or `None` if no queue
1102 or message in queue is condition topic only.
1105 try: item = self.
_queue.get(block=
False)
1106 except queue.Empty:
pass
1107 if item
is None:
return None
1109 topic, msg, stamp = item
1110 topickey = api.TypeMeta.make(msg, topic, self).topickey
1116 """Registers message produced from read_queue()."""
1117 if self.
args.NTH_MESSAGE > 1
or self.
args.NTH_INTERVAL > 0:
1118 topickey = api.TypeMeta.make(msg, topic).topickey
1121 def push(self, topic, msg=None, stamp=None):
1123 Pushes a message to be yielded from read().
1125 @param topic topic name, or `None` to signal end of content
1126 @param msg ROS message
1127 @param stamp message ROS timestamp, defaults to current wall time if `None`
1129 if topic
is None: self.
_queue.put(
None)
1130 else: self.
_queue.put((topic, msg, stamp
or api.get_rostime(fallback=
True)))
1133 """Returns whether message passes source filters."""
1134 dct = common.filter_dict({topic: [api.get_message_type(msg)]},
1136 if not common.filter_dict(dct, self.
args.SKIP_TOPIC, self.
args.SKIP_TYPE, reverse=
True):
1138 if self.
args.START_INDEX
and index
is not None:
1139 if max(0, self.
args.START_INDEX) >= index:
1141 if self.
args.END_INDEX
and index
is not None:
1142 if 0 < self.
args.END_INDEX < index:
1144 if not super(AppSource, self).
is_processable(topic, msg, stamp, index):
1146 return ConditionMixin.is_processable(self, topic, msg, stamp, index)
1149 """Adjusts start/end time filter values to current time."""
1150 if self.
args.START_TIME
is not None:
1151 self.
args.START_TIME = api.make_live_time(self.
args.START_TIME)
1152 if self.
args.END_TIME
is not None:
1153 self.
args.END_TIME = api.make_live_time(self.
args.END_TIME)
1156 __all__ = [
"AppSource",
"BagSource",
"ConditionMixin",
"Source",
"TopicSource"]