5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
22 except Exception: pandas =
None
24 except Exception: pyarrow =
None
25 try:
import pyarrow.parquet
26 except Exception:
pass
29 from ..
import api, common
30 from .. common
import ConsolePrinter
31 from .. outputs
import Sink
35 """Writes messages to Apache Parquet files."""
38 FILE_EXTENSIONS = (
".parquet", )
45 "bool": pyarrow.bool_,
"bool_": pyarrow.bool_,
46 "float16": pyarrow.float16,
"float64": pyarrow.float64,
47 "float32": pyarrow.float32,
"decimal128": pyarrow.decimal128,
48 "int8": pyarrow.int8,
"uint8": pyarrow.uint8,
49 "int16": pyarrow.int16,
"uint16": pyarrow.uint16,
50 "int32": pyarrow.int32,
"uint32": pyarrow.uint32,
51 "int64": pyarrow.int64,
"uint64": pyarrow.uint64,
52 "date32": pyarrow.date32,
"time32": pyarrow.time32,
53 "date64": pyarrow.date64,
"time64": pyarrow.time64,
54 "timestamp": pyarrow.timestamp,
"duration": pyarrow.duration,
55 "binary": pyarrow.binary,
"large_binary": pyarrow.large_binary,
56 "string": pyarrow.string,
"large_string": pyarrow.large_string,
57 "utf8": pyarrow.string,
"large_utf8": pyarrow.large_utf8,
58 "list": pyarrow.list_,
"list_": pyarrow.list_,
59 "large_list": pyarrow.large_list,
61 if hasattr(pyarrow,
"month_day_nano_interval"): ARROW_TYPES.update({
62 "month_day_nano_interval": pyarrow.month_day_nano_interval,
67 "int8": pyarrow.int8(),
"int16": pyarrow.int16(),
"int32": pyarrow.int32(),
68 "uint8": pyarrow.uint8(),
"uint16": pyarrow.uint16(),
"uint32": pyarrow.uint32(),
69 "int64": pyarrow.int64(),
"uint64": pyarrow.uint64(),
"bool": pyarrow.bool_(),
70 "string": pyarrow.string(),
"wstring": pyarrow.string(),
"uint8[]": pyarrow.binary(),
71 "float32": pyarrow.float32(),
"float64": pyarrow.float64(),
75 DEFAULT_TYPE = pyarrow.string()
if pyarrow
else None
78 MESSAGE_TYPE_BASECOLS = [(
"_topic",
"string"),
79 (
"_timestamp",
"time"), ]
82 MESSAGE_TYPE_NESTCOLS = [(
"_id",
"string"),
83 (
"_parent_type",
"string"),
84 (
"_parent_id",
"string"), ]
87 WRITER_ARGS = {
"version":
"2.6"}
90 DEFAULT_ARGS = dict(EMIT_FIELD=(), META=
False, NOEMIT_FIELD=(), WRITE_OPTIONS={},
96 @param args arguments as namespace or dictionary, case-insensitive;
97 or a single path as the base name of Parquet files to write
98 @param args.emit_field message fields to emit in output if not all
99 @param args.noemit_field message fields to skip in output
100 @param args.write base name of Parquet files to write
101 @param args.write_options ```
102 {"column": additional columns as {name: (rostype, value)},
103 "type": {rostype: PyArrow type or typename like "uint8"},
104 "writer": dictionary of arguments passed to ParquetWriter,
105 "idgenerator": callable or iterable for producing message IDs
106 like uuid.uuid4 or itertools.count();
107 nesting uses UUID values by default,
108 "column-k=rostype:v": one "column"-argument
110 "type-k=v: one "type"-argument in flat string form,
111 "writer-k=v": one "writer"-argument in flat string form,
112 "nesting": "array" to recursively insert arrays
113 of nested types, or "all" for any nesting,
114 "overwrite": whether to overwrite existing file
117 @param args.meta whether to print metainfo
118 @param args.verbose whether to print debug information
119 @param kwargs any and all arguments as keyword overrides, case-insensitive
121 args = {
"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
122 args = common.ensure_namespace(args, ParquetSink.DEFAULT_ARGS, **kwargs)
123 super(ParquetSink, self).
__init__(args)
126 self.
_overwrite = (args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
142 Returns whether required libraries are available (pandas and pyarrow) and overwrite is valid
143 and file base is writable.
146 ok, pandas_ok, pyarrow_ok = self.
_configure(), bool(pandas), bool(pyarrow)
147 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
148 ConsolePrinter.error(
"Invalid overwrite option for Parquet: %r. "
149 "Choose one of {true, false}.",
150 self.
args.WRITE_OPTIONS[
"overwrite"])
152 if self.
args.WRITE_OPTIONS.get(
"nesting")
not in (
None,
"",
"array",
"all"):
153 ConsolePrinter.error(
"Invalid nesting option for Parquet: %r. "
154 "Choose one of {array,all}.",
155 self.
args.WRITE_OPTIONS[
"nesting"])
158 ConsolePrinter.error(
"pandas not available: cannot write Parquet files.")
160 ConsolePrinter.error(
"PyArrow not available: cannot write Parquet files.")
161 if not common.verify_io(self.
args.WRITE,
"w"):
163 self.
valid = ok
and pandas_ok
and pyarrow_ok
167 def emit(self, topic, msg, stamp=None, match=None, index=None):
168 """Writes message to a Parquet file."""
169 if not self.
validate():
raise Exception(
"invalid")
177 """Writes out any remaining messages, closes writers, clears structures."""
179 for k, vv
in list(self.
_caches.items()):
186 sizes = {n:
None for n
in self.
_filenames.values()}
188 try: sizes[n] = os.path.getsize(n)
189 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", n, e)
190 ConsolePrinter.debug(
"Wrote %s in %s to %s (%s):",
191 common.plural(
"message", sum(self.
_counts.values())),
192 common.plural(
"topic", self.
_counts),
193 common.plural(
"Parquet file", sizes),
194 common.format_bytes(sum(filter(bool, sizes.values()))))
196 count = sum(c
for (_, t_, h_), c
in self.
_counts.items()
if (t, h) == (t_, h_))
197 ConsolePrinter.debug(
"- %s (%s, %s)", name,
198 "error getting size" if sizes[name]
is None else
199 common.format_bytes(sizes[name]),
200 common.plural(
"message", count))
207 """Prepares Parquet schema and writer if not existing."""
208 rootmsg = rootmsg
or msg
209 with api.TypeMeta.make(msg, root=rootmsg)
as m:
210 typename, typehash, typekey = (m.typename, m.typehash, m.typekey)
211 if topic
and (topic, typename, typehash)
not in self.
_counts and self.
args.VERBOSE:
212 ConsolePrinter.debug(
"Adding topic %s in Parquet output.", topic)
215 basedir, basename = os.path.split(self.
_filebase)
216 pathname = os.path.join(basedir, re.sub(
r"\W",
"__",
"%s__%s" % (typename, typehash)))
217 filename = os.path.join(pathname, basename)
219 filename = common.unique_path(filename)
222 scalars = set(x
for x
in self.
COMMON_TYPES if "[" not in x)
224 for path, value, subtype
in api.iter_message_fields(msg, scalars=scalars, **fltrs):
226 cols += [(
".".join(path), coltype)]
228 cols += [(c, self.
_make_column_type(t, fallback=
"int64" if "time" == t
else None))
231 if self.
args.VERBOSE:
232 sz = os.path.isfile(filename)
and os.path.getsize(filename)
233 action =
"Overwriting" if sz
and self.
_overwrite else "Adding"
234 ConsolePrinter.debug(
"%s type %s in Parquet output.", action, typename)
235 common.makedirs(pathname)
237 schema = pyarrow.schema(cols)
238 writer = pyarrow.parquet.ParquetWriter(filename, schema, **self.
WRITER_ARGS)
244 nesteds = api.iter_message_fields(msg, messages_only=
True, **fltrs)
if self.
_nesting else ()
245 for path, submsgs, subtype
in nesteds:
246 scalartype = api.scalar(subtype)
247 if subtype == scalartype
and "all" != self.
_nesting:
250 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
256 rootmsg=None, parent_type=None, parent_id=None):
258 Converts message to pandas dataframe, adds to cache.
260 Writes cache to disk if length reached chunk size.
262 If nesting is enabled, processes nested messages for subtypes in message.
263 If IDs are used, returns generated ID.
265 data, myid, rootmsg = {},
None, (rootmsg
or None)
267 with api.TypeMeta.make(msg, topic, root=rootmsg)
as m:
268 typename, typekey = m.typename, m.typekey
270 for p, v, t
in api.iter_message_fields(msg, scalars=set(self.
COMMON_TYPES), **fltrs):
276 data.update(zip(COLS, [parent_type, parent_id]))
278 self.
_caches[typekey].append(data)
279 super(ParquetSink, self).
emit(topic, msg, stamp, match, index)
282 nesteds = api.iter_message_fields(msg, messages_only=
True, **fltrs)
if self.
_nesting else ()
283 for path, submsgs, subtype
in nesteds:
284 scalartype = api.scalar(subtype)
285 if subtype == scalartype
and "all" != self.
_nesting:
287 if isinstance(submsgs, (list, tuple)):
289 for submsg
in submsgs
if isinstance(submsgs, (list, tuple))
else [submsgs]:
291 rootmsg=rootmsg, parent_type=typename, parent_id=myid)
292 if isinstance(submsgs, (list, tuple)):
293 subids[path].append(subid)
303 Returns pyarrow type for ROS type.
305 @param fallback fallback typename to use for lookup if typename not found
307 noboundtype = api.canonical(typename, unbounded=
True)
308 scalartype = api.scalar(typename)
309 timetype = api.get_ros_time_category(scalartype)
315 if typename != scalartype:
319 if not coltype
and fallback:
327 """Returns column value suitable for adding to Parquet file."""
329 if isinstance(v, (list, tuple)):
330 noboundtype = api.canonical(typename, unbounded=
True)
331 if v
and api.is_ros_time(v[0]):
332 v = [api.to_nsec(x)
for x
in v]
333 elif api.scalar(typename)
not in api.ROS_BUILTIN_TYPES:
334 v = str([api.message_to_dict(m)
for m
in v])
335 elif pyarrow.binary()
in (self.
COMMON_TYPES.get(typename),
337 v = bytes(bytearray(v))
340 elif api.is_ros_time(v):
342 elif typename
and typename
not in api.ROS_BUILTIN_TYPES:
343 v = str(api.message_to_dict(v))
348 """Writes out cached messages for type."""
349 dicts = self.
_caches[typekey][:]
351 mapping = {k: [d[k]
for d
in dicts]
for k
in dicts[0]}
352 table = pyarrow.Table.from_pydict(mapping, self.
_schemas[typekey])
353 self.
_writers[typekey].write_table(table)
357 """Parses args.WRITE_OPTIONS, returns success."""
360 def process_column(name, rostype, value):
361 v, myok = value,
True
362 if "string" not in rostype:
366 ConsolePrinter.error(
"Invalid name option in %s=%s:%s", name, rostype, v)
367 if rostype
not in api.ROS_BUILTIN_TYPES:
369 ConsolePrinter.error(
"Invalid type option in %s=%s:%s", name, rostype, v)
375 def process_type(rostype, arrowtype):
377 arrowtype = eval(compile(arrowtype,
"",
"eval"), {
"__builtins__": self.
ARROW_TYPES})
380 for key, vals
in [(
"print", self.
args.EMIT_FIELD), (
"noprint", self.
args.NOEMIT_FIELD)]:
381 self.
_patterns[key] = [(tuple(v.split(
".")), common.wildcard_to_regex(v))
for v
in vals]
385 alias = api.get_type_alias(rostype)
391 for k, v
in self.
args.WRITE_OPTIONS.items():
392 if "column" == k
and v
and isinstance(v, dict):
393 for name, (rostype, value)
in v.items():
394 if not process_column(name, rostype, value): ok =
False
395 elif "type" == k
and v
and isinstance(v, dict):
396 for name, value
in v.items():
397 if not process_type(name, value): ok =
False
398 elif "writer" == k
and v
and isinstance(v, dict):
400 elif isinstance(k, str)
and "-" in k:
401 category, name = k.split(
"-", 1)
402 if category
not in (
"column",
"type",
"writer"):
403 ConsolePrinter.warn(
"Unknown %r option in %s=%s", category, k, v)
406 if not name:
raise Exception(
"empty name")
407 if "column" == category:
408 if not process_column(name, *v.split(
":", 1)): ok =
False
409 elif "type" == category:
410 process_type(name, v)
411 elif "writer" == category:
412 try: v = json.loads(v)
413 except Exception:
pass
415 except Exception
as e:
417 ConsolePrinter.error(
"Invalid %s option in %s=%s: %s", category, k, v, e)
422 """Configures ID generator from args.WRITE_OPTIONS, returns success."""
425 k, v =
"idgenerator", self.
args.WRITE_OPTIONS.get(
"idgenerator")
426 if k
in self.
args.WRITE_OPTIONS:
427 val, ex, ns = v,
None, dict(self.
ARROW_TYPES, itertools=itertools, uuid=uuid)
428 for root
in v.split(
".", 1)[:1]:
429 try: ns[root] = common.import_item(root)
430 except Exception:
pass
431 try: common.import_item(re.sub(
r"\(.+",
"", v))
432 except Exception:
pass
433 try: val = eval(compile(v,
"",
"eval"), ns)
434 except Exception
as e: ok, ex =
False, e
435 if isinstance(val, (six.binary_type, six.text_type)): ok =
False
439 except Exception
as e:
441 except Exception
as e: ok, ex =
False, e
443 ConsolePrinter.error(
"Invalid value in %s=%s%s", k, v, (
": %s" % ex
if ex
else ""))
449 if api.is_ros_time(fval):
450 typename =
"time" if "time" in str(type(fval)).lower()
else "duration"
451 elif isinstance(fval, six.integer_types):
453 elif isinstance(fval, float):
455 elif not isinstance(fval, str):
456 fval, self.
_idgenerator = str(fval), (str(x)
for x
in generator)
458 repl =
lambda n, t: (n, typename)
if "_id" in n
else (n, t)
467 """Adds Parquet output format support. Raises ImportWarning if libraries not available."""
468 if not pandas
or not pyarrow:
469 ConsolePrinter.error(
"pandas or PyArrow not available: cannot write Parquet files.")
470 raise ImportWarning()
471 from ..
import plugins
472 plugins.add_write_format(
"parquet", ParquetSink,
"Parquet", [
473 (
"column-NAME=ROSTYPE:VALUE",
"additional column to add in Parquet output,\n"
474 "like column-bag_hash=string:26dfba2c"),
475 (
"idgenerator=CALLABLE",
"callable or iterable for producing message IDs \n"
476 "in Parquet output, like 'uuid.uuid4' or 'itertools.count()';\n"
477 "nesting uses UUID values by default"),
478 (
"nesting=array|all",
"create tables for nested message types\n"
479 "in Parquet output,\n"
480 'only for arrays if "array" \n'
481 "else for any nested types\n"
482 "(array fields in parent will be populated \n"
483 " with foreign keys instead of messages as JSON)"),
484 (
"overwrite=true|false",
"overwrite existing file in Parquet output\n"
485 "instead of appending unique counter (default false)"),
486 (
"type-ROSTYPE=ARROWTYPE",
"custom mapping between ROS and pyarrow type\n"
487 "for Parquet output, like type-time=\"timestamp('ns')\"\n"
488 "or type-uint8[]=\"list(uint8())\""),
489 (
"writer-ARGNAME=ARGVALUE",
"additional arguments for Parquet output\n"
490 "given to pyarrow.parquet.ParquetWriter"),
492 plugins.add_output_label(
"Parquet", [
"--emit-field",
"--no-emit-field"])
495 __all__ = [
"ParquetSink",
"init"]