2 from datetime
import timedelta
3 from datetime
import datetime
4 from threading
import Lock
15 history backend which stores data values and object events in a SQLite database 16 this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in 17 the history database are in binary format (SQLite BLOBs) 18 note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs 22 self.
logger = logging.getLogger(__name__)
28 self.
_conn = sqlite3.connect(self.
_db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=
False)
32 _c_new = self._conn.cursor()
41 _c_new.execute(
'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,' 42 ' ServerTimestamp TIMESTAMP,' 43 ' SourceTimestamp TIMESTAMP,' 44 ' StatusCode INTEGER,' 47 ' VariantBinary BLOB)'.format(tn=table))
49 except sqlite3.Error
as e:
50 self.logger.info(
'Historizing SQL Table Creation Error for %s: %s', node_id, e)
56 _c_sub = self._conn.cursor()
62 _c_sub.execute(
'INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
64 datavalue.ServerTimestamp,
65 datavalue.SourceTimestamp,
66 datavalue.StatusCode.value,
67 str(datavalue.Value.Value),
68 datavalue.Value.VariantType.name,
69 sqlite3.Binary(datavalue.Value.to_binary())
72 except sqlite3.Error
as e:
73 self.logger.error(
'Historizing SQL Insert Error for %s: %s', node_id, e)
80 def execute_sql_delete(condition, args):
81 query = (
'DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
84 _c_sub.execute(query, args)
85 except sqlite3.Error
as e:
86 self.logger.error(
'Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
92 date_limit = datetime.utcnow() - period
93 execute_sql_delete(
'ServerTimestamp < ?', (date_limit,))
97 execute_sql_delete(
'ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? ' 98 'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
102 _c_read = self._conn.cursor()
105 start_time, end_time, order, limit = self.
_get_bounds(start, end, nb_values)
112 for row
in _c_read.execute(
'SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? ' 113 'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
114 (start_time, end_time, limit,)):
118 dv.ServerTimestamp = row[1]
119 dv.SourceTimestamp = row[2]
124 except sqlite3.Error
as e:
125 self.logger.error(
'Historizing SQL Read Error for %s: %s', node_id, e)
128 if len(results) > nb_values:
129 cont = results[nb_values].ServerTimestamp
131 results = results[:nb_values]
137 _c_new = self._conn.cursor()
153 'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})' 154 .format(tn=table, co=columns))
156 except sqlite3.Error
as e:
157 self.logger.info(
'Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
163 _c_sub = self._conn.cursor()
167 event_type = event.EventType
172 'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})' 173 .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
175 except sqlite3.Error
as e:
176 self.logger.error(
'Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
185 date_limit = datetime.now() - period
188 _c_sub.execute(
'DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
189 (date_limit.isoformat(
' '),))
190 except sqlite3.Error
as e:
191 self.logger.error(
'Historizing SQL Delete Old Data Error for events from %s: %s',
198 _c_read = self._conn.cursor()
201 start_time, end_time, order, limit = self.
_get_bounds(start, end, nb_values)
210 for row
in _c_read.execute(
211 'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?' 212 .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
215 cont_timestamps.append(row[0])
216 for i, field
in enumerate(row[1:]):
217 if field
is not None:
218 fdict[clauses[i]] = ua.Variant.from_binary(
Buffer(field))
222 results.append(events.Event.from_field_dict(fdict))
224 except sqlite3.Error
as e:
225 self.logger.error(
'Historizing SQL Read Error events for node %s: %s', source_id, e)
228 if len(results) > nb_values:
229 cont = cont_timestamps[nb_values]
231 results = results[:nb_values]
236 return str(node_id.NamespaceIndex) +
'_' + str(node_id.Identifier)
240 Get all fields from the event types that are to be historized 242 evtypes: List of event type nodes 244 Returns: List of fields for all event types 248 ev_aggregate_fields = []
249 for event_type
in evtypes:
250 ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
253 for field
in set(ev_aggregate_fields):
254 ev_fields.append(field.get_display_name().Text.decode(encoding=
'utf-8'))
261 if start
is None or start == ua.get_win_epoch():
263 start = ua.get_win_epoch()
265 if end
is None or end == ua.get_win_epoch():
266 end = datetime.utcnow() + timedelta(days=1)
269 start_time = start.isoformat(
' ')
270 end_time = end.isoformat(
' ')
273 start_time = end.isoformat(
' ')
274 end_time = start.isoformat(
' ')
277 limit = nb_values + 1
281 return start_time, end_time, order, limit
285 Convert an event object triggered by the subscription into ordered lists for the SQL insert string 288 event: The event returned by the subscription 290 Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries 294 ev_variant_binaries = []
296 ev_variant_dict = event.get_event_props_as_fields_dict()
297 names = list(ev_variant_dict.keys())
303 variant = ev_variant_dict[name]
304 placeholders.append(
'?')
305 ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
311 for field
in ev_fields:
312 fields.append(field +
' BLOB')
317 for select_clause
in evfilter.SelectClauses:
319 if not select_clause.BrowsePath:
320 s_clauses.append(select_clause.Attribute.name)
322 name = select_clause.BrowsePath[0].Name
323 s_clauses.append(name)
324 except AttributeError:
325 self.logger.warning(
'Historizing SQL OPC UA Select Clause Warning for node %s,' 326 ' Clause: %s:', source_id, select_clause)
329 clauses = [x
for x
in s_clauses
if x
in self.
_event_fields[source_id]]
337 sql_str +=
'"' + item +
'", ' 339 sql_str += item +
', ' 345 self.logger.info(
'Historizing SQL connection closed')
def read_node_history(self, node_id, start, end, nb_values)
def _format_event(self, event)
def new_historized_event(self, source_id, evtypes, period, count=0)
def save_event(self, event)
def save_node_value(self, node_id, datavalue)
def new_historized_node(self, node_id, period, count=0)
def _get_table_name(self, node_id)
def _get_event_columns(self, ev_fields)
def _list_to_sql_str(ls, quotes=True)
def __init__(self, path="history.db")
def read_event_history(self, source_id, start, end, nb_values, evfilter)
def _get_select_clauses(self, source_id, evfilter)
def _get_event_fields(self, evtypes)
def _get_bounds(start, end, nb_values)