postgres.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Postgres output plugin.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 02.12.2021
11 @modified 28.06.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 import collections
16 import json
17 
18 try:
19  import psycopg2
20  import psycopg2.extensions
21  import psycopg2.extras
22 except ImportError:
23  psycopg2 = None
24 
25 from ... import api
26 from ... common import ConsolePrinter
27 from . dbbase import BaseDataSink, quote
28 
29 
31  """
32  Writes messages to a Postgres database.
33 
34  Output will have:
35  - table "topics", with topic and message type names
36  - table "types", with message type definitions
37 
38  plus:
39  - table "pkg/MsgType" for each topic message type, with detailed fields,
40  BYTEA fields for uint8[], array fields for scalar list attributes,
41  and JSONB fields for lists of ROS messages;
42  with foreign keys if nesting else subtype values as JSON dictionaries;
43  plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
44  If not nesting, only topic message type tables are created.
45  - view "/full/topic/name" for each topic, selecting from the message type table
46 
47  If a message type table already exists but for a type with a different MD5 hash,
48  the new table will have its MD5 hash appended to end, as "pkg/MsgType (hash)".
49  """
50 
51 
52  ENGINE = "Postgres"
53 
54 
55  ID_SEQUENCE_STEP = 100
56 
57 
58  SELECT_TYPE_COLUMNS = """
59  SELECT c.table_name, c.column_name, c.data_type
60  FROM information_schema.columns c INNER JOIN information_schema.tables t
61  ON t.table_name = c.table_name
62  WHERE c.table_schema = current_schema() AND t.table_type = 'BASE TABLE' AND
63  c.table_name LIKE '%/%'
64  ORDER BY c.table_name, CAST(c.dtd_identifier AS INTEGER)
65  """
66 
67 
68  MESSAGE_TYPE_TOPICCOLS = [("_topic", "TEXT"),
69  ("_topic_id", "BIGINT"), ]
70 
71  MESSAGE_TYPE_BASECOLS = [("_dt", "TIMESTAMP"),
72  ("_timestamp", "NUMERIC"),
73  ("_id", "BIGSERIAL PRIMARY KEY"), ]
74 
75  MESSAGE_TYPE_NESTCOLS = [("_parent_type", "TEXT"),
76  ("_parent_id", "BIGINT"), ]
77 
78 
79  def __init__(self, args=None, **kwargs):
80  """
81  @param args arguments as namespace or dictionary, case-insensitive;
82  or a single item as the database connection string
83  @param args.write Postgres connection string like "postgresql://user@host/db"
84  @param args.write_options ```
85  {"commit-interval": transaction size (0 is autocommit),
86  "nesting": "array" to recursively insert arrays
87  of nested types, or "all" for any nesting)}
88  ```
89  @param args.meta whether to emit metainfo
90  @param args.verbose whether to emit debug information
91  @param kwargs any and all arguments as keyword overrides, case-insensitive
92  """
93  super(PostgresSink, self).__init__(args, **kwargs)
94  self._id_queue = collections.defaultdict(collections.deque) # {table name: [next ID, ]}
95 
96 
97  def validate(self):
98  """
99  Returns whether Postgres driver is available,
100  and "commit-interval" and "nesting" in args.write_options have valid value, if any,
101  and database is connectable.
102  """
103  if self.valid is not None: return self.valid
104  db_ok, driver_ok, config_ok = False, bool(psycopg2), super(PostgresSink, self).validate()
105  if not driver_ok:
106  ConsolePrinter.error("psycopg2 not available: cannot write to Postgres.")
107  else:
108  try:
109  with self._connect(): db_ok = True
110  except Exception as e:
111  ConsolePrinter.error("Error connecting Postgres: %s", e)
112  self.valid = db_ok and driver_ok and config_ok
113  return self.valid
114 
115 
116  def _init_db(self):
117  """Opens the database file, and populates schema if not already existing."""
118  psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
119  psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
120  psycopg2.extras.register_default_jsonb(globally=True, loads=json.loads)
121  super(PostgresSink, self)._init_db()
122 
123 
124  def _load_schema(self):
125  """Populates instance attributes with schema metainfo."""
126  super(PostgresSink, self)._load_schema()
127  self._cursor.execute(self.SELECT_TYPE_COLUMNS)
128  for row in self._cursor.fetchall():
129  typerow = next((x for x in self._types.values()
130  if x["table_name"] == row["table_name"]), None)
131  if not typerow: continue # for row
132  typekey = (typerow["type"], typerow["md5"])
133  self._schema.setdefault(typekey, collections.OrderedDict())
134  self._schema[typekey][row["column_name"]] = row["data_type"]
135 
136 
137  def _connect(self):
138  """Returns new database connection."""
139  factory = psycopg2.extras.RealDictCursor
140  return psycopg2.connect(self.args.WRITE, cursor_factory=factory)
141 
142 
143  def _execute_insert(self, sql, args):
144  """Executes INSERT statement, returns inserted ID."""
145  self._cursor.execute(sql, args)
146  return self._cursor.fetchone()["id"]
147 
148 
149  def _executemany(self, sql, argses):
150  """Executes SQL with all args sequences."""
151  psycopg2.extras.execute_batch(self._cursor, sql, argses)
152 
153 
154  def _executescript(self, sql):
155  """Executes SQL with one or more statements."""
156  self._cursor.execute(sql)
157 
158 
159  def _get_next_id(self, table):
160  """Returns next cached ID value, re-populating empty cache from sequence."""
161  if not self._id_queue.get(table):
162  MAXLEN = self._get_dialect_option("maxlen_entity")
163  seqbase, seqsuffix = table, "_%s_seq" % self.MESSAGE_TYPE_BASECOLS[-1][0]
164  if MAXLEN: seqbase = seqbase[:MAXLEN - len(seqsuffix)]
165  sql = "SELECT nextval('%s') AS id" % quote(seqbase + seqsuffix)
166  for _ in range(self.ID_SEQUENCE_STEP):
167  self._cursor.execute(sql)
168  self._id_queue[table].append(self._cursor.fetchone()["id"])
169  return self._id_queue[table].popleft()
170 
171 
172  def _make_column_value(self, value, typename=None):
173  """Returns column value suitable for inserting to database."""
174  TYPES = self._get_dialect_option("types")
175  plaintype = api.scalar(typename) # "string<=10" -> "string"
176  v = value
177  # Common in JSON but disallowed in Postgres
178  replace = {float("inf"): None, float("-inf"): None, float("nan"): None}
179  if not typename:
180  v = psycopg2.extras.Json(v, json.dumps)
181  elif isinstance(v, (list, tuple)):
182  scalartype = api.scalar(typename)
183  if scalartype in api.ROS_TIME_TYPES:
184  v = [self._convert_time_value(x, scalartype) for x in v]
185  elif scalartype not in api.ROS_BUILTIN_TYPES:
186  if self._nesting: v = None
187  else: v = psycopg2.extras.Json([api.message_to_dict(m, replace)
188  for m in v], json.dumps)
189  elif "BYTEA" in (TYPES.get(typename),
190  TYPES.get(api.canonical(typename, unbounded=True))):
191  v = psycopg2.Binary(bytes(bytearray(v))) # Py2/Py3 compatible
192  else:
193  v = list(self._convert_column_value(v, typename)) # Ensure not-tuple for psycopg2
194  elif api.is_ros_time(v):
195  v = self._convert_time_value(v, typename)
196  elif plaintype not in api.ROS_BUILTIN_TYPES:
197  v = psycopg2.extras.Json(api.message_to_dict(v, replace), json.dumps)
198  else:
199  v = self._convert_column_value(v, plaintype)
200  return v
201 
202 
203  def _make_db_label(self):
204  """Returns formatted label for database."""
205  target = self.args.WRITE
206  if not target.startswith(("postgres://", "postgresql://")): target = repr(target)
207  return target
208 
209 
210  @classmethod
211  def autodetect(cls, target):
212  """Returns true if target is recognizable as a Postgres connection string."""
213  return (target or "").startswith(("postgres://", "postgresql://"))
214 
215 
216 
217 def init(*_, **__):
218  """Adds Postgres output format support."""
219  from ... import plugins # Late import to avoid circular
220  plugins.add_write_format("postgres", PostgresSink, "Postgres", [
221  ("commit-interval=NUM", "transaction size for Postgres output\n"
222  "(default 1000, 0 is autocommit)"),
223  ("dialect-file=path/to/dialects.yaml",
224  "load additional SQL dialect options\n"
225  "for Postgres output\n"
226  "from a YAML or JSON file"),
227  ("nesting=array|all", "create tables for nested message types\n"
228  "in Postgres output,\n"
229  'only for arrays if "array" \n'
230  "else for any nested types\n"
231  "(array fields in parent will be populated \n"
232  " with foreign keys instead of messages as JSON)"),
233  ])
234 
235 
236 __all__ = ["PostgresSink", "init"]
grepros.plugins.auto.postgres.PostgresSink._executemany
def _executemany(self, sql, argses)
Definition: postgres.py:149
grepros.plugins.auto.dbbase.BaseDataSink.MESSAGE_TYPE_BASECOLS
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
Definition: dbbase.py:55
grepros.outputs.Sink.args
args
Definition: outputs.py:50
grepros.plugins.auto.sqlbase.SqlMixin._convert_column_value
def _convert_column_value(self, value, typename)
Definition: sqlbase.py:345
grepros.plugins.auto.postgres.PostgresSink._connect
def _connect(self)
Definition: postgres.py:137
grepros.plugins.auto.postgres.PostgresSink.SELECT_TYPE_COLUMNS
string SELECT_TYPE_COLUMNS
SQL statement for selecting metainfo on pkg/MsgType table columns.
Definition: postgres.py:58
grepros.plugins.auto.postgres.PostgresSink
Definition: postgres.py:30
grepros.plugins.auto.postgres.PostgresSink.autodetect
def autodetect(cls, target)
Definition: postgres.py:211
grepros.plugins.auto.dbbase.BaseDataSink._connect
def _connect(self)
Definition: dbbase.py:363
grepros.plugins.auto.postgres.PostgresSink._id_queue
_id_queue
Definition: postgres.py:94
grepros.plugins.auto.sqlbase.SqlMixin._get_dialect_option
def _get_dialect_option(self, option)
Definition: sqlbase.py:371
grepros.plugins.auto.postgres.init
def init(*_, **__)
Definition: postgres.py:217
grepros.outputs.Sink.valid
valid
Result of validate()
Definition: outputs.py:52
grepros.plugins.auto.postgres.PostgresSink._get_next_id
def _get_next_id(self, table)
Definition: postgres.py:159
grepros.plugins.auto.postgres.PostgresSink._init_db
def _init_db(self)
Definition: postgres.py:116
grepros.plugins.auto.postgres.PostgresSink.__init__
def __init__(self, args=None, **kwargs)
Definition: postgres.py:79
grepros.plugins.auto.postgres.PostgresSink.ID_SEQUENCE_STEP
int ID_SEQUENCE_STEP
Sequence length per table to reserve for inserted message IDs.
Definition: postgres.py:55
grepros.plugins.auto.sqlbase.SqlMixin._convert_time_value
def _convert_time_value(self, value, typename)
Definition: sqlbase.py:358
grepros.plugins.auto.dbbase.BaseDataSink
Definition: dbbase.py:24
grepros.plugins.auto.postgres.PostgresSink.validate
def validate(self)
Definition: postgres.py:97
grepros.plugins.auto.sqlbase.quote
def quote(name, force=False)
Definition: sqlbase.py:631
grepros.plugins.auto.postgres.PostgresSink._executescript
def _executescript(self, sql)
Definition: postgres.py:154
grepros.plugins.auto.postgres.PostgresSink._make_db_label
def _make_db_label(self)
Definition: postgres.py:203
grepros.plugins.auto.postgres.PostgresSink._make_column_value
def _make_column_value(self, value, typename=None)
Definition: postgres.py:172
grepros.plugins.auto.postgres.PostgresSink._execute_insert
def _execute_insert(self, sql, args)
Definition: postgres.py:143
grepros.plugins.auto.dbbase.BaseDataSink._cursor
_cursor
Definition: dbbase.py:89
grepros.plugins.auto.sqlbase.SqlMixin._schema
_schema
Definition: sqlbase.py:52
grepros.plugins.auto.sqlbase.SqlMixin._types
_types
Definition: sqlbase.py:51
grepros.plugins.auto.dbbase.BaseDataSink._nesting
_nesting
Definition: dbbase.py:97
grepros.plugins.auto.postgres.PostgresSink._load_schema
def _load_schema(self)
Definition: postgres.py:124


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29