3 SQL schema output plugin.
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
20 from ..
import __title__
24 from .. common
import ConsolePrinter, plural
25 from .. outputs
import Sink
26 from . auto.sqlbase
import SqlMixin
32 Writes SQL schema file for message type tables and topic views.
35 - table "pkg/MsgType" for each topic message type, with ordinary columns for
36 scalar fields, and structured columns for list fields;
37 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
39 If launched with nesting-option, tables will also be created for each
42 - view "/full/topic/name" for each topic, selecting from the message type table
46 FILE_EXTENSIONS = (
".sql", )
49 MESSAGE_TYPE_BASECOLS = [(
"_topic",
"string"),
50 (
"_timestamp",
"time"), ]
53 DEFAULT_ARGS = dict(WRITE_OPTIONS={}, VERBOSE=
False)
58 @param args arguments as namespace or dictionary, case-insensitive;
59 or a single path as the file to write
60 @param args.write output file path
61 @param args.write_options ```
62 {"dialect": SQL dialect if not default,
63 "nesting": true|false to created nested type tables,
64 "overwrite": whether to overwrite existing file
67 @param args.meta whether to emit metainfo
68 @param args.verbose whether to emit debug information
69 @param kwargs any and all arguments as keyword overrides, case-insensitive
71 args = {
"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
72 args = common.ensure_namespace(args, SqlSink.DEFAULT_ARGS, **kwargs)
74 SqlMixin.__init__(self, args)
81 self.
_overwrite = (args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
87 self.
_nesting = args.WRITE_OPTIONS.get(
"nesting")
89 atexit.register(self.
close)
94 Returns whether "dialect" and "nesting" and "overwrite" parameters contain supported values
98 ok, sqlconfig_ok =
True, SqlMixin.validate(self)
99 if self.
args.WRITE_OPTIONS.get(
"nesting")
not in (
None,
"array",
"all"):
100 ConsolePrinter.error(
"Invalid nesting option for SQL: %r. "
101 "Choose one of {array,all}.",
102 self.
args.WRITE_OPTIONS[
"nesting"])
104 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
105 ConsolePrinter.error(
"Invalid overwrite option for SQL: %r. "
106 "Choose one of {true, false}.",
107 self.
args.WRITE_OPTIONS[
"overwrite"])
109 if not common.verify_io(self.
args.WRITE,
"w"):
115 def emit(self, topic, msg, stamp=None, match=None, index=None):
116 """Writes out message type CREATE TABLE statements to SQL schema file."""
117 if not self.
validate():
raise Exception(
"invalid")
118 batch = self.
source.get_batch()
128 """Rewrites out everything to SQL schema file, ensuring all source metas."""
133 for key
in sorted(self.
_types):
135 for key
in sorted(self.
_topics):
142 try: sz = common.format_bytes(os.path.getsize(self.
_filename))
143 except Exception
as e:
144 ConsolePrinter.warn(
"Error getting size of %s: %s", self.
_filename, e)
145 sz =
"error getting size"
146 ConsolePrinter.debug(
"Wrote %s and %s to SQL %s (%s).",
147 plural(
"message type table",
151 ConsolePrinter.debug(
"Wrote %s to SQL %s.",
157 super(SqlSink, self).
close()
161 """Opens output file if not already open, writes header."""
162 if self.
_file:
return
165 common.makedirs(os.path.dirname(self.
_filename))
166 if self.
args.VERBOSE:
168 action =
"Overwriting" if sz
and self.
_overwrite else "Creating"
169 ConsolePrinter.debug(
"%s %s.", action, self.
_filename)
176 """Builds and writes CREATE VIEW statement for topic if not already built."""
177 topickey = api.TypeMeta.make(msg, topic).topickey
187 Builds and writes CREATE TABLE statement for message type if not already built.
189 Builds statements recursively for nested types if configured.
191 @return built SQL, or None if already built
193 rootmsg = rootmsg
or msg
194 typekey = api.TypeMeta.make(msg, root=rootmsg).typekey
195 if typekey
in self.
_types:
198 extra_cols = [(c, self.
_make_column_type(t, fallback=
"int64" if "time" == t
else None))
201 self.
_schema[typekey] = collections.OrderedDict(self.
_types[typekey].pop(
"cols"))
205 return self.
_types[typekey][
"sql"]
209 """Builds anr writes CREATE TABLE statements for nested types."""
210 nesteds = api.iter_message_fields(msg, messages_only=
True)
if self.
_nesting else ()
211 for path, submsgs, subtype
in nesteds:
212 scalartype = api.scalar(subtype)
213 if subtype == scalartype
and "all" != self.
_nesting:
continue
216 subtypekey = (scalartype, subtypehash)
217 if subtypekey
in self.
_types:
continue
219 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
226 """Writes header to current file."""
227 values = {
"title": __title__,
229 "args":
" ".join(main.CLI_ARGS
or []),
230 "source":
"\n\n".join(
"-- Source:\n" +
231 "\n".join(
"-- " + x
for x
in s.strip().splitlines())
233 "dt": datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M"), }
235 "-- SQL dialect: {dialect}.\n"
236 "-- Written by {title} on {dt}.\n"
237 ).format(**values).encode(
"utf-8"))
239 self.
_file.write(
"-- Command: grepros {args}.\n".format(**values).encode(
"utf-8"))
240 self.
_file.write(
"\n{source}\n\n".format(**values).encode(
"utf-8"))
244 """Writes table or view SQL statement to file."""
245 self.
_file.write(b
"\n")
246 if "table" == category:
247 self.
_file.write((
"-- Message type %(type)s (%(md5)s)\n--\n" % item).encode(
"utf-8"))
248 self.
_file.write((
"-- %s\n" %
"\n-- ".join(item[
"definition"].splitlines())).encode(
"utf-8"))
250 self.
_file.write((
'-- Topic "%(name)s": %(type)s (%(md5)s)\n' % item).encode(
"utf-8"))
251 self.
_file.write((
"%s\n\n" % item[
"sql"]).encode(
"utf-8"))
256 """Adds SQL schema output format support."""
257 from ..
import plugins
258 plugins.add_write_format(
"sql", SqlSink,
"SQL", [
259 (
"dialect=" +
"|".join(sorted(filter(bool, SqlSink.DIALECTS))),
260 "use specified SQL dialect in SQL output\n"
261 '(default "%s")' % SqlSink.DEFAULT_DIALECT),
262 (
"dialect-file=path/to/dialects.yaml",
263 "load additional SQL dialects\n"
264 "for SQL output, from a YAML or JSON file"),
265 (
"nesting=array|all",
"create tables for nested message types\n"
267 'only for arrays if "array" \n'
268 "else for any nested types"),
269 (
"overwrite=true|false",
"overwrite existing file in SQL output\n"
270 "instead of appending unique counter (default false)")
274 __all__ = [
"SqlSink",
"init"]