00001 import logging
00002 from datetime import timedelta
00003 from datetime import datetime
00004
00005 from opcua import Subscription
00006 from opcua import ua
00007 from opcua.common import utils
00008
00009
00010 class UaNodeAlreadyHistorizedError(ua.UaError):
00011 pass
00012
00013
00014 class HistoryStorageInterface(object):
00015
00016 """
00017 Interface of a history backend.
00018 Must be implemented by backends
00019 """
00020
00021 def new_historized_node(self, node_id, period, count=0):
00022 """
00023 Called when a new node is to be historized
00024 Returns None
00025 """
00026 raise NotImplementedError
00027
00028 def save_node_value(self, node_id, datavalue):
00029 """
00030 Called when the value of a historized node has changed and should be saved in history
00031 Returns None
00032 """
00033 raise NotImplementedError
00034
00035 def read_node_history(self, node_id, start, end, nb_values):
00036 """
00037 Called when a client make a history read request for a node
00038 if start or end is missing then nb_values is used to limit query
00039 nb_values is the max number of values to read. Ignored if 0
00040 Start time and end time are inclusive
00041 Returns a list of DataValues and a continuation point which
00042 is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
00043 """
00044 raise NotImplementedError
00045
00046 def new_historized_event(self, source_id, evtypes, period, count=0):
00047 """
00048 Called when historization of events is enabled on server side
00049 Returns None
00050 """
00051 raise NotImplementedError
00052
00053 def save_event(self, event):
00054 """
00055 Called when a new event has been generated ans should be saved in history
00056 Returns None
00057 """
00058 raise NotImplementedError
00059
00060 def read_event_history(self, source_id, start, end, nb_values, evfilter):
00061 """
00062 Called when a client make a history read request for events
00063 Start time and end time are inclusive
00064 Returns a list of Events and a continuation point which
00065 is None if all events are read or the ServerTimeStamp of the last rejected event
00066 """
00067 raise NotImplementedError
00068
00069 def stop(self):
00070 """
00071 Called when the server shuts down
00072 Can be used to close database connections etc.
00073 """
00074 raise NotImplementedError
00075
00076
00077 class HistoryDict(HistoryStorageInterface):
00078 """
00079 Very minimal history backend storing data in memory using a Python dictionary
00080 """
00081
00082 def __init__(self):
00083 self._datachanges = {}
00084 self._datachanges_period = {}
00085 self._events = {}
00086 self._events_periods = {}
00087
00088 def new_historized_node(self, node_id, period, count=0):
00089 if node_id in self._datachanges:
00090 raise UaNodeAlreadyHistorizedError(node_id)
00091 self._datachanges[node_id] = []
00092 self._datachanges_period[node_id] = period, count
00093
00094 def save_node_value(self, node_id, datavalue):
00095 data = self._datachanges[node_id]
00096 period, count = self._datachanges_period[node_id]
00097 data.append(datavalue)
00098 now = datetime.utcnow()
00099 if period:
00100 while len(data) and now - data[0].ServerTimestamp > period:
00101 data.pop(0)
00102 if count and len(data) > count:
00103 data.pop(0)
00104
00105 def read_node_history(self, node_id, start, end, nb_values):
00106 cont = None
00107 if node_id not in self._datachanges:
00108 print("Error attempt to read history for a node which is not historized")
00109 return [], cont
00110 else:
00111 if start is None:
00112 start = ua.get_win_epoch()
00113 if end is None:
00114 end = ua.get_win_epoch()
00115 if start == ua.get_win_epoch():
00116 results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
00117 elif end == ua.get_win_epoch():
00118 results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
00119 elif start > end:
00120 results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
00121
00122 else:
00123 results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
00124 if nb_values and len(results) > nb_values:
00125 cont = results[nb_values + 1].ServerTimestamp
00126 results = results[:nb_values]
00127 return results, cont
00128
00129 def new_historized_event(self, source_id, evtypes, period, count=0):
00130 if source_id in self._events:
00131 raise UaNodeAlreadyHistorizedError(source_id)
00132 self._events[source_id] = []
00133 self._events_periods[source_id] = period, count
00134
00135 def save_event(self, event):
00136 evts = self._events[event.SourceNode]
00137 evts.append(event)
00138 period, count = self._events_periods[event.SourceNode]
00139 now = datetime.utcnow()
00140 if period:
00141 while len(evts) and now - evts[0].ServerTimestamp > period:
00142 evts.pop(0)
00143 if count and len(evts) > count:
00144 evts.pop(0)
00145
00146 def read_event_history(self, source_id, start, end, nb_values, evfilter):
00147 cont = None
00148 if source_id not in self._events:
00149 print("Error attempt to read event history for a node which does not historize events")
00150 return [], cont
00151 else:
00152 if start is None:
00153 start = ua.get_win_epoch()
00154 if end is None:
00155 end = ua.get_win_epoch()
00156 if start == ua.get_win_epoch():
00157 results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
00158 elif end == ua.get_win_epoch():
00159 results = [ev for ev in self._events[source_id] if start <= ev.Time]
00160 elif start > end:
00161 results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
00162
00163 else:
00164 results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
00165 if nb_values and len(results) > nb_values:
00166 cont = results[nb_values + 1].Time
00167 results = results[:nb_values]
00168 return results, cont
00169
00170 def stop(self):
00171 pass
00172
00173
00174 class SubHandler(object):
00175 def __init__(self, storage):
00176 self.storage = storage
00177
00178 def datachange_notification(self, node, val, data):
00179 self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
00180
00181 def event_notification(self, event):
00182 self.storage.save_event(event)
00183
00184
00185 class HistoryManager(object):
00186 def __init__(self, iserver):
00187 self.logger = logging.getLogger(__name__)
00188 self.iserver = iserver
00189 self.storage = HistoryDict()
00190 self._sub = None
00191 self._handlers = {}
00192
00193 def set_storage(self, storage):
00194 """
00195 set the desired HistoryStorageInterface which History Manager will use for historizing
00196 """
00197 self.storage = storage
00198
00199 def _create_subscription(self, handler):
00200 params = ua.CreateSubscriptionParameters()
00201 params.RequestedPublishingInterval = 10
00202 params.RequestedLifetimeCount = 3000
00203 params.RequestedMaxKeepAliveCount = 10000
00204 params.MaxNotificationsPerPublish = 0
00205 params.PublishingEnabled = True
00206 params.Priority = 0
00207 return Subscription(self.iserver.isession, params, handler)
00208
00209 def historize_data_change(self, node, period=timedelta(days=7), count=0):
00210 """
00211 Subscribe to the nodes' data changes and store the data in the active storage.
00212 """
00213 if not self._sub:
00214 self._sub = self._create_subscription(SubHandler(self.storage))
00215 if node in self._handlers:
00216 raise ua.UaError("Node {0} is already historized".format(node))
00217 self.storage.new_historized_node(node.nodeid, period, count)
00218 handler = self._sub.subscribe_data_change(node)
00219 self._handlers[node] = handler
00220
00221 def historize_event(self, source, period=timedelta(days=7), count=0):
00222 """
00223 Subscribe to the source nodes' events and store the data in the active storage.
00224
00225 SQL Implementation
00226 The default is to historize every event type the source generates, custom event properties are included. At
00227 this time there is no way to historize a specific event type. The user software can filter out events which are
00228 not desired when reading.
00229
00230 Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
00231 time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
00232 must be deleted manually so that a new table with the custom event fields can be created.
00233 """
00234 if not self._sub:
00235 self._sub = self._create_subscription(SubHandler(self.storage))
00236 if source in self._handlers:
00237 raise ua.UaError("Events from {0} are already historized".format(source))
00238
00239
00240 event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
00241
00242 self.storage.new_historized_event(source.nodeid, event_types, period, count)
00243
00244 handler = self._sub.subscribe_events(source, event_types)
00245 self._handlers[source] = handler
00246
00247 def dehistorize(self, node):
00248 """
00249 Remove subscription to the node/source which is being historized
00250
00251 SQL Implementation
00252 Only the subscriptions is removed. The historical data remains.
00253 """
00254 if node in self._handlers:
00255 self._sub.unsubscribe(self._handlers[node])
00256 del(self._handlers[node])
00257 else:
00258 self.logger.error("History Manager isn't subscribed to %s", node)
00259
00260 def read_history(self, params):
00261 """
00262 Read history for a node
00263 This is the part AttributeService, but implemented as its own service
00264 since it requires more logic than other attribute service methods
00265 """
00266 results = []
00267
00268 for rv in params.NodesToRead:
00269 res = self._read_history(params.HistoryReadDetails, rv)
00270 results.append(res)
00271 return results
00272
00273 def _read_history(self, details, rv):
00274 """
00275 determine if the history read is for a data changes or events; then read the history for that node
00276 """
00277 result = ua.HistoryReadResult()
00278 if isinstance(details, ua.ReadRawModifiedDetails):
00279 if details.IsReadModified:
00280 result.HistoryData = ua.HistoryModifiedData()
00281
00282 else:
00283 result.HistoryData = ua.HistoryData()
00284 dv, cont = self._read_datavalue_history(rv, details)
00285 result.HistoryData.DataValues = dv
00286 result.ContinuationPoint = cont
00287
00288 elif isinstance(details, ua.ReadEventDetails):
00289 result.HistoryData = ua.HistoryEvent()
00290
00291
00292 ev, cont = self._read_event_history(rv, details)
00293 result.HistoryData.Events = ev
00294 result.ContinuationPoint = cont
00295
00296 else:
00297
00298 result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
00299 return result
00300
00301 def _read_datavalue_history(self, rv, details):
00302 starttime = details.StartTime
00303 if rv.ContinuationPoint:
00304
00305
00306
00307
00308 starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
00309
00310 dv, cont = self.storage.read_node_history(rv.NodeId,
00311 starttime,
00312 details.EndTime,
00313 details.NumValuesPerNode)
00314 if cont:
00315 cont = ua.ua_binary.Primitives.DateTime.pack(cont)
00316
00317
00318 return dv, cont
00319
00320 def _read_event_history(self, rv, details):
00321 starttime = details.StartTime
00322 if rv.ContinuationPoint:
00323
00324
00325
00326
00327 starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
00328
00329 evts, cont = self.storage.read_event_history(rv.NodeId,
00330 starttime,
00331 details.EndTime,
00332 details.NumValuesPerNode,
00333 details.Filter)
00334 results = []
00335 for ev in evts:
00336 field_list = ua.HistoryEventFieldList()
00337 field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
00338 results.append(field_list)
00339 if cont:
00340 cont = ua.ua_binary.Primitives.DateTime.pack(cont)
00341 return results, cont
00342
00343 def update_history(self, params):
00344 """
00345 Update history for a node
00346 This is the part AttributeService, but implemented as its own service
00347 since it requires more logic than other attribute service methods
00348 """
00349 results = []
00350 for _ in params.HistoryUpdateDetails:
00351 result = ua.HistoryUpdateResult()
00352
00353 result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
00354 results.append(results)
00355 return results
00356
00357 def stop(self):
00358 """
00359 call stop methods of active storage interface whenever the server is stopped
00360 """
00361 self.storage.stop()