history.py
Go to the documentation of this file.
00001 import logging
00002 from datetime import timedelta
00003 from datetime import datetime
00004 
00005 from opcua import Subscription
00006 from opcua import ua
00007 from opcua.common import utils
00008 
00009 
00010 class UaNodeAlreadyHistorizedError(ua.UaError):
00011     pass
00012 
00013 
00014 class HistoryStorageInterface(object):
00015 
00016     """
00017     Interface of a history backend.
00018     Must be implemented by backends
00019     """
00020 
00021     def new_historized_node(self, node_id, period, count=0):
00022         """
00023         Called when a new node is to be historized
00024         Returns None
00025         """
00026         raise NotImplementedError
00027 
00028     def save_node_value(self, node_id, datavalue):
00029         """
00030         Called when the value of a historized node has changed and should be saved in history
00031         Returns None
00032         """
00033         raise NotImplementedError
00034 
00035     def read_node_history(self, node_id, start, end, nb_values):
00036         """
00037         Called when a client make a history read request for a node
00038         if start or end is missing then nb_values is used to limit query
00039         nb_values is the max number of values to read. Ignored if 0
00040         Start time and end time are inclusive
00041         Returns a list of DataValues and a continuation point which
00042         is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
00043         """
00044         raise NotImplementedError
00045 
00046     def new_historized_event(self, source_id, evtypes, period, count=0):
00047         """
00048         Called when historization of events is enabled on server side
00049         Returns None
00050         """
00051         raise NotImplementedError
00052 
00053     def save_event(self, event):
00054         """
00055         Called when a new event has been generated ans should be saved in history
00056         Returns None
00057         """
00058         raise NotImplementedError
00059 
00060     def read_event_history(self, source_id, start, end, nb_values, evfilter):
00061         """
00062         Called when a client make a history read request for events
00063         Start time and end time are inclusive
00064         Returns a list of Events and a continuation point which
00065         is None if all events are read or the ServerTimeStamp of the last rejected event
00066         """
00067         raise NotImplementedError
00068 
00069     def stop(self):
00070         """
00071         Called when the server shuts down
00072         Can be used to close database connections etc.
00073         """
00074         raise NotImplementedError
00075 
00076 
00077 class HistoryDict(HistoryStorageInterface):
00078     """
00079     Very minimal history backend storing data in memory using a Python dictionary
00080     """
00081 
00082     def __init__(self):
00083         self._datachanges = {}
00084         self._datachanges_period = {}
00085         self._events = {}
00086         self._events_periods = {}
00087 
00088     def new_historized_node(self, node_id, period, count=0):
00089         if node_id in self._datachanges:
00090             raise UaNodeAlreadyHistorizedError(node_id)
00091         self._datachanges[node_id] = []
00092         self._datachanges_period[node_id] = period, count
00093 
00094     def save_node_value(self, node_id, datavalue):
00095         data = self._datachanges[node_id]
00096         period, count = self._datachanges_period[node_id]
00097         data.append(datavalue)
00098         now = datetime.utcnow()
00099         if period:
00100             while len(data) and now - data[0].ServerTimestamp > period:
00101                 data.pop(0)
00102         if count and len(data) > count:
00103             data.pop(0)
00104 
00105     def read_node_history(self, node_id, start, end, nb_values):
00106         cont = None
00107         if node_id not in self._datachanges:
00108             print("Error attempt to read history for a node which is not historized")
00109             return [], cont
00110         else:
00111             if start is None:
00112                 start = ua.get_win_epoch()
00113             if end is None:
00114                 end = ua.get_win_epoch()
00115             if start == ua.get_win_epoch():
00116                 results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
00117             elif end == ua.get_win_epoch():
00118                 results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
00119             elif start > end:
00120                 results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
00121 
00122             else:
00123                 results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
00124             if nb_values and len(results) > nb_values:
00125                 cont = results[nb_values + 1].ServerTimestamp
00126                 results = results[:nb_values]
00127             return results, cont
00128 
00129     def new_historized_event(self, source_id, evtypes, period, count=0):
00130         if source_id in self._events:
00131             raise UaNodeAlreadyHistorizedError(source_id)
00132         self._events[source_id] = []
00133         self._events_periods[source_id] = period, count
00134 
00135     def save_event(self, event):
00136         evts = self._events[event.SourceNode]
00137         evts.append(event)
00138         period, count = self._events_periods[event.SourceNode]
00139         now = datetime.utcnow()
00140         if period:
00141             while len(evts) and now - evts[0].ServerTimestamp > period:
00142                 evts.pop(0)
00143         if count and len(evts) > count:
00144             evts.pop(0)
00145 
00146     def read_event_history(self, source_id, start, end, nb_values, evfilter):
00147         cont = None
00148         if source_id not in self._events:
00149             print("Error attempt to read event history for a node which does not historize events")
00150             return [], cont
00151         else:
00152             if start is None:
00153                 start = ua.get_win_epoch()
00154             if end is None:
00155                 end = ua.get_win_epoch()
00156             if start == ua.get_win_epoch():
00157                 results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
00158             elif end == ua.get_win_epoch():
00159                 results = [ev for ev in self._events[source_id] if start <= ev.Time]
00160             elif start > end:
00161                 results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
00162 
00163             else:
00164                 results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
00165             if nb_values and len(results) > nb_values:
00166                 cont = results[nb_values + 1].Time
00167                 results = results[:nb_values]
00168             return results, cont
00169 
00170     def stop(self):
00171         pass
00172 
00173 
00174 class SubHandler(object):
00175     def __init__(self, storage):
00176         self.storage = storage
00177 
00178     def datachange_notification(self, node, val, data):
00179         self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
00180 
00181     def event_notification(self, event):
00182         self.storage.save_event(event)
00183 
00184 
00185 class HistoryManager(object):
00186     def __init__(self, iserver):
00187         self.logger = logging.getLogger(__name__)
00188         self.iserver = iserver
00189         self.storage = HistoryDict()
00190         self._sub = None
00191         self._handlers = {}
00192 
00193     def set_storage(self, storage):
00194         """
00195         set the desired HistoryStorageInterface which History Manager will use for historizing
00196         """
00197         self.storage = storage
00198 
00199     def _create_subscription(self, handler):
00200         params = ua.CreateSubscriptionParameters()
00201         params.RequestedPublishingInterval = 10
00202         params.RequestedLifetimeCount = 3000
00203         params.RequestedMaxKeepAliveCount = 10000
00204         params.MaxNotificationsPerPublish = 0
00205         params.PublishingEnabled = True
00206         params.Priority = 0
00207         return Subscription(self.iserver.isession, params, handler)
00208 
00209     def historize_data_change(self, node, period=timedelta(days=7), count=0):
00210         """
00211         Subscribe to the nodes' data changes and store the data in the active storage.
00212         """
00213         if not self._sub:
00214             self._sub = self._create_subscription(SubHandler(self.storage))
00215         if node in self._handlers:
00216             raise ua.UaError("Node {0} is already historized".format(node))
00217         self.storage.new_historized_node(node.nodeid, period, count)
00218         handler = self._sub.subscribe_data_change(node)
00219         self._handlers[node] = handler
00220 
00221     def historize_event(self, source, period=timedelta(days=7), count=0):
00222         """
00223         Subscribe to the source nodes' events and store the data in the active storage.
00224 
00225         SQL Implementation
00226         The default is to historize every event type the source generates, custom event properties are included. At
00227         this time there is no way to historize a specific event type. The user software can filter out events which are
00228         not desired when reading.
00229 
00230         Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
00231         time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
00232         must be deleted manually so that a new table with the custom event fields can be created.
00233         """
00234         if not self._sub:
00235             self._sub = self._create_subscription(SubHandler(self.storage))
00236         if source in self._handlers:
00237             raise ua.UaError("Events from {0} are already historized".format(source))
00238 
00239         # get list of all event types that the source node generates; change this to only historize specific events
00240         event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
00241 
00242         self.storage.new_historized_event(source.nodeid, event_types, period, count)
00243 
00244         handler = self._sub.subscribe_events(source, event_types)
00245         self._handlers[source] = handler
00246 
00247     def dehistorize(self, node):
00248         """
00249         Remove subscription to the node/source which is being historized
00250 
00251         SQL Implementation
00252         Only the subscriptions is removed. The historical data remains.
00253         """
00254         if node in self._handlers:
00255             self._sub.unsubscribe(self._handlers[node])
00256             del(self._handlers[node])
00257         else:
00258             self.logger.error("History Manager isn't subscribed to %s", node)
00259 
00260     def read_history(self, params):
00261         """
00262         Read history for a node
00263         This is the part AttributeService, but implemented as its own service
00264         since it requires more logic than other attribute service methods
00265         """
00266         results = []
00267 
00268         for rv in params.NodesToRead:
00269             res = self._read_history(params.HistoryReadDetails, rv)
00270             results.append(res)
00271         return results
00272 
00273     def _read_history(self, details, rv):
00274         """
00275         determine if the history read is for a data changes or events; then read the history for that node
00276         """
00277         result = ua.HistoryReadResult()
00278         if isinstance(details, ua.ReadRawModifiedDetails):
00279             if details.IsReadModified:
00280                 result.HistoryData = ua.HistoryModifiedData()
00281                 # we do not support modified history by design so we return what we have
00282             else:
00283                 result.HistoryData = ua.HistoryData()
00284             dv, cont = self._read_datavalue_history(rv, details)
00285             result.HistoryData.DataValues = dv
00286             result.ContinuationPoint = cont
00287 
00288         elif isinstance(details, ua.ReadEventDetails):
00289             result.HistoryData = ua.HistoryEvent()
00290             # FIXME: filter is a cumbersome type, maybe transform it something easier
00291             # to handle for storage
00292             ev, cont = self._read_event_history(rv, details)
00293             result.HistoryData.Events = ev
00294             result.ContinuationPoint = cont
00295 
00296         else:
00297             # we do not currently support the other types, clients can process data themselves
00298             result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
00299         return result
00300 
00301     def _read_datavalue_history(self, rv, details):
00302         starttime = details.StartTime
00303         if rv.ContinuationPoint:
00304             # Spec says we should ignore details if cont point is present
00305             # but they also say we can use cont point as timestamp to enable stateless
00306             # implementation. This is contradictory, so we assume details is
00307             # send correctly with continuation point
00308             starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
00309 
00310         dv, cont = self.storage.read_node_history(rv.NodeId,
00311                                                   starttime,
00312                                                   details.EndTime,
00313                                                   details.NumValuesPerNode)
00314         if cont:
00315             cont = ua.ua_binary.Primitives.DateTime.pack(cont)
00316         # rv.IndexRange
00317         # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
00318         return dv, cont
00319 
00320     def _read_event_history(self, rv, details):
00321         starttime = details.StartTime
00322         if rv.ContinuationPoint:
00323             # Spec says we should ignore details if cont point is present
00324             # but they also say we can use cont point as timestamp to enable stateless
00325             # implementation. This is contradictory, so we assume details is
00326             # send correctly with continuation point
00327             starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
00328 
00329         evts, cont = self.storage.read_event_history(rv.NodeId,
00330                                                      starttime,
00331                                                      details.EndTime,
00332                                                      details.NumValuesPerNode,
00333                                                      details.Filter)
00334         results = []
00335         for ev in evts:
00336             field_list = ua.HistoryEventFieldList()
00337             field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
00338             results.append(field_list)
00339         if cont:
00340             cont = ua.ua_binary.Primitives.DateTime.pack(cont)
00341         return results, cont
00342 
00343     def update_history(self, params):
00344         """
00345         Update history for a node
00346         This is the part AttributeService, but implemented as its own service
00347         since it requires more logic than other attribute service methods
00348         """
00349         results = []
00350         for _ in params.HistoryUpdateDetails:
00351             result = ua.HistoryUpdateResult()
00352             # we do not accept to rewrite history
00353             result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
00354             results.append(results)
00355         return results
00356 
00357     def stop(self):
00358         """
00359         call stop methods of active storage interface whenever the server is stopped
00360         """
00361         self.storage.stop()


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23