3 Postgres output plugin.
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
20 import psycopg2.extensions
21 import psycopg2.extras
26 from ... common
import ConsolePrinter
27 from . dbbase
import BaseDataSink, quote
32 Writes messages to a Postgres database.
35 - table "topics", with topic and message type names
36 - table "types", with message type definitions
39 - table "pkg/MsgType" for each topic message type, with detailed fields,
40 BYTEA fields for uint8[], array fields for scalar list attributes,
41 and JSONB fields for lists of ROS messages;
42 with foreign keys if nesting else subtype values as JSON dictionaries;
43 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
44 If not nesting, only topic message type tables are created.
45 - view "/full/topic/name" for each topic, selecting from the message type table
47 If a message type table already exists but for a type with a different MD5 hash,
48 the new table will have its MD5 hash appended to end, as "pkg/MsgType (hash)".
55 ID_SEQUENCE_STEP = 100
58 SELECT_TYPE_COLUMNS =
"""
59 SELECT c.table_name, c.column_name, c.data_type
60 FROM information_schema.columns c INNER JOIN information_schema.tables t
61 ON t.table_name = c.table_name
62 WHERE c.table_schema = current_schema() AND t.table_type = 'BASE TABLE' AND
63 c.table_name LIKE '%/%'
64 ORDER BY c.table_name, CAST(c.dtd_identifier AS INTEGER)
68 MESSAGE_TYPE_TOPICCOLS = [(
"_topic",
"TEXT"),
69 (
"_topic_id",
"BIGINT"), ]
71 MESSAGE_TYPE_BASECOLS = [(
"_dt",
"TIMESTAMP"),
72 (
"_timestamp",
"NUMERIC"),
73 (
"_id",
"BIGSERIAL PRIMARY KEY"), ]
75 MESSAGE_TYPE_NESTCOLS = [(
"_parent_type",
"TEXT"),
76 (
"_parent_id",
"BIGINT"), ]
81 @param args arguments as namespace or dictionary, case-insensitive;
82 or a single item as the database connection string
83 @param args.write Postgres connection string like "postgresql://user@host/db"
84 @param args.write_options ```
85 {"commit-interval": transaction size (0 is autocommit),
86 "nesting": "array" to recursively insert arrays
87 of nested types, or "all" for any nesting)}
89 @param args.meta whether to emit metainfo
90 @param args.verbose whether to emit debug information
91 @param kwargs any and all arguments as keyword overrides, case-insensitive
93 super(PostgresSink, self).
__init__(args, **kwargs)
94 self.
_id_queue = collections.defaultdict(collections.deque)
99 Returns whether Postgres driver is available,
100 and "commit-interval" and "nesting" in args.write_options have valid value, if any,
101 and database is connectable.
104 db_ok, driver_ok, config_ok =
False, bool(psycopg2), super(PostgresSink, self).
validate()
106 ConsolePrinter.error(
"psycopg2 not available: cannot write to Postgres.")
110 except Exception
as e:
111 ConsolePrinter.error(
"Error connecting Postgres: %s", e)
112 self.
valid = db_ok
and driver_ok
and config_ok
117 """Opens the database file, and populates schema if not already existing."""
118 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
119 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
120 psycopg2.extras.register_default_jsonb(globally=
True, loads=json.loads)
121 super(PostgresSink, self).
_init_db()
125 """Populates instance attributes with schema metainfo."""
128 for row
in self.
_cursor.fetchall():
129 typerow = next((x
for x
in self.
_types.values()
130 if x[
"table_name"] == row[
"table_name"]),
None)
131 if not typerow:
continue
132 typekey = (typerow[
"type"], typerow[
"md5"])
133 self.
_schema.setdefault(typekey, collections.OrderedDict())
134 self.
_schema[typekey][row[
"column_name"]] = row[
"data_type"]
138 """Returns new database connection."""
139 factory = psycopg2.extras.RealDictCursor
140 return psycopg2.connect(self.
args.WRITE, cursor_factory=factory)
144 """Executes INSERT statement, returns inserted ID."""
145 self.
_cursor.execute(sql, args)
146 return self.
_cursor.fetchone()[
"id"]
150 """Executes SQL with all args sequences."""
151 psycopg2.extras.execute_batch(self.
_cursor, sql, argses)
155 """Executes SQL with one or more statements."""
160 """Returns next cached ID value, re-populating empty cache from sequence."""
164 if MAXLEN: seqbase = seqbase[:MAXLEN - len(seqsuffix)]
165 sql =
"SELECT nextval('%s') AS id" %
quote(seqbase + seqsuffix)
173 """Returns column value suitable for inserting to database."""
175 plaintype = api.scalar(typename)
178 replace = {float(
"inf"):
None, float(
"-inf"):
None, float(
"nan"):
None}
180 v = psycopg2.extras.Json(v, json.dumps)
181 elif isinstance(v, (list, tuple)):
182 scalartype = api.scalar(typename)
183 if scalartype
in api.ROS_TIME_TYPES:
185 elif scalartype
not in api.ROS_BUILTIN_TYPES:
187 else: v = psycopg2.extras.Json([api.message_to_dict(m, replace)
188 for m
in v], json.dumps)
189 elif "BYTEA" in (TYPES.get(typename),
190 TYPES.get(api.canonical(typename, unbounded=
True))):
191 v = psycopg2.Binary(bytes(bytearray(v)))
194 elif api.is_ros_time(v):
196 elif plaintype
not in api.ROS_BUILTIN_TYPES:
197 v = psycopg2.extras.Json(api.message_to_dict(v, replace), json.dumps)
204 """Returns formatted label for database."""
205 target = self.
args.WRITE
206 if not target.startswith((
"postgres://",
"postgresql://")): target = repr(target)
212 """Returns true if target is recognizable as a Postgres connection string."""
213 return (target
or "").startswith((
"postgres://",
"postgresql://"))
218 """Adds Postgres output format support."""
219 from ...
import plugins
220 plugins.add_write_format(
"postgres", PostgresSink,
"Postgres", [
221 (
"commit-interval=NUM",
"transaction size for Postgres output\n"
222 "(default 1000, 0 is autocommit)"),
223 (
"dialect-file=path/to/dialects.yaml",
224 "load additional SQL dialect options\n"
225 "for Postgres output\n"
226 "from a YAML or JSON file"),
227 (
"nesting=array|all",
"create tables for nested message types\n"
228 "in Postgres output,\n"
229 'only for arrays if "array" \n'
230 "else for any nested types\n"
231 "(array fields in parent will be populated \n"
232 " with foreign keys instead of messages as JSON)"),
236 __all__ = [
"PostgresSink",
"init"]