Public Member Functions | Public Attributes | Static Public Attributes | Private Member Functions | Private Attributes | List of all members
grepros.plugins.parquet.ParquetSink Class Reference
Inheritance diagram for grepros.plugins.parquet.ParquetSink:
Inheritance graph
[legend]

Public Member Functions

def __init__ (self, args=None, **kwargs)
 
def close (self)
 
def emit (self, topic, msg, stamp=None, match=None, index=None)
 
def validate (self)
 
- Public Member Functions inherited from grepros.outputs.Sink
def __enter__ (self)
 
def __exit__ (self, exc_type, exc_value, traceback)
 
def autodetect (cls, target)
 
def bind (self, source)
 
def emit_meta (self)
 
def flush (self)
 
def is_highlighting (self)
 
def thread_excepthook (self, text, exc)
 

Public Attributes

 MESSAGE_TYPE_BASECOLS
 
 MESSAGE_TYPE_NESTCOLS
 
 valid
 
- Public Attributes inherited from grepros.outputs.Sink
 args
 
 source
 inputs.Source instance bound to this sink More...
 
 valid
 Result of validate() More...
 

Static Public Attributes

dictionary ARROW_TYPES
 Mapping from pyarrow type names and aliases to pyarrow type constructors. More...
 
int CHUNK_SIZE = 100
 Number of dataframes to cache before writing, per type. More...
 
dictionary COMMON_TYPES
 Mapping from ROS common type names to pyarrow type constructors. More...
 
 DEFAULT_ARGS
 Constructor argument defaults. More...
 
 DEFAULT_TYPE = pyarrow.string() if pyarrow else None
 Fallback pyarrow type if mapped type not found. More...
 
tuple FILE_EXTENSIONS = (".parquet", )
 Auto-detection file extensions. More...
 
list MESSAGE_TYPE_BASECOLS
 Default columns for message type tables. More...
 
list MESSAGE_TYPE_NESTCOLS
 Additional default columns for messaga type tables with nesting output. More...
 
dictionary WRITER_ARGS = {"version": "2.6"}
 Custom arguments for pyarrow.parquet.ParquetWriter. More...
 
- Static Public Attributes inherited from grepros.outputs.Sink
 DEFAULT_ARGS = dict(META=False)
 Constructor argument defaults. More...
 
tuple FILE_EXTENSIONS = ()
 Auto-detection file extensions for subclasses, as (".ext", ) More...
 

Private Member Functions

def _configure (self)
 
def _configure_ids (self)
 
def _make_column_type (self, typename, fallback=None)
 
def _make_column_value (self, value, typename=None)
 
def _process_message (self, topic, index, stamp, msg, match=None, rootmsg=None, parent_type=None, parent_id=None)
 
def _process_type (self, topic, msg, rootmsg=None)
 
def _write_table (self, typekey)
 

Private Attributes

 _caches
 
 _close_printed
 
 _extra_basecols
 
 _extra_basevals
 
 _filebase
 
 _filenames
 
 _idgenerator
 
 _nesting
 
 _overwrite
 
 _patterns
 
 _schemas
 
 _writers
 

Detailed Description

Writes messages to Apache Parquet files.

Definition at line 34 of file parquet.py.

Constructor & Destructor Documentation

◆ __init__()

def grepros.plugins.parquet.ParquetSink.__init__ (   self,
  args = None,
**  kwargs 
)
@param   args                 arguments as namespace or dictionary, case-insensitive;
                      or a single path as the base name of Parquet files to write
@param   args.emit_field      message fields to emit in output if not all
@param   args.noemit_field    message fields to skip in output
@param   args.write           base name of Parquet files to write
@param   args.write_options   ```
                      {"column": additional columns as {name: (rostype, value)},
                       "type": {rostype: PyArrow type or typename like "uint8"},
                       "writer": dictionary of arguments passed to ParquetWriter,
                       "idgenerator": callable or iterable for producing message IDs
                                      like uuid.uuid4 or itertools.count();
                                      nesting uses UUID values by default,
                       "column-k=rostype:v": one "column"-argument
                                             in flat string form,
                       "type-k=v: one "type"-argument in flat string form,
                       "writer-k=v": one "writer"-argument in flat string form,
                       "nesting": "array" to recursively insert arrays
                                  of nested types, or "all" for any nesting,
                       "overwrite": whether to overwrite existing file
                                    (default false)}
                      ```
@param   args.meta            whether to print metainfo
@param   args.verbose         whether to print debug information
@param   kwargs               any and all arguments as keyword overrides, case-insensitive

