3 Shared functionality for database sinks.
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
19 from ... common
import PATH_TYPES, ConsolePrinter, ensure_namespace, plural
20 from ... outputs
import Sink
21 from . sqlbase
import SqlMixin, quote
26 Base class for writing messages to a database.
29 - table "topics", with topic and message type names
30 - table "types", with message type definitions
33 - table "pkg/MsgType" for each topic message type, with detailed fields,
34 BYTEA fields for uint8[], array fields for scalar list attributes,
35 and JSONB fields for lists of ROS messages;
36 with foreign keys if nesting else subtype values as JSON dictionaries;
37 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
38 If not nesting, only topic message type tables are created.
39 - view "/full/topic/name" for each topic, selecting from the message type table
41 If a message type table already exists but for a type with a different MD5 hash,
42 the new table will have its MD5 hash appended to end, as "pkg/MsgType (hash)".
49 COMMIT_INTERVAL = 1000
52 MESSAGE_TYPE_TOPICCOLS = [(
"_topic",
"TEXT"),
53 (
"_topic_id",
"INTEGER"), ]
55 MESSAGE_TYPE_BASECOLS = [(
"_dt",
"TIMESTAMP"),
56 (
"_timestamp",
"INTEGER"),
57 (
"_id",
"INTEGER NOT NULL "
58 "PRIMARY KEY AUTOINCREMENT"), ]
60 MESSAGE_TYPE_NESTCOLS = [(
"_parent_type",
"TEXT"),
61 (
"_parent_id",
"INTEGER"), ]
64 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
69 @param args arguments as namespace or dictionary, case-insensitive;
70 or a single item as the database connection string
71 @param args.write database connection string
72 @param args.write_options ```
73 {"commit-interval": transaction size (0 is autocommit),
74 "nesting": "array" to recursively insert arrays
75 of nested types, or "all" for any nesting)}
77 @param args.meta whether to emit metainfo
78 @param args.verbose whether to emit debug information
79 @param kwargs any and all arguments as keyword overrides, case-insensitive
81 args = {
"WRITE": str(args)}
if isinstance(args, PATH_TYPES)
else args
83 super(BaseDataSink, self).
__init__(args)
84 SqlMixin.__init__(self, args)
97 self.
_nesting = args.WRITE_OPTIONS.get(
"nesting")
103 atexit.register(self.
close)
108 Returns whether args.write_options has valid values, if any.
110 Checks parameters "commit-interval" and "nesting".
112 ok, sqlconfig_ok =
True, SqlMixin._validate_dialect_file(self)
113 if "commit-interval" in self.
args.WRITE_OPTIONS:
114 try: ok = int(self.
args.WRITE_OPTIONS[
"commit-interval"]) >= 0
115 except Exception: ok =
False
117 ConsolePrinter.error(
"Invalid commit-interval option for %s: %r.",
118 self.
ENGINE, self.
args.WRITE_OPTIONS[
"commit-interval"])
119 if self.
args.WRITE_OPTIONS.get(
"nesting")
not in (
None,
False,
"",
"array",
"all"):
120 ConsolePrinter.error(
"Invalid nesting option for %s: %r. "
121 "Choose one of {array,all}.",
122 self.
ENGINE, self.
args.WRITE_OPTIONS[
"nesting"])
124 return ok
and sqlconfig_ok
127 def emit(self, topic, msg, stamp=None, match=None, index=None):
128 """Writes message to database."""
129 if not self.
validate():
raise Exception(
"invalid")
136 super(BaseDataSink, self).
emit(topic, msg, stamp, match, index)
140 """Closes database connection, if any, emits metainfo."""
145 if hasattr(self,
"format_output_meta"):
146 ConsolePrinter.debug(
"Wrote %s database for %s",
147 self.
ENGINE, self.format_output_meta())
150 ConsolePrinter.debug(
"Wrote %s in %s to %s database %s.",
154 ConsolePrinter.debug(
"Wrote %s in %s to %s.",
161 super(BaseDataSink, self).
close()
165 """Closes database connection, if any, executing any pending statements."""
177 """Opens database connection, and populates schema if not already existing."""
181 if "commit-interval" in self.
args.WRITE_OPTIONS:
194 """Populates instance attributes with schema metainfo."""
195 self.
_cursor.execute(
"SELECT * FROM topics")
196 for row
in self.
_cursor.fetchall():
197 topickey = (row[
"name"], row[
"md5"])
200 self.
_cursor.execute(
"SELECT * FROM types")
201 for row
in self.
_cursor.fetchall():
202 typekey = (row[
"type"], row[
"md5"])
203 self.
_types[typekey] = row
208 Inserts topics-row and creates view `/topic/name` if not already existing.
210 Also creates types-row and pkg/MsgType table for this message if not existing.
211 If nesting enabled, creates types recursively.
213 topickey = api.TypeMeta.make(msg, topic).topickey
217 if topickey
not in self.
_topics:
225 if self.
args.VERBOSE
and topickey
not in self.
_counts:
226 ConsolePrinter.debug(
"Adding topic %s in %s output.", topic, self.
ENGINE)
235 Creates types-row and pkg/MsgType table if not already existing.
237 @return created types-row, or None if already existed
239 rootmsg = rootmsg
or msg
240 with api.TypeMeta.make(msg, root=rootmsg)
as m:
241 typename, typekey = (m.typename, m.typekey)
245 if typekey
not in self.
_types:
246 if self.
args.VERBOSE
and typekey
not in self.
_schema:
247 ConsolePrinter.debug(
"Adding type %s in %s output.", typename, self.
ENGINE)
251 self.
_schema[typekey] = collections.OrderedDict(tdata.pop(
"cols"))
252 self.
_types[typekey] = tdata
259 nested_tables = self.
_types[typekey].get(
"nested_tables")
or {}
260 nesteds = api.iter_message_fields(msg, messages_only=
True)
if self.
_nesting else ()
261 for path, submsgs, subtype
in nesteds:
262 scalartype = api.scalar(subtype)
263 if subtype == scalartype
and "all" != self.
_nesting:
267 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
270 if subdata: nested_tables[
".".join(path)] = subdata[
"table_name"]
272 self.
_types[typekey][
"nested_tables"] = nested_tables
273 sets, where = {
"nested_tables": nested_tables}, {
"id": self.
_types[typekey][
"id"]}
275 self.
_cursor.execute(sql, args)
277 return self.
_types[typekey]
282 Inserts pkg/MsgType row for this message.
284 Inserts sub-rows for subtypes in message if nesting enabled.
285 Commits transaction if interval due.
290 for sql
in list(self.
_sql_queue)
if do_commit
else ():
292 do_commit
and self.
db.commit()
296 rootmsg=None, parent_type=None, parent_id=None):
298 Inserts pkg/MsgType row for message.
300 If nesting is enabled, inserts sub-rows for subtypes in message,
301 and returns inserted ID.
303 rootmsg = rootmsg
or msg
304 with api.TypeMeta.make(msg, root=rootmsg)
as m:
305 typename, typekey = m.typename, m.typekey
306 with api.TypeMeta.make(rootmsg)
as m:
307 topic_id = self.
_topics[m.topickey][
"id"]
308 table_name = self.
_types[typekey][
"table_name"]
312 colvals = [topic, topic_id, api.to_datetime(stamp), api.to_nsec(stamp)]
315 colvals += [myid, parent_type, parent_id]
316 extra_cols = list(zip([c
for c, _
in coldefs], colvals))
322 nesteds = api.iter_message_fields(msg, messages_only=
True)
if self.
_nesting else ()
323 for subpath, submsgs, subtype
in nesteds:
324 scalartype = api.scalar(subtype)
325 if subtype == scalartype
and "all" != self.
_nesting:
327 if isinstance(submsgs, (list, tuple)):
329 for submsg
in submsgs
if isinstance(submsgs, (list, tuple))
else [submsgs]:
330 subid = self.
_populate_type(topic, submsg, stamp, rootmsg, typename, myid)
331 if isinstance(submsgs, (list, tuple)):
332 subids[subpath].append(subid)
334 sets, where = {
".".join(p): subids[p]
for p
in subids}, {
"_id": myid}
341 """Adds specified columns to any type tables lacking them."""
343 for typekey, typecols
in ((k, v)
for k, v
in self.
_schema.items()
if k
in self.
_types):
344 table_name = self.
_types[typekey][
"table_name"]
345 for c, t
in ((c, t)
for c, t
in cols
if c
not in typecols):
346 sql =
"ALTER TABLE %s ADD COLUMN %s %s;" % (
quote(table_name), c, t)
355 """Executes SQL if in autocommit mode, else caches arguments for batch execution."""
356 args = tuple(args)
if isinstance(args, list)
else args
358 self.
_sql_queue.setdefault(sql, []).append(args)
360 self.
_cursor.execute(sql, args)
364 """Returns new database connection."""
365 raise NotImplementedError()
369 """Executes INSERT statement, returns inserted ID."""
370 raise NotImplementedError()
374 """Executes SQL with all args sequences."""
375 raise NotImplementedError()
379 """Executes SQL with one or more statements."""
380 raise NotImplementedError()
384 """Returns next ID value for table."""
385 raise NotImplementedError()
389 """Returns new database cursor."""
390 return self.
db.cursor()
394 """Returns formatted label for database."""
395 return self.
args.WRITE
398 __all__ = [
"BaseDataSink"]