search.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Search core.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 28.09.2021
11 @modified 23.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 import copy
16 import collections
17 import re
18 
19 import six
20 
21 from . import api
22 from . import common
23 from . import inputs
24 
25 
26 class Scanner(object):
27  """
28  ROS message grepper.
29 
30  In highlighted results, message field values that match search criteria are modified
31  to wrap the matching parts in {@link grepros.common.MatchMarkers MatchMarkers} tags,
32  with numeric field values converted to strings beforehand.
33  """
34 
35 
36  GrepMessage = collections.namedtuple("BagMessage", "topic message timestamp match index")
37 
38 
39  ANY_MATCHES = [((), re.compile("(.*)", re.DOTALL)), (), re.compile("(.?)", re.DOTALL)]
40 
41 
42  DEFAULT_ARGS = dict(PATTERN=(), CASE=False, FIXED_STRING=False, INVERT=False, HIGHLIGHT=False,
43  NTH_MATCH=1, BEFORE=0, AFTER=0, CONTEXT=0, MAX_COUNT=0,
44  MAX_PER_TOPIC=0, MAX_TOPICS=0, SELECT_FIELD=(), NOSELECT_FIELD=(),
45  MATCH_WRAPPER="**")
46 
47 
48  def __init__(self, args=None, **kwargs):
49  """
50  @param args arguments as namespace or dictionary, case-insensitive
51  @param args.pattern pattern(s) to find in message field values
52  @param args.fixed_string pattern contains ordinary strings, not regular expressions
53  @param args.case use case-sensitive matching in pattern
54  @param args.invert select messages not matching pattern
55  @param args.highlight highlight matched values
56  @param args.before number of messages of leading context to emit before match
57  @param args.after number of messages of trailing context to emit after match
58  @param args.context number of messages of leading and trailing context to emit
59  around match, overrides args.before and args.after
60  @param args.max_count number of matched messages to emit (per file if bag input)
61  @param args.max_per_topic number of matched messages to emit from each topic
62  @param args.max_topics number of topics to emit matches from
63  @param args.nth_match emit every Nth match in topic
64  @param args.select_field message fields to use in matching if not all
65  @param args.noselect_field message fields to skip in matching
66  @param args.match_wrapper string to wrap around matched values in find() and match(),
67  both sides if one value, start and end if more than one,
68  or no wrapping if zero values (default "**")
69  @param kwargs any and all arguments as keyword overrides, case-insensitive
70  <!--sep-->
71 
72  Additional arguments when using match() or find(grepros.api.Bag):
73 
74  @param args.topic ROS topics to read if not all
75  @param args.type ROS message types to read if not all
76  @param args.skip_topic ROS topics to skip
77  @param args.skip_type ROS message types to skip
78  @param args.start_time earliest timestamp of messages to read
79  @param args.end_time latest timestamp of messages to read
80  @param args.start_index message index within topic to start from
81  @param args.end_index message index within topic to stop at
82  @param args.unique emit messages that are unique in topic
83  @param args.nth_message read every Nth message in topic
84  @param args.nth_interval minimum time interval between messages in topic
85  @param args.condition Python expressions that must evaluate as true
86  for message to be processable, see ConditionMixin
87  @param args.progress whether to print progress bar
88  @param args.stop_on_error stop execution on any error like unknown message type
89  """
90  # {key: [(() if any field else ('nested', 'path') or re.Pattern, re.Pattern), ]}
91  self._patterns = {}
92  # {(topic, typename, typehash): {message ID: message}}
93  self._messages = collections.defaultdict(collections.OrderedDict)
94  # {(topic, typename, typehash): {message ID: ROS time}}
95  self._stamps = collections.defaultdict(collections.OrderedDict)
96  # {(topic, typename, typehash): {None: processed, True: matched, False: emitted as context}}
97  self._counts = collections.defaultdict(collections.Counter)
98  # {(topic, typename, typehash): {message ID: True if matched else False if emitted else None}}
99  self._statuses = collections.defaultdict(collections.OrderedDict)
100  # Patterns to check in message plaintext and skip full matching if not found
102  self._idcounter = 0 # Counter for unique message IDs
103  self._highlight = None # Highlight matched values in message fields
104  self._passthrough = False # Emit messages without pattern-matching and highlighting
105 
106 
107  self.source = None
108 
109  self.sink = None
110 
111  self.args = common.ensure_namespace(args, Scanner.DEFAULT_ARGS, **kwargs)
112  if self.args.CONTEXT: self.args.BEFORE = self.args.AFTER = self.args.CONTEXT
113  self._parse_patterns()
114 
115 
116  def find(self, source, highlight=None):
117  """
118  Yields matched and context messages from source.
119 
120  @param source inputs.Source or api.Bag instance
121  @param highlight whether to highlight matched values in message fields,
122  defaults to flag from constructor
123  @return GrepMessage namedtuples of
124  (topic, message, timestamp, match, index in topic),
125  where match is matched optionally highlighted message
126  or `None` if yielding a context message
127  """
128  if isinstance(source, api.Bag):
129  source = inputs.BagSource(source, **vars(self.args))
130  self._prepare(source, highlight=highlight)
131  for topic, msg, stamp, matched, index in self._generate():
132  yield self.GrepMessage(topic, msg, stamp, matched, index)
133 
134 
135  def match(self, topic, msg, stamp, highlight=None):
136  """
137  Returns matched message if message matches search filters.
138 
139  @param topic topic name
140  @param msg ROS message
141  @param stamp message ROS timestamp
142  @param highlight whether to highlight matched values in message fields,
143  defaults to flag from constructor
144  @return original or highlighted message on match else `None`
145  """
146  result = None
147  if not isinstance(self.source, inputs.AppSource):
148  self._prepare(inputs.AppSource(self.args), highlight=highlight)
149  if self._highlight != bool(highlight): self._configure_flags(highlight=highlight)
150 
151  self.source.push(topic, msg, stamp)
152  item = self.source.read_queue()
153  if item is not None:
154  msgid = self._idcounter = self._idcounter + 1
155  topickey = api.TypeMeta.make(msg, topic).topickey
156  self._register_message(topickey, msgid, msg, stamp)
157  matched = self._is_processable(topic, msg, stamp) and self.get_match(msg)
158 
159  self.source.notify(matched)
160  if matched and not self._counts[topickey][True] % (self.args.NTH_MATCH or 1):
161  self._statuses[topickey][msgid] = True
162  self._counts[topickey][True] += 1
163  result = matched
164  elif matched: # Not NTH_MATCH, skip emitting
165  self._statuses[topickey][msgid] = True
166  self._counts[topickey][True] += 1
167  self._prune_data(topickey)
168  self.source.mark_queue(topic, msg, stamp)
169  return result
170 
171 
172  def work(self, source, sink):
173  """
174  Greps messages yielded from source and emits matched content to sink.
175 
176  @param source inputs.Source or api.Bag instance
177  @param sink outputs.Sink instance
178  @return count matched
179  """
180  if isinstance(source, api.Bag):
181  source = inputs.BagSource(source, **vars(self.args))
182  self._prepare(source, sink, highlight=self.args.HIGHLIGHT)
183  total_matched = 0
184  for topic, msg, stamp, matched, index in self._generate():
185  sink.emit_meta()
186  sink.emit(topic, msg, stamp, matched, index)
187  total_matched += bool(matched)
188  return total_matched
189 
190 
191  def __enter__(self):
192  """Context manager entry, does nothing, returns self."""
193  return self
194 
195 
196  def __exit__(self, exc_type, exc_value, traceback):
197  """Context manager exit, does nothing."""
198  return self
199 
200 
201  def _generate(self):
202  """
203  Yields matched and context messages from source.
204 
205  @return tuples of (topic, msg, stamp, matched optionally highlighted msg, index in topic)
206  """
207  batch_matched, batch = False, None
208  for topic, msg, stamp in self.source.read():
209  if batch != self.source.get_batch():
210  batch, batch_matched = self.source.get_batch(), False
211  if self._counts: self._clear_data()
212 
213  msgid = self._idcounter = self._idcounter + 1
214  topickey = api.TypeMeta.make(msg, topic).topickey
215  self._register_message(topickey, msgid, msg, stamp)
216  matched = self._is_processable(topic, msg, stamp) and self.get_match(msg)
217 
218  self.source.notify(matched)
219  if matched and not self._counts[topickey][True] % (self.args.NTH_MATCH or 1):
220  self._statuses[topickey][msgid] = True
221  self._counts[topickey][True] += 1
222  for x in self._generate_context(topickey, before=True): yield x
223  yield (topic, msg, stamp, matched, self._counts[topickey][None])
224  elif matched: # Not NTH_MATCH, skip emitting
225  self._statuses[topickey][msgid] = True
226  self._counts[topickey][True] += 1
227  elif self.args.AFTER \
228  and self._has_in_window(topickey, self.args.AFTER + 1, status=True):
229  for x in self._generate_context(topickey, before=False): yield x
230  batch_matched = batch_matched or bool(matched)
231 
232  self._prune_data(topickey)
233  if batch_matched and self._is_max_done():
234  if self.sink: self.sink.flush()
235  self.source.close_batch()
236 
237 
238  def _is_processable(self, topic, msg, stamp):
239  """
240  Returns whether processing current message in topic is acceptable:
241  that topic or total maximum count has not been reached,
242  and current message in topic is in configured range, if any.
243  """
244  topickey = api.TypeMeta.make(msg, topic).topickey
245  if self.args.MAX_COUNT \
246  and sum(x[True] for x in self._counts.values()) >= self.args.MAX_COUNT:
247  return False
248  if self.args.MAX_PER_TOPIC and self._counts[topickey][True] >= self.args.MAX_PER_TOPIC:
249  return False
250  if self.args.MAX_TOPICS:
251  topics_matched = [k for k, vv in self._counts.items() if vv[True]]
252  if topickey not in topics_matched and len(topics_matched) >= self.args.MAX_TOPICS:
253  return False
254  if self.source \
255  and not self.source.is_processable(topic, msg, stamp, self._counts[topickey][None]):
256  return False
257  return True
258 
259 
260  def _generate_context(self, topickey, before=False):
261  """Yields before/after context for latest match."""
262  count = self.args.BEFORE + 1 if before else self.args.AFTER
263  candidates = list(self._statuses[topickey])[-count:]
264  current_index = self._counts[topickey][None]
265  for i, msgid in enumerate(candidates) if count else ():
266  if self._statuses[topickey][msgid] is None:
267  idx = current_index + i - (len(candidates) - 1 if before else 1)
268  msg, stamp = self._messages[topickey][msgid], self._stamps[topickey][msgid]
269  self._counts[topickey][False] += 1
270  yield topickey[0], msg, stamp, None, idx
271  self._statuses[topickey][msgid] = False
272 
273 
274  def _clear_data(self):
275  """Clears local structures."""
276  for d in (self._counts, self._messages, self._stamps, self._statuses):
277  d.clear()
278  api.TypeMeta.clear()
279 
280 
281  def _prepare(self, source, sink=None, highlight=None):
282  """Clears local structures, binds and registers source and sink, if any."""
283  self._clear_data()
284  self.source, self.sink = source, sink
285  source.bind(sink), sink and sink.bind(source)
286  source.preprocess = False
287  self._configure_flags(highlight=highlight)
288 
289 
290  def _prune_data(self, topickey):
291  """Drops history older than context window."""
292  WINDOW = max(self.args.BEFORE, self.args.AFTER) + 1
293  for dct in (self._messages, self._stamps, self._statuses):
294  while len(dct[topickey]) > WINDOW:
295  msgid = next(iter(dct[topickey]))
296  value = dct[topickey].pop(msgid)
297  dct is self._messages and api.TypeMeta.discard(value)
298 
299 
300  def _parse_patterns(self):
301  """Parses pattern arguments into re.Patterns."""
302  NOBRUTE_SIGILS = r"\A", r"\Z", "?(" # Regex specials ruling out brute precheck
303  BRUTE, FLAGS = not self.args.INVERT, re.DOTALL | (0 if self.args.CASE else re.I)
304  self._patterns.clear()
305  del self._brute_prechecks[:]
306  contents = []
307  for v in self.args.PATTERN:
308  split = v.find("=", 1, -1)
309  v, path = (v[split + 1:], v[:split]) if split > 0 else (v, ())
310  # Special case if '' or "": add pattern for matching empty string
311  v = "|^$" if v in ("''", '""') else (re.escape(v) if self.args.FIXED_STRING else v)
312  path = re.compile(r"(^|\.)%s($|\.)" % ".*".join(map(re.escape, path.split("*")))) \
313  if path else ()
314  contents.append((path, re.compile("(%s)" % v, FLAGS)))
315  if BRUTE and (self.args.FIXED_STRING or not any(x in v for x in NOBRUTE_SIGILS)):
316  self._brute_prechecks.append(re.compile(v, re.I | re.M))
317  if not self.args.PATTERN: # Add match-all pattern
318  contents.append(self.ANY_MATCHES[0])
319  self._patterns["content"] = contents
320 
321  selects, noselects = self.args.SELECT_FIELD, self.args.NOSELECT_FIELD
322  for key, vals in [("select", selects), ("noselect", noselects)]:
323  self._patterns[key] = [(tuple(v.split(".")), common.wildcard_to_regex(v)) for v in vals]
324 
325 
326  def _register_message(self, topickey, msgid, msg, stamp):
327  """Registers message with local structures."""
328  self._counts[topickey][None] += 1
329  self._messages[topickey][msgid] = msg
330  self._stamps [topickey][msgid] = stamp
331  self._statuses[topickey][msgid] = None
332 
333 
334  def _configure_flags(self, highlight=None):
335  """Sets highlight and passthrough flags from current settings."""
336  self._highlight = bool(highlight if highlight is not None else
337  False if self.sink and not self.sink.is_highlighting() else
338  self.args.HIGHLIGHT)
339  self._passthrough = not self._highlight and not self._patterns["select"] \
340  and not self._patterns["noselect"] and not self.args.INVERT \
341  and set(self._patterns["content"]) <= set(self.ANY_MATCHES)
342 
343 
344  def _is_max_done(self):
345  """Returns whether max match count has been reached (and message after-context emitted)."""
346  result, is_maxed = False, False
347  if self.args.MAX_COUNT:
348  is_maxed = sum(vv[True] for vv in self._counts.values()) >= self.args.MAX_COUNT
349  if not is_maxed and self.args.MAX_PER_TOPIC:
350  count_required = self.args.MAX_TOPICS or len(self.source.topics)
351  count_maxed = sum(vv[True] >= self.args.MAX_PER_TOPIC
352  or vv[None] >= (self.source.topics.get(k) or 0)
353  for k, vv in self._counts.items())
354  is_maxed = (count_maxed >= count_required)
355  if is_maxed:
356  result = not self.args.AFTER or \
357  not any(self._has_in_window(k, self.args.AFTER, status=True, full=True)
358  for k in self._counts)
359  return result
360 
361 
362  def _has_in_window(self, topickey, length, status, full=False):
363  """Returns whether given status exists in recent message window."""
364  if not length or full and len(self._statuses[topickey]) < length:
365  return False
366  return status in list(self._statuses[topickey].values())[-length:]
367 
368 
369  def get_match(self, msg):
370  """
371  Returns transformed message if all patterns find a match in message, else None.
372 
373  Matching field values are converted to strings and surrounded by markers.
374  Returns original message if any-match and sink does not require highlighting.
375  """
376 
377  def wrap_matches(v, top, is_collection=False):
378  """Returns string with matching parts wrapped in marker tags; updates `matched`."""
379  spans = []
380  # Omit collection brackets from match unless empty: allow matching "[]"
381  v1 = v2 = v[1:-1] if is_collection and v != "[]" else v
382  topstr = ".".join(top)
383  for i, (path, p) in enumerate(self._patterns["content"]):
384  if path and not path.search(topstr): continue # for
385  matches = [next(p.finditer(v1), None)] if self.args.INVERT else list(p.finditer(v1))
386  # Join consecutive zero-length matches, extend remaining zero-lengths to end of value
387  matchspans = common.merge_spans([x.span() for x in matches if x], join_blanks=True)
388  matchspans = [(a, b if a != b else len(v1)) for a, b in matchspans]
389  if matchspans:
390  matched[i] = True
391  spans.extend(matchspans)
392  if any(WRAPS):
393  spans = common.merge_spans(spans) if not self.args.INVERT else \
394  [] if spans else [(0, len(v1))] if v1 or not is_collection else []
395  for a, b in reversed(spans): # Work from last to first, indices stay the same
396  v2 = v2[:a] + WRAPS[0] + v2[a:b] + WRAPS[1] + v2[b:]
397  return "[%s]" % v2 if is_collection and v != "[]" else v2
398 
399  def process_message(obj, top=()):
400  """Recursively converts field values to pattern-matched strings; updates `matched`."""
401  LISTIFIABLES = (bytes, tuple) if six.PY3 else (tuple, )
402  selects, noselects = self._patterns["select"], self._patterns["noselect"]
403  fieldmap = fieldmap0 = api.get_message_fields(obj) # Returns obj if not ROS message
404  if fieldmap != obj:
405  fieldmap = api.filter_fields(fieldmap, top, include=selects, exclude=noselects)
406  for k, t in fieldmap.items() if fieldmap != obj else ():
407  v, path = api.get_message_value(obj, k, t), top + (k, )
408  is_collection = isinstance(v, (list, tuple))
409  if api.is_ros_message(v):
410  process_message(v, path)
411  elif v and is_collection and api.scalar(t) not in api.ROS_NUMERIC_TYPES:
412  api.set_message_value(obj, k, [process_message(x, path) for x in v])
413  else:
414  v1 = str(list(v) if isinstance(v, LISTIFIABLES) else v)
415  v2 = wrap_matches(v1, path, is_collection)
416  if len(v1) != len(v2):
417  api.set_message_value(obj, k, v2)
418  if not api.is_ros_message(obj):
419  v1 = str(list(obj) if isinstance(obj, LISTIFIABLES) else obj)
420  v2 = wrap_matches(v1, top)
421  obj = v2 if len(v1) != len(v2) else obj
422  if not top and not matched and not selects and not fieldmap0 and not self.args.INVERT \
423  and set(self._patterns["content"]) <= set(self.ANY_MATCHES): # Ensure Empty any-match
424  matched.update({i: True for i, _ in enumerate(self._patterns["content"])})
425  return obj
426 
427  if self._passthrough: return msg
428 
429  if self._brute_prechecks:
430  text = "\n".join("%r" % (v, ) for _, v, _ in api.iter_message_fields(msg, flat=True))
431  if not all(any(p.finditer(text)) for p in self._brute_prechecks):
432  return None # Skip detailed matching if patterns not present at all
433 
434  WRAPS = [] if not self._highlight else self.args.MATCH_WRAPPER if not self.sink else \
435  (common.MatchMarkers.START, common.MatchMarkers.END)
436  WRAPS = WRAPS if isinstance(WRAPS, (list, tuple)) else [] if WRAPS is None else [WRAPS]
437  WRAPS = ((WRAPS or [""]) * 2)[:2]
438 
439  result, matched = copy.deepcopy(msg), {} # {pattern index: True}
440  process_message(result)
441  yes = not matched if self.args.INVERT else len(matched) == len(self._patterns["content"])
442  return (result if self._highlight else msg) if yes else None
443 
444 
445 __all__ = ["Scanner"]
grepros.search.Scanner._clear_data
def _clear_data(self)
Definition: search.py:274
grepros.search.Scanner.sink
sink
Sink instance.
Definition: search.py:109
grepros.search.Scanner._statuses
_statuses
Definition: search.py:99
grepros.search.Scanner
Definition: search.py:26
grepros.search.Scanner._has_in_window
def _has_in_window(self, topickey, length, status, full=False)
Definition: search.py:362
grepros.search.Scanner._prepare
def _prepare(self, source, sink=None, highlight=None)
Definition: search.py:281
grepros.search.Scanner._highlight
_highlight
Definition: search.py:103
grepros.search.Scanner.work
def work(self, source, sink)
Definition: search.py:172
grepros.search.Scanner._brute_prechecks
_brute_prechecks
Definition: search.py:101
grepros.search.Scanner.source
source
Source instance.
Definition: search.py:107
grepros.search.Scanner.__enter__
def __enter__(self)
Definition: search.py:191
grepros.inputs.AppSource
Definition: inputs.py:1023
grepros.search.Scanner._messages
_messages
Definition: search.py:93
grepros.search.Scanner._passthrough
_passthrough
Definition: search.py:104
grepros.search.Scanner._is_max_done
def _is_max_done(self)
Definition: search.py:344
grepros.search.Scanner.match
def match(self, topic, msg, stamp, highlight=None)
Definition: search.py:135
grepros.search.Scanner._configure_flags
def _configure_flags(self, highlight=None)
Definition: search.py:334
grepros.search.Scanner._register_message
def _register_message(self, topickey, msgid, msg, stamp)
Definition: search.py:326
grepros.search.Scanner._patterns
_patterns
Definition: search.py:91
grepros.search.Scanner._generate_context
def _generate_context(self, topickey, before=False)
Definition: search.py:260
grepros.search.Scanner._is_processable
def _is_processable(self, topic, msg, stamp)
Definition: search.py:238
grepros.search.Scanner.args
args
Definition: search.py:111
grepros.search.Scanner.__init__
def __init__(self, args=None, **kwargs)
Definition: search.py:48
grepros.search.Scanner.GrepMessage
GrepMessage
Namedtuple of (topic name, ROS message, ROS time object, message if matched, index in topic).
Definition: search.py:36
grepros.search.Scanner._idcounter
_idcounter
Definition: search.py:102
grepros.search.Scanner.find
def find(self, source, highlight=None)
Definition: search.py:116
grepros.api.Bag
Definition: api.py:350
grepros.search.Scanner._stamps
_stamps
Definition: search.py:95
grepros.search.Scanner._parse_patterns
def _parse_patterns(self)
Definition: search.py:300
grepros.search.Scanner._generate
def _generate(self)
Definition: search.py:201
grepros.search.Scanner.ANY_MATCHES
list ANY_MATCHES
Match patterns for global any-match.
Definition: search.py:39
grepros.search.Scanner._counts
_counts
Definition: search.py:97
grepros.inputs.BagSource
Definition: inputs.py:428
grepros.search.Scanner._prune_data
def _prune_data(self, topickey)
Definition: search.py:290
grepros.search.Scanner.get_match
def get_match(self, msg)
Definition: search.py:369
grepros.search.Scanner.__exit__
def __exit__(self, exc_type, exc_value, traceback)
Definition: search.py:196


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29