library.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 grepros library interface.
4 
5 Source classes:
6 
7 - {@link grepros.inputs.AppSource AppSource}: produces messages from iterable or pushed data
8 - {@link grepros.inputs.BagSource BagSource}: produces messages from ROS bagfiles
9 - {@link grepros.inputs.TopicSource TopicSource}: produces messages from live ROS topics
10 
11 Sink classes:
12 
13 - {@link grepros.outputs.AppSink AppSink}: provides messages to callback function
14 - {@link grepros.outputs.BagSink BagSink}: writes messages to bagfile
15 - {@link grepros.outputs.ConsoleSink ConsoleSink}: prints messages to console
16 - {@link grepros.plugins.auto.csv.CsvSink CsvSink}: writes messages to CSV files, each topic to a separate file
17 - {@link grepros.plugins.auto.html.HtmlSink HtmlSink}: writes messages to an HTML file
18 - {@link grepros.plugins.mcap.McapSink McapSink}: writes messages to MCAP file
19 - {@link grepros.outputs.MultiSink MultiSink}: combines any number of sinks
20 - {@link grepros.plugins.parquet.ParquetSink ParquetSink}: writes messages to Apache Parquet files
21 - {@link grepros.plugins.auto.postgres.PostgresSink PostgresSink}: writes messages to a Postgres database
22 - {@link grepros.plugins.auto.sqlite.SqliteSink SqliteSink}: writes messages to an SQLite database
23 - {@link grepros.plugins.sql.SqlSink SqlSink}: writes SQL schema file for message type tables and topic views
24 - {@link grepros.outputs.TopicSink TopicSink}: publishes messages to ROS topics
25 
26 {@link grepros.api.BaseBag Bag}: generic bag interface.
27 {@link grepros.search.Scanner Scanner}: ROS message grepper.
28 
29 Format-specific bag classes:
30 
31 - {@link grepros.ros1.ROS1Bag ROS1Bag}: ROS1 bag reader and writer in .bag format
32 - {@link grepros.ros2.ROS2Bag ROS2Bag}: ROS2 bag reader and writer in .db3 SQLite format
33 - {@link grepros.plugins.embag.EmbagReader EmbagReader}: ROS1 bag reader
34  using the <a href="https://github.com/embarktrucks/embag">embag</a> library
35 - {@link grepros.plugins.mcap.McapBag McapBag}: ROS1/ROS2 bag reader and writer in MCAP format
36 
37 Output sink `write_options` arguments can be given with underscores
38 instead of dashes, e.g. `"rollover_size"` instead of `"rollover-size"`.
39 
40 ------------------------------------------------------------------------------
41 This file is part of grepros - grep for ROS bag files and live topics.
42 Released under the BSD License.
43 
44 @author Erki Suurjaak
45 @created 09.12.2022
46 @modified 28.12.2023
47 ------------------------------------------------------------------------------
48 """
49 
50 from . plugins.auto.csv import CsvSink
51 from . plugins.auto.html import HtmlSink
52 from . plugins.auto.postgres import PostgresSink
53 from . plugins.auto.sqlite import SqliteSink
54 from . plugins.mcap import McapBag, McapSink
55 from . plugins.parquet import ParquetSink
56 from . plugins.sql import SqlSink
57 
58 from . api import Bag
59 from . inputs import AppSource, BagSource, Source, TopicSource
60 from . outputs import AppSink, BagSink, ConsoleSink, MultiSink, Sink, TopicSink
61 from . search import Scanner
62 from . import api
63 from . import common
64 from . import main
65 from . import plugins
66 
67 
68 _inited = False
69 
70 
71 def grep(args=None, **kwargs):
72  """
73  Yields matching messages from specified source.
74 
75  Initializes grepros if not already initialized.
76 
77  Read from bagfiles: `grep(file="2022-10-*.bag", pattern="cpu")`.
78 
79  Read from live topics: `grep(live=True, pattern="cpu")`.
80 
81 
82  @param args arguments as namespace or dictionary, case-insensitive;
83  or a single path as the ROS bagfile to read,
84  or one or more {@link grepros.api.Bag Bag} instances
85  @param kwargs any and all arguments as keyword overrides, case-insensitive
86  <!--sep-->
87 
88  Bag source:
89  @param args.file names of ROS bagfiles to read if not all in directory
90  @param args.path paths to scan if not current directory
91  @param args.recurse recurse into subdirectories when looking for bagfiles
92  @param args.decompress decompress archived bags to file directory
93  @param args.reindex make a copy of unindexed bags and reindex them (ROS1 only)
94  @param args.orderby "topic" or "type" if any to group results by
95  @param args.bag one or more {@link grepros.api.Bag Bag} instances
96  <!--sep-->
97 
98  Live source:
99  @param args.live whether reading messages from live ROS topics
100  @param args.queue_size_in subscriber queue size (default 10)
101  @param args.ros_time_in stamp messages with ROS time instead of wall time
102  <!--sep-->
103 
104  App source:
105  @param args.app whether reading messages from iterable or pushed data;
106  may contain the iterable itself
107  @param args.iterable iterable yielding (topic, msg, stamp) or (topic, msg);
108  yielding `None` signals end of content
109  Any source:
110  @param args.topic ROS topics to read if not all
111  @param args.type ROS message types to read if not all
112  @param args.skip_topic ROS topics to skip
113  @param args.skip_type ROS message types to skip
114  @param args.start_time earliest timestamp of messages to read
115  @param args.end_time latest timestamp of messages to read
116  @param args.start_index message index within topic to start from
117  @param args.end_index message index within topic to stop at
118 
119  @param args.nth_message read every Nth message in topic
120  @param args.nth_interval minimum time interval between messages in topic
121 
122  @param args.select_field message fields to use in matching if not all
123  @param args.noselect_field message fields to skip in matching
124 
125  @param args.unique emit messages that are unique in topic
126  (select_field and noselect_field apply if specified)
127  @param args.condition Python expressions that must evaluate as true
128  for message to be processable, see ConditionMixin
129  <!--sep-->
130 
131  Search&zwj;:
132  @param args.pattern pattern(s) to find in message field values
133  @param args.fixed_string pattern contains ordinary strings, not regular expressions
134  @param args.case use case-sensitive matching in pattern
135  @param args.invert select messages not matching pattern
136 
137  @param args.nth_match emit every Nth match in topic
138  @param args.max_count number of matched messages to emit (per file if bag input)
139  @param args.max_per_topic number of matched messages to emit from each topic
140  @param args.max_topics number of topics to print matches from
141 
142  @param args.before number of messages of leading context to emit before match
143  @param args.after number of messages of trailing context to emit after match
144  @param args.context number of messages of leading and trailing context
145  to emit around match
146 
147  @param args.highlight highlight matched values
148  @param args.match_wrapper string to wrap around matched values,
149  both sides if one value, start and end if more than one,
150  or no wrapping if zero values
151 
152  @return {@link grepros.Scanner.GrepMessage GrepMessage} namedtuples
153  of (topic, message, timestamp, match, index)
154  """
155  DEFAULT_ARGS = dict(FILE=[], LIVE=False, APP=False, ITERABLE=None,
156  COLOR="never", HIGHLIGHT=False)
157 
158  args0 = args
159  is_bag = isinstance(args, Bag) or \
160  common.is_iterable(args) and all(isinstance(x, Bag) for x in args)
161  args = {"FILE": str(args)} if isinstance(args, common.PATH_TYPES) else \
162  {} if is_bag or isinstance(args, Source) else args
163  args = common.ensure_namespace(args, DEFAULT_ARGS, **kwargs)
164  main.validate_args(main.process_args(args))
165  if not _inited: init(args)
166 
167  if common.is_iterable(args.APP) and not common.is_iterable(args.ITERABLE):
168  args.APP, args.ITERABLE = True, args.APP
169  src = args0 if isinstance(args0, Source) else \
170  TopicSource(args) if args.LIVE else \
171  AppSource(args) if args.APP else \
172  BagSource(args0, **vars(args)) if is_bag else BagSource(args)
173  src.validate()
174 
175  try:
176  for x in Scanner(args).find(src): yield x
177  finally:
178  if not isinstance(args0, (Bag, Source)): src.close()
179 
180 
181 def source(args=None, **kwargs):
182  """
183  Convenience for creating a {@link grepros.inputs.Source Source} instance from arguments.
184 
185  Initializes grepros if not already initialized.
186 
187  @param args arguments as namespace or dictionary, case-insensitive;
188  or a single path as the ROS bagfile to read
189  @param kwargs any and all arguments as keyword overrides, case-insensitive
190  @param args.file one or more names of ROS bagfiles to read from
191  @param args.live read messages from live ROS topics instead
192  @param args.app read messages from iterable or pushed data instead;
193  may contain the iterable itself
194  <!--sep-->
195 
196  Bag source:
197  @param args.file names of ROS bagfiles to read if not all in directory
198  @param args.path paths to scan if not current directory
199  @param args.recurse recurse into subdirectories when looking for bagfiles
200  @param args.orderby "topic" or "type" if any to group results by
201  @param args.decompress decompress archived bags to file directory
202  @param args.reindex make a copy of unindexed bags and reindex them (ROS1 only)
203  @param args.progress whether to print progress bar
204  <!--sep-->
205 
206  Live source:
207  @param args.queue_size_in subscriber queue size (default 10)
208  @param args.ros_time_in stamp messages with ROS time instead of wall time
209  @param args.progress whether to print progress bar
210  <!--sep-->
211 
212  App source:
213  @param args.iterable iterable yielding (topic, msg, stamp) or (topic, msg);
214  yielding `None` signals end of content
215  <!--sep-->
216 
217  Any source:
218  @param args.topic ROS topics to read if not all
219  @param args.type ROS message types to read if not all
220  @param args.skip_topic ROS topics to skip
221  @param args.skip_type ROS message types to skip
222  @param args.start_time earliest timestamp of messages to read
223  @param args.end_time latest timestamp of messages to read
224  @param args.start_index message index within topic to start from
225  @param args.end_index message index within topic to stop at
226  @param args.unique emit messages that are unique in topic
227  @param args.select_field message fields to use for uniqueness if not all
228  @param args.noselect_field message fields to skip for uniqueness
229  @param args.nth_message read every Nth message in topic
230  @param args.nth_interval minimum time interval between messages in topic
231  @param args.condition Python expressions that must evaluate as true
232  for message to be processable, see ConditionMixin
233  """
234  DEFAULT_ARGS = dict(FILE=[], LIVE=False, APP=False, ITERABLE=None)
235  args = {"FILE": str(args)} if isinstance(args, common.PATH_TYPES) else args
236  args = common.ensure_namespace(args, DEFAULT_ARGS, **kwargs)
237  if not _inited: init(args)
238 
239  if common.is_iterable(args.APP) and not common.is_iterable(args.ITERABLE):
240  args.APP, args.ITERABLE = True, args.APP
241  result = (TopicSource if args.LIVE else AppSource if args.APP else BagSource)(args)
242  result.validate()
243  return result
244 
245 
246 def sink(args=None, **kwargs):
247  """
248  Convenience for creating a {@link grepros.outputs.Sink Sink} instance from arguments,
249  {@link grepros.outputs.MultiSink MultiSink} if several outputs.
250 
251  Initializes grepros if not already initialized.
252 
253  @param args arguments as namespace or dictionary, case-insensitive;
254  or a single item as sink target like bag filename
255  @param kwargs any and all arguments as keyword overrides, case-insensitive
256  @param args.app provide messages to given callback function
257  @param args.console print matches to console
258  @param args.publish publish matches to live topics
259  @param args.write file or other target like Postgres database to write,
260  as "target", or ["target", dict(format="format", ..)]
261  or [[..target1..], [..target2..], ..]
262  @param args.write_options format-specific options like
263  {"overwrite": whether to overwrite existing file
264  (default false)}
265  <!--sep-->
266 
267  Console sink:
268  @param args.line_prefix print source prefix like bag filename on each message line
269  @param args.max_field_lines maximum number of lines to print per field
270  @param args.start_line message line number to start output from
271  @param args.end_line message line number to stop output at
272  @param args.max_message_lines maximum number of lines to output per message
273  @param args.lines_around_match number of message lines around matched fields to output
274  @param args.matched_fields_only output only the fields where match was found
275  @param args.wrap_width character width to wrap message YAML output at
276  @param args.match_wrapper string to wrap around matched values,
277  both sides if one value, start and end if more than one,
278  or no wrapping if zero values
279  <!--sep-->
280 
281  Console / HTML sink:
282  @param args.color False or "never" for not using colors in replacements
283  @param args.highlight highlight matched values (default true)
284  @param args.emit_field message fields to emit if not all
285  @param args.noemit_field message fields to skip in output
286  @param args.max_field_lines maximum number of lines to output per field
287  @param args.start_line message line number to start output from
288  @param args.end_line message line number to stop output at
289  @param args.max_message_lines maximum number of lines to output per message
290  @param args.lines_around_match number of message lines around matched fields to output
291  @param args.matched_fields_only output only the fields where match was found
292  @param args.wrap_width character width to wrap message YAML output at
293  @param args.match_wrapper string to wrap around matched values,
294  both sides if one value, start and end if more than one,
295  or no wrapping if zero values
296  <!--sep-->
297 
298  Topic sink:
299  @param args.queue_size_out publisher queue size (default 10)
300  @param args.publish_prefix output topic prefix, prepended to input topic
301  @param args.publish_suffix output topic suffix, appended to output topic
302  @param args.publish_fixname single output topic name to publish to,
303  overrides prefix and suffix if given
304  <!--sep-->
305 
306  App sink:
307  @param args.emit callback(topic, msg, stamp, highlighted msg, index in topic)
308  if any
309  @param args.metaemit callback(metadata dict) if any,
310  invoked before first emit from source batch
311  <!--sep-->
312 
313  Any sink:
314  @param args.meta whether to print metainfo
315  @param args.verbose whether to print debug information
316  """
317  DEFAULT_ARGS = dict(CONSOLE=False, PUBLISH=False, WRITE=[], APP=False, EMIT=None, METAEMIT=None)
318 
319  result = None
320  args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
321  args = common.ensure_namespace(args, DEFAULT_ARGS, **kwargs)
322  if not _inited: init(args)
323 
324  if args.WRITE:
325  if isinstance(args.WRITE, common.PATH_TYPES):
326  args.WRITE = [[args.WRITE]] # Nest deeper, single file given
327  elif isinstance(args.WRITE, (list, tuple)) and isinstance(args.WRITE[0], common.PATH_TYPES):
328  args.WRITE = [args.WRITE] # Nest deeper, must have been single [target, ..opts]
329  if callable(args.APP) and not callable(args.EMIT): args.APP, args.EMIT = True, args.APP
330 
331  multisink = MultiSink(args)
332  multisink.validate()
333  result = multisink.sinks[0] if len(multisink.sinks) == 1 else multisink
334  return result
335 
336 
337 def init(args=None, **kwargs):
338  """
339  Initializes ROS version bindings, loads all built-in plugins if dependencies available.
340 
341  @param args
342  @param args.plugin one or more extra plugins to load,
343  as full names or instances of Python module/class
344  @param kwargs any and all arguments as keyword overrides, case-insensitive
345  """
346  global _inited
347  args = common.ensure_namespace(args, {"PLUGIN": []}, **kwargs)
348  if _inited:
349  if args: plugins.configure(args)
350  return
351 
352  common.ConsolePrinter.configure(color=None, apimode=True)
353  api.validate()
354  plugins.init(args)
355  for x in (plugins.mcap, plugins.parquet, plugins.sql):
356  try: plugins.configure(PLUGIN=x)
357  except Exception: pass
358  Bag.READER_CLASSES.add(McapBag) # Ensure MCAP files at least get recognized,
359  Bag.WRITER_CLASSES.add(McapBag) # even if loading them will fail when dependencies missing
360  if args.PLUGIN: plugins.configure(args)
361  # Switch message metadata cache to constrain on total number instead of time
362  api.TypeMeta.LIFETIME, api.TypeMeta.POPULATION = 0, 100
363  _inited = True
364 
365 
366 
367 __all__ = [
368  "AppSink", "AppSource", "Bag", "BagSink", "BagSource", "ConsoleSink", "CsvSink", "HtmlSink",
369  "McapBag", "McapSink", "MultiSink", "ParquetSink", "PostgresSink", "Scanner", "Sink", "Source",
370  "SqliteSink", "SqlSink", "TopicSink", "TopicSource",
371  "grep", "init", "sink", "source",
372 ]
grepros.outputs.MultiSink
Definition: outputs.py:850
grepros.inputs.TopicSource
Definition: inputs.py:806
grepros.search.Scanner
Definition: search.py:26
grepros.inputs.AppSource
Definition: inputs.py:1023
grepros.library.grep
def grep(args=None, **kwargs)
Definition: library.py:71
grepros.library.init
def init(args=None, **kwargs)
Definition: library.py:337
grepros.inputs.BagSource
Definition: inputs.py:428
grepros.library.source
def source(args=None, **kwargs)
Definition: library.py:181
grepros.library.sink
def sink(args=None, **kwargs)
Definition: library.py:246


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29