00001 import logging
00002 from datetime import timedelta
00003 from datetime import datetime
00004 from threading import Lock
00005 import sqlite3
00006
00007 from opcua import ua
00008 from opcua.common.utils import Buffer
00009 from opcua.common import events
00010 from opcua.server.history import HistoryStorageInterface
00011
00012
00013 class HistorySQLite(HistoryStorageInterface):
00014 """
00015 history backend which stores data values and object events in a SQLite database
00016 this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
00017 the history database are in binary format (SQLite BLOBs)
00018 note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
00019 """
00020
00021 def __init__(self, path="history.db"):
00022 self.logger = logging.getLogger(__name__)
00023 self._datachanges_period = {}
00024 self._db_file = path
00025 self._lock = Lock()
00026 self._event_fields = {}
00027
00028 self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
00029
00030 def new_historized_node(self, node_id, period, count=0):
00031 with self._lock:
00032 _c_new = self._conn.cursor()
00033
00034 table = self._get_table_name(node_id)
00035
00036 self._datachanges_period[node_id] = period, count
00037
00038
00039
00040 try:
00041 _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
00042 ' ServerTimestamp TIMESTAMP,'
00043 ' SourceTimestamp TIMESTAMP,'
00044 ' StatusCode INTEGER,'
00045 ' Value TEXT,'
00046 ' VariantType TEXT,'
00047 ' VariantBinary BLOB)'.format(tn=table))
00048
00049 except sqlite3.Error as e:
00050 self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
00051
00052 self._conn.commit()
00053
00054 def save_node_value(self, node_id, datavalue):
00055 with self._lock:
00056 _c_sub = self._conn.cursor()
00057
00058 table = self._get_table_name(node_id)
00059
00060
00061 try:
00062 _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
00063 (
00064 datavalue.ServerTimestamp,
00065 datavalue.SourceTimestamp,
00066 datavalue.StatusCode.value,
00067 str(datavalue.Value.Value),
00068 datavalue.Value.VariantType.name,
00069 sqlite3.Binary(datavalue.Value.to_binary())
00070 )
00071 )
00072 except sqlite3.Error as e:
00073 self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
00074
00075 self._conn.commit()
00076
00077
00078 period, count = self._datachanges_period[node_id]
00079
00080 def execute_sql_delete(condition, args):
00081 query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
00082
00083 try:
00084 _c_sub.execute(query, args)
00085 except sqlite3.Error as e:
00086 self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
00087
00088 self._conn.commit()
00089
00090 if period:
00091
00092 date_limit = datetime.utcnow() - period
00093 execute_sql_delete('ServerTimestamp < ?', (date_limit,))
00094
00095 if count:
00096
00097 execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
00098 'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
00099
00100 def read_node_history(self, node_id, start, end, nb_values):
00101 with self._lock:
00102 _c_read = self._conn.cursor()
00103
00104 table = self._get_table_name(node_id)
00105 start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
00106
00107 cont = None
00108 results = []
00109
00110
00111 try:
00112 for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
00113 'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
00114 (start_time, end_time, limit,)):
00115
00116
00117 dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
00118 dv.ServerTimestamp = row[1]
00119 dv.SourceTimestamp = row[2]
00120 dv.StatusCode = ua.StatusCode(row[3])
00121
00122 results.append(dv)
00123
00124 except sqlite3.Error as e:
00125 self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
00126
00127 if nb_values:
00128 if len(results) > nb_values:
00129 cont = results[nb_values].ServerTimestamp
00130
00131 results = results[:nb_values]
00132
00133 return results, cont
00134
00135 def new_historized_event(self, source_id, evtypes, period, count=0):
00136 with self._lock:
00137 _c_new = self._conn.cursor()
00138
00139
00140 ev_fields = self._get_event_fields(evtypes)
00141
00142 self._datachanges_period[source_id] = period
00143 self._event_fields[source_id] = ev_fields
00144
00145 table = self._get_table_name(source_id)
00146 columns = self._get_event_columns(ev_fields)
00147
00148
00149
00150
00151 try:
00152 _c_new.execute(
00153 'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
00154 .format(tn=table, co=columns))
00155
00156 except sqlite3.Error as e:
00157 self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
00158
00159 self._conn.commit()
00160
00161 def save_event(self, event):
00162 with self._lock:
00163 _c_sub = self._conn.cursor()
00164
00165 table = self._get_table_name(event.SourceNode)
00166 columns, placeholders, evtup = self._format_event(event)
00167 event_type = event.EventType
00168
00169
00170 try:
00171 _c_sub.execute(
00172 'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
00173 .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
00174
00175 except sqlite3.Error as e:
00176 self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
00177
00178 self._conn.commit()
00179
00180
00181 period = self._datachanges_period[event.SourceNode]
00182
00183 if period:
00184
00185 date_limit = datetime.now() - period
00186
00187 try:
00188 _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
00189 (date_limit.isoformat(' '),))
00190 except sqlite3.Error as e:
00191 self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s',
00192 event.SourceNode, e)
00193
00194 self._conn.commit()
00195
00196 def read_event_history(self, source_id, start, end, nb_values, evfilter):
00197 with self._lock:
00198 _c_read = self._conn.cursor()
00199
00200 table = self._get_table_name(source_id)
00201 start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
00202 clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
00203
00204 cont = None
00205 cont_timestamps = []
00206 results = []
00207
00208
00209 try:
00210 for row in _c_read.execute(
00211 'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
00212 .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
00213
00214 fdict = {}
00215 cont_timestamps.append(row[0])
00216 for i, field in enumerate(row[1:]):
00217 if field is not None:
00218 fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
00219 else:
00220 fdict[clauses[i]] = ua.Variant(None)
00221
00222 results.append(events.Event.from_field_dict(fdict))
00223
00224 except sqlite3.Error as e:
00225 self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
00226
00227 if nb_values:
00228 if len(results) > nb_values:
00229 cont = cont_timestamps[nb_values]
00230
00231 results = results[:nb_values]
00232
00233 return results, cont
00234
00235 def _get_table_name(self, node_id):
00236 return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
00237
00238 def _get_event_fields(self, evtypes):
00239 """
00240 Get all fields from the event types that are to be historized
00241 Args:
00242 evtypes: List of event type nodes
00243
00244 Returns: List of fields for all event types
00245
00246 """
00247
00248 ev_aggregate_fields = []
00249 for event_type in evtypes:
00250 ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
00251
00252 ev_fields = []
00253 for field in set(ev_aggregate_fields):
00254 ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
00255 return ev_fields
00256
00257 @staticmethod
00258 def _get_bounds(start, end, nb_values):
00259 order = "ASC"
00260
00261 if start is None or start == ua.get_win_epoch():
00262 order = "DESC"
00263 start = ua.get_win_epoch()
00264
00265 if end is None or end == ua.get_win_epoch():
00266 end = datetime.utcnow() + timedelta(days=1)
00267
00268 if start < end:
00269 start_time = start.isoformat(' ')
00270 end_time = end.isoformat(' ')
00271 else:
00272 order = "DESC"
00273 start_time = end.isoformat(' ')
00274 end_time = start.isoformat(' ')
00275
00276 if nb_values:
00277 limit = nb_values + 1
00278 else:
00279 limit = -1
00280
00281 return start_time, end_time, order, limit
00282
00283 def _format_event(self, event):
00284 """
00285 Convert an event object triggered by the subscription into ordered lists for the SQL insert string
00286
00287 Args:
00288 event: The event returned by the subscription
00289
00290 Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
00291
00292 """
00293 placeholders = []
00294 ev_variant_binaries = []
00295
00296 ev_variant_dict = event.get_event_props_as_fields_dict()
00297 names = list(ev_variant_dict.keys())
00298 names.sort()
00299
00300
00301
00302 for name in names:
00303 variant = ev_variant_dict[name]
00304 placeholders.append('?')
00305 ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
00306
00307 return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
00308
00309 def _get_event_columns(self, ev_fields):
00310 fields = []
00311 for field in ev_fields:
00312 fields.append(field + ' BLOB')
00313 return self._list_to_sql_str(fields, False)
00314
00315 def _get_select_clauses(self, source_id, evfilter):
00316 s_clauses = []
00317 for select_clause in evfilter.SelectClauses:
00318 try:
00319 if not select_clause.BrowsePath:
00320 s_clauses.append(select_clause.Attribute.name)
00321 else:
00322 name = select_clause.BrowsePath[0].Name
00323 s_clauses.append(name)
00324 except AttributeError:
00325 self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
00326 ' Clause: %s:', source_id, select_clause)
00327
00328
00329 clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
00330 return clauses, self._list_to_sql_str(clauses)
00331
00332 @staticmethod
00333 def _list_to_sql_str(ls, quotes=True):
00334 sql_str = ''
00335 for item in ls:
00336 if quotes:
00337 sql_str += '"' + item + '", '
00338 else:
00339 sql_str += item + ', '
00340 return sql_str[:-2]
00341
00342 def stop(self):
00343 with self._lock:
00344 self._conn.close()
00345 self.logger.info('Historizing SQL connection closed')