dbbase.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Shared functionality for database sinks.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 11.12.2021
11 @modified 27.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 import atexit
16 import collections
17 
18 from ... import api
19 from ... common import PATH_TYPES, ConsolePrinter, ensure_namespace, plural
20 from ... outputs import Sink
21 from . sqlbase import SqlMixin, quote
22 
23 
25  """
26  Base class for writing messages to a database.
27 
28  Output will have:
29  - table "topics", with topic and message type names
30  - table "types", with message type definitions
31 
32  plus:
33  - table "pkg/MsgType" for each topic message type, with detailed fields,
34  BYTEA fields for uint8[], array fields for scalar list attributes,
35  and JSONB fields for lists of ROS messages;
36  with foreign keys if nesting else subtype values as JSON dictionaries;
37  plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
38  If not nesting, only topic message type tables are created.
39  - view "/full/topic/name" for each topic, selecting from the message type table
40 
41  If a message type table already exists but for a type with a different MD5 hash,
42  the new table will have its MD5 hash appended to end, as "pkg/MsgType (hash)".
43  """
44 
45 
46  ENGINE = None
47 
48 
49  COMMIT_INTERVAL = 1000
50 
51 
52  MESSAGE_TYPE_TOPICCOLS = [("_topic", "TEXT"),
53  ("_topic_id", "INTEGER"), ]
54 
55  MESSAGE_TYPE_BASECOLS = [("_dt", "TIMESTAMP"),
56  ("_timestamp", "INTEGER"),
57  ("_id", "INTEGER NOT NULL "
58  "PRIMARY KEY AUTOINCREMENT"), ]
59 
60  MESSAGE_TYPE_NESTCOLS = [("_parent_type", "TEXT"),
61  ("_parent_id", "INTEGER"), ]
62 
63 
64  DEFAULT_ARGS = dict(META=False, WRITE_OPTIONS={}, VERBOSE=False)
65 
66 
67  def __init__(self, args=None, **kwargs):
68  """
69  @param args arguments as namespace or dictionary, case-insensitive;
70  or a single item as the database connection string
71  @param args.write database connection string
72  @param args.write_options ```
73  {"commit-interval": transaction size (0 is autocommit),
74  "nesting": "array" to recursively insert arrays
75  of nested types, or "all" for any nesting)}
76  ```
77  @param args.meta whether to emit metainfo
78  @param args.verbose whether to emit debug information
79  @param kwargs any and all arguments as keyword overrides, case-insensitive
80  """
81  args = {"WRITE": str(args)} if isinstance(args, PATH_TYPES) else args
82  args = ensure_namespace(args, BaseDataSink.DEFAULT_ARGS, **kwargs)
83  super(BaseDataSink, self).__init__(args)
84  SqlMixin.__init__(self, args)
85 
86 
87  self.db = None
88 
89  self._cursor = None # Database cursor or connection
90  self._dialect = self.ENGINE.lower() # Override SqlMixin._dialect
91  self._close_printed = False
92 
93  # Whether to create tables and rows for nested message types,
94  # "array" if to do this only for arrays of nested types, or
95  # "all" for any nested type, including those fully flattened into parent fields.
96  # In parent, nested arrays are inserted as foreign keys instead of formatted values.
97  self._nesting = args.WRITE_OPTIONS.get("nesting")
98 
99  self._checkeds = {} # {topickey/typekey: whether existence checks are done}
100  self._sql_queue = {} # {SQL: [(args), ]}
101  self._nested_counts = {} # {(typename, typehash): count}
102 
103  atexit.register(self.close)
104 
105 
106  def validate(self):
107  """
108  Returns whether args.write_options has valid values, if any.
109 
110  Checks parameters "commit-interval" and "nesting".
111  """
112  ok, sqlconfig_ok = True, SqlMixin._validate_dialect_file(self)
113  if "commit-interval" in self.args.WRITE_OPTIONS:
114  try: ok = int(self.args.WRITE_OPTIONS["commit-interval"]) >= 0
115  except Exception: ok = False
116  if not ok:
117  ConsolePrinter.error("Invalid commit-interval option for %s: %r.",
118  self.ENGINE, self.args.WRITE_OPTIONS["commit-interval"])
119  if self.args.WRITE_OPTIONS.get("nesting") not in (None, False, "", "array", "all"):
120  ConsolePrinter.error("Invalid nesting option for %s: %r. "
121  "Choose one of {array,all}.",
122  self.ENGINE, self.args.WRITE_OPTIONS["nesting"])
123  ok = False
124  return ok and sqlconfig_ok
125 
126 
127  def emit(self, topic, msg, stamp=None, match=None, index=None):
128  """Writes message to database."""
129  if not self.validate(): raise Exception("invalid")
130  if not self.db:
131  self._init_db()
132  stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
133  self._process_type(msg)
134  self._process_topic(topic, msg)
135  self._process_message(topic, msg, stamp)
136  super(BaseDataSink, self).emit(topic, msg, stamp, match, index)
137 
138 
139  def close(self):
140  """Closes database connection, if any, emits metainfo."""
141  try: self.close_output()
142  finally:
143  if not self._close_printed and self._counts:
144  self._close_printed = True
145  if hasattr(self, "format_output_meta"):
146  ConsolePrinter.debug("Wrote %s database for %s",
147  self.ENGINE, self.format_output_meta())
148  else:
149  target = self._make_db_label()
150  ConsolePrinter.debug("Wrote %s in %s to %s database %s.",
151  plural("message", sum(self._counts.values())),
152  plural("topic", self._counts), self.ENGINE, target)
153  if self._nested_counts:
154  ConsolePrinter.debug("Wrote %s in %s to %s.",
155  plural("nested message", sum(self._nested_counts.values())),
156  plural("nested message type", self._nested_counts), self.ENGINE
157  )
158  self._checkeds.clear()
159  self._nested_counts.clear()
160  SqlMixin.close(self)
161  super(BaseDataSink, self).close()
162 
163 
164  def close_output(self):
165  """Closes database connection, if any, executing any pending statements."""
166  if self.db:
167  for sql in list(self._sql_queue):
168  self._executemany(sql, self._sql_queue.pop(sql))
169  self.db.commit()
170  self._cursor.close()
171  self._cursor = None
172  self.db.close()
173  self.db = None
174 
175 
176  def _init_db(self):
177  """Opens database connection, and populates schema if not already existing."""
178  for d in (self._checkeds, self._topics, self._types): d.clear()
179  self._close_printed = False
180 
181  if "commit-interval" in self.args.WRITE_OPTIONS:
182  self.COMMIT_INTERVAL = int(self.args.WRITE_OPTIONS["commit-interval"])
183  self.db = self._connect()
184  self._cursor = self._make_cursor()
185  self._executescript(self._get_dialect_option("base_schema"))
186  self.db.commit()
187  self._load_schema()
188  TYPECOLS = self.MESSAGE_TYPE_TOPICCOLS + self.MESSAGE_TYPE_BASECOLS
189  if self._nesting: TYPECOLS += self.MESSAGE_TYPE_NESTCOLS
190  self._ensure_columns(TYPECOLS)
191 
192 
193  def _load_schema(self):
194  """Populates instance attributes with schema metainfo."""
195  self._cursor.execute("SELECT * FROM topics")
196  for row in self._cursor.fetchall():
197  topickey = (row["name"], row["md5"])
198  self._topics[topickey] = row
199 
200  self._cursor.execute("SELECT * FROM types")
201  for row in self._cursor.fetchall():
202  typekey = (row["type"], row["md5"])
203  self._types[typekey] = row
204 
205 
206  def _process_topic(self, topic, msg):
207  """
208  Inserts topics-row and creates view `/topic/name` if not already existing.
209 
210  Also creates types-row and pkg/MsgType table for this message if not existing.
211  If nesting enabled, creates types recursively.
212  """
213  topickey = api.TypeMeta.make(msg, topic).topickey
214  if topickey in self._checkeds:
215  return
216 
217  if topickey not in self._topics:
218  exclude_cols = list(self.MESSAGE_TYPE_TOPICCOLS)
219  if self._nesting: exclude_cols += self.MESSAGE_TYPE_NESTCOLS
220  tdata = self._make_topic_data(topic, msg, exclude_cols)
221  self._topics[topickey] = tdata
222  self._executescript(tdata["sql"])
223 
224  sql, args = self._make_topic_insert_sql(topic, msg)
225  if self.args.VERBOSE and topickey not in self._counts:
226  ConsolePrinter.debug("Adding topic %s in %s output.", topic, self.ENGINE)
227  self._topics[topickey]["id"] = self._execute_insert(sql, args)
228 
229  if self.COMMIT_INTERVAL: self.db.commit()
230  self._checkeds[topickey] = True
231 
232 
233  def _process_type(self, msg, rootmsg=None):
234  """
235  Creates types-row and pkg/MsgType table if not already existing.
236 
237  @return created types-row, or None if already existed
238  """
239  rootmsg = rootmsg or msg
240  with api.TypeMeta.make(msg, root=rootmsg) as m:
241  typename, typekey = (m.typename, m.typekey)
242  if typekey in self._checkeds:
243  return None
244 
245  if typekey not in self._types:
246  if self.args.VERBOSE and typekey not in self._schema:
247  ConsolePrinter.debug("Adding type %s in %s output.", typename, self.ENGINE)
248  extra_cols = self.MESSAGE_TYPE_TOPICCOLS + self.MESSAGE_TYPE_BASECOLS
249  if self._nesting: extra_cols += self.MESSAGE_TYPE_NESTCOLS
250  tdata = self._make_type_data(msg, extra_cols)
251  self._schema[typekey] = collections.OrderedDict(tdata.pop("cols"))
252  self._types[typekey] = tdata
253 
254  self._executescript(tdata["sql"])
255  sql, args = self._make_type_insert_sql(msg)
256  tdata["id"] = self._execute_insert(sql, args)
257 
258 
259  nested_tables = self._types[typekey].get("nested_tables") or {}
260  nesteds = api.iter_message_fields(msg, messages_only=True) if self._nesting else ()
261  for path, submsgs, subtype in nesteds:
262  scalartype = api.scalar(subtype)
263  if subtype == scalartype and "all" != self._nesting:
264  continue # for path
265 
266  subtypehash = not submsgs and self.source.get_message_type_hash(scalartype)
267  if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
268  [submsg] = submsgs[:1] or [self.source.get_message_class(scalartype, subtypehash)()]
269  subdata = self._process_type(submsg, rootmsg)
270  if subdata: nested_tables[".".join(path)] = subdata["table_name"]
271  if nested_tables:
272  self._types[typekey]["nested_tables"] = nested_tables
273  sets, where = {"nested_tables": nested_tables}, {"id": self._types[typekey]["id"]}
274  sql, args = self._make_update_sql("types", sets, where)
275  self._cursor.execute(sql, args)
276  self._checkeds[typekey] = True
277  return self._types[typekey]
278 
279 
280  def _process_message(self, topic, msg, stamp):
281  """
282  Inserts pkg/MsgType row for this message.
283 
284  Inserts sub-rows for subtypes in message if nesting enabled.
285  Commits transaction if interval due.
286  """
287  self._populate_type(topic, msg, stamp)
288  if self.COMMIT_INTERVAL:
289  do_commit = sum(len(v) for v in self._sql_queue.values()) >= self.COMMIT_INTERVAL
290  for sql in list(self._sql_queue) if do_commit else ():
291  self._executemany(sql, self._sql_queue.pop(sql))
292  do_commit and self.db.commit()
293 
294 
295  def _populate_type(self, topic, msg, stamp,
296  rootmsg=None, parent_type=None, parent_id=None):
297  """
298  Inserts pkg/MsgType row for message.
299 
300  If nesting is enabled, inserts sub-rows for subtypes in message,
301  and returns inserted ID.
302  """
303  rootmsg = rootmsg or msg
304  with api.TypeMeta.make(msg, root=rootmsg) as m:
305  typename, typekey = m.typename, m.typekey
306  with api.TypeMeta.make(rootmsg) as m:
307  topic_id = self._topics[m.topickey]["id"]
308  table_name = self._types[typekey]["table_name"]
309 
310  myid = self._get_next_id(table_name) if self._nesting else None
311  coldefs = self.MESSAGE_TYPE_TOPICCOLS + self.MESSAGE_TYPE_BASECOLS[:-1]
312  colvals = [topic, topic_id, api.to_datetime(stamp), api.to_nsec(stamp)]
313  if self._nesting:
314  coldefs += self.MESSAGE_TYPE_BASECOLS[-1:] + self.MESSAGE_TYPE_NESTCOLS
315  colvals += [myid, parent_type, parent_id]
316  extra_cols = list(zip([c for c, _ in coldefs], colvals))
317  sql, args = self._make_message_insert_sql(topic, msg, extra_cols)
318  self._ensure_execute(sql, args)
319  if parent_type: self._nested_counts[typekey] = self._nested_counts.get(typekey, 0) + 1
320 
321  subids = {} # {message field path: [ids]}
322  nesteds = api.iter_message_fields(msg, messages_only=True) if self._nesting else ()
323  for subpath, submsgs, subtype in nesteds:
324  scalartype = api.scalar(subtype)
325  if subtype == scalartype and "all" != self._nesting:
326  continue # for subpath
327  if isinstance(submsgs, (list, tuple)):
328  subids[subpath] = []
329  for submsg in submsgs if isinstance(submsgs, (list, tuple)) else [submsgs]:
330  subid = self._populate_type(topic, submsg, stamp, rootmsg, typename, myid)
331  if isinstance(submsgs, (list, tuple)):
332  subids[subpath].append(subid)
333  if subids:
334  sets, where = {".".join(p): subids[p] for p in subids}, {"_id": myid}
335  sql, args = self._make_update_sql(table_name, sets, where)
336  self._ensure_execute(sql, args)
337  return myid
338 
339 
340  def _ensure_columns(self, cols):
341  """Adds specified columns to any type tables lacking them."""
342  sqls = []
343  for typekey, typecols in ((k, v) for k, v in self._schema.items() if k in self._types):
344  table_name = self._types[typekey]["table_name"]
345  for c, t in ((c, t) for c, t in cols if c not in typecols):
346  sql = "ALTER TABLE %s ADD COLUMN %s %s;" % (quote(table_name), c, t)
347  sqls.append(sql)
348  typecols[c] = t
349  if sqls:
350  self._executescript("\n".join(sqls))
351  self.db.commit()
352 
353 
354  def _ensure_execute(self, sql, args):
355  """Executes SQL if in autocommit mode, else caches arguments for batch execution."""
356  args = tuple(args) if isinstance(args, list) else args
357  if self.COMMIT_INTERVAL:
358  self._sql_queue.setdefault(sql, []).append(args)
359  else:
360  self._cursor.execute(sql, args)
361 
362 
363  def _connect(self):
364  """Returns new database connection."""
365  raise NotImplementedError()
366 
367 
368  def _execute_insert(self, sql, args):
369  """Executes INSERT statement, returns inserted ID."""
370  raise NotImplementedError()
371 
372 
373  def _executemany(self, sql, argses):
374  """Executes SQL with all args sequences."""
375  raise NotImplementedError()
376 
377 
378  def _executescript(self, sql):
379  """Executes SQL with one or more statements."""
380  raise NotImplementedError()
381 
382 
383  def _get_next_id(self, table):
384  """Returns next ID value for table."""
385  raise NotImplementedError()
386 
387 
388  def _make_cursor(self):
389  """Returns new database cursor."""
390  return self.db.cursor()
391 
392 
393  def _make_db_label(self):
394  """Returns formatted label for database."""
395  return self.args.WRITE
396 
397 
398 __all__ = ["BaseDataSink"]
grepros.api.get_message_class
def get_message_class(typename)
Definition: api.py:727
grepros.plugins.auto.dbbase.BaseDataSink.MESSAGE_TYPE_BASECOLS
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
Definition: dbbase.py:55
grepros.plugins.auto.dbbase.BaseDataSink.db
db
Database connection.
Definition: dbbase.py:87
grepros.plugins.auto.sqlbase.SqlMixin._make_type_insert_sql
def _make_type_insert_sql(self, msg)
Definition: sqlbase.py:200
grepros.plugins.auto.dbbase.BaseDataSink.emit
def emit(self, topic, msg, stamp=None, match=None, index=None)
Definition: dbbase.py:127
grepros.plugins.auto.dbbase.BaseDataSink._dialect
_dialect
Definition: dbbase.py:90
grepros.plugins.auto.dbbase.BaseDataSink._checkeds
_checkeds
Definition: dbbase.py:99
generate_msgs.plural
def plural(word, items)
Definition: generate_msgs.py:405
grepros.outputs.Sink.args
args
Definition: outputs.py:50
grepros.plugins.auto.dbbase.BaseDataSink.MESSAGE_TYPE_TOPICCOLS
list MESSAGE_TYPE_TOPICCOLS
Default topic-related columns for pkg/MsgType tables.
Definition: dbbase.py:52
grepros.plugins.auto.sqlbase.SqlMixin._make_message_insert_sql
def _make_message_insert_sql(self, topic, msg, extra_cols=())
Definition: sqlbase.py:211
grepros.plugins.auto.dbbase.BaseDataSink.validate
def validate(self)
Definition: dbbase.py:106
grepros.plugins.auto.dbbase.BaseDataSink._init_db
def _init_db(self)
Definition: dbbase.py:176
grepros.api.get_message_type_hash
def get_message_type_hash(msg_or_type)
Definition: api.py:741
grepros.plugins.auto.dbbase.BaseDataSink._sql_queue
_sql_queue
Definition: dbbase.py:100
grepros.plugins.auto.dbbase.BaseDataSink._execute_insert
def _execute_insert(self, sql, args)
Definition: dbbase.py:368
grepros.plugins.auto.dbbase.BaseDataSink._process_type
def _process_type(self, msg, rootmsg=None)
Definition: dbbase.py:233
grepros.plugins.auto.dbbase.BaseDataSink._connect
def _connect(self)
Definition: dbbase.py:363
grepros.outputs.Sink
Definition: outputs.py:32
grepros.plugins.auto.dbbase.BaseDataSink.__init__
def __init__(self, args=None, **kwargs)
Definition: dbbase.py:67
grepros.plugins.auto.sqlbase.SqlMixin._get_dialect_option
def _get_dialect_option(self, option)
Definition: sqlbase.py:371
grepros.plugins.auto.dbbase.BaseDataSink.MESSAGE_TYPE_NESTCOLS
list MESSAGE_TYPE_NESTCOLS
Additional default columns for pkg/MsgType tables with nesting output.
Definition: dbbase.py:60
grepros.plugins.auto.dbbase.BaseDataSink._ensure_execute
def _ensure_execute(self, sql, args)
Definition: dbbase.py:354
grepros.plugins.auto.dbbase.BaseDataSink.ENGINE
ENGINE
Database engine name, overridden in subclasses.
Definition: dbbase.py:46
grepros.outputs.Sink.source
source
inputs.Source instance bound to this sink
Definition: outputs.py:54
grepros.plugins.auto.sqlbase.SqlMixin._topics
_topics
Definition: sqlbase.py:50
grepros.plugins.auto.sqlbase.SqlMixin._make_topic_data
def _make_topic_data(self, topic, msg, exclude_cols=())
Definition: sqlbase.py:122
grepros.outputs.Sink._counts
_counts
Definition: outputs.py:48
grepros.outputs.Sink.validate
def validate(self)
Definition: outputs.py:88
grepros.common.ensure_namespace
def ensure_namespace(val, defaults=None, dashify=("WRITE_OPTIONS",), **kwargs)
Definition: common.py:658
grepros.plugins.auto.dbbase.BaseDataSink._executemany
def _executemany(self, sql, argses)
Definition: dbbase.py:373
grepros.plugins.auto.dbbase.BaseDataSink._make_db_label
def _make_db_label(self)
Definition: dbbase.py:393
grepros.plugins.auto.dbbase.BaseDataSink.COMMIT_INTERVAL
int COMMIT_INTERVAL
Number of emits between commits; 0 is autocommit.
Definition: dbbase.py:49
grepros.outputs.Sink.close
def close(self)
Definition: outputs.py:93
grepros.plugins.auto.dbbase.BaseDataSink._get_next_id
def _get_next_id(self, table)
Definition: dbbase.py:383
grepros.plugins.auto.dbbase.BaseDataSink._make_cursor
def _make_cursor(self)
Definition: dbbase.py:388
grepros.plugins.auto.sqlbase.SqlMixin._make_update_sql
def _make_update_sql(self, table, values, where=())
Definition: sqlbase.py:238
grepros.plugins.auto.dbbase.BaseDataSink.close_output
def close_output(self)
Definition: dbbase.py:164
grepros.plugins.auto.dbbase.BaseDataSink
Definition: dbbase.py:24
grepros.plugins.auto.dbbase.BaseDataSink._process_topic
def _process_topic(self, topic, msg)
Definition: dbbase.py:206
grepros.plugins.auto.dbbase.BaseDataSink._close_printed
_close_printed
Definition: dbbase.py:91
grepros.plugins.auto.sqlbase.quote
def quote(name, force=False)
Definition: sqlbase.py:631
grepros.plugins.auto.dbbase.BaseDataSink._nested_counts
_nested_counts
Definition: dbbase.py:101
grepros.plugins.auto.dbbase.BaseDataSink._ensure_columns
def _ensure_columns(self, cols)
Definition: dbbase.py:340
grepros.plugins.auto.dbbase.BaseDataSink._cursor
_cursor
Definition: dbbase.py:89
grepros.plugins.auto.dbbase.BaseDataSink._process_message
def _process_message(self, topic, msg, stamp)
Definition: dbbase.py:280
grepros.plugins.auto.sqlbase.SqlMixin._make_type_data
def _make_type_data(self, msg, extra_cols=(), rootmsg=None)
Definition: sqlbase.py:152
grepros.plugins.auto.sqlbase.SqlMixin._make_topic_insert_sql
def _make_topic_insert_sql(self, topic, msg)
Definition: sqlbase.py:189
grepros.plugins.auto.sqlbase.SqlMixin
Definition: sqlbase.py:25
grepros.plugins.auto.dbbase.BaseDataSink.close
def close(self)
Definition: dbbase.py:139
grepros.plugins.auto.sqlbase.SqlMixin._schema
_schema
Definition: sqlbase.py:52
grepros.outputs.Sink._ensure_stamp_index
def _ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition: outputs.py:115
grepros.plugins.auto.dbbase.BaseDataSink._load_schema
def _load_schema(self)
Definition: dbbase.py:193
grepros.plugins.auto.dbbase.BaseDataSink._populate_type
def _populate_type(self, topic, msg, stamp, rootmsg=None, parent_type=None, parent_id=None)
Definition: dbbase.py:295
grepros.plugins.auto.sqlbase.SqlMixin._types
_types
Definition: sqlbase.py:51
grepros.plugins.auto.dbbase.BaseDataSink._nesting
_nesting
Definition: dbbase.py:97
grepros.plugins.auto.dbbase.BaseDataSink._executescript
def _executescript(self, sql)
Definition: dbbase.py:378


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29