Reimplemented from grepros.outputs.Sink.

Definition at line 94 of file parquet.py.

Member Function Documentation

◆ _configure()

def grepros.plugins.parquet.ParquetSink._configure (   self)
private
Parses args.WRITE_OPTIONS, returns success.

Definition at line 356 of file parquet.py.

◆ _configure_ids()

def grepros.plugins.parquet.ParquetSink._configure_ids (   self)
private
Configures ID generator from args.WRITE_OPTIONS, returns success.

Definition at line 421 of file parquet.py.

◆ _make_column_type()

def grepros.plugins.parquet.ParquetSink._make_column_type (   self,
  typename,
  fallback = None 
)
private
Returns pyarrow type for ROS type.

@param  fallback  fallback typename to use for lookup if typename not found

Definition at line 301 of file parquet.py.

◆ _make_column_value()

def grepros.plugins.parquet.ParquetSink._make_column_value (   self,
  value,
  typename = None 
)
private
Returns column value suitable for adding to Parquet file.

Definition at line 326 of file parquet.py.

◆ _process_message()

def grepros.plugins.parquet.ParquetSink._process_message (   self,
  topic,
  index,
  stamp,
  msg,
  match = None,
  rootmsg = None,
  parent_type = None,
  parent_id = None 
)
private
Converts message to pandas dataframe, adds to cache.

Writes cache to disk if length reached chunk size.

If nesting is enabled, processes nested messages for subtypes in message.
If IDs are used, returns generated ID.

Definition at line 255 of file parquet.py.

◆ _process_type()

def grepros.plugins.parquet.ParquetSink._process_type (   self,
  topic,
  msg,
  rootmsg = None 
)
private
Prepares Parquet schema and writer if not existing.

Definition at line 206 of file parquet.py.

◆ _write_table()

def grepros.plugins.parquet.ParquetSink._write_table (   self,
  typekey 
)
private
Writes out cached messages for type.

Definition at line 347 of file parquet.py.

◆ close()

def grepros.plugins.parquet.ParquetSink.close (   self)
Writes out any remaining messages, closes writers, clears structures.

Reimplemented from grepros.outputs.Sink.

Definition at line 176 of file parquet.py.

◆ emit()

def grepros.plugins.parquet.ParquetSink.emit (   self,
  topic,
  msg,
  stamp = None,
  match = None,
  index = None 
)
Writes message to a Parquet file.

Reimplemented from grepros.outputs.Sink.

Definition at line 167 of file parquet.py.

◆ validate()

def grepros.plugins.parquet.ParquetSink.validate (   self)
Returns whether required libraries are available (pandas and pyarrow) and overwrite is valid
and file base is writable.

Reimplemented from grepros.outputs.Sink.

Definition at line 140 of file parquet.py.

Member Data Documentation

◆ _caches

grepros.plugins.parquet.ParquetSink._caches
private

Definition at line 128 of file parquet.py.

◆ _close_printed

grepros.plugins.parquet.ParquetSink._close_printed
private

Definition at line 137 of file parquet.py.

◆ _extra_basecols

grepros.plugins.parquet.ParquetSink._extra_basecols
private

Definition at line 131 of file parquet.py.

◆ _extra_basevals

grepros.plugins.parquet.ParquetSink._extra_basevals
private

Definition at line 132 of file parquet.py.

◆ _filebase

grepros.plugins.parquet.ParquetSink._filebase
private

Definition at line 125 of file parquet.py.

◆ _filenames

grepros.plugins.parquet.ParquetSink._filenames
private

Definition at line 127 of file parquet.py.

◆ _idgenerator

grepros.plugins.parquet.ParquetSink._idgenerator
private

Definition at line 135 of file parquet.py.

◆ _nesting

grepros.plugins.parquet.ParquetSink._nesting
private

Definition at line 134 of file parquet.py.

◆ _overwrite

grepros.plugins.parquet.ParquetSink._overwrite
private

Definition at line 126 of file parquet.py.

◆ _patterns

grepros.plugins.parquet.ParquetSink._patterns
private

Definition at line 133 of file parquet.py.

◆ _schemas

grepros.plugins.parquet.ParquetSink._schemas
private

Definition at line 129 of file parquet.py.

◆ _writers

grepros.plugins.parquet.ParquetSink._writers
private

Definition at line 130 of file parquet.py.

◆ ARROW_TYPES

dictionary grepros.plugins.parquet.ParquetSink.ARROW_TYPES
static
Initial value:
= {
"bool": pyarrow.bool_, "bool_": pyarrow.bool_,
"float16": pyarrow.float16, "float64": pyarrow.float64,
"float32": pyarrow.float32, "decimal128": pyarrow.decimal128,
"int8": pyarrow.int8, "uint8": pyarrow.uint8,
"int16": pyarrow.int16, "uint16": pyarrow.uint16,
"int32": pyarrow.int32, "uint32": pyarrow.uint32,
"int64": pyarrow.int64, "uint64": pyarrow.uint64,
"date32": pyarrow.date32, "time32": pyarrow.time32,
"date64": pyarrow.date64, "time64": pyarrow.time64,
"timestamp": pyarrow.timestamp, "duration": pyarrow.duration,
"binary": pyarrow.binary, "large_binary": pyarrow.large_binary,
"string": pyarrow.string, "large_string": pyarrow.large_string,
"utf8": pyarrow.string, "large_utf8": pyarrow.large_utf8,
"list": pyarrow.list_, "list_": pyarrow.list_,
"large_list": pyarrow.large_list,
} if pyarrow else {}

Mapping from pyarrow type names and aliases to pyarrow type constructors.

Definition at line 44 of file parquet.py.

◆ CHUNK_SIZE

int grepros.plugins.parquet.ParquetSink.CHUNK_SIZE = 100
static

Number of dataframes to cache before writing, per type.

Definition at line 41 of file parquet.py.

◆ COMMON_TYPES

dictionary grepros.plugins.parquet.ParquetSink.COMMON_TYPES
static
Initial value:
= {
"int8": pyarrow.int8(), "int16": pyarrow.int16(), "int32": pyarrow.int32(),
"uint8": pyarrow.uint8(), "uint16": pyarrow.uint16(), "uint32": pyarrow.uint32(),
"int64": pyarrow.int64(), "uint64": pyarrow.uint64(), "bool": pyarrow.bool_(),
"string": pyarrow.string(), "wstring": pyarrow.string(), "uint8[]": pyarrow.binary(),
"float32": pyarrow.float32(), "float64": pyarrow.float64(),
} if pyarrow else {}

Mapping from ROS common type names to pyarrow type constructors.

Definition at line 66 of file parquet.py.

◆ DEFAULT_ARGS

grepros.plugins.parquet.ParquetSink.DEFAULT_ARGS
static
Initial value:
= dict(EMIT_FIELD=(), META=False, NOEMIT_FIELD=(), WRITE_OPTIONS={},
VERBOSE=False)

Constructor argument defaults.

Definition at line 90 of file parquet.py.

◆ DEFAULT_TYPE

grepros.plugins.parquet.ParquetSink.DEFAULT_TYPE = pyarrow.string() if pyarrow else None
static

Fallback pyarrow type if mapped type not found.

Definition at line 75 of file parquet.py.

◆ FILE_EXTENSIONS

tuple grepros.plugins.parquet.ParquetSink.FILE_EXTENSIONS = (".parquet", )
static

Auto-detection file extensions.

Definition at line 38 of file parquet.py.

◆ MESSAGE_TYPE_BASECOLS [1/2]

list grepros.plugins.parquet.ParquetSink.MESSAGE_TYPE_BASECOLS
static
Initial value:
= [("_topic", "string"),
("_timestamp", "time"), ]

Default columns for message type tables.

Definition at line 78 of file parquet.py.

◆ MESSAGE_TYPE_BASECOLS [2/2]

grepros.plugins.parquet.ParquetSink.MESSAGE_TYPE_BASECOLS

Definition at line 459 of file parquet.py.

◆ MESSAGE_TYPE_NESTCOLS [1/2]

list grepros.plugins.parquet.ParquetSink.MESSAGE_TYPE_NESTCOLS
static
Initial value:
= [("_id", "string"),
("_parent_type", "string"),
("_parent_id", "string"), ]

Additional default columns for messaga type tables with nesting output.

Definition at line 82 of file parquet.py.

◆ MESSAGE_TYPE_NESTCOLS [2/2]

grepros.plugins.parquet.ParquetSink.MESSAGE_TYPE_NESTCOLS

Definition at line 460 of file parquet.py.

◆ valid

grepros.plugins.parquet.ParquetSink.valid

Definition at line 163 of file parquet.py.

◆ WRITER_ARGS

dictionary grepros.plugins.parquet.ParquetSink.WRITER_ARGS = {"version": "2.6"}
static

Custom arguments for pyarrow.parquet.ParquetWriter.

Definition at line 87 of file parquet.py.


The documentation for this class was generated from the following file:


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:30