inputs.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Input sources for ROS messages.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 23.10.2021
11 @modified 30.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 from __future__ import print_function
16 import collections
17 import datetime
18 import functools
19 import itertools
20 import os
21 try: import queue # Py3
22 except ImportError: import Queue as queue # Py2
23 import re
24 import threading
25 import time
26 
27 import six
28 
29 from . import api
30 from . import common
31 from . common import ConsolePrinter, ensure_namespace, drop_zeros
32 
33 
34 class Source(object):
35  """Message producer base class."""
36 
37 
38  class SourceMessage(api.Bag.BagMessage): pass
39 
40 
41  MESSAGE_META_TEMPLATE = "{topic} #{index} ({type} {dt} {stamp})"
42 
43 
44  DEFAULT_ARGS = dict(START_TIME=None, END_TIME=None, UNIQUE=False, SELECT_FIELD=(),
45  NOSELECT_FIELD=(), NTH_MESSAGE=1, NTH_INTERVAL=0)
46 
47  def __init__(self, args=None, **kwargs):
48  """
49  @param args arguments as namespace or dictionary, case-insensitive
50  @param args.start_time earliest timestamp of messages to read
51  @param args.end_time latest timestamp of messages to read
52  @param args.unique emit messages that are unique in topic
53  @param args.select_field message fields to use for uniqueness if not all
54  @param args.noselect_field message fields to skip for uniqueness
55  @param args.nth_message read every Nth message in topic
56  @param args.nth_interval minimum time interval between messages in topic
57  @param kwargs any and all arguments as keyword overrides, case-insensitive
58  """
59  # {key: [(() if any field else ('nested', 'path') or re.Pattern, re.Pattern), ]}
60  self._patterns = {}
61  # {topic: ["pkg/MsgType", ]} searched in current source
62  self._topics = collections.defaultdict(list)
63  self._counts = collections.Counter() # {(topic, typename, typehash): count processed}
64  # {(topic, typename, typehash): (message hash over all fields used in matching)}
65  self._hashes = collections.defaultdict(set)
66  self._processables = {} # {(topic, typename, typehash): (index, stamp) of last processable}
67 
68  self.args = ensure_namespace(args, Source.DEFAULT_ARGS, **kwargs)
69 
70  self.sink = None
71 
72  self.topics = {}
73 
74  self.bar = None
75 
76  self.valid = None
77 
78  self.preprocess = True
79 
80  self._parse_patterns()
81 
82  def __iter__(self):
83  """Yields messages from source, as (topic, msg, ROS time)."""
84  return self.read()
85 
86  def __enter__(self):
87  """Context manager entry."""
88  return self
89 
90  def __exit__(self, exc_type, exc_value, traceback):
91  """Context manager exit, closes source."""
92  self.close()
93 
94  def read(self):
95  """Yields messages from source, as (topic, msg, ROS time)."""
96 
97  def bind(self, sink):
98  """Attaches sink to source"""
99  self.sink = sink
100 
101  def validate(self):
102  """Returns whether source prerequisites are met (like ROS environment for TopicSource)."""
103  if self.valid is None: self.valid = True
104  return self.valid
105 
106  def close(self):
107  """Shuts down input, closing any files or connections."""
108  self.topics.clear()
109  self._topics.clear()
110  self._counts.clear()
111  self._hashes.clear()
112  self._processables.clear()
113  if self.bar:
114  self.bar.pulse_pos = None
115  self.bar.update(flush=True).stop()
116  self.bar = None
117 
118  def close_batch(self):
119  """Shuts down input batch if any (like bagfile), else all input."""
120  self.close()
121 
122  def format_meta(self):
123  """Returns source metainfo string."""
124  return ""
125 
126  def format_message_meta(self, topic, msg, stamp, index=None):
127  """Returns message metainfo string."""
128  meta = self.get_message_meta(topic, msg, stamp, index)
129  meta = {k: "" if v is None else v for k, v in meta.items()}
130  return self.MESSAGE_META_TEMPLATE.format(**meta)
131 
132  def get_batch(self):
133  """Returns source batch identifier if any (like bagfile name if BagSource)."""
134 
135  def get_meta(self):
136  """Returns source metainfo data dict."""
137  return {}
138 
139  def get_message_meta(self, topic, msg, stamp, index=None):
140  """Returns message metainfo data dict."""
141  with api.TypeMeta.make(msg, topic) as m:
142  return dict(topic=topic, type=m.typename, stamp=drop_zeros(api.to_sec(stamp)),
143  index=index, dt=drop_zeros(common.format_stamp(api.to_sec(stamp)), " "),
144  hash=m.typehash, schema=m.definition)
145 
146  def get_message_class(self, typename, typehash=None):
147  """Returns message type class."""
148  return api.get_message_class(typename)
149 
150  def get_message_definition(self, msg_or_type):
151  """Returns ROS message type definition full text, including subtype definitions."""
152  return api.get_message_definition(msg_or_type)
153 
154  def get_message_type_hash(self, msg_or_type):
155  """Returns ROS message type MD5 hash."""
156  return api.get_message_type_hash(msg_or_type)
157 
158  def is_processable(self, topic, msg, stamp, index=None):
159  """Returns whether message passes source filters."""
160  if self.args.START_TIME and stamp < self.args.START_TIME:
161  return False
162  if self.args.END_TIME and stamp > self.args.END_TIME:
163  return False
164  if self.args.UNIQUE or self.args.NTH_MESSAGE > 1 or self.args.NTH_INTERVAL > 0:
165  topickey = api.TypeMeta.make(msg, topic).topickey
166  last_accepted = self._processables.get(topickey)
167  if self.args.NTH_MESSAGE > 1 and last_accepted and index is not None:
168  if (index - 1) % self.args.NTH_MESSAGE:
169  return False
170  if self.args.NTH_INTERVAL > 0 and last_accepted and stamp is not None:
171  if api.to_sec(stamp - last_accepted[1]) < self.args.NTH_INTERVAL:
172  return False
173  if self.args.UNIQUE:
174  include, exclude = self._patterns["select"], self._patterns["noselect"]
175  msghash = api.make_message_hash(msg, include, exclude)
176  if msghash in self._hashes[topickey]:
177  return False
178  self._hashes[topickey].add(msghash)
179  return True
180 
181  def notify(self, status):
182  """Reports match status of last produced message."""
183 
184  def thread_excepthook(self, text, exc):
185  """Handles exception, used by background threads."""
186  ConsolePrinter.error(text)
187 
188  def _parse_patterns(self):
189  """Parses pattern arguments into re.Patterns."""
190  selects, noselects = self.args.SELECT_FIELD, self.args.NOSELECT_FIELD
191  for key, vals in [("select", selects), ("noselect", noselects)]:
192  self._patterns[key] = [(tuple(v.split(".")), common.wildcard_to_regex(v)) for v in vals]
193 
194 
195 class ConditionMixin(object):
196  """
197  Provides topic conditions evaluation.
198 
199  Evaluates a set of Python expressions, with a namespace of:
200  - msg: current message being checked
201  - topic: current topic being read
202  - <topic /any/name> messages in named or wildcarded topic
203 
204  <topic ..> gets replaced with an object with the following behavior:
205  - len(obj) -> number of messages processed in topic
206  - bool(obj) -> whether there are any messages in topic
207  - obj[pos] -> topic message at position (from latest if negative, first if positive)
208  - obj.x -> attribute x of last message
209 
210  All conditions need to evaluate as true for a message to be processable.
211  If a condition tries to access attributes of a message not yet present,
212  condition evaluates as false.
213 
214  If a condition topic matches more than one real topic (by wildcard or by
215  different types in one topic), evaluation is done for each set of
216  topics separately, condition passing if any set passes.
217 
218  Example condition: `<topic */control_enable>.data and <topic */cmd_vel>.linear.x > 0`
219  `and <topic */cmd_vel>.angular.z < 0.02`.
220  """
221 
222  TOPIC_RGX = re.compile(r"<topic\s+([^\s><]+)\s*>") # "<topic /some/thing>"
223 
224 
225  DEFAULT_ARGS = dict(CONDITION=())
226 
227  class NoMessageException(Exception): pass
228 
229 
230  class Topic(object):
231  """
232  Object for <topic x> replacements in condition expressions.
233 
234  - len(topic) -> number of messages processed in topic
235  - bool(topic) -> whether there are any messages in topic
236  - topic[x] -> history at -1 -2 for last and but one, or 0 1 for first and second
237  - topic.x -> attribute x of last message
238  - value in topic -> whether any field of last message contains value
239  - value in topic[x] -> whether any field of topic history at position contains value
240  """
241 
242  def __init__(self, count, firsts, lasts):
243  self._count = count
244  self._firsts = firsts
245  self._lasts = lasts
246 
247  def __bool__(self): return bool(self._count)
248  def __nonzero__(self): return bool(self._count)
249  def __len__(self): return self._count
250 
251  def __contains__(self, item):
252  """Returns whether value exists in last message, or raises NoMessageException."""
253  if not self._lasts: raise ConditionMixin.NoMessageException()
254  return item in ConditionMixin.Message(self._lasts[-1])
255 
256  def __getitem__(self, key):
257  """Returns message from history at key, or Empty() if no such message."""
258  try: return ConditionMixin.Message((self._lasts if key < 0 else self._firsts)[key])
259  except IndexError: return ConditionMixin.Empty()
260 
261  def __getattr__(self, name):
262  """Returns attribute value of last message, or raises NoMessageException."""
263  if not self._lasts: raise ConditionMixin.NoMessageException()
264  return getattr(self._lasts[-1], name)
265 
266 
267  class Message(object):
268  """
269  Object for current topic message in condition expressions.
270 
271  - value in msg -> whether any message field contains value
272  - msg.x -> attribute x of message
273  """
274 
275  def __init__(self, msg):
276  self._msg = msg
277  self._fulltext = None
278 
279  def __contains__(self, item):
280  """Returns whether value exists in any message field."""
281  if not self._fulltext:
282  self._fulltext = "\n".join("%s" % (v, ) for _, v, _ in
283  api.iter_message_fields(self._msg, flat=True))
284  value = item if isinstance(item, six.text_type) else \
285  item.decode() if isinstance(item, six.binary_type) else str(item)
286  return re.search(re.escape(value), self._fulltext, re.I)
287 
288  def __getattr__(self, name):
289  """Returns attribute value of message."""
290  return getattr(self._msg, name)
291 
292 
293  class Empty(object):
294  """Placeholder falsy object that raises NoMessageException on attribute access."""
295  def __getattr__(self, name): raise ConditionMixin.NoMessageException()
296  def __bool__(self): return False
297  def __nonzero__(self): return False
298  def __contains__(self, item): return False
299  def __len__(self): return 0
300 
301 
302  def __init__(self, args=None, **kwargs):
303  """
304  @param args arguments as namespace or dictionary, case-insensitive
305  @param args.condition Python expressions that must evaluate as true
306  for message to be processable, see ConditionMixin
307  @param kwargs any and all arguments as keyword overrides, case-insensitive
308  """
309  self._topic_states = {} # {topic: whether only used for condition, not matching}
310  self._topics_per_condition = [] # [[topics in 1st condition], ]
311  self._wildcard_topics = {} # {"/my/*/topic": re.Pattern}
312  # {(topic, typename, typehash): [1st, 2nd, ..]}
313  self._firstmsgs = collections.defaultdict(collections.deque)
314  # {(topic, typename, typehash): [.., last]}
315  self._lastmsgs = collections.defaultdict(collections.deque)
316  # {topic: (max positive index + 1, max abs(negative index) or 1)}
317  self._topic_limits = collections.defaultdict(lambda: [1, 1])
318 
319 
320  self._conditions = collections.OrderedDict()
321  self._configure_conditions(ensure_namespace(args, ConditionMixin.DEFAULT_ARGS, **kwargs))
322 
323  def is_processable(self, topic, msg, stamp, index=None):
324  """Returns whether message passes passes current state conditions, if any."""
325  result = True
326  if not self._conditions:
327  return result
328  for i, (expr, code) in enumerate(self._conditions.items()):
329  topics = self._topics_per_condition[i]
330  wildcarded = [t for t in topics if t in self._wildcard_topics]
331  realcarded = {wt: [(t, n, h) for (t, n, h) in self._lastmsgs if p.match(t)]
332  for wt in wildcarded for p in [self._wildcard_topics[wt]]}
333  variants = [[(wt, (t, n, h)) for (t, n, h) in tt] or [(wt, (wt, None))]
334  for wt, tt in realcarded.items()]
335  variants = variants or [[None]] # Ensure one iteration if no wildcards to combine
336 
337  result = False
338  for remaps in itertools.product(*variants): # [(wildcard1, realname1), (wildcard2, ..]
339  if remaps == (None, ): remaps = ()
340  getter = functools.partial(self._get_topic_instance, remap=dict(remaps))
341  ns = {"topic": topic, "msg": ConditionMixin.Message(msg), "get_topic": getter}
342  try: result = eval(code, ns)
343  except self.NoMessageException: pass
344  except Exception as e:
345  ConsolePrinter.error('Error evaluating condition "%s": %s', expr, e)
346  raise
347  if result: break # for remaps
348  if not result: break # for i,
349  return result
350 
351  def close_batch(self):
352  """Clears cached messages."""
353  self._firstmsgs.clear()
354  self._lastmsgs.clear()
355 
356  def has_conditions(self):
357  """Returns whether there are any conditions configured."""
358  return bool(self._conditions)
359 
361  """Returns a list of all topics used in conditions (may contain wildcards)."""
362  return list(self._topic_states)
363 
364  def is_conditions_topic(self, topic, pure=True):
365  """
366  Returns whether topic is used for checking condition.
367 
368  @param pure whether use should be solely for condition, not for matching at all
369  """
370  if not self._conditions: return False
371  if topic in self._topic_states:
372  return self._topic_states[topic] if pure else True
373  wildcarded = [t for t, p in self._wildcard_topics.items() if p.match(topic)]
374  if not wildcarded: return False
375  return all(map(self._topic_states.get, wildcarded)) if pure else True
376 
377  def conditions_set_topic_state(self, topic, pure):
378  """Sets whether topic is purely used for conditions not matching."""
379  if topic in self._topic_states:
380  self._topic_states[topic] = pure
381 
382  def conditions_register_message(self, topic, msg):
383  """Retains message for condition evaluation if in condition topic."""
384  if self.is_conditions_topic(topic, pure=False):
385  topickey = api.TypeMeta.make(msg, topic).topickey
386  self._lastmsgs[topickey].append(msg)
387  if len(self._lastmsgs[topickey]) > self._topic_limits[topic][-1]:
388  self._lastmsgs[topickey].popleft()
389  if len(self._firstmsgs[topickey]) < self._topic_limits[topic][0]:
390  self._firstmsgs[topickey].append(msg)
391 
392  def _get_topic_instance(self, topic, remap=None):
393  """
394  Returns Topic() by name.
395 
396  @param remap optional remap dictionary as {topic1: (topic2, typename, typehash)}
397  """
398  if remap and topic in remap:
399  topickey = remap[topic]
400  else:
401  topickey = next(((t, n, h) for (t, n, h) in self._lastmsgs if t == topic), None)
402  if topickey not in self._counts:
403  return self.Empty()
404  c, f, l = (d[topickey] for d in (self._counts, self._firstmsgs, self._lastmsgs))
405  return self.Topic(c, f, l)
406 
407  def _configure_conditions(self, args):
408  """Parses condition expressions and populates local structures."""
409  for v in args.CONDITION:
410  topics = list(set(self.TOPIC_RGX.findall(v)))
411  self._topic_states.update({t: True for t in topics})
412  self._topics_per_condition.append(topics)
413  for t in (t for t in topics if "*" in t):
414  self._wildcard_topics[t] = common.wildcard_to_regex(t, end=True)
415  expr = self.TOPIC_RGX.sub(r'get_topic("\1")', v)
416  self._conditions[expr] = compile(expr, "", "eval")
417 
418  for v in args.CONDITION: # Set history length from <topic x>[index]
419  indexexprs = re.findall(self.TOPIC_RGX.pattern + r"\s*\[([^\]]+)\]", v)
420  for topic, indexexpr in indexexprs:
421  limits = self._topic_limits[topic]
422  try:
423  index = eval(indexexpr) # If integer, set history limits
424  limits[index < 0] = max(limits[index < 0], abs(index) + (index >= 0))
425  except Exception: continue # for topic
426 
427 
429  """Produces messages from ROS bagfiles."""
430 
431 
432  MESSAGE_META_TEMPLATE = "{topic} {index}/{total} ({type} {dt} {stamp})"
433 
434 
435  META_TEMPLATE = "\nFile {file} ({size}), {tcount} topics, {mcount:,d} messages\n" \
436  "File period {startdt} - {enddt}\n" \
437  "File span {delta} ({start} - {end})"
438 
439 
440  DEFAULT_ARGS = dict(BAG=(), FILE=(), PATH=(), RECURSE=False, TOPIC=(), TYPE=(),
441  SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=None, END_TIME=None,
442  START_INDEX=None, END_INDEX=None, CONDITION=(), AFTER=0, ORDERBY=None,
443  DECOMPRESS=False, REINDEX=False, WRITE=(), PROGRESS=False,
444  STOP_ON_ERROR=False)
445 
446  def __init__(self, args=None, **kwargs):
447  """
448  @param args arguments as namespace or dictionary, case-insensitive;
449  or a single path as the ROS bagfile to read,
450  or a stream to read from,
451  or one or more {@link grepros.api.Bag Bag} instances
452  <!--sep-->
453 
454  Bag-specific arguments:
455  @param args.file names of ROS bagfiles to read if not all in directory,
456  or a stream to read from;
457  or one or more {@link grepros.api.Bag Bag} instances
458  @param args.path paths to scan if not current directory
459  @param args.recurse recurse into subdirectories when looking for bagfiles
460  @param args.orderby "topic" or "type" if any to group results by
461  @param args.decompress decompress archived bags to file directory
462  @param args.reindex make a copy of unindexed bags and reindex them (ROS1 only)
463  @param args.write outputs, to skip in input files
464  @param args.bag one or more {@link grepros.api.Bag Bag} instances
465  <!--sep-->
466 
467  General arguments:
468  @param args.topic ROS topics to read if not all
469  @param args.type ROS message types to read if not all
470  @param args.skip_topic ROS topics to skip
471  @param args.skip_type ROS message types to skip
472  @param args.start_time earliest timestamp of messages to read
473  @param args.end_time latest timestamp of messages to read
474  @param args.start_index message index within topic to start from
475  @param args.end_index message index within topic to stop at
476  @param args.unique emit messages that are unique in topic
477  @param args.select_field message fields to use for uniqueness if not all
478  @param args.noselect_field message fields to skip for uniqueness
479  @param args.nth_message read every Nth message in topic
480  @param args.nth_interval minimum time interval between messages in topic
481  @param args.condition Python expressions that must evaluate as true
482  for message to be processable, see ConditionMixin
483  @param args.progress whether to print progress bar
484  @param args.stop_on_error stop execution on any error like unknown message type
485  @param kwargs any and all arguments as keyword overrides, case-insensitive
486  """
487  args0 = args
488  is_bag = isinstance(args, api.Bag) or \
489  common.is_iterable(args) and all(isinstance(x, api.Bag) for x in args)
490  args = {"FILE": str(args)} if isinstance(args, common.PATH_TYPES) else \
491  {"FILE": args} if common.is_stream(args) else {} if is_bag else args
492  args = ensure_namespace(args, BagSource.DEFAULT_ARGS, **kwargs)
493  super(BagSource, self).__init__(args)
494  ConditionMixin.__init__(self, args)
495  self._args0 = common.structcopy(self.args) # Original arguments
496  self._status = None # Match status of last produced message
497  self._sticky = False # Reading a single topic until all after-context emitted
498  self._totals_ok = False # Whether message count totals have been retrieved (ROS2 optimize)
499  self._types_ok = False # Whether type definitions have been retrieved (ROS2 optimize)
500  self._running = False
501  self._bag = None # Current bag object instance
502  self._filename = None # Current bagfile path
503  self._meta = None # Cached get_meta()
504  self._bag0 = ([args0] if isinstance(args0, api.Bag) else args0) if is_bag else None
505 
506  def read(self):
507  """Yields messages from ROS bagfiles, as (topic, msg, ROS time)."""
508  if not self.validate(): raise Exception("invalid")
509  self._running = True
510 
511  for _ in self._produce_bags():
512  if not self._running:
513  break # for _
514 
515  topicsets = [self._topics]
516  if "topic" == self.args.ORDERBY: # Group output by sorted topic names
517  topicsets = [{n: tt} for n, tt in sorted(self._topics.items())]
518  elif "type" == self.args.ORDERBY: # Group output by sorted type names
519  typetopics = {}
520  for n, tt in self._topics.items():
521  for t in tt: typetopics.setdefault(t, []).append(n)
522  topicsets = [{n: [t] for n in nn} for t, nn in sorted(typetopics.items())]
523 
524  self._types_ok = False
525  self._init_progress()
526  for topics in topicsets:
527  for topic, msg, stamp, index in self._produce(topics) if topics else ():
528  self.conditions_register_message(topic, msg)
529  if not self.is_conditions_topic(topic, pure=True) \
530  and (not self.preprocess or self.is_processable(topic, msg, stamp, index)):
531  yield self.SourceMessage(topic, msg, stamp)
532  if not self._running:
533  break # for topics
534  self._counts and self.sink and self.sink.flush()
535  self.close_batch()
536  self._running = False
537 
538  def validate(self):
539  """Returns whether ROS environment is set and arguments valid, prints error if not."""
540  if self.valid is not None: return self.valid
541  self.valid = api.validate()
542  if not self._bag0 and self.args.FILE and os.path.isfile(self.args.FILE[0]) \
543  and not common.verify_io(self.args.FILE[0], "r"):
544  ConsolePrinter.error("File not readable.")
545  self.valid = False
546  if not self._bag0 and common.is_stream(self.args.FILE) \
547  and not any(c.STREAMABLE for c in api.Bag.READER_CLASSES):
548  ConsolePrinter.error("Bag format does not support reading streams.")
549  self.valid = False
550  if self._bag0 and not any(x.mode in ("r", "a") for x in self._bag0):
551  ConsolePrinter.error("Bag not in read mode.")
552  self.valid = False
553  if self.args.ORDERBY and self.conditions_get_topics():
554  ConsolePrinter.error("Cannot use topics in conditions and bag order by %s.",
555  self.args.ORDERBY)
556  self.valid = False
557  return self.valid
558 
559  def close(self):
560  """Closes current bag, if any."""
561  self._running = False
562  if self._bag and not self._bag0: self._bag.close()
563  ConditionMixin.close_batch(self)
564  super(BagSource, self).close()
565 
566  def close_batch(self):
567  """Closes current bag, if any."""
568  if self._bag0: self._running = False
569  elif self._bag: self._bag.close()
570  self._bag = None
571  if self.bar:
572  self.bar.update(flush=True)
573  self.bar = None
574  ConditionMixin.close_batch(self)
575 
576  def format_meta(self):
577  """Returns bagfile metainfo string."""
578  return self.META_TEMPLATE.format(**self.get_meta())
579 
580  def format_message_meta(self, topic, msg, stamp, index=None):
581  """Returns message metainfo string."""
582  meta = self.get_message_meta(topic, msg, stamp, index)
583  meta = {k: "" if v is None else v for k, v in meta.items()}
584  return self.MESSAGE_META_TEMPLATE.format(**meta)
585 
586  def get_batch(self):
587  """Returns name of current bagfile, or self if reading stream."""
588  return self._filename if self._filename is not None else self
589 
590  def get_meta(self):
591  """Returns bagfile metainfo data dict."""
592  if self._meta is not None:
593  return self._meta
594  mcount = self._bag.get_message_count()
595  start, end = (self._bag.get_start_time(), self._bag.get_end_time()) if mcount else ("", "")
596  delta = common.format_timedelta(datetime.timedelta(seconds=(end or 0) - (start or 0)))
597  self._meta = dict(file=self._filename, size=common.format_bytes(self._bag.size),
598  mcount=mcount, tcount=len(self.topics), delta=delta,
599  start=drop_zeros(start), end=drop_zeros(end),
600  startdt=drop_zeros(common.format_stamp(start)) if start != "" else "",
601  enddt=drop_zeros(common.format_stamp(end)) if end != "" else "")
602  return self._meta
603 
604  def get_message_meta(self, topic, msg, stamp, index=None):
605  """Returns message metainfo data dict."""
606  self._ensure_totals()
607  result = super(BagSource, self).get_message_meta(topic, msg, stamp, index)
608  result.update(total=self.topics[(topic, result["type"], result["hash"])])
609  if callable(getattr(self._bag, "get_qoses", None)):
610  result.update(qoses=self._bag.get_qoses(topic, result["type"]))
611  return result
612 
613  def get_message_class(self, typename, typehash=None):
614  """Returns ROS message type class."""
615  return self._bag.get_message_class(typename, typehash) or \
616  api.get_message_class(typename)
617 
618  def get_message_definition(self, msg_or_type):
619  """Returns ROS message type definition full text, including subtype definitions."""
620  return self._bag.get_message_definition(msg_or_type) or \
621  api.get_message_definition(msg_or_type)
622 
623  def get_message_type_hash(self, msg_or_type):
624  """Returns ROS message type MD5 hash."""
625  return self._bag.get_message_type_hash(msg_or_type) or \
626  api.get_message_type_hash(msg_or_type)
627 
628  def notify(self, status):
629  """Reports match status of last produced message."""
630  self._status = bool(status)
631  if status and not self._totals_ok:
632  self._ensure_totals()
633 
634  def is_processable(self, topic, msg, stamp, index=None):
635  """Returns whether message passes source filters."""
636  topickey = api.TypeMeta.make(msg, topic).topickey
637  if self.args.START_INDEX and index is not None:
638  self._ensure_totals()
639  START = self.args.START_INDEX
640  MIN = max(0, START + (self.topics[topickey] if START < 0 else 0))
641  if MIN >= index:
642  return False
643  if self.args.END_INDEX and index is not None:
644  self._ensure_totals()
645  END = self.args.END_INDEX
646  MAX = END + (self.topics[topickey] if END < 0 else 0)
647  if MAX < index:
648  return False
649  if not super(BagSource, self).is_processable(topic, msg, stamp, index):
650  return False
651  return ConditionMixin.is_processable(self, topic, msg, stamp, index)
652 
653  def _produce(self, topics, start_time=None):
654  """
655  Yields messages from current ROS bagfile, as (topic, msg, ROS time, index in topic).
656 
657  @param topics {topic: [typename, ]}
658  """
659  if not self._running or not self._bag: return
660  counts = collections.Counter()
661  for topic, msg, stamp in self._bag.read_messages(list(topics), start_time):
662  if not self._running or not self._bag:
663  break # for topic,
664  typename = api.get_message_type(msg)
665  if topics and typename not in topics[topic]:
666  continue # for topic
667  if api.ROS2 and not self._types_ok:
668  self.topics, self._types_ok = self._bag.get_topic_info(counts=False), True
669 
670  topickey = api.TypeMeta.make(msg, topic, self).topickey
671  counts[topickey] += 1; self._counts[topickey] += 1
672  # Skip messages already processed during sticky
673  if not self._sticky and counts[topickey] != self._counts[topickey]:
674  continue # for topic
675 
676  self._status = None
677  self.bar and self.bar.update(value=sum(self._counts.values()))
678  yield topic, msg, stamp, self._counts[topickey]
679 
680  if self.args.NTH_MESSAGE > 1 or self.args.NTH_INTERVAL > 0:
681  self._processables[topickey] = (self._counts[topickey], stamp)
682  if self._status and self.args.AFTER and not self._sticky \
683  and not self.has_conditions() \
684  and (len(self._topics) > 1 or len(next(iter(self._topics.values()))) > 1):
685  # Stick to one topic until trailing messages have been emitted
686  self._sticky = True
687  continue_from = stamp + api.make_duration(nsecs=1)
688  for entry in self._produce({topic: typename}, continue_from):
689  yield entry
690  self._sticky = False
691  if not self._running or not self._bag:
692  break # for topic
693 
694  def _produce_bags(self):
695  """Yields Bag instances from configured arguments."""
696  if self._bag0:
697  for bag in self._bag0:
698  if self._configure(bag=bag):
699  yield self._bag
700  return
701 
702  names, paths = self.args.FILE, self.args.PATH
703  exts, skip_exts = api.BAG_EXTENSIONS, api.SKIP_EXTENSIONS
704  exts = list(exts) + ["%s%s" % (a, b) for a in exts for b in common.Decompressor.EXTENSIONS]
705 
706  encountereds = set()
707  for filename in common.find_files(names, paths, exts, skip_exts, self.args.RECURSE):
708  if not self._running:
709  break # for filename
710 
711  fullname = os.path.realpath(os.path.abspath(filename))
712  skip = common.Decompressor.make_decompressed_name(fullname) in encountereds
713  encountereds.add(fullname)
714 
715  if skip or not self._configure(filename):
716  continue # for filename
717 
718  encountereds.add(self._bag.filename)
719  yield self._bag
720 
721  def _init_progress(self):
722  """Initializes progress bar, if any, for current bag."""
723  if self.args.PROGRESS and not self.bar:
724  self._ensure_totals()
725  self.bar = common.ProgressBar(aftertemplate=" {afterword} ({value:,d}/{max:,d})")
726  self.bar.afterword = os.path.basename(self._filename or "<stream>")
727  self.bar.max = sum(sum(c for (t, n, _), c in self.topics.items()
728  if c and t == t_ and n in nn)
729  for t_, nn in self._topics.items())
730  self.bar.update(value=0)
731 
732  def _ensure_totals(self):
733  """Retrieves total message counts if not retrieved."""
734  if not self._totals_ok: # ROS2 bag probably
735  has_ensure = common.has_arg(self._bag.get_topic_info, "ensure_types")
736  kws = dict(ensure_types=False) if has_ensure else {}
737  for (t, n, h), c in self._bag.get_topic_info(**kws).items():
738  self.topics[(t, n, h)] = c
739  self._totals_ok = True
740 
741  def _configure(self, filename=None, bag=None):
742  """Opens bag and populates bag-specific argument state, returns success."""
743  self._meta = None
744  self._bag = None
745  self._filename = None
746  self._sticky = False
747  self._totals_ok = False
748  self._counts.clear()
749  self._processables.clear()
750  self._hashes.clear()
751  self.topics.clear()
752 
753  if bag is not None and bag.mode not in ("r", "a"):
754  ConsolePrinter.warn("Cannot read %s: bag in write mode.", bag)
755  return False
756 
757  if filename and self.args.WRITE \
758  and any(os.path.realpath(x[0]) == os.path.realpath(filename)
759  for x in self.args.WRITE):
760  return False
761  try:
762  if filename and common.Decompressor.is_compressed(filename):
763  if self.args.DECOMPRESS:
764  filename = common.Decompressor.decompress(filename, self.args.PROGRESS)
765  else: raise Exception("decompression not enabled")
766  bag = api.Bag(filename, mode="r", reindex=self.args.REINDEX,
767  progress=self.args.PROGRESS) if bag is None else bag
768  bag.stop_on_error = self.args.STOP_ON_ERROR
769  bag.open()
770  except Exception as e:
771  ConsolePrinter.error("\nError opening %r: %s", filename or bag, e)
772  if self.args.STOP_ON_ERROR: raise
773  return False
774 
775  self._bag = bag
776  self._filename = bag.filename
777 
778  dct = fulldct = {} # {topic: [typename, ]}
779  kws = dict(ensure_types=False) if common.has_arg(bag.get_topic_info, "ensure_types") else {}
780  for (t, n, h), c in bag.get_topic_info(counts=False, **kws).items():
781  dct.setdefault(t, []).append(n)
782  self.topics[(t, n, h)] = c
783  self._totals_ok = not any(v is None for v in self.topics.values())
784  for topic in self.conditions_get_topics():
785  self.conditions_set_topic_state(topic, True)
786 
787  dct = common.filter_dict(dct, self.args.TOPIC, self.args.TYPE)
788  dct = common.filter_dict(dct, self.args.SKIP_TOPIC, self.args.SKIP_TYPE, reverse=True)
789  for topic in self.conditions_get_topics(): # Add topics used in conditions
790  matches = [t for p in [common.wildcard_to_regex(topic, end=True)] for t in fulldct
791  if t == topic or "*" in topic and p.match(t)]
792  for topic in matches:
793  self.conditions_set_topic_state(topic, topic not in dct)
794  dct.setdefault(topic, fulldct[topic])
795  self._topics = dct
796  self._meta = self.get_meta()
797 
798  args = self.args = common.structcopy(self._args0)
799  if args.START_TIME is not None:
800  args.START_TIME = api.make_bag_time(args.START_TIME, bag)
801  if args.END_TIME is not None:
802  args.END_TIME = api.make_bag_time(args.END_TIME, bag)
803  return True
804 
805 
807  """Produces messages from live ROS topics."""
808 
809 
810  MASTER_INTERVAL = 2
811 
812 
813  DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=None,
814  END_TIME=None, START_INDEX=None, END_INDEX=None, CONDITION=(),
815  QUEUE_SIZE_IN=10, ROS_TIME_IN=False, PROGRESS=False, STOP_ON_ERROR=False)
816 
817  def __init__(self, args=None, **kwargs):
818  """
819  @param args arguments as namespace or dictionary, case-insensitive
820  @param args.topic ROS topics to read if not all
821  @param args.type ROS message types to read if not all
822  @param args.skip_topic ROS topics to skip
823  @param args.skip_type ROS message types to skip
824  @param args.start_time earliest timestamp of messages to read
825  @param args.end_time latest timestamp of messages to read
826  @param args.start_index message index within topic to start from
827  @param args.end_index message index within topic to stop at
828  @param args.unique emit messages that are unique in topic
829  @param args.select_field message fields to use for uniqueness if not all
830  @param args.noselect_field message fields to skip for uniqueness
831  @param args.nth_message read every Nth message in topic
832  @param args.nth_interval minimum time interval between messages in topic
833  @param args.condition Python expressions that must evaluate as true
834  for message to be processable, see ConditionMixin
835  @param args.queue_size_in subscriber queue size (default 10)
836  @param args.ros_time_in stamp messages with ROS time instead of wall time
837  @param args.progress whether to print progress bar
838  @param args.stop_on_error stop execution on any error like unknown message type
839  @param kwargs any and all arguments as keyword overrides, case-insensitive
840  """
841  args = ensure_namespace(args, TopicSource.DEFAULT_ARGS, **kwargs)
842  super(TopicSource, self).__init__(args)
843  ConditionMixin.__init__(self, args)
844  self._running = False # Whether is in process of yielding messages from topics
845  self._queue = None # [(topic, msg, ROS time)]
846  self._subs = {} # {(topic, typename, typehash): ROS subscriber}
847 
848  self._configure()
849 
850  def read(self):
851  """Yields messages from subscribed ROS topics, as (topic, msg, ROS time)."""
852  if not self._running:
853  if not self.validate(): raise Exception("invalid")
854  api.init_node()
855  self._running = True
856  self._queue = queue.Queue()
857  self.refresh_topics()
858  t = threading.Thread(target=self._run_refresh)
859  t.daemon = True
860  t.start()
861 
862  total = 0
863  self._init_progress()
864  while self._running:
865  topic, msg, stamp = self._queue.get()
866  total += bool(topic)
867  self._update_progress(total, running=self._running and bool(topic))
868  if topic:
869  topickey = api.TypeMeta.make(msg, topic, self).topickey
870  self._counts[topickey] += 1
871  self.conditions_register_message(topic, msg)
872  if self.is_conditions_topic(topic, pure=True): continue # while
873 
874  if not self.preprocess \
875  or self.is_processable(topic, msg, stamp, self._counts[topickey]):
876  yield self.SourceMessage(topic, msg, stamp)
877  if self.args.NTH_MESSAGE > 1 or self.args.NTH_INTERVAL > 0:
878  self._processables[topickey] = (self._counts[topickey], stamp)
879  self._queue = None
880  self._running = False
881 
882  def bind(self, sink):
883  """Attaches sink to source and blocks until connected to ROS live."""
884  if not self.validate(): raise Exception("invalid")
885  super(TopicSource, self).bind(sink)
886  api.init_node()
887 
888  def validate(self):
889  """Returns whether ROS environment is set, prints error if not."""
890  if self.valid is None: self.valid = api.validate(live=True)
891  return self.valid
892 
893  def close(self):
894  """Shuts down subscribers and stops producing messages."""
895  self._running = False
896  for k in list(self._subs):
897  self._subs.pop(k).unregister()
898  self._queue and self._queue.put((None, None, None)) # Wake up iterator
899  self._queue = None
900  ConditionMixin.close_batch(self)
901  super(TopicSource, self).close()
902 
903  def get_meta(self):
904  """Returns source metainfo data dict."""
905  ENV = {k: os.getenv(k) for k in ("ROS_MASTER_URI", "ROS_DOMAIN_ID") if os.getenv(k)}
906  return dict(ENV, tcount=len(self.topics))
907 
908  def get_message_meta(self, topic, msg, stamp, index=None):
909  """Returns message metainfo data dict."""
910  result = super(TopicSource, self).get_message_meta(topic, msg, stamp, index)
911  topickey = (topic, result["type"], result["hash"])
912  if topickey in self._subs:
913  result.update(qoses=self._subs[topickey].get_qoses())
914  return result
915 
916  def get_message_class(self, typename, typehash=None):
917  """Returns message type class, from active subscription if available."""
918  sub = next((s for (t, n, h), s in self._subs.items()
919  if n == typename and typehash in (s.get_message_type_hash(), None)), None)
920  return sub and sub.get_message_class() or api.get_message_class(typename)
921 
922  def get_message_definition(self, msg_or_type):
923  """Returns ROS message type definition full text, including subtype definitions."""
924  if api.is_ros_message(msg_or_type):
925  return api.get_message_definition(msg_or_type)
926  sub = next((s for (t, n, h), s in self._subs.items() if n == msg_or_type), None)
927  return sub and sub.get_message_definition() or api.get_message_definition(msg_or_type)
928 
929  def get_message_type_hash(self, msg_or_type):
930  """Returns ROS message type MD5 hash."""
931  if api.is_ros_message(msg_or_type):
932  return api.get_message_type_hash(msg_or_type)
933  sub = next((s for (t, n, h), s in self._subs.items() if n == msg_or_type), None)
934  return sub and sub.get_message_type_hash() or api.get_message_type_hash(msg_or_type)
935 
936  def format_meta(self):
937  """Returns source metainfo string."""
938  metadata = self.get_meta()
939  result = "\nROS%s live" % api.ROS_VERSION
940  if "ROS_MASTER_URI" in metadata:
941  result += ", ROS master %s" % metadata["ROS_MASTER_URI"]
942  if "ROS_DOMAIN_ID" in metadata:
943  result += ", ROS domain ID %s" % metadata["ROS_DOMAIN_ID"]
944  result += ", %s initially" % common.plural("topic", metadata["tcount"])
945  return result
946 
947  def is_processable(self, topic, msg, stamp, index=None):
948  """Returns whether message passes source filters."""
949  if self.args.START_INDEX and index is not None:
950  if max(0, self.args.START_INDEX) >= index:
951  return False
952  if self.args.END_INDEX and index is not None:
953  if 0 < self.args.END_INDEX < index:
954  return False
955  if not super(TopicSource, self).is_processable(topic, msg, stamp, index):
956  return False
957  return ConditionMixin.is_processable(self, topic, msg, stamp, index)
958 
959  def refresh_topics(self):
960  """Refreshes topics and subscriptions from ROS live."""
961  for topic, typename in api.get_topic_types():
962  dct = common.filter_dict({topic: [typename]}, self.args.TOPIC, self.args.TYPE)
963  if not common.filter_dict(dct, self.args.SKIP_TOPIC, self.args.SKIP_TYPE, reverse=True):
964  continue # for topic, typename
965  if api.ROS2 and api.get_message_class(typename) is None:
966  msg = "Error loading type %s in topic %s." % (typename, topic)
967  if self.args.STOP_ON_ERROR: raise Exception(msg)
968  ConsolePrinter.warn(msg, __once=True)
969  continue # for topic, typename
970  topickey = (topic, typename, None)
971  if topickey in self.topics:
972  continue # for topic, typename
973 
974  handler = functools.partial(self._on_message, topic)
975  try:
976  sub = api.create_subscriber(topic, typename, handler,
977  queue_size=self.args.QUEUE_SIZE_IN)
978  except Exception as e:
979  ConsolePrinter.warn("Error subscribing to topic %s: %%r" % topic,
980  e, __once=True)
981  if self.args.STOP_ON_ERROR: raise
982  continue # for topic, typename
983  self._subs[topickey] = sub
984  self.topics[topickey] = None
985 
986  def _init_progress(self):
987  """Initializes progress bar, if any."""
988  if self.args.PROGRESS and not self.bar:
989  self.bar = common.ProgressBar(afterword="ROS%s live" % api.ROS_VERSION,
990  aftertemplate=" {afterword}", pulse=True)
991  self.bar.start()
992 
993  def _update_progress(self, count, running=True):
994  """Updates progress bar, if any."""
995  if self.bar:
996  afterword = "ROS%s live, %s" % (api.ROS_VERSION, common.plural("message", count))
997  self.bar.afterword, self.bar.max = afterword, count
998  if not running:
999  self.bar.pause, self.bar.pulse_pos = True, None
1000  self.bar.update(count)
1001 
1002  def _configure(self):
1003  """Adjusts start/end time filter values to current time."""
1004  if self.args.START_TIME is not None:
1005  self.args.START_TIME = api.make_live_time(self.args.START_TIME)
1006  if self.args.END_TIME is not None:
1007  self.args.END_TIME = api.make_live_time(self.args.END_TIME)
1008 
1009  def _run_refresh(self):
1010  """Periodically refreshes topics and subscriptions from ROS live."""
1011  time.sleep(self.MASTER_INTERVAL)
1012  while self._running:
1013  try: self.refresh_topics()
1014  except Exception as e: self.thread_excepthook("Error refreshing live topics: %r" % e, e)
1015  time.sleep(self.MASTER_INTERVAL)
1016 
1017  def _on_message(self, topic, msg):
1018  """Subscription callback handler, queues message for yielding."""
1019  stamp = api.get_rostime() if self.args.ROS_TIME_IN else api.make_time(time.time())
1020  self._queue and self._queue.put((topic, msg, stamp))
1021 
1022 
1024  """Produces messages from iterable or pushed data."""
1025 
1026 
1027  DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=None,
1028  END_TIME=None, START_INDEX=None, END_INDEX=None, UNIQUE=False,
1029  SELECT_FIELD=(), NOSELECT_FIELD=(), NTH_MESSAGE=1, NTH_INTERVAL=0,
1030  CONDITION=(), ITERABLE=None)
1031 
1032  def __init__(self, args=None, **kwargs):
1033  """
1034  @param args arguments as namespace or dictionary, case-insensitive;
1035  or iterable yielding messages
1036  @param args.topic ROS topics to read if not all
1037  @param args.type ROS message types to read if not all
1038  @param args.skip_topic ROS topics to skip
1039  @param args.skip_type ROS message types to skip
1040  @param args.start_time earliest timestamp of messages to read
1041  @param args.end_time latest timestamp of messages to read
1042  @param args.start_index message index within topic to start from
1043  @param args.end_index message index within topic to stop at
1044  @param args.unique emit messages that are unique in topic
1045  @param args.select_field message fields to use for uniqueness if not all
1046  @param args.noselect_field message fields to skip for uniqueness
1047  @param args.nth_message read every Nth message in topic
1048  @param args.nth_interval minimum time interval between messages in topic
1049  @param args.condition Python expressions that must evaluate as true
1050  for message to be processable, see ConditionMixin
1051  @param args.iterable iterable yielding (topic, msg, stamp) or (topic, msg);
1052  yielding `None` signals end of content
1053  @param kwargs any and all arguments as keyword overrides, case-insensitive
1054  """
1055  if common.is_iterable(args) and not isinstance(args, dict):
1056  args = ensure_namespace(None, iterable=args)
1057  args = ensure_namespace(args, AppSource.DEFAULT_ARGS, **kwargs)
1058  super(AppSource, self).__init__(args)
1059  ConditionMixin.__init__(self, args)
1060  self._queue = queue.Queue() # [(topic, msg, ROS time)]
1061  self._reading = False
1062 
1063  self._configure()
1064 
1065  def read(self):
1066  """
1067  Yields messages from iterable or pushed data, as (topic, msg, ROS timestamp).
1068 
1069  Blocks until a message is available, or source is closed.
1070  """
1071  def generate(iterable):
1072  for x in iterable: yield x
1073  feeder = generate(self.args.ITERABLE) if self.args.ITERABLE else None
1074  self._reading = True
1075  while self._reading:
1076  item = self._queue.get() if not feeder or self._queue.qsize() else next(feeder, None)
1077  if item is None: break # while
1078 
1079  if len(item) > 2: topic, msg, stamp = item[:3]
1080  else: (topic, msg), stamp = item[:2], api.get_rostime(fallback=True)
1081  topickey = api.TypeMeta.make(msg, topic, self).topickey
1082  self._counts[topickey] += 1
1083  self.conditions_register_message(topic, msg)
1084  if self.is_conditions_topic(topic, pure=True): continue # while
1085 
1086  if not self.preprocess \
1087  or self.is_processable(topic, msg, stamp, self._counts[topickey]):
1088  yield self.SourceMessage(topic, msg, stamp)
1089  if self.args.NTH_MESSAGE > 1 or self.args.NTH_INTERVAL > 0:
1090  self._processables[topickey] = (self._counts[topickey], stamp)
1091  self._reading = False
1092 
1093  def close(self):
1094  """Closes current read() yielding, if any."""
1095  if self._reading:
1096  self._reading = False
1097  self._queue.put(None)
1098 
1099  def read_queue(self):
1100  """
1101  Returns (topic, msg, stamp) from push queue, or `None` if no queue
1102  or message in queue is condition topic only.
1103  """
1104  item = None
1105  try: item = self._queue.get(block=False)
1106  except queue.Empty: pass
1107  if item is None: return None
1108 
1109  topic, msg, stamp = item
1110  topickey = api.TypeMeta.make(msg, topic, self).topickey
1111  self._counts[topickey] += 1
1112  self.conditions_register_message(topic, msg)
1113  return None if self.is_conditions_topic(topic, pure=True) else (topic, msg, stamp)
1114 
1115  def mark_queue(self, topic, msg, stamp):
1116  """Registers message produced from read_queue()."""
1117  if self.args.NTH_MESSAGE > 1 or self.args.NTH_INTERVAL > 0:
1118  topickey = api.TypeMeta.make(msg, topic).topickey
1119  self._processables[topickey] = (self._counts[topickey], stamp)
1120 
1121  def push(self, topic, msg=None, stamp=None):
1122  """
1123  Pushes a message to be yielded from read().
1124 
1125  @param topic topic name, or `None` to signal end of content
1126  @param msg ROS message
1127  @param stamp message ROS timestamp, defaults to current wall time if `None`
1128  """
1129  if topic is None: self._queue.put(None)
1130  else: self._queue.put((topic, msg, stamp or api.get_rostime(fallback=True)))
1131 
1132  def is_processable(self, topic, msg, stamp, index=None):
1133  """Returns whether message passes source filters."""
1134  dct = common.filter_dict({topic: [api.get_message_type(msg)]},
1135  self.args.TOPIC, self.args.TYPE)
1136  if not common.filter_dict(dct, self.args.SKIP_TOPIC, self.args.SKIP_TYPE, reverse=True):
1137  return False
1138  if self.args.START_INDEX and index is not None:
1139  if max(0, self.args.START_INDEX) >= index:
1140  return False
1141  if self.args.END_INDEX and index is not None:
1142  if 0 < self.args.END_INDEX < index:
1143  return False
1144  if not super(AppSource, self).is_processable(topic, msg, stamp, index):
1145  return False
1146  return ConditionMixin.is_processable(self, topic, msg, stamp, index)
1147 
1148  def _configure(self):
1149  """Adjusts start/end time filter values to current time."""
1150  if self.args.START_TIME is not None:
1151  self.args.START_TIME = api.make_live_time(self.args.START_TIME)
1152  if self.args.END_TIME is not None:
1153  self.args.END_TIME = api.make_live_time(self.args.END_TIME)
1154 
1155 
1156 __all__ = ["AppSource", "BagSource", "ConditionMixin", "Source", "TopicSource"]
grepros.inputs.BagSource.META_TEMPLATE
string META_TEMPLATE
Template for bag metainfo header.
Definition: inputs.py:435
grepros.inputs.BagSource._produce_bags
def _produce_bags(self)
Definition: inputs.py:694
grepros.inputs.BagSource.__init__
def __init__(self, args=None, **kwargs)
Definition: inputs.py:446
grepros.inputs.BagSource._status
_status
Definition: inputs.py:496
grepros.inputs.BagSource.close_batch
def close_batch(self)
Definition: inputs.py:566
grepros.inputs.ConditionMixin.Message.__init__
def __init__(self, msg)
Definition: inputs.py:275
grepros.inputs.ConditionMixin.Topic._count
_count
Definition: inputs.py:243
grepros.inputs.Source.__enter__
def __enter__(self)
Definition: inputs.py:86
grepros.inputs.ConditionMixin._firstmsgs
_firstmsgs
Definition: inputs.py:313
grepros.inputs.TopicSource._on_message
def _on_message(self, topic, msg)
Definition: inputs.py:1017
grepros.inputs.TopicSource.is_processable
def is_processable(self, topic, msg, stamp, index=None)
Definition: inputs.py:947
grepros.inputs.TopicSource.refresh_topics
def refresh_topics(self)
Definition: inputs.py:959
grepros.inputs.Source.get_message_definition
def get_message_definition(self, msg_or_type)
Definition: inputs.py:150
grepros.inputs.BagSource.read
def read(self)
Definition: inputs.py:506
grepros.inputs.Source.format_message_meta
def format_message_meta(self, topic, msg, stamp, index=None)
Definition: inputs.py:126
grepros.inputs.TopicSource.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: inputs.py:916
grepros.inputs.Source._topics
_topics
Definition: inputs.py:62
grepros.inputs.ConditionMixin.Topic.__getitem__
def __getitem__(self, key)
Definition: inputs.py:256
grepros.inputs.ConditionMixin.is_conditions_topic
def is_conditions_topic(self, topic, pure=True)
Definition: inputs.py:364
grepros.inputs.TopicSource._subs
_subs
Definition: inputs.py:846
grepros.inputs.TopicSource.get_message_definition
def get_message_definition(self, msg_or_type)
Definition: inputs.py:922
grepros.inputs.BagSource._filename
_filename
Definition: inputs.py:502
grepros.inputs.BagSource.get_message_type_hash
def get_message_type_hash(self, msg_or_type)
Definition: inputs.py:623
grepros.inputs.ConditionMixin._wildcard_topics
_wildcard_topics
Definition: inputs.py:311
grepros.common.drop_zeros
def drop_zeros(v, replace="")
Definition: common.py:646
grepros.inputs.TopicSource
Definition: inputs.py:806
grepros.inputs.AppSource.close
def close(self)
Definition: inputs.py:1093
grepros.inputs.BagSource.get_meta
def get_meta(self)
Definition: inputs.py:590
grepros.inputs.Source.sink
sink
outputs.Sink instance bound to this source
Definition: inputs.py:70
grepros.inputs.ConditionMixin.Message.__contains__
def __contains__(self, item)
Definition: inputs.py:279
grepros.inputs.BagSource._bag
_bag
Definition: inputs.py:501
grepros.inputs.TopicSource.get_message_meta
def get_message_meta(self, topic, msg, stamp, index=None)
Definition: inputs.py:908
grepros.inputs.ConditionMixin.Topic.__nonzero__
def __nonzero__(self)
Definition: inputs.py:248
grepros.inputs.AppSource._queue
_queue
Definition: inputs.py:1060
grepros.inputs.ConditionMixin.Message
Definition: inputs.py:267
grepros.inputs.BagSource.notify
def notify(self, status)
Definition: inputs.py:628
grepros.inputs.ConditionMixin.Topic.__init__
def __init__(self, count, firsts, lasts)
Definition: inputs.py:242
grepros.inputs.Source.get_message_type_hash
def get_message_type_hash(self, msg_or_type)
Definition: inputs.py:154
grepros.inputs.Source.get_message_meta
def get_message_meta(self, topic, msg, stamp, index=None)
Definition: inputs.py:139
grepros.inputs.ConditionMixin.TOPIC_RGX
TOPIC_RGX
Definition: inputs.py:222
grepros.inputs.Source.SourceMessage
Returned from read() as (topic name, ROS message, ROS timestamp object).
Definition: inputs.py:38
grepros.inputs.ConditionMixin._configure_conditions
def _configure_conditions(self, args)
Definition: inputs.py:407
grepros.inputs.ConditionMixin.conditions_set_topic_state
def conditions_set_topic_state(self, topic, pure)
Definition: inputs.py:377
grepros.inputs.BagSource._totals_ok
_totals_ok
Definition: inputs.py:498
grepros.inputs.Source.get_batch
def get_batch(self)
Definition: inputs.py:132
grepros.inputs.ConditionMixin.Topic.__bool__
def __bool__(self)
Definition: inputs.py:247
grepros.inputs.AppSource.read
def read(self)
Definition: inputs.py:1065
grepros.inputs.ConditionMixin.Message._fulltext
_fulltext
Definition: inputs.py:277
grepros.inputs.AppSource.read_queue
def read_queue(self)
Definition: inputs.py:1099
grepros.inputs.TopicSource._run_refresh
def _run_refresh(self)
Definition: inputs.py:1009
grepros.inputs.BagSource.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: inputs.py:613
grepros.inputs.AppSource
Definition: inputs.py:1023
grepros.inputs.BagSource._bag0
_bag0
Definition: inputs.py:504
grepros.inputs.TopicSource.validate
def validate(self)
Definition: inputs.py:888
grepros.inputs.AppSource.is_processable
def is_processable(self, topic, msg, stamp, index=None)
Definition: inputs.py:1132
grepros.inputs.ConditionMixin._lastmsgs
_lastmsgs
Definition: inputs.py:315
grepros.inputs.TopicSource.bind
def bind(self, sink)
Definition: inputs.py:882
grepros.inputs.ConditionMixin.Topic.__contains__
def __contains__(self, item)
Definition: inputs.py:251
grepros.inputs.ConditionMixin
Definition: inputs.py:195
grepros.inputs.Source.__exit__
def __exit__(self, exc_type, exc_value, traceback)
Definition: inputs.py:90
grepros.inputs.ConditionMixin.Topic._lasts
_lasts
Definition: inputs.py:245
grepros.inputs.Source._processables
_processables
Definition: inputs.py:66
grepros.inputs.ConditionMixin.conditions_get_topics
def conditions_get_topics(self)
Definition: inputs.py:360
grepros.inputs.BagSource._types_ok
_types_ok
Definition: inputs.py:499
grepros.inputs.ConditionMixin.Topic._firsts
_firsts
Definition: inputs.py:244
grepros.inputs.ConditionMixin.NoMessageException
Definition: inputs.py:227
grepros.inputs.TopicSource._configure
def _configure(self)
Definition: inputs.py:1002
grepros.inputs.Source.preprocess
preprocess
Apply all filter arguments when reading, not only topic and type.
Definition: inputs.py:78
grepros.inputs.TopicSource._update_progress
def _update_progress(self, count, running=True)
Definition: inputs.py:993
grepros.inputs.Source._patterns
_patterns
Definition: inputs.py:60
grepros.inputs.ConditionMixin.Message.__getattr__
def __getattr__(self, name)
Definition: inputs.py:288
grepros.inputs.BagSource._sticky
_sticky
Definition: inputs.py:497
grepros.inputs.ConditionMixin.is_processable
def is_processable(self, topic, msg, stamp, index=None)
Definition: inputs.py:323
grepros.inputs.ConditionMixin.Topic.__len__
def __len__(self)
Definition: inputs.py:249
grepros.inputs.TopicSource.read
def read(self)
Definition: inputs.py:850
grepros.inputs.TopicSource.__init__
def __init__(self, args=None, **kwargs)
Definition: inputs.py:817
grepros.inputs.Source.close_batch
def close_batch(self)
Definition: inputs.py:118
grepros.inputs.AppSource.mark_queue
def mark_queue(self, topic, msg, stamp)
Definition: inputs.py:1115
grepros.common.ensure_namespace
def ensure_namespace(val, defaults=None, dashify=("WRITE_OPTIONS",), **kwargs)
Definition: common.py:658
grepros.inputs.Source.__init__
def __init__(self, args=None, **kwargs)
Definition: inputs.py:47
grepros.inputs.ConditionMixin.Empty
Definition: inputs.py:293
grepros.inputs.TopicSource.format_meta
def format_meta(self)
Definition: inputs.py:936
grepros.inputs.Source.valid
valid
Result of validate()
Definition: inputs.py:76
grepros.inputs.BagSource.is_processable
def is_processable(self, topic, msg, stamp, index=None)
Definition: inputs.py:634
grepros.inputs.BagSource.get_batch
def get_batch(self)
Definition: inputs.py:586
grepros.inputs.Source.thread_excepthook
def thread_excepthook(self, text, exc)
Definition: inputs.py:184
grepros.inputs.Source.__iter__
def __iter__(self)
Definition: inputs.py:82
grepros.inputs.AppSource.push
def push(self, topic, msg=None, stamp=None)
Definition: inputs.py:1121
grepros.inputs.AppSource._reading
_reading
Definition: inputs.py:1061
grepros.inputs.Source.args
args
Definition: inputs.py:68
grepros.inputs.ConditionMixin.Topic.__getattr__
def __getattr__(self, name)
Definition: inputs.py:261
grepros.inputs.Source.notify
def notify(self, status)
Definition: inputs.py:181
grepros.inputs.TopicSource._running
_running
Definition: inputs.py:844
grepros.inputs.BagSource.format_meta
def format_meta(self)
Definition: inputs.py:576
grepros.inputs.ConditionMixin.Empty.__contains__
def __contains__(self, item)
Definition: inputs.py:298
grepros.inputs.BagSource._running
_running
Definition: inputs.py:500
grepros.inputs.BagSource.get_message_meta
def get_message_meta(self, topic, msg, stamp, index=None)
Definition: inputs.py:604
grepros.inputs.BagSource._args0
_args0
Definition: inputs.py:495
grepros.inputs.BagSource._init_progress
def _init_progress(self)
Definition: inputs.py:721
grepros.inputs.Source.MESSAGE_META_TEMPLATE
string MESSAGE_META_TEMPLATE
Template for message metainfo line.
Definition: inputs.py:41
grepros.inputs.BagSource._ensure_totals
def _ensure_totals(self)
Definition: inputs.py:732
grepros.api.Bag
Definition: api.py:350
grepros.inputs.Source.get_message_class
def get_message_class(self, typename, typehash=None)
Definition: inputs.py:146
grepros.inputs.ConditionMixin.Empty.__bool__
def __bool__(self)
Definition: inputs.py:296
grepros.inputs.ConditionMixin._get_topic_instance
def _get_topic_instance(self, topic, remap=None)
Definition: inputs.py:392
grepros.inputs.BagSource.format_message_meta
def format_message_meta(self, topic, msg, stamp, index=None)
Definition: inputs.py:580
grepros.inputs.Source.validate
def validate(self)
Definition: inputs.py:101
grepros.inputs.TopicSource._queue
_queue
Definition: inputs.py:845
grepros.inputs.AppSource.__init__
def __init__(self, args=None, **kwargs)
Definition: inputs.py:1032
grepros.inputs.BagSource.validate
def validate(self)
Definition: inputs.py:538
grepros.inputs.TopicSource.get_meta
def get_meta(self)
Definition: inputs.py:903
grepros.inputs.TopicSource.close
def close(self)
Definition: inputs.py:893
grepros.inputs.TopicSource.MASTER_INTERVAL
int MASTER_INTERVAL
Seconds between refreshing available topics from ROS master.
Definition: inputs.py:810
grepros.inputs.ConditionMixin._topic_limits
_topic_limits
Definition: inputs.py:317
grepros.inputs.Source.bar
bar
ProgressBar instance, if any.
Definition: inputs.py:74
grepros.inputs.BagSource
Definition: inputs.py:428
grepros.inputs.BagSource._produce
def _produce(self, topics, start_time=None)
Definition: inputs.py:653
grepros.common.ProgressBar
Definition: common.py:348
grepros.inputs.TopicSource.get_message_type_hash
def get_message_type_hash(self, msg_or_type)
Definition: inputs.py:929
grepros.inputs.Source.get_meta
def get_meta(self)
Definition: inputs.py:135
grepros.inputs.Source.is_processable
def is_processable(self, topic, msg, stamp, index=None)
Definition: inputs.py:158
grepros.inputs.Source
Definition: inputs.py:34
grepros.inputs.ConditionMixin.Topic
Definition: inputs.py:230
grepros.inputs.Source.close
def close(self)
Definition: inputs.py:106
grepros.inputs.ConditionMixin.Message._msg
_msg
Definition: inputs.py:276
grepros.inputs.ConditionMixin.Empty.__getattr__
def __getattr__(self, name)
Definition: inputs.py:295
grepros.inputs.ConditionMixin.has_conditions
def has_conditions(self)
Definition: inputs.py:356
grepros.inputs.AppSource._configure
def _configure(self)
Definition: inputs.py:1148
grepros.inputs.ConditionMixin.conditions_register_message
def conditions_register_message(self, topic, msg)
Definition: inputs.py:382
grepros.inputs.TopicSource._init_progress
def _init_progress(self)
Definition: inputs.py:986
grepros.inputs.Source._parse_patterns
def _parse_patterns(self)
Definition: inputs.py:188
grepros.inputs.Source.format_meta
def format_meta(self)
Definition: inputs.py:122
grepros.inputs.ConditionMixin.close_batch
def close_batch(self)
Definition: inputs.py:351
grepros.inputs.ConditionMixin._topic_states
_topic_states
Definition: inputs.py:309
grepros.inputs.BagSource._meta
_meta
Definition: inputs.py:503
grepros.inputs.BagSource.close
def close(self)
Definition: inputs.py:559
grepros.inputs.BagSource._configure
def _configure(self, filename=None, bag=None)
Definition: inputs.py:741
grepros.inputs.BagSource.get_message_definition
def get_message_definition(self, msg_or_type)
Definition: inputs.py:618
grepros.inputs.ConditionMixin._conditions
_conditions
{condition with <topic x> as get_topic("x"): compiled code object}
Definition: inputs.py:320
grepros.inputs.ConditionMixin.__init__
def __init__(self, args=None, **kwargs)
Definition: inputs.py:302
grepros.inputs.Source._counts
_counts
Definition: inputs.py:63
grepros.inputs.Source._hashes
_hashes
Definition: inputs.py:65
grepros.inputs.ConditionMixin._topics_per_condition
_topics_per_condition
Definition: inputs.py:310
grepros.inputs.Source.topics
topics
All topics in source, as {(topic, typenane, typehash): total message count or None}.
Definition: inputs.py:72
grepros.inputs.Source.bind
def bind(self, sink)
Definition: inputs.py:97
grepros.inputs.Source.read
def read(self)
Definition: inputs.py:94


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29