history_sql.py
Go to the documentation of this file.
1 import logging
2 from datetime import timedelta
3 from datetime import datetime
4 from threading import Lock
5 import sqlite3
6 
7 from opcua import ua
8 from opcua.common.utils import Buffer
9 from opcua.common import events
10 from opcua.server.history import HistoryStorageInterface
11 
12 
14  """
15  history backend which stores data values and object events in a SQLite database
16  this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
17  the history database are in binary format (SQLite BLOBs)
18  note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
19  """
20 
21  def __init__(self, path="history.db"):
22  self.logger = logging.getLogger(__name__)
24  self._db_file = path
25  self._lock = Lock()
26  self._event_fields = {}
27 
28  self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
29 
30  def new_historized_node(self, node_id, period, count=0):
31  with self._lock:
32  _c_new = self._conn.cursor()
33 
34  table = self._get_table_name(node_id)
35 
36  self._datachanges_period[node_id] = period, count
37 
38  # create a table for the node which will store attributes of the DataValue object
39  # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
40  try:
41  _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
42  ' ServerTimestamp TIMESTAMP,'
43  ' SourceTimestamp TIMESTAMP,'
44  ' StatusCode INTEGER,'
45  ' Value TEXT,'
46  ' VariantType TEXT,'
47  ' VariantBinary BLOB)'.format(tn=table))
48 
49  except sqlite3.Error as e:
50  self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
51 
52  self._conn.commit()
53 
54  def save_node_value(self, node_id, datavalue):
55  with self._lock:
56  _c_sub = self._conn.cursor()
57 
58  table = self._get_table_name(node_id)
59 
60  # insert the data change into the database
61  try:
62  _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
63  (
64  datavalue.ServerTimestamp,
65  datavalue.SourceTimestamp,
66  datavalue.StatusCode.value,
67  str(datavalue.Value.Value),
68  datavalue.Value.VariantType.name,
69  sqlite3.Binary(datavalue.Value.to_binary())
70  )
71  )
72  except sqlite3.Error as e:
73  self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
74 
75  self._conn.commit()
76 
77  # get this node's period from the period dict and calculate the limit
78  period, count = self._datachanges_period[node_id]
79 
80  def execute_sql_delete(condition, args):
81  query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
82 
83  try:
84  _c_sub.execute(query, args)
85  except sqlite3.Error as e:
86  self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
87 
88  self._conn.commit()
89 
90  if period:
91  # after the insert, if a period was specified delete all records older than period
92  date_limit = datetime.utcnow() - period
93  execute_sql_delete('ServerTimestamp < ?', (date_limit,))
94 
95  if count:
96  # ensure that no more than count records are stored for the specified node
97  execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
98  'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
99 
100  def read_node_history(self, node_id, start, end, nb_values):
101  with self._lock:
102  _c_read = self._conn.cursor()
103 
104  table = self._get_table_name(node_id)
105  start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
106 
107  cont = None
108  results = []
109 
110  # select values from the database; recreate UA Variant from binary
111  try:
112  for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
113  'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
114  (start_time, end_time, limit,)):
115 
116  # rebuild the data value object
117  dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
118  dv.ServerTimestamp = row[1]
119  dv.SourceTimestamp = row[2]
120  dv.StatusCode = ua.StatusCode(row[3])
121 
122  results.append(dv)
123 
124  except sqlite3.Error as e:
125  self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
126 
127  if nb_values:
128  if len(results) > nb_values:
129  cont = results[nb_values].ServerTimestamp
130 
131  results = results[:nb_values]
132 
133  return results, cont
134 
135  def new_historized_event(self, source_id, evtypes, period, count=0):
136  with self._lock:
137  _c_new = self._conn.cursor()
138 
139  # get all fields for the event type nodes
140  ev_fields = self._get_event_fields(evtypes)
141 
142  self._datachanges_period[source_id] = period
143  self._event_fields[source_id] = ev_fields
144 
145  table = self._get_table_name(source_id)
146  columns = self._get_event_columns(ev_fields)
147 
148  # create a table for the event which will store fields generated by the source object's events
149  # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
150  # properties with these names
151  try:
152  _c_new.execute(
153  'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
154  .format(tn=table, co=columns))
155 
156  except sqlite3.Error as e:
157  self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
158 
159  self._conn.commit()
160 
161  def save_event(self, event):
162  with self._lock:
163  _c_sub = self._conn.cursor()
164 
165  table = self._get_table_name(event.SourceNode)
166  columns, placeholders, evtup = self._format_event(event)
167  event_type = event.EventType # useful for troubleshooting database
168 
169  # insert the event into the database
170  try:
171  _c_sub.execute(
172  'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
173  .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
174 
175  except sqlite3.Error as e:
176  self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
177 
178  self._conn.commit()
179 
180  # get this node's period from the period dict and calculate the limit
181  period = self._datachanges_period[event.SourceNode]
182 
183  if period:
184  # after the insert, if a period was specified delete all records older than period
185  date_limit = datetime.now() - period
186 
187  try:
188  _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
189  (date_limit.isoformat(' '),))
190  except sqlite3.Error as e:
191  self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s',
192  event.SourceNode, e)
193 
194  self._conn.commit()
195 
196  def read_event_history(self, source_id, start, end, nb_values, evfilter):
197  with self._lock:
198  _c_read = self._conn.cursor()
199 
200  table = self._get_table_name(source_id)
201  start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
202  clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
203 
204  cont = None
205  cont_timestamps = []
206  results = []
207 
208  # select events from the database; SQL select clause is built from EventFilter and available fields
209  try:
210  for row in _c_read.execute(
211  'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
212  .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
213 
214  fdict = {}
215  cont_timestamps.append(row[0])
216  for i, field in enumerate(row[1:]):
217  if field is not None:
218  fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
219  else:
220  fdict[clauses[i]] = ua.Variant(None)
221 
222  results.append(events.Event.from_field_dict(fdict))
223 
224  except sqlite3.Error as e:
225  self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
226 
227  if nb_values:
228  if len(results) > nb_values: # start > ua.get_win_epoch() and
229  cont = cont_timestamps[nb_values]
230 
231  results = results[:nb_values]
232 
233  return results, cont
234 
235  def _get_table_name(self, node_id):
236  return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
237 
238  def _get_event_fields(self, evtypes):
239  """
240  Get all fields from the event types that are to be historized
241  Args:
242  evtypes: List of event type nodes
243 
244  Returns: List of fields for all event types
245 
246  """
247  # get all fields from the event types that are to be historized
248  ev_aggregate_fields = []
249  for event_type in evtypes:
250  ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
251 
252  ev_fields = []
253  for field in set(ev_aggregate_fields):
254  ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
255  return ev_fields
256 
257  @staticmethod
258  def _get_bounds(start, end, nb_values):
259  order = "ASC"
260 
261  if start is None or start == ua.get_win_epoch():
262  order = "DESC"
263  start = ua.get_win_epoch()
264 
265  if end is None or end == ua.get_win_epoch():
266  end = datetime.utcnow() + timedelta(days=1)
267 
268  if start < end:
269  start_time = start.isoformat(' ')
270  end_time = end.isoformat(' ')
271  else:
272  order = "DESC"
273  start_time = end.isoformat(' ')
274  end_time = start.isoformat(' ')
275 
276  if nb_values:
277  limit = nb_values + 1 # add 1 to the number of values for retrieving a continuation point
278  else:
279  limit = -1 # in SQLite a LIMIT of -1 returns all results
280 
281  return start_time, end_time, order, limit
282 
283  def _format_event(self, event):
284  """
285  Convert an event object triggered by the subscription into ordered lists for the SQL insert string
286 
287  Args:
288  event: The event returned by the subscription
289 
290  Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
291 
292  """
293  placeholders = []
294  ev_variant_binaries = []
295 
296  ev_variant_dict = event.get_event_props_as_fields_dict()
297  names = list(ev_variant_dict.keys())
298  names.sort() # sort alphabetically since dict is not sorted
299 
300  # split dict into two synchronized lists which will be converted to SQL strings
301  # note that the variants are converted to binary objects for storing in SQL BLOB format
302  for name in names:
303  variant = ev_variant_dict[name]
304  placeholders.append('?')
305  ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
306 
307  return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
308 
309  def _get_event_columns(self, ev_fields):
310  fields = []
311  for field in ev_fields:
312  fields.append(field + ' BLOB')
313  return self._list_to_sql_str(fields, False)
314 
315  def _get_select_clauses(self, source_id, evfilter):
316  s_clauses = []
317  for select_clause in evfilter.SelectClauses:
318  try:
319  if not select_clause.BrowsePath:
320  s_clauses.append(select_clause.Attribute.name)
321  else:
322  name = select_clause.BrowsePath[0].Name
323  s_clauses.append(name)
324  except AttributeError:
325  self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
326  ' Clause: %s:', source_id, select_clause)
327 
328  # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
329  clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
330  return clauses, self._list_to_sql_str(clauses)
331 
332  @staticmethod
333  def _list_to_sql_str(ls, quotes=True):
334  sql_str = ''
335  for item in ls:
336  if quotes:
337  sql_str += '"' + item + '", '
338  else:
339  sql_str += item + ', '
340  return sql_str[:-2] # remove trailing space and comma for SQL syntax
341 
342  def stop(self):
343  with self._lock:
344  self._conn.close()
345  self.logger.info('Historizing SQL connection closed')
def read_node_history(self, node_id, start, end, nb_values)
Definition: history_sql.py:100
def new_historized_event(self, source_id, evtypes, period, count=0)
Definition: history_sql.py:135
def save_node_value(self, node_id, datavalue)
Definition: history_sql.py:54
def new_historized_node(self, node_id, period, count=0)
Definition: history_sql.py:30
def _get_event_columns(self, ev_fields)
Definition: history_sql.py:309
def _list_to_sql_str(ls, quotes=True)
Definition: history_sql.py:333
def __init__(self, path="history.db")
Definition: history_sql.py:21
def read_event_history(self, source_id, start, end, nb_values, evfilter)
Definition: history_sql.py:196
def _get_select_clauses(self, source_id, evfilter)
Definition: history_sql.py:315
def _get_bounds(start, end, nb_values)
Definition: history_sql.py:258


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44