parquet.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Parquet output plugin.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 14.12.2021
11 @modified 28.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 import itertools
16 import json
17 import os
18 import re
19 import uuid
20 
21 try: import pandas
22 except Exception: pandas = None
23 try: import pyarrow
24 except Exception: pyarrow = None
25 try: import pyarrow.parquet
26 except Exception: pass
27 import six
28 
29 from .. import api, common
30 from .. common import ConsolePrinter
31 from .. outputs import Sink
32 
33 
35  """Writes messages to Apache Parquet files."""
36 
37 
38  FILE_EXTENSIONS = (".parquet", )
39 
40 
41  CHUNK_SIZE = 100
42 
43 
44  ARROW_TYPES = {
45  "bool": pyarrow.bool_, "bool_": pyarrow.bool_,
46  "float16": pyarrow.float16, "float64": pyarrow.float64,
47  "float32": pyarrow.float32, "decimal128": pyarrow.decimal128,
48  "int8": pyarrow.int8, "uint8": pyarrow.uint8,
49  "int16": pyarrow.int16, "uint16": pyarrow.uint16,
50  "int32": pyarrow.int32, "uint32": pyarrow.uint32,
51  "int64": pyarrow.int64, "uint64": pyarrow.uint64,
52  "date32": pyarrow.date32, "time32": pyarrow.time32,
53  "date64": pyarrow.date64, "time64": pyarrow.time64,
54  "timestamp": pyarrow.timestamp, "duration": pyarrow.duration,
55  "binary": pyarrow.binary, "large_binary": pyarrow.large_binary,
56  "string": pyarrow.string, "large_string": pyarrow.large_string,
57  "utf8": pyarrow.string, "large_utf8": pyarrow.large_utf8,
58  "list": pyarrow.list_, "list_": pyarrow.list_,
59  "large_list": pyarrow.large_list,
60  } if pyarrow else {}
61  if hasattr(pyarrow, "month_day_nano_interval"): ARROW_TYPES.update({ # Py3
62  "month_day_nano_interval": pyarrow.month_day_nano_interval,
63  })
64 
65 
66  COMMON_TYPES = {
67  "int8": pyarrow.int8(), "int16": pyarrow.int16(), "int32": pyarrow.int32(),
68  "uint8": pyarrow.uint8(), "uint16": pyarrow.uint16(), "uint32": pyarrow.uint32(),
69  "int64": pyarrow.int64(), "uint64": pyarrow.uint64(), "bool": pyarrow.bool_(),
70  "string": pyarrow.string(), "wstring": pyarrow.string(), "uint8[]": pyarrow.binary(),
71  "float32": pyarrow.float32(), "float64": pyarrow.float64(),
72  } if pyarrow else {}
73 
74 
75  DEFAULT_TYPE = pyarrow.string() if pyarrow else None
76 
77 
78  MESSAGE_TYPE_BASECOLS = [("_topic", "string"),
79  ("_timestamp", "time"), ]
80 
81 
82  MESSAGE_TYPE_NESTCOLS = [("_id", "string"),
83  ("_parent_type", "string"),
84  ("_parent_id", "string"), ]
85 
86 
87  WRITER_ARGS = {"version": "2.6"}
88 
89 
90  DEFAULT_ARGS = dict(EMIT_FIELD=(), META=False, NOEMIT_FIELD=(), WRITE_OPTIONS={},
91  VERBOSE=False)
92 
93 
94  def __init__(self, args=None, **kwargs):
95  """
96  @param args arguments as namespace or dictionary, case-insensitive;
97  or a single path as the base name of Parquet files to write
98  @param args.emit_field message fields to emit in output if not all
99  @param args.noemit_field message fields to skip in output
100  @param args.write base name of Parquet files to write
101  @param args.write_options ```
102  {"column": additional columns as {name: (rostype, value)},
103  "type": {rostype: PyArrow type or typename like "uint8"},
104  "writer": dictionary of arguments passed to ParquetWriter,
105  "idgenerator": callable or iterable for producing message IDs
106  like uuid.uuid4 or itertools.count();
107  nesting uses UUID values by default,
108  "column-k=rostype:v": one "column"-argument
109  in flat string form,
110  "type-k=v: one "type"-argument in flat string form,
111  "writer-k=v": one "writer"-argument in flat string form,
112  "nesting": "array" to recursively insert arrays
113  of nested types, or "all" for any nesting,
114  "overwrite": whether to overwrite existing file
115  (default false)}
116  ```
117  @param args.meta whether to print metainfo
118  @param args.verbose whether to print debug information
119  @param kwargs any and all arguments as keyword overrides, case-insensitive
120  """
121  args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
122  args = common.ensure_namespace(args, ParquetSink.DEFAULT_ARGS, **kwargs)
123  super(ParquetSink, self).__init__(args)
124 
125  self._filebase = args.WRITE
126  self._overwrite = (args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
127  self._filenames = {} # {(typename, typehash): Parquet file path}
128  self._caches = {} # {(typename, typehash): [{data}, ]}
129  self._schemas = {} # {(typename, typehash): pyarrow.Schema}
130  self._writers = {} # {(typename, typehash): pyarrow.parquet.ParquetWriter}
131  self._extra_basecols = [] # [(name, rostype)]
132  self._extra_basevals = [] # [(name, value)]
133  self._patterns = {} # {key: [(() if any field else ('path', ), re.Pattern), ]}
134  self._nesting = args.WRITE_OPTIONS.get("nesting")
135  self._idgenerator = iter(lambda: str(uuid.uuid4()), self) if self._nesting else None
136 
137  self._close_printed = False
138 
139 
140  def validate(self):
141  """
142  Returns whether required libraries are available (pandas and pyarrow) and overwrite is valid
143  and file base is writable.
144  """
145  if self.valid is not None: return self.valid
146  ok, pandas_ok, pyarrow_ok = self._configure(), bool(pandas), bool(pyarrow)
147  if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
148  ConsolePrinter.error("Invalid overwrite option for Parquet: %r. "
149  "Choose one of {true, false}.",
150  self.args.WRITE_OPTIONS["overwrite"])
151  ok = False
152  if self.args.WRITE_OPTIONS.get("nesting") not in (None, "", "array", "all"):
153  ConsolePrinter.error("Invalid nesting option for Parquet: %r. "
154  "Choose one of {array,all}.",
155  self.args.WRITE_OPTIONS["nesting"])
156  ok = False
157  if not pandas_ok:
158  ConsolePrinter.error("pandas not available: cannot write Parquet files.")
159  if not pyarrow_ok:
160  ConsolePrinter.error("PyArrow not available: cannot write Parquet files.")
161  if not common.verify_io(self.args.WRITE, "w"):
162  ok = False
163  self.valid = ok and pandas_ok and pyarrow_ok
164  return self.valid
165 
166 
167  def emit(self, topic, msg, stamp=None, match=None, index=None):
168  """Writes message to a Parquet file."""
169  if not self.validate(): raise Exception("invalid")
170  stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
171  self._process_type(topic, msg)
172  self._process_message(topic, index, stamp, msg, match)
173  self._close_printed = False
174 
175 
176  def close(self):
177  """Writes out any remaining messages, closes writers, clears structures."""
178  try:
179  for k, vv in list(self._caches.items()):
180  vv and self._write_table(k)
181  for k in list(self._writers):
182  self._writers.pop(k).close()
183  finally:
184  if not self._close_printed and self._counts:
185  self._close_printed = True
186  sizes = {n: None for n in self._filenames.values()}
187  for n in self._filenames.values():
188  try: sizes[n] = os.path.getsize(n)
189  except Exception as e: ConsolePrinter.warn("Error getting size of %s: %s", n, e)
190  ConsolePrinter.debug("Wrote %s in %s to %s (%s):",
191  common.plural("message", sum(self._counts.values())),
192  common.plural("topic", self._counts),
193  common.plural("Parquet file", sizes),
194  common.format_bytes(sum(filter(bool, sizes.values()))))
195  for (t, h), name in self._filenames.items():
196  count = sum(c for (_, t_, h_), c in self._counts.items() if (t, h) == (t_, h_))
197  ConsolePrinter.debug("- %s (%s, %s)", name,
198  "error getting size" if sizes[name] is None else
199  common.format_bytes(sizes[name]),
200  common.plural("message", count))
201  self._caches.clear()
202  self._schemas.clear()
203  self._filenames.clear()
204 
205 
206  def _process_type(self, topic, msg, rootmsg=None):
207  """Prepares Parquet schema and writer if not existing."""
208  rootmsg = rootmsg or msg
209  with api.TypeMeta.make(msg, root=rootmsg) as m:
210  typename, typehash, typekey = (m.typename, m.typehash, m.typekey)
211  if topic and (topic, typename, typehash) not in self._counts and self.args.VERBOSE:
212  ConsolePrinter.debug("Adding topic %s in Parquet output.", topic)
213  if typekey in self._writers: return
214 
215  basedir, basename = os.path.split(self._filebase)
216  pathname = os.path.join(basedir, re.sub(r"\W", "__", "%s__%s" % (typename, typehash)))
217  filename = os.path.join(pathname, basename)
218  if not self._overwrite:
219  filename = common.unique_path(filename)
220 
221  cols = []
222  scalars = set(x for x in self.COMMON_TYPES if "[" not in x)
223  fltrs = dict(include=self._patterns["print"], exclude=self._patterns["noprint"])
224  for path, value, subtype in api.iter_message_fields(msg, scalars=scalars, **fltrs):
225  coltype = self._make_column_type(subtype)
226  cols += [(".".join(path), coltype)]
227  MSGCOLS = self.MESSAGE_TYPE_BASECOLS + (self.MESSAGE_TYPE_NESTCOLS if self._nesting else [])
228  cols += [(c, self._make_column_type(t, fallback="int64" if "time" == t else None))
229  for c, t in MSGCOLS + self._extra_basecols]
230 
231  if self.args.VERBOSE:
232  sz = os.path.isfile(filename) and os.path.getsize(filename)
233  action = "Overwriting" if sz and self._overwrite else "Adding"
234  ConsolePrinter.debug("%s type %s in Parquet output.", action, typename)
235  common.makedirs(pathname)
236 
237  schema = pyarrow.schema(cols)
238  writer = pyarrow.parquet.ParquetWriter(filename, schema, **self.WRITER_ARGS)
239  self._caches[typekey] = []
240  self._filenames[typekey] = filename
241  self._schemas[typekey] = schema
242  self._writers[typekey] = writer
243 
244  nesteds = api.iter_message_fields(msg, messages_only=True, **fltrs) if self._nesting else ()
245  for path, submsgs, subtype in nesteds:
246  scalartype = api.scalar(subtype)
247  if subtype == scalartype and "all" != self._nesting:
248  continue # for path
249  subtypehash = not submsgs and self.source.get_message_type_hash(scalartype)
250  if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
251  [submsg] = submsgs[:1] or [self.source.get_message_class(scalartype, subtypehash)()]
252  self._process_type(None, submsg, rootmsg)
253 
254 
255  def _process_message(self, topic, index, stamp, msg, match=None,
256  rootmsg=None, parent_type=None, parent_id=None):
257  """
258  Converts message to pandas dataframe, adds to cache.
259 
260  Writes cache to disk if length reached chunk size.
261 
262  If nesting is enabled, processes nested messages for subtypes in message.
263  If IDs are used, returns generated ID.
264  """
265  data, myid, rootmsg = {}, None, (rootmsg or None)
266  if self._idgenerator: myid = next(self._idgenerator)
267  with api.TypeMeta.make(msg, topic, root=rootmsg) as m:
268  typename, typekey = m.typename, m.typekey
269  fltrs = dict(include=self._patterns["print"], exclude=self._patterns["noprint"])
270  for p, v, t in api.iter_message_fields(msg, scalars=set(self.COMMON_TYPES), **fltrs):
271  data[".".join(p)] = self._make_column_value(v, t)
272  data.update(_topic=topic, _timestamp=self._make_column_value(stamp, "time"))
273  if self._idgenerator: data.update(_id=myid)
274  if self._nesting:
275  COLS = [k for k, _ in self.MESSAGE_TYPE_NESTCOLS if "parent" in k]
276  data.update(zip(COLS, [parent_type, parent_id]))
277  data.update(self._extra_basevals)
278  self._caches[typekey].append(data)
279  super(ParquetSink, self).emit(topic, msg, stamp, match, index)
280 
281  subids = {} # {message field path: [ids]}
282  nesteds = api.iter_message_fields(msg, messages_only=True, **fltrs) if self._nesting else ()
283  for path, submsgs, subtype in nesteds:
284  scalartype = api.scalar(subtype)
285  if subtype == scalartype and "all" != self._nesting:
286  continue # for path
287  if isinstance(submsgs, (list, tuple)):
288  subids[path] = []
289  for submsg in submsgs if isinstance(submsgs, (list, tuple)) else [submsgs]:
290  subid = self._process_message(topic, index, stamp, submsg,
291  rootmsg=rootmsg, parent_type=typename, parent_id=myid)
292  if isinstance(submsgs, (list, tuple)):
293  subids[path].append(subid)
294  data.update(subids)
295 
296  if len(self._caches[typekey]) >= self.CHUNK_SIZE:
297  self._write_table(typekey)
298  return myid
299 
300 
301  def _make_column_type(self, typename, fallback=None):
302  """
303  Returns pyarrow type for ROS type.
304 
305  @param fallback fallback typename to use for lookup if typename not found
306  """
307  noboundtype = api.canonical(typename, unbounded=True)
308  scalartype = api.scalar(typename)
309  timetype = api.get_ros_time_category(scalartype)
310  coltype = self.COMMON_TYPES.get(typename) or self.COMMON_TYPES.get(noboundtype)
311 
312  if not coltype and scalartype in self.COMMON_TYPES:
313  coltype = pyarrow.list_(self.COMMON_TYPES[scalartype])
314  if not coltype and timetype in self.COMMON_TYPES:
315  if typename != scalartype:
316  coltype = pyarrow.list_(self.COMMON_TYPES[timetype])
317  else:
318  coltype = self.COMMON_TYPES[timetype]
319  if not coltype and fallback:
320  coltype = self._make_column_type(fallback)
321  if not coltype:
322  coltype = self.DEFAULT_TYPE
323  return coltype
324 
325 
326  def _make_column_value(self, value, typename=None):
327  """Returns column value suitable for adding to Parquet file."""
328  v = value
329  if isinstance(v, (list, tuple)):
330  noboundtype = api.canonical(typename, unbounded=True)
331  if v and api.is_ros_time(v[0]):
332  v = [api.to_nsec(x) for x in v]
333  elif api.scalar(typename) not in api.ROS_BUILTIN_TYPES:
334  v = str([api.message_to_dict(m) for m in v])
335  elif pyarrow.binary() in (self.COMMON_TYPES.get(typename),
336  self.COMMON_TYPES.get(noboundtype)):
337  v = bytes(bytearray(v)) # Py2/Py3 compatible
338  else:
339  v = list(v) # Ensure lists not tuples
340  elif api.is_ros_time(v):
341  v = api.to_nsec(v)
342  elif typename and typename not in api.ROS_BUILTIN_TYPES:
343  v = str(api.message_to_dict(v))
344  return v
345 
346 
347  def _write_table(self, typekey):
348  """Writes out cached messages for type."""
349  dicts = self._caches[typekey][:]
350  del self._caches[typekey][:]
351  mapping = {k: [d[k] for d in dicts] for k in dicts[0]}
352  table = pyarrow.Table.from_pydict(mapping, self._schemas[typekey])
353  self._writers[typekey].write_table(table)
354 
355 
356  def _configure(self):
357  """Parses args.WRITE_OPTIONS, returns success."""
358  ok = self._configure_ids()
359 
360  def process_column(name, rostype, value): # Parse "column-name=rostype:value"
361  v, myok = value, True
362  if "string" not in rostype:
363  v = json.loads(v)
364  if not name:
365  myok = False
366  ConsolePrinter.error("Invalid name option in %s=%s:%s", name, rostype, v)
367  if rostype not in api.ROS_BUILTIN_TYPES:
368  myok = False
369  ConsolePrinter.error("Invalid type option in %s=%s:%s", name, rostype, v)
370  if myok:
371  self._extra_basecols.append((name, rostype))
372  self._extra_basevals.append((name, value))
373  return myok
374 
375  def process_type(rostype, arrowtype): # Eval pyarrow datatype from value like "float64()"
376  if arrowtype not in self.ARROW_TYPES.values():
377  arrowtype = eval(compile(arrowtype, "", "eval"), {"__builtins__": self.ARROW_TYPES})
378  self.COMMON_TYPES[rostype] = arrowtype
379 
380  for key, vals in [("print", self.args.EMIT_FIELD), ("noprint", self.args.NOEMIT_FIELD)]:
381  self._patterns[key] = [(tuple(v.split(".")), common.wildcard_to_regex(v)) for v in vals]
382 
383  # Populate ROS type aliases like "byte" and "char"
384  for rostype in list(self.COMMON_TYPES):
385  alias = api.get_type_alias(rostype)
386  if alias:
387  self.COMMON_TYPES[alias] = self.COMMON_TYPES[rostype]
388  if alias and rostype + "[]" in self.COMMON_TYPES:
389  self.COMMON_TYPES[alias + "[]"] = self.COMMON_TYPES[rostype + "[]"]
390 
391  for k, v in self.args.WRITE_OPTIONS.items():
392  if "column" == k and v and isinstance(v, dict):
393  for name, (rostype, value) in v.items():
394  if not process_column(name, rostype, value): ok = False
395  elif "type" == k and v and isinstance(v, dict):
396  for name, value in v.items():
397  if not process_type(name, value): ok = False
398  elif "writer" == k and v and isinstance(v, dict):
399  self.WRITER_ARGS.update(v)
400  elif isinstance(k, str) and "-" in k:
401  category, name = k.split("-", 1)
402  if category not in ("column", "type", "writer"):
403  ConsolePrinter.warn("Unknown %r option in %s=%s", category, k, v)
404  continue # for k, v
405  try:
406  if not name: raise Exception("empty name")
407  if "column" == category: # column-name=rostype:value
408  if not process_column(name, *v.split(":", 1)): ok = False
409  elif "type" == category: # type-rostype=arrowtype
410  process_type(name, v)
411  elif "writer" == category: # writer-argname=argvalue
412  try: v = json.loads(v)
413  except Exception: pass
414  self.WRITER_ARGS[name] = v
415  except Exception as e:
416  ok = False
417  ConsolePrinter.error("Invalid %s option in %s=%s: %s", category, k, v, e)
418  return ok
419 
420 
421  def _configure_ids(self):
422  """Configures ID generator from args.WRITE_OPTIONS, returns success."""
423  ok = True
424 
425  k, v = "idgenerator", self.args.WRITE_OPTIONS.get("idgenerator")
426  if k in self.args.WRITE_OPTIONS:
427  val, ex, ns = v, None, dict(self.ARROW_TYPES, itertools=itertools, uuid=uuid)
428  for root in v.split(".", 1)[:1]:
429  try: ns[root] = common.import_item(root) # Provide root module
430  except Exception: pass
431  try: common.import_item(re.sub(r"\(.+", "", v)) # Ensure nested imports
432  except Exception: pass
433  try: val = eval(compile(v, "", "eval"), ns)
434  except Exception as e: ok, ex = False, e
435  if isinstance(val, (six.binary_type, six.text_type)): ok = False
436 
437  if ok:
438  try: self._idgenerator = iter(val)
439  except Exception as e:
440  try: self._idgenerator = iter(val, self) # (callable=val, sentinel=self)
441  except Exception as e: ok, ex = False, e
442  if not ok:
443  ConsolePrinter.error("Invalid value in %s=%s%s", k, v, (": %s" % ex if ex else ""))
444  elif not self._nesting:
445  self.MESSAGE_TYPE_BASECOLS.append(("_id", "string"))
446 
447  if ok and self._idgenerator: # Detect given ID column type
448  fval, typename, generator = next(self._idgenerator), None, self._idgenerator
449  if api.is_ros_time(fval):
450  typename = "time" if "time" in str(type(fval)).lower() else "duration"
451  elif isinstance(fval, six.integer_types):
452  typename = "int64"
453  elif isinstance(fval, float):
454  typename = "float64"
455  elif not isinstance(fval, str): # Cast whatever it is to string
456  fval, self._idgenerator = str(fval), (str(x) for x in generator)
457  if typename:
458  repl = lambda n, t: (n, typename) if "_id" in n else (n, t)
459  self.MESSAGE_TYPE_BASECOLS = [repl(*x) for x in self.MESSAGE_TYPE_BASECOLS]
460  self.MESSAGE_TYPE_NESTCOLS = [repl(*x) for x in self.MESSAGE_TYPE_NESTCOLS]
461  self._idgenerator = itertools.chain([fval], self._idgenerator)
462  return ok
463 
464 
465 
466 def init(*_, **__):
467  """Adds Parquet output format support. Raises ImportWarning if libraries not available."""
468  if not pandas or not pyarrow:
469  ConsolePrinter.error("pandas or PyArrow not available: cannot write Parquet files.")
470  raise ImportWarning()
471  from .. import plugins # Late import to avoid circular
472  plugins.add_write_format("parquet", ParquetSink, "Parquet", [
473  ("column-NAME=ROSTYPE:VALUE", "additional column to add in Parquet output,\n"
474  "like column-bag_hash=string:26dfba2c"),
475  ("idgenerator=CALLABLE", "callable or iterable for producing message IDs \n"
476  "in Parquet output, like 'uuid.uuid4' or 'itertools.count()';\n"
477  "nesting uses UUID values by default"),
478  ("nesting=array|all", "create tables for nested message types\n"
479  "in Parquet output,\n"
480  'only for arrays if "array" \n'
481  "else for any nested types\n"
482  "(array fields in parent will be populated \n"
483  " with foreign keys instead of messages as JSON)"),
484  ("overwrite=true|false", "overwrite existing file in Parquet output\n"
485  "instead of appending unique counter (default false)"),
486  ("type-ROSTYPE=ARROWTYPE", "custom mapping between ROS and pyarrow type\n"
487  "for Parquet output, like type-time=\"timestamp('ns')\"\n"
488  "or type-uint8[]=\"list(uint8())\""),
489  ("writer-ARGNAME=ARGVALUE", "additional arguments for Parquet output\n"
490  "given to pyarrow.parquet.ParquetWriter"),
491  ])
492  plugins.add_output_label("Parquet", ["--emit-field", "--no-emit-field"])
493 
494 
495 __all__ = ["ParquetSink", "init"]
grepros.api.get_message_class
def get_message_class(typename)
Definition: api.py:727
grepros.outputs.Sink.args
args
Definition: outputs.py:50
grepros.api.get_message_type_hash
def get_message_type_hash(msg_or_type)
Definition: api.py:741
grepros.plugins.parquet.ParquetSink._make_column_type
def _make_column_type(self, typename, fallback=None)
Definition: parquet.py:301
grepros.plugins.parquet.ParquetSink.__init__
def __init__(self, args=None, **kwargs)
Definition: parquet.py:94
grepros.plugins.parquet.ParquetSink._configure
def _configure(self)
Definition: parquet.py:356
grepros.plugins.parquet.ParquetSink.COMMON_TYPES
dictionary COMMON_TYPES
Mapping from ROS common type names to pyarrow type constructors.
Definition: parquet.py:66
grepros.plugins.parquet.ParquetSink.validate
def validate(self)
Definition: parquet.py:140
grepros.plugins.parquet.ParquetSink.MESSAGE_TYPE_BASECOLS
list MESSAGE_TYPE_BASECOLS
Default columns for message type tables.
Definition: parquet.py:78
grepros.plugins.parquet.ParquetSink._extra_basecols
_extra_basecols
Definition: parquet.py:131
grepros.plugins.parquet.ParquetSink._filenames
_filenames
Definition: parquet.py:127
grepros.outputs.Sink
Definition: outputs.py:32
grepros.plugins.parquet.ParquetSink.MESSAGE_TYPE_NESTCOLS
list MESSAGE_TYPE_NESTCOLS
Additional default columns for messaga type tables with nesting output.
Definition: parquet.py:82
grepros.plugins.parquet.ParquetSink.ARROW_TYPES
dictionary ARROW_TYPES
Mapping from pyarrow type names and aliases to pyarrow type constructors.
Definition: parquet.py:44
grepros.outputs.Sink.valid
valid
Result of validate()
Definition: outputs.py:52
grepros.outputs.Sink.source
source
inputs.Source instance bound to this sink
Definition: outputs.py:54
grepros.plugins.parquet.ParquetSink.CHUNK_SIZE
int CHUNK_SIZE
Number of dataframes to cache before writing, per type.
Definition: parquet.py:41
grepros.outputs.Sink._counts
_counts
Definition: outputs.py:48
grepros.plugins.parquet.ParquetSink._write_table
def _write_table(self, typekey)
Definition: parquet.py:347
grepros.plugins.parquet.ParquetSink.emit
def emit(self, topic, msg, stamp=None, match=None, index=None)
Definition: parquet.py:167
grepros.plugins.parquet.ParquetSink._overwrite
_overwrite
Definition: parquet.py:126
grepros.plugins.parquet.ParquetSink._filebase
_filebase
Definition: parquet.py:125
grepros.plugins.parquet.ParquetSink._process_type
def _process_type(self, topic, msg, rootmsg=None)
Definition: parquet.py:206
grepros.plugins.parquet.ParquetSink._writers
_writers
Definition: parquet.py:130
grepros.outputs.Sink.validate
def validate(self)
Definition: outputs.py:88
grepros.plugins.parquet.ParquetSink._extra_basevals
_extra_basevals
Definition: parquet.py:132
grepros.plugins.parquet.ParquetSink._make_column_value
def _make_column_value(self, value, typename=None)
Definition: parquet.py:326
grepros.plugins.parquet.init
def init(*_, **__)
Definition: parquet.py:466
grepros.plugins.parquet.ParquetSink._patterns
_patterns
Definition: parquet.py:133
grepros.plugins.parquet.ParquetSink.DEFAULT_TYPE
DEFAULT_TYPE
Fallback pyarrow type if mapped type not found.
Definition: parquet.py:75
grepros.plugins.parquet.ParquetSink._idgenerator
_idgenerator
Definition: parquet.py:135
grepros.plugins.parquet.ParquetSink._caches
_caches
Definition: parquet.py:128
grepros.plugins.parquet.ParquetSink._close_printed
_close_printed
Definition: parquet.py:137
grepros.plugins.parquet.ParquetSink._nesting
_nesting
Definition: parquet.py:134
grepros.plugins.parquet.ParquetSink._schemas
_schemas
Definition: parquet.py:129
grepros.plugins.parquet.ParquetSink._configure_ids
def _configure_ids(self)
Definition: parquet.py:421
grepros.outputs.Sink._ensure_stamp_index
def _ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition: outputs.py:115
grepros.plugins.parquet.ParquetSink._process_message
def _process_message(self, topic, index, stamp, msg, match=None, rootmsg=None, parent_type=None, parent_id=None)
Definition: parquet.py:255
grepros.plugins.parquet.ParquetSink.WRITER_ARGS
dictionary WRITER_ARGS
Custom arguments for pyarrow.parquet.ParquetWriter.
Definition: parquet.py:87
grepros.plugins.parquet.ParquetSink.close
def close(self)
Definition: parquet.py:176
grepros.plugins.parquet.ParquetSink
Definition: parquet.py:34


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29