history_sql.py
Go to the documentation of this file.
00001 import logging
00002 from datetime import timedelta
00003 from datetime import datetime
00004 from threading import Lock
00005 import sqlite3
00006 
00007 from opcua import ua
00008 from opcua.common.utils import Buffer
00009 from opcua.common import events
00010 from opcua.server.history import HistoryStorageInterface
00011 
00012 
00013 class HistorySQLite(HistoryStorageInterface):
00014     """
00015     history backend which stores data values and object events in a SQLite database
00016     this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
00017     the history database are in binary format (SQLite BLOBs)
00018     note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
00019     """
00020 
00021     def __init__(self, path="history.db"):
00022         self.logger = logging.getLogger(__name__)
00023         self._datachanges_period = {}
00024         self._db_file = path
00025         self._lock = Lock()
00026         self._event_fields = {}
00027 
00028         self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
00029 
00030     def new_historized_node(self, node_id, period, count=0):
00031         with self._lock:
00032             _c_new = self._conn.cursor()
00033 
00034             table = self._get_table_name(node_id)
00035 
00036             self._datachanges_period[node_id] = period, count
00037 
00038             # create a table for the node which will store attributes of the DataValue object
00039             # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
00040             try:
00041                 _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
00042                                ' ServerTimestamp TIMESTAMP,'
00043                                ' SourceTimestamp TIMESTAMP,'
00044                                ' StatusCode INTEGER,'
00045                                ' Value TEXT,'
00046                                ' VariantType TEXT,'
00047                                ' VariantBinary BLOB)'.format(tn=table))
00048 
00049             except sqlite3.Error as e:
00050                 self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
00051 
00052             self._conn.commit()
00053 
00054     def save_node_value(self, node_id, datavalue):
00055         with self._lock:
00056             _c_sub = self._conn.cursor()
00057 
00058             table = self._get_table_name(node_id)
00059 
00060             # insert the data change into the database
00061             try:
00062                 _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
00063                                (
00064                                    datavalue.ServerTimestamp,
00065                                    datavalue.SourceTimestamp,
00066                                    datavalue.StatusCode.value,
00067                                    str(datavalue.Value.Value),
00068                                    datavalue.Value.VariantType.name,
00069                                    sqlite3.Binary(datavalue.Value.to_binary())
00070                                )
00071                               )
00072             except sqlite3.Error as e:
00073                 self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
00074 
00075             self._conn.commit()
00076 
00077             # get this node's period from the period dict and calculate the limit
00078             period, count = self._datachanges_period[node_id]
00079 
00080             def execute_sql_delete(condition, args):
00081                 query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
00082 
00083                 try:
00084                     _c_sub.execute(query, args)
00085                 except sqlite3.Error as e:
00086                     self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
00087 
00088                 self._conn.commit()
00089 
00090             if period:
00091                 # after the insert, if a period was specified delete all records older than period
00092                 date_limit = datetime.utcnow() - period
00093                 execute_sql_delete('ServerTimestamp < ?', (date_limit,))
00094 
00095             if count:
00096                 # ensure that no more than count records are stored for the specified node
00097                 execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
00098                                    'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
00099 
00100     def read_node_history(self, node_id, start, end, nb_values):
00101         with self._lock:
00102             _c_read = self._conn.cursor()
00103 
00104             table = self._get_table_name(node_id)
00105             start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
00106 
00107             cont = None
00108             results = []
00109 
00110             # select values from the database; recreate UA Variant from binary
00111             try:
00112                 for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
00113                                            'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
00114                                            (start_time, end_time, limit,)):
00115 
00116                     # rebuild the data value object
00117                     dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
00118                     dv.ServerTimestamp = row[1]
00119                     dv.SourceTimestamp = row[2]
00120                     dv.StatusCode = ua.StatusCode(row[3])
00121 
00122                     results.append(dv)
00123 
00124             except sqlite3.Error as e:
00125                 self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
00126 
00127             if nb_values:
00128                 if len(results) > nb_values:
00129                     cont = results[nb_values].ServerTimestamp
00130 
00131                 results = results[:nb_values]
00132 
00133             return results, cont
00134 
00135     def new_historized_event(self, source_id, evtypes, period, count=0):
00136         with self._lock:
00137             _c_new = self._conn.cursor()
00138 
00139             # get all fields for the event type nodes
00140             ev_fields = self._get_event_fields(evtypes)
00141 
00142             self._datachanges_period[source_id] = period
00143             self._event_fields[source_id] = ev_fields
00144 
00145             table = self._get_table_name(source_id)
00146             columns = self._get_event_columns(ev_fields)
00147 
00148             # create a table for the event which will store fields generated by the source object's events
00149             # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
00150             # properties with these names
00151             try:
00152                 _c_new.execute(
00153                     'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
00154                     .format(tn=table, co=columns))
00155 
00156             except sqlite3.Error as e:
00157                 self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
00158 
00159             self._conn.commit()
00160 
00161     def save_event(self, event):
00162         with self._lock:
00163             _c_sub = self._conn.cursor()
00164 
00165             table = self._get_table_name(event.SourceNode)
00166             columns, placeholders, evtup = self._format_event(event)
00167             event_type = event.EventType  # useful for troubleshooting database
00168 
00169             # insert the event into the database
00170             try:
00171                 _c_sub.execute(
00172                     'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
00173                     .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
00174 
00175             except sqlite3.Error as e:
00176                 self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
00177 
00178             self._conn.commit()
00179 
00180             # get this node's period from the period dict and calculate the limit
00181             period = self._datachanges_period[event.SourceNode]
00182 
00183             if period:
00184                 # after the insert, if a period was specified delete all records older than period
00185                 date_limit = datetime.now() - period
00186 
00187                 try:
00188                     _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
00189                                    (date_limit.isoformat(' '),))
00190                 except sqlite3.Error as e:
00191                     self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s',
00192                                       event.SourceNode, e)
00193 
00194                 self._conn.commit()
00195 
00196     def read_event_history(self, source_id, start, end, nb_values, evfilter):
00197         with self._lock:
00198             _c_read = self._conn.cursor()
00199 
00200             table = self._get_table_name(source_id)
00201             start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
00202             clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
00203 
00204             cont = None
00205             cont_timestamps = []
00206             results = []
00207 
00208             # select events from the database; SQL select clause is built from EventFilter and available fields
00209             try:
00210                 for row in _c_read.execute(
00211                         'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
00212                         .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
00213 
00214                     fdict = {}
00215                     cont_timestamps.append(row[0])
00216                     for i, field in enumerate(row[1:]):
00217                         if field is not None:
00218                             fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
00219                         else:
00220                             fdict[clauses[i]] = ua.Variant(None)
00221 
00222                     results.append(events.Event.from_field_dict(fdict))
00223 
00224             except sqlite3.Error as e:
00225                 self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
00226 
00227             if nb_values:
00228                 if len(results) > nb_values:  # start > ua.get_win_epoch() and
00229                     cont = cont_timestamps[nb_values]
00230 
00231                 results = results[:nb_values]
00232 
00233             return results, cont
00234 
00235     def _get_table_name(self, node_id):
00236         return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
00237 
00238     def _get_event_fields(self, evtypes):
00239         """
00240         Get all fields from the event types that are to be historized
00241         Args:
00242             evtypes: List of event type nodes
00243 
00244         Returns: List of fields for all event types
00245 
00246         """
00247         # get all fields from the event types that are to be historized
00248         ev_aggregate_fields = []
00249         for event_type in evtypes:
00250             ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
00251 
00252         ev_fields = []
00253         for field in set(ev_aggregate_fields):
00254             ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
00255         return ev_fields
00256 
00257     @staticmethod
00258     def _get_bounds(start, end, nb_values):
00259         order = "ASC"
00260 
00261         if start is None or start == ua.get_win_epoch():
00262             order = "DESC"
00263             start = ua.get_win_epoch()
00264 
00265         if end is None or end == ua.get_win_epoch():
00266             end = datetime.utcnow() + timedelta(days=1)
00267 
00268         if start < end:
00269             start_time = start.isoformat(' ')
00270             end_time = end.isoformat(' ')
00271         else:
00272             order = "DESC"
00273             start_time = end.isoformat(' ')
00274             end_time = start.isoformat(' ')
00275 
00276         if nb_values:
00277             limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
00278         else:
00279             limit = -1  # in SQLite a LIMIT of -1 returns all results
00280 
00281         return start_time, end_time, order, limit
00282 
00283     def _format_event(self, event):
00284         """
00285         Convert an event object triggered by the subscription into ordered lists for the SQL insert string
00286 
00287         Args:
00288             event: The event returned by the subscription
00289 
00290         Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
00291 
00292         """
00293         placeholders = []
00294         ev_variant_binaries = []
00295 
00296         ev_variant_dict = event.get_event_props_as_fields_dict()
00297         names = list(ev_variant_dict.keys())
00298         names.sort()  # sort alphabetically since dict is not sorted
00299 
00300         # split dict into two synchronized lists which will be converted to SQL strings
00301         # note that the variants are converted to binary objects for storing in SQL BLOB format
00302         for name in names:
00303             variant = ev_variant_dict[name]
00304             placeholders.append('?')
00305             ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
00306 
00307         return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
00308 
00309     def _get_event_columns(self, ev_fields):
00310         fields = []
00311         for field in ev_fields:
00312             fields.append(field + ' BLOB')
00313         return self._list_to_sql_str(fields, False)
00314 
00315     def _get_select_clauses(self, source_id, evfilter):
00316         s_clauses = []
00317         for select_clause in evfilter.SelectClauses:
00318             try:
00319                 if not select_clause.BrowsePath:
00320                     s_clauses.append(select_clause.Attribute.name)
00321                 else:
00322                     name = select_clause.BrowsePath[0].Name
00323                     s_clauses.append(name)
00324             except AttributeError:
00325                 self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
00326                                     ' Clause: %s:', source_id, select_clause)
00327 
00328         # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
00329         clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
00330         return clauses, self._list_to_sql_str(clauses)
00331 
00332     @staticmethod
00333     def _list_to_sql_str(ls, quotes=True):
00334         sql_str = ''
00335         for item in ls:
00336             if quotes:
00337                 sql_str += '"' + item + '", '
00338             else:
00339                 sql_str += item + ', '
00340         return sql_str[:-2]  # remove trailing space and comma for SQL syntax
00341 
00342     def stop(self):
00343         with self._lock:
00344             self._conn.close()
00345             self.logger.info('Historizing SQL connection closed')


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23