5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
23 from ... common
import ConsolePrinter, format_bytes, makedirs, verify_io
24 from ... outputs
import RolloverSinkMixin
25 from . dbbase
import BaseDataSink, quote
30 Writes messages to an SQLite database.
33 - table "messages", with all messages as serialized binary data
34 - table "types", with message definitions
35 - table "topics", with topic information
38 - table "pkg/MsgType" for each message type, with detailed fields,
39 and JSON fields for arrays of nested subtypes,
40 with foreign keys if nesting else subtype values as JSON dictionaries;
41 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
43 If launched with nesting-option, tables will also be created for each
46 - view "/topic/full/name" for each topic,
47 selecting from the message type table
55 FILE_EXTENSIONS = (
".sqlite",
".sqlite3")
61 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
66 @param args arguments as namespace or dictionary, case-insensitive;
67 or a single path as the name of SQLitefile to write
68 @param args.write name of SQLite file to write, will be appended to if exists
69 @param args.write_options ```
70 {"commit-interval": transaction size (0 is autocommit),
71 "message-yaml": populate messages.yaml (default true),
72 "nesting": "array" to recursively insert arrays
73 of nested types, or "all" for any nesting),
74 "overwrite": whether to overwrite existing file
76 "rollover-size": bytes limit for individual output files,
77 "rollover-count": message limit for individual output files,
78 "rollover-duration": time span limit for individual output files,
79 as ROS duration or convertible seconds,
80 "rollover-template": output filename template, supporting
81 strftime format codes like "%H-%M-%S"
82 and "%(index)s" as output file index}
84 @param args.meta whether to emit metainfo
85 @param args.verbose whether to emit debug information
86 @param kwargs any and all arguments as keyword overrides, case-insensitive
88 super(SqliteSink, self).
__init__(args, **kwargs)
89 RolloverSinkMixin.__init__(self, args)
91 self.
_do_yaml = (self.
args.WRITE_OPTIONS.get(
"message-yaml") !=
"false")
92 self.
_overwrite = (self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
98 Returns whether "commit-interval" and "nesting" in args.write_options have valid value, if any,
99 and file is writable; parses "message-yaml" and "overwrite" from args.write_options.
102 ok = all([super(SqliteSink, self).
validate(), RolloverSinkMixin.validate(self)])
103 if self.
args.WRITE_OPTIONS.get(
"message-yaml")
not in (
None,
True,
False,
"true",
"false"):
104 ConsolePrinter.error(
"Invalid message-yaml option for %s: %r. "
105 "Choose one of {true, false}.",
106 self.
ENGINE, self.
args.WRITE_OPTIONS[
"message-yaml"])
108 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
109 ConsolePrinter.error(
"Invalid overwrite option for %s: %r. "
110 "Choose one of {true, false}.",
111 self.
ENGINE, self.
args.WRITE_OPTIONS[
"overwrite"])
119 def emit(self, topic, msg, stamp=None, match=None, index=None):
120 """Writes message to database."""
122 RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
123 super(SqliteSink, self).
emit(topic, msg, stamp, match, index)
128 """Returns current file size in bytes, including journals, or None if size lookup failed."""
130 for f
in (
"%s%s" % (self.
filename, x)
for x
in (
"",
"-journal",
"-wal")):
131 try: os.path.isfile(f)
and sizes.append(os.path.getsize(f))
132 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", f, e)
133 return sum(sizes)
if sizes
else None
137 """Opens the database file and populates schema if not already existing."""
138 for t
in (dict, list, tuple): sqlite3.register_adapter(t, json.dumps)
139 for t
in six.integer_types:
140 sqlite3.register_adapter(t,
lambda x: str(x)
if abs(x) > self.
MAX_INT else x)
141 sqlite3.register_converter(
"JSON", json.loads)
143 if self.
args.VERBOSE:
145 action =
"Overwriting" if sz
and self.
_overwrite else \
146 "Appending to" if sz
else "Creating"
147 ConsolePrinter.debug(
"%s SQLite output %s%s.", action, self.
filename,
153 """Populates instance attributes with schema metainfo."""
155 for row
in self.
db.execute(
"SELECT name FROM sqlite_master "
156 "WHERE type = 'table' AND name LIKE '%/%'"):
157 cols = self.
db.execute(
"PRAGMA table_info(%s)" %
quote(row[
"name"])).fetchall()
158 typerow = next((x
for x
in self.
_types.values()
159 if x[
"table_name"] == row[
"name"]),
None)
160 if not typerow:
continue
161 typekey = (typerow[
"type"], typerow[
"md5"])
162 self.
_schema[typekey] = collections.OrderedDict([(c[
"name"], c)
for c
in cols])
166 """Inserts message to messages-table, and to pkg/MsgType tables."""
167 with api.TypeMeta.make(msg, topic)
as m:
168 topic_id, typename = self.
_topics[m.topickey][
"id"], m.typename
169 margs = dict(dt=api.to_datetime(stamp), timestamp=api.to_nsec(stamp),
170 topic=topic, name=topic, topic_id=topic_id, type=typename,
171 yaml=str(msg)
if self.
_do_yaml else "", data=api.serialize_message(msg))
177 """Returns new database connection."""
180 db = sqlite3.connect(self.
filename, check_same_thread=
False,
181 detect_types=sqlite3.PARSE_DECLTYPES)
183 db.row_factory =
lambda cursor, row: dict(sqlite3.Row(cursor, row))
188 """Executes INSERT statement, returns inserted ID."""
189 return self.
_cursor.execute(sql, args).lastrowid
193 """Executes SQL with all args sequences."""
194 self.
_cursor.executemany(sql, argses)
198 """Executes SQL with one or more statements."""
199 self.
_cursor.executescript(sql)
203 """Returns next ID value for table, using simple auto-increment."""
205 sql =
"SELECT COALESCE(MAX(_id), 0) AS id FROM %s" %
quote(table)
212 """Returns formatted label for database, with file path and size."""
214 return "%s (%s)" % (self.
filename,
"error getting size" if sz
is None else sz)
219 """Adds SQLite output format support."""
220 from ...
import plugins
221 plugins.add_write_format(
"sqlite", SqliteSink,
"SQLite", [
222 (
"commit-interval=NUM",
"transaction size for SQLite output\n"
223 "(default 1000, 0 is autocommit)"),
224 (
"dialect-file=path/to/dialects.yaml",
225 "load additional SQL dialect options\n"
226 "for SQLite output\n"
227 "from a YAML or JSON file"),
228 (
"message-yaml=true|false",
"whether to populate table field messages.yaml\n"
229 "in SQLite output (default true)"),
230 (
"nesting=array|all",
"create tables for nested message types\n"
231 "in SQLite output,\n"
232 'only for arrays if "array" \n'
233 "else for any nested types\n"
234 "(array fields in parent will be populated \n"
235 " with foreign keys instead of messages as JSON)"),
236 (
"overwrite=true|false",
"overwrite existing file in SQLite output\n"
237 "instead of appending to file (default false)")
238 ] + RolloverSinkMixin.get_write_options(
"SQLite"))
241 __all__ = [
"SqliteSink",
"init"]