sql.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 SQL schema output plugin.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 20.12.2021
11 @modified 28.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 import atexit
16 import collections
17 import datetime
18 import os
19 
20 from .. import __title__
21 from .. import api
22 from .. import common
23 from .. import main
24 from .. common import ConsolePrinter, plural
25 from .. outputs import Sink
26 from . auto.sqlbase import SqlMixin
27 
28 
29 
31  """
32  Writes SQL schema file for message type tables and topic views.
33 
34  Output will have:
35  - table "pkg/MsgType" for each topic message type, with ordinary columns for
36  scalar fields, and structured columns for list fields;
37  plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
38 
39  If launched with nesting-option, tables will also be created for each
40  nested message type.
41 
42  - view "/full/topic/name" for each topic, selecting from the message type table
43  """
44 
45 
46  FILE_EXTENSIONS = (".sql", )
47 
48 
49  MESSAGE_TYPE_BASECOLS = [("_topic", "string"),
50  ("_timestamp", "time"), ]
51 
52 
53  DEFAULT_ARGS = dict(WRITE_OPTIONS={}, VERBOSE=False)
54 
55 
56  def __init__(self, args=None, **kwargs):
57  """
58  @param args arguments as namespace or dictionary, case-insensitive;
59  or a single path as the file to write
60  @param args.write output file path
61  @param args.write_options ```
62  {"dialect": SQL dialect if not default,
63  "nesting": true|false to created nested type tables,
64  "overwrite": whether to overwrite existing file
65  (default false)}
66  ```
67  @param args.meta whether to emit metainfo
68  @param args.verbose whether to emit debug information
69  @param kwargs any and all arguments as keyword overrides, case-insensitive
70  """
71  args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
72  args = common.ensure_namespace(args, SqlSink.DEFAULT_ARGS, **kwargs)
73  super(SqlSink, self).__init__(args)
74  SqlMixin.__init__(self, args)
75 
76  self._filename = None # Unique output filename
77  self._file = None # Open file() object
78  self._batch = None # Current source batch
79  self._nested_types = {} # {(typename, typehash): "CREATE TABLE .."}
80  self._batch_metas = [] # [source batch metainfo string, ]
81  self._overwrite = (args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
82  self._close_printed = False
83 
84  # Whether to create tables for nested message types,
85  # "array" if to do this only for arrays of nested types, or
86  # "all" for any nested type, including those fully flattened into parent fields.
87  self._nesting = args.WRITE_OPTIONS.get("nesting")
88 
89  atexit.register(self.close)
90 
91 
92  def validate(self):
93  """
94  Returns whether "dialect" and "nesting" and "overwrite" parameters contain supported values
95  and file is writable.
96  """
97  if self.valid is not None: return self.valid
98  ok, sqlconfig_ok = True, SqlMixin.validate(self)
99  if self.args.WRITE_OPTIONS.get("nesting") not in (None, "array", "all"):
100  ConsolePrinter.error("Invalid nesting option for SQL: %r. "
101  "Choose one of {array,all}.",
102  self.args.WRITE_OPTIONS["nesting"])
103  ok = False
104  if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
105  ConsolePrinter.error("Invalid overwrite option for SQL: %r. "
106  "Choose one of {true, false}.",
107  self.args.WRITE_OPTIONS["overwrite"])
108  ok = False
109  if not common.verify_io(self.args.WRITE, "w"):
110  ok = False
111  self.valid = sqlconfig_ok and ok
112  return self.valid
113 
114 
115  def emit(self, topic, msg, stamp=None, match=None, index=None):
116  """Writes out message type CREATE TABLE statements to SQL schema file."""
117  if not self.validate(): raise Exception("invalid")
118  batch = self.source.get_batch()
119  if not self._batch_metas or batch != self._batch:
120  self._batch = batch
121  self._batch_metas.append(self.source.format_meta())
122  self._ensure_open()
123  self._process_type(msg)
124  self._process_topic(topic, msg)
125 
126 
127  def close(self):
128  """Rewrites out everything to SQL schema file, ensuring all source metas."""
129  try:
130  if self._file:
131  self._file.seek(0)
132  self._write_header()
133  for key in sorted(self._types):
134  self._write_entity("table", self._types[key])
135  for key in sorted(self._topics):
136  self._write_entity("view", self._topics[key])
137  self._file.close()
138  self._file = None
139  finally:
140  if not self._close_printed and self._types:
141  self._close_printed = True
142  try: sz = common.format_bytes(os.path.getsize(self._filename))
143  except Exception as e:
144  ConsolePrinter.warn("Error getting size of %s: %s", self._filename, e)
145  sz = "error getting size"
146  ConsolePrinter.debug("Wrote %s and %s to SQL %s (%s).",
147  plural("message type table",
148  len(self._types) - len(self._nested_types)),
149  plural("topic view", self._topics), self._filename, sz)
150  if self._nested_types:
151  ConsolePrinter.debug("Wrote %s to SQL %s.",
152  plural("nested message type table", self._nested_types),
153  self._filename)
154  self._nested_types.clear()
155  del self._batch_metas[:]
156  SqlMixin.close(self)
157  super(SqlSink, self).close()
158 
159 
160  def _ensure_open(self):
161  """Opens output file if not already open, writes header."""
162  if self._file: return
163 
164  self._filename = self.args.WRITE if self._overwrite else common.unique_path(self.args.WRITE)
165  common.makedirs(os.path.dirname(self._filename))
166  if self.args.VERBOSE:
167  sz = os.path.exists(self._filename) and os.path.getsize(self._filename)
168  action = "Overwriting" if sz and self._overwrite else "Creating"
169  ConsolePrinter.debug("%s %s.", action, self._filename)
170  self._file = open(self._filename, "wb")
171  self._write_header()
172  self._close_printed = False
173 
174 
175  def _process_topic(self, topic, msg):
176  """Builds and writes CREATE VIEW statement for topic if not already built."""
177  topickey = api.TypeMeta.make(msg, topic).topickey
178  if topickey in self._topics:
179  return
180 
181  self._topics[topickey] = self._make_topic_data(topic, msg)
182  self._write_entity("view", self._topics[topickey])
183 
184 
185  def _process_type(self, msg, rootmsg=None):
186  """
187  Builds and writes CREATE TABLE statement for message type if not already built.
188 
189  Builds statements recursively for nested types if configured.
190 
191  @return built SQL, or None if already built
192  """
193  rootmsg = rootmsg or msg
194  typekey = api.TypeMeta.make(msg, root=rootmsg).typekey
195  if typekey in self._types:
196  return None
197 
198  extra_cols = [(c, self._make_column_type(t, fallback="int64" if "time" == t else None))
199  for c, t in self.MESSAGE_TYPE_BASECOLS]
200  self._types[typekey] = self._make_type_data(msg, extra_cols, rootmsg)
201  self._schema[typekey] = collections.OrderedDict(self._types[typekey].pop("cols"))
202 
203  self._write_entity("table", self._types[typekey])
204  if self._nesting: self._process_nested(msg, rootmsg)
205  return self._types[typekey]["sql"]
206 
207 
208  def _process_nested(self, msg, rootmsg):
209  """Builds anr writes CREATE TABLE statements for nested types."""
210  nesteds = api.iter_message_fields(msg, messages_only=True) if self._nesting else ()
211  for path, submsgs, subtype in nesteds:
212  scalartype = api.scalar(subtype)
213  if subtype == scalartype and "all" != self._nesting: continue # for path
214 
215  subtypehash = self.source.get_message_type_hash(scalartype)
216  subtypekey = (scalartype, subtypehash)
217  if subtypekey in self._types: continue # for path
218 
219  if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
220  [submsg] = submsgs[:1] or [self.source.get_message_class(scalartype, subtypehash)()]
221  subsql = self._process_type(submsg, rootmsg)
222  if subsql: self._nested_types[subtypekey] = subsql
223 
224 
225  def _write_header(self):
226  """Writes header to current file."""
227  values = {"title": __title__,
228  "dialect": self._dialect,
229  "args": " ".join(main.CLI_ARGS or []),
230  "source": "\n\n".join("-- Source:\n" +
231  "\n".join("-- " + x for x in s.strip().splitlines())
232  for s in self._batch_metas),
233  "dt": datetime.datetime.now().strftime("%Y-%m-%d %H:%M"), }
234  self._file.write((
235  "-- SQL dialect: {dialect}.\n"
236  "-- Written by {title} on {dt}.\n"
237  ).format(**values).encode("utf-8"))
238  if values["args"]:
239  self._file.write("-- Command: grepros {args}.\n".format(**values).encode("utf-8"))
240  self._file.write("\n{source}\n\n".format(**values).encode("utf-8"))
241 
242 
243  def _write_entity(self, category, item):
244  """Writes table or view SQL statement to file."""
245  self._file.write(b"\n")
246  if "table" == category:
247  self._file.write(("-- Message type %(type)s (%(md5)s)\n--\n" % item).encode("utf-8"))
248  self._file.write(("-- %s\n" % "\n-- ".join(item["definition"].splitlines())).encode("utf-8"))
249  else:
250  self._file.write(('-- Topic "%(name)s": %(type)s (%(md5)s)\n' % item).encode("utf-8"))
251  self._file.write(("%s\n\n" % item["sql"]).encode("utf-8"))
252 
253 
254 
255 def init(*_, **__):
256  """Adds SQL schema output format support."""
257  from .. import plugins # Late import to avoid circular
258  plugins.add_write_format("sql", SqlSink, "SQL", [
259  ("dialect=" + "|".join(sorted(filter(bool, SqlSink.DIALECTS))),
260  "use specified SQL dialect in SQL output\n"
261  '(default "%s")' % SqlSink.DEFAULT_DIALECT),
262  ("dialect-file=path/to/dialects.yaml",
263  "load additional SQL dialects\n"
264  "for SQL output, from a YAML or JSON file"),
265  ("nesting=array|all", "create tables for nested message types\n"
266  "in SQL output,\n"
267  'only for arrays if "array" \n'
268  "else for any nested types"),
269  ("overwrite=true|false", "overwrite existing file in SQL output\n"
270  "instead of appending unique counter (default false)")
271  ])
272 
273 
274 __all__ = ["SqlSink", "init"]
grepros.api.get_message_class
def get_message_class(typename)
Definition: api.py:727
grepros.plugins.sql.SqlSink.validate
def validate(self)
Definition: sql.py:92
grepros.plugins.sql.SqlSink.emit
def emit(self, topic, msg, stamp=None, match=None, index=None)
Definition: sql.py:115
generate_msgs.plural
def plural(word, items)
Definition: generate_msgs.py:405
grepros.outputs.Sink.args
args
Definition: outputs.py:50
grepros.plugins.sql.SqlSink._process_nested
def _process_nested(self, msg, rootmsg)
Definition: sql.py:208
grepros.api.get_message_type_hash
def get_message_type_hash(msg_or_type)
Definition: api.py:741
grepros.plugins.sql.SqlSink._batch
_batch
Definition: sql.py:78
grepros.plugins.sql.SqlSink._filename
_filename
Definition: sql.py:76
grepros.plugins.auto.sqlbase.SqlMixin._make_column_type
def _make_column_type(self, typename, fallback=None)
Definition: sqlbase.py:317
grepros.plugins.sql.SqlSink._close_printed
_close_printed
Definition: sql.py:82
grepros.plugins.sql.SqlSink._overwrite
_overwrite
Definition: sql.py:81
grepros.plugins.sql.SqlSink._write_header
def _write_header(self)
Definition: sql.py:225
grepros.outputs.Sink
Definition: outputs.py:32
grepros.plugins.auto.sqlbase.SqlMixin._dialect
_dialect
Definition: sqlbase.py:54
grepros.plugins.auto.sqlbase.SqlMixin._nesting
_nesting
Definition: sqlbase.py:55
grepros.plugins.sql.SqlSink.__init__
def __init__(self, args=None, **kwargs)
Definition: sql.py:56
grepros.plugins.sql.SqlSink._file
_file
Definition: sql.py:77
grepros.outputs.Sink.valid
valid
Result of validate()
Definition: outputs.py:52
grepros.outputs.Sink.source
source
inputs.Source instance bound to this sink
Definition: outputs.py:54
grepros.plugins.auto.sqlbase.SqlMixin._topics
_topics
Definition: sqlbase.py:50
grepros.plugins.sql.SqlSink._process_topic
def _process_topic(self, topic, msg)
Definition: sql.py:175
grepros.plugins.auto.sqlbase.SqlMixin._make_topic_data
def _make_topic_data(self, topic, msg, exclude_cols=())
Definition: sqlbase.py:122
grepros.plugins.sql.SqlSink._write_entity
def _write_entity(self, category, item)
Definition: sql.py:243
grepros.outputs.Sink.validate
def validate(self)
Definition: outputs.py:88
grepros.plugins.sql.SqlSink.MESSAGE_TYPE_BASECOLS
list MESSAGE_TYPE_BASECOLS
Default columns for message type tables, as [(column name, ROS type)].
Definition: sql.py:49
grepros.plugins.sql.SqlSink.close
def close(self)
Definition: sql.py:127
grepros.plugins.sql.SqlSink._batch_metas
_batch_metas
Definition: sql.py:80
grepros.plugins.sql.SqlSink._process_type
def _process_type(self, msg, rootmsg=None)
Definition: sql.py:185
grepros.outputs.Sink.close
def close(self)
Definition: outputs.py:93
grepros.plugins.sql.init
def init(*_, **__)
Definition: sql.py:255
grepros.plugins.sql.SqlSink
Definition: sql.py:30
grepros.plugins.auto.sqlbase.SqlMixin._make_type_data
def _make_type_data(self, msg, extra_cols=(), rootmsg=None)
Definition: sqlbase.py:152
grepros.plugins.auto.sqlbase.SqlMixin
Definition: sqlbase.py:25
grepros.plugins.auto.sqlbase.SqlMixin._schema
_schema
Definition: sqlbase.py:52
grepros.plugins.sql.SqlSink._ensure_open
def _ensure_open(self)
Definition: sql.py:160
grepros.plugins.auto.sqlbase.SqlMixin._types
_types
Definition: sqlbase.py:51
grepros.plugins.sql.SqlSink._nested_types
_nested_types
Definition: sql.py:79


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29