sqlite.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 SQLite output plugin.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 03.12.2021
11 @modified 27.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 import collections
16 import json
17 import os
18 import sqlite3
19 
20 import six
21 
22 from ... import api
23 from ... common import ConsolePrinter, format_bytes, makedirs, verify_io
24 from ... outputs import RolloverSinkMixin
25 from . dbbase import BaseDataSink, quote
26 
27 
29  """
30  Writes messages to an SQLite database.
31 
32  Output will have:
33  - table "messages", with all messages as serialized binary data
34  - table "types", with message definitions
35  - table "topics", with topic information
36 
37  plus:
38  - table "pkg/MsgType" for each message type, with detailed fields,
39  and JSON fields for arrays of nested subtypes,
40  with foreign keys if nesting else subtype values as JSON dictionaries;
41  plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
42 
43  If launched with nesting-option, tables will also be created for each
44  nested message type.
45 
46  - view "/topic/full/name" for each topic,
47  selecting from the message type table
48 
49  """
50 
51 
52  ENGINE = "SQLite"
53 
54 
55  FILE_EXTENSIONS = (".sqlite", ".sqlite3")
56 
57 
58  MAX_INT = 2**63 - 1
59 
60 
61  DEFAULT_ARGS = dict(META=False, WRITE_OPTIONS={}, VERBOSE=False)
62 
63 
64  def __init__(self, args=None, **kwargs):
65  """
66  @param args arguments as namespace or dictionary, case-insensitive;
67  or a single path as the name of SQLitefile to write
68  @param args.write name of SQLite file to write, will be appended to if exists
69  @param args.write_options ```
70  {"commit-interval": transaction size (0 is autocommit),
71  "message-yaml": populate messages.yaml (default true),
72  "nesting": "array" to recursively insert arrays
73  of nested types, or "all" for any nesting),
74  "overwrite": whether to overwrite existing file
75  (default false),
76  "rollover-size": bytes limit for individual output files,
77  "rollover-count": message limit for individual output files,
78  "rollover-duration": time span limit for individual output files,
79  as ROS duration or convertible seconds,
80  "rollover-template": output filename template, supporting
81  strftime format codes like "%H-%M-%S"
82  and "%(index)s" as output file index}
83  ```
84  @param args.meta whether to emit metainfo
85  @param args.verbose whether to emit debug information
86  @param kwargs any and all arguments as keyword overrides, case-insensitive
87  """
88  super(SqliteSink, self).__init__(args, **kwargs)
89  RolloverSinkMixin.__init__(self, args)
90 
91  self._do_yaml = (self.args.WRITE_OPTIONS.get("message-yaml") != "false")
92  self._overwrite = (self.args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
93  self._id_counters = {} # {table next: max ID}
94 
95 
96  def validate(self):
97  """
98  Returns whether "commit-interval" and "nesting" in args.write_options have valid value, if any,
99  and file is writable; parses "message-yaml" and "overwrite" from args.write_options.
100  """
101  if self.valid is not None: return self.valid
102  ok = all([super(SqliteSink, self).validate(), RolloverSinkMixin.validate(self)])
103  if self.args.WRITE_OPTIONS.get("message-yaml") not in (None, True, False, "true", "false"):
104  ConsolePrinter.error("Invalid message-yaml option for %s: %r. "
105  "Choose one of {true, false}.",
106  self.ENGINE, self.args.WRITE_OPTIONS["message-yaml"])
107  ok = False
108  if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
109  ConsolePrinter.error("Invalid overwrite option for %s: %r. "
110  "Choose one of {true, false}.",
111  self.ENGINE, self.args.WRITE_OPTIONS["overwrite"])
112  ok = False
113  if not verify_io(self.args.WRITE, "w"):
114  ok = False
115  self.valid = ok
116  return self.valid
117 
118 
119  def emit(self, topic, msg, stamp=None, match=None, index=None):
120  """Writes message to database."""
121  stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
122  RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
123  super(SqliteSink, self).emit(topic, msg, stamp, match, index)
124 
125 
126  @property
127  def size(self):
128  """Returns current file size in bytes, including journals, or None if size lookup failed."""
129  sizes = []
130  for f in ("%s%s" % (self.filename, x) for x in ("", "-journal", "-wal")):
131  try: os.path.isfile(f) and sizes.append(os.path.getsize(f))
132  except Exception as e: ConsolePrinter.warn("Error getting size of %s: %s", f, e)
133  return sum(sizes) if sizes else None
134 
135 
136  def _init_db(self):
137  """Opens the database file and populates schema if not already existing."""
138  for t in (dict, list, tuple): sqlite3.register_adapter(t, json.dumps)
139  for t in six.integer_types:
140  sqlite3.register_adapter(t, lambda x: str(x) if abs(x) > self.MAX_INT else x)
141  sqlite3.register_converter("JSON", json.loads)
142  self.filename = self.filename or self.make_filename()
143  if self.args.VERBOSE:
144  sz = self.size
145  action = "Overwriting" if sz and self._overwrite else \
146  "Appending to" if sz else "Creating"
147  ConsolePrinter.debug("%s SQLite output %s%s.", action, self.filename,
148  (" (%s)" % format_bytes(sz)) if sz else "")
149  super(SqliteSink, self)._init_db()
150 
151 
152  def _load_schema(self):
153  """Populates instance attributes with schema metainfo."""
154  super(SqliteSink, self)._load_schema()
155  for row in self.db.execute("SELECT name FROM sqlite_master "
156  "WHERE type = 'table' AND name LIKE '%/%'"):
157  cols = self.db.execute("PRAGMA table_info(%s)" % quote(row["name"])).fetchall()
158  typerow = next((x for x in self._types.values()
159  if x["table_name"] == row["name"]), None)
160  if not typerow: continue # for row
161  typekey = (typerow["type"], typerow["md5"])
162  self._schema[typekey] = collections.OrderedDict([(c["name"], c) for c in cols])
163 
164 
165  def _process_message(self, topic, msg, stamp):
166  """Inserts message to messages-table, and to pkg/MsgType tables."""
167  with api.TypeMeta.make(msg, topic) as m:
168  topic_id, typename = self._topics[m.topickey]["id"], m.typename
169  margs = dict(dt=api.to_datetime(stamp), timestamp=api.to_nsec(stamp),
170  topic=topic, name=topic, topic_id=topic_id, type=typename,
171  yaml=str(msg) if self._do_yaml else "", data=api.serialize_message(msg))
172  self._ensure_execute(self._get_dialect_option("insert_message"), margs)
173  super(SqliteSink, self)._process_message(topic, msg, stamp)
174 
175 
176  def _connect(self):
177  """Returns new database connection."""
178  makedirs(os.path.dirname(self.filename))
179  if self._overwrite: open(self.filename, "w").close()
180  db = sqlite3.connect(self.filename, check_same_thread=False,
181  detect_types=sqlite3.PARSE_DECLTYPES)
182  if not self.COMMIT_INTERVAL: db.isolation_level = None
183  db.row_factory = lambda cursor, row: dict(sqlite3.Row(cursor, row))
184  return db
185 
186 
187  def _execute_insert(self, sql, args):
188  """Executes INSERT statement, returns inserted ID."""
189  return self._cursor.execute(sql, args).lastrowid
190 
191 
192  def _executemany(self, sql, argses):
193  """Executes SQL with all args sequences."""
194  self._cursor.executemany(sql, argses)
195 
196 
197  def _executescript(self, sql):
198  """Executes SQL with one or more statements."""
199  self._cursor.executescript(sql)
200 
201 
202  def _get_next_id(self, table):
203  """Returns next ID value for table, using simple auto-increment."""
204  if not self._id_counters.get(table):
205  sql = "SELECT COALESCE(MAX(_id), 0) AS id FROM %s" % quote(table)
206  self._id_counters[table] = self.db.execute(sql).fetchone()["id"]
207  self._id_counters[table] += 1
208  return self._id_counters[table]
209 
210 
211  def _make_db_label(self):
212  """Returns formatted label for database, with file path and size."""
213  sz = self.size
214  return "%s (%s)" % (self.filename, "error getting size" if sz is None else sz)
215 
216 
217 
218 def init(*_, **__):
219  """Adds SQLite output format support."""
220  from ... import plugins # Late import to avoid circular
221  plugins.add_write_format("sqlite", SqliteSink, "SQLite", [
222  ("commit-interval=NUM", "transaction size for SQLite output\n"
223  "(default 1000, 0 is autocommit)"),
224  ("dialect-file=path/to/dialects.yaml",
225  "load additional SQL dialect options\n"
226  "for SQLite output\n"
227  "from a YAML or JSON file"),
228  ("message-yaml=true|false", "whether to populate table field messages.yaml\n"
229  "in SQLite output (default true)"),
230  ("nesting=array|all", "create tables for nested message types\n"
231  "in SQLite output,\n"
232  'only for arrays if "array" \n'
233  "else for any nested types\n"
234  "(array fields in parent will be populated \n"
235  " with foreign keys instead of messages as JSON)"),
236  ("overwrite=true|false", "overwrite existing file in SQLite output\n"
237  "instead of appending to file (default false)")
238  ] + RolloverSinkMixin.get_write_options("SQLite"))
239 
240 
241 __all__ = ["SqliteSink", "init"]
grepros.plugins.auto.dbbase.BaseDataSink.db
db
Database connection.
Definition: dbbase.py:87
grepros.plugins.auto.sqlite.SqliteSink._make_db_label
def _make_db_label(self)
Definition: sqlite.py:211
grepros.plugins.auto.sqlite.SqliteSink.MAX_INT
int MAX_INT
Maximum integer size supported in SQLite, higher values inserted as string.
Definition: sqlite.py:58
grepros.outputs.Sink.args
args
Definition: outputs.py:50
grepros.plugins.auto.sqlite.SqliteSink.size
def size(self)
Definition: sqlite.py:127
grepros.plugins.auto.sqlite.SqliteSink._overwrite
_overwrite
Definition: sqlite.py:92
grepros.plugins.auto.sqlite.SqliteSink
Definition: sqlite.py:28
grepros.plugins.auto.sqlite.SqliteSink._executemany
def _executemany(self, sql, argses)
Definition: sqlite.py:192
grepros.plugins.auto.sqlite.SqliteSink._process_message
def _process_message(self, topic, msg, stamp)
Definition: sqlite.py:165
grepros.common.verify_io
def verify_io(f, mode)
Definition: common.py:1019
grepros.common.format_bytes
def format_bytes(size, precision=2, inter=" ", strip=True)
Definition: common.py:779
grepros.plugins.auto.sqlite.SqliteSink._get_next_id
def _get_next_id(self, table)
Definition: sqlite.py:202
grepros.plugins.auto.sqlite.init
def init(*_, **__)
Definition: sqlite.py:218
grepros.plugins.auto.sqlbase.SqlMixin._get_dialect_option
def _get_dialect_option(self, option)
Definition: sqlbase.py:371
grepros.plugins.auto.dbbase.BaseDataSink._ensure_execute
def _ensure_execute(self, sql, args)
Definition: dbbase.py:354
grepros.plugins.auto.sqlite.SqliteSink.validate
def validate(self)
Definition: sqlite.py:96
grepros.plugins.auto.dbbase.BaseDataSink.ENGINE
ENGINE
Database engine name, overridden in subclasses.
Definition: dbbase.py:46
grepros.plugins.auto.sqlite.SqliteSink._do_yaml
_do_yaml
Definition: sqlite.py:91
grepros.plugins.auto.sqlite.SqliteSink._load_schema
def _load_schema(self)
Definition: sqlite.py:152
grepros.outputs.Sink.valid
valid
Result of validate()
Definition: outputs.py:52
grepros.plugins.auto.sqlbase.SqlMixin._topics
_topics
Definition: sqlbase.py:50
grepros.plugins.auto.sqlite.SqliteSink._id_counters
_id_counters
Definition: sqlite.py:93
grepros.outputs.RolloverSinkMixin.make_filename
def make_filename(self)
Definition: outputs.py:446
grepros.plugins.auto.sqlite.SqliteSink._execute_insert
def _execute_insert(self, sql, args)
Definition: sqlite.py:187
grepros.plugins.auto.sqlite.SqliteSink.emit
def emit(self, topic, msg, stamp=None, match=None, index=None)
Definition: sqlite.py:119
grepros.plugins.auto.sqlite.SqliteSink.__init__
def __init__(self, args=None, **kwargs)
Definition: sqlite.py:64
grepros.outputs.RolloverSinkMixin.filename
filename
Current output file path.
Definition: outputs.py:371
grepros.plugins.auto.dbbase.BaseDataSink.COMMIT_INTERVAL
int COMMIT_INTERVAL
Number of emits between commits; 0 is autocommit.
Definition: dbbase.py:49
grepros.common.makedirs
def makedirs(path)
Definition: common.py:856
grepros.outputs.RolloverSinkMixin
Definition: outputs.py:323
grepros.plugins.auto.dbbase.BaseDataSink
Definition: dbbase.py:24
grepros.plugins.auto.sqlbase.quote
def quote(name, force=False)
Definition: sqlbase.py:631
grepros.plugins.auto.sqlite.SqliteSink._executescript
def _executescript(self, sql)
Definition: sqlite.py:197
grepros.plugins.auto.dbbase.BaseDataSink._cursor
_cursor
Definition: dbbase.py:89
grepros.outputs.RolloverSinkMixin.size
def size(self)
Definition: outputs.py:491
grepros.plugins.auto.dbbase.BaseDataSink.close
def close(self)
Definition: dbbase.py:139
grepros.plugins.auto.sqlbase.SqlMixin._schema
_schema
Definition: sqlbase.py:52
grepros.outputs.Sink._ensure_stamp_index
def _ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition: outputs.py:115
grepros.plugins.auto.sqlite.SqliteSink._connect
def _connect(self)
Definition: sqlite.py:176
grepros.plugins.auto.sqlbase.SqlMixin._types
_types
Definition: sqlbase.py:51
grepros.plugins.auto.sqlite.SqliteSink._init_db
def _init_db(self)
Definition: sqlite.py:136


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29