2 from datetime
import timedelta
3 from datetime
import datetime
5 from opcua
import Subscription
17 Interface of a history backend. 18 Must be implemented by backends 23 Called when a new node is to be historized 26 raise NotImplementedError
30 Called when the value of a historized node has changed and should be saved in history 33 raise NotImplementedError
37 Called when a client make a history read request for a node 38 if start or end is missing then nb_values is used to limit query 39 nb_values is the max number of values to read. Ignored if 0 40 Start time and end time are inclusive 41 Returns a list of DataValues and a continuation point which 42 is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue 44 raise NotImplementedError
48 Called when historization of events is enabled on server side 51 raise NotImplementedError
55 Called when a new event has been generated ans should be saved in history 58 raise NotImplementedError
62 Called when a client make a history read request for events 63 Start time and end time are inclusive 64 Returns a list of Events and a continuation point which 65 is None if all events are read or the ServerTimeStamp of the last rejected event 67 raise NotImplementedError
71 Called when the server shuts down 72 Can be used to close database connections etc. 74 raise NotImplementedError
79 Very minimal history backend storing data in memory using a Python dictionary 97 data.append(datavalue)
98 now = datetime.utcnow()
100 while len(data)
and now - data[0].ServerTimestamp > period:
102 if count
and len(data) > count:
108 print(
"Error attempt to read history for a node which is not historized")
112 start = ua.get_win_epoch()
114 end = ua.get_win_epoch()
115 if start == ua.get_win_epoch():
116 results = [dv
for dv
in reversed(self.
_datachanges[node_id])
if start <= dv.ServerTimestamp]
117 elif end == ua.get_win_epoch():
118 results = [dv
for dv
in self.
_datachanges[node_id]
if start <= dv.ServerTimestamp]
120 results = [dv
for dv
in reversed(self.
_datachanges[node_id])
if end <= dv.ServerTimestamp <= start]
123 results = [dv
for dv
in self.
_datachanges[node_id]
if start <= dv.ServerTimestamp <= end]
124 if nb_values
and len(results) > nb_values:
125 cont = results[nb_values + 1].ServerTimestamp
126 results = results[:nb_values]
136 evts = self.
_events[event.SourceNode]
139 now = datetime.utcnow()
141 while len(evts)
and now - evts[0].ServerTimestamp > period:
143 if count
and len(evts) > count:
148 if source_id
not in self.
_events:
149 print(
"Error attempt to read event history for a node which does not historize events")
153 start = ua.get_win_epoch()
155 end = ua.get_win_epoch()
156 if start == ua.get_win_epoch():
157 results = [ev
for ev
in reversed(self.
_events[source_id])
if start <= ev.Time]
158 elif end == ua.get_win_epoch():
159 results = [ev
for ev
in self.
_events[source_id]
if start <= ev.Time]
161 results = [ev
for ev
in reversed(self.
_events[source_id])
if end <= ev.Time <= start]
164 results = [ev
for ev
in self.
_events[source_id]
if start <= ev.Time <= end]
165 if nb_values
and len(results) > nb_values:
166 cont = results[nb_values + 1].Time
167 results = results[:nb_values]
174 class SubHandler(object):
179 self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
182 self.storage.save_event(event)
187 self.
logger = logging.getLogger(__name__)
195 set the desired HistoryStorageInterface which History Manager will use for historizing 201 params.RequestedPublishingInterval = 10
202 params.RequestedLifetimeCount = 3000
203 params.RequestedMaxKeepAliveCount = 10000
204 params.MaxNotificationsPerPublish = 0
205 params.PublishingEnabled =
True 207 return Subscription(self.iserver.isession, params, handler)
211 Subscribe to the nodes' data changes and store the data in the active storage. 216 raise ua.UaError(
"Node {0} is already historized".format(node))
217 self.storage.new_historized_node(node.nodeid, period, count)
218 handler = self._sub.subscribe_data_change(node)
223 Subscribe to the source nodes' events and store the data in the active storage. 226 The default is to historize every event type the source generates, custom event properties are included. At 227 this time there is no way to historize a specific event type. The user software can filter out events which are 228 not desired when reading. 230 Note that adding custom events to a source node AFTER historizing has been activated is not supported at this 231 time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table 232 must be deleted manually so that a new table with the custom event fields can be created. 237 raise ua.UaError(
"Events from {0} are already historized".format(source))
240 event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
242 self.storage.new_historized_event(source.nodeid, event_types, period, count)
244 handler = self._sub.subscribe_events(source, event_types)
249 Remove subscription to the node/source which is being historized 252 Only the subscriptions is removed. The historical data remains. 255 self._sub.unsubscribe(self.
_handlers[node])
258 self.logger.error(
"History Manager isn't subscribed to %s", node)
262 Read history for a node 263 This is the part AttributeService, but implemented as its own service 264 since it requires more logic than other attribute service methods 268 for rv
in params.NodesToRead:
275 determine if the history read is for a data changes or events; then read the history for that node 279 if details.IsReadModified:
285 result.HistoryData.DataValues = dv
286 result.ContinuationPoint = cont
293 result.HistoryData.Events = ev
294 result.ContinuationPoint = cont
298 result.StatusCode =
ua.StatusCode(ua.StatusCodes.BadNotImplemented)
302 starttime = details.StartTime
303 if rv.ContinuationPoint:
308 starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
310 dv, cont = self.storage.read_node_history(rv.NodeId,
313 details.NumValuesPerNode)
315 cont = ua.ua_binary.Primitives.DateTime.pack(cont)
321 starttime = details.StartTime
322 if rv.ContinuationPoint:
327 starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
329 evts, cont = self.storage.read_event_history(rv.NodeId,
332 details.NumValuesPerNode,
337 field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
338 results.append(field_list)
340 cont = ua.ua_binary.Primitives.DateTime.pack(cont)
345 Update history for a node 346 This is the part AttributeService, but implemented as its own service 347 since it requires more logic than other attribute service methods 350 for _
in params.HistoryUpdateDetails:
353 result.StatusCode =
ua.StatusCode(ua.StatusCodes.BadNotWritable)
354 results.append(results)
359 call stop methods of active storage interface whenever the server is stopped
def new_historized_event(self, source_id, evtypes, period, count=0)
def update_history(self, params)
def read_event_history(self, source_id, start, end, nb_values, evfilter)
def new_historized_node(self, node_id, period, count=0)
def event_notification(self, event)
def historize_event(self, source, period=timedelta(days=7), count=0)
def set_storage(self, storage)
def new_historized_event(self, source_id, evtypes, period, count=0)
def save_event(self, event)
def __init__(self, iserver)
def read_event_history(self, source_id, start, end, nb_values, evfilter)
def _read_history(self, details, rv)
def _read_datavalue_history(self, rv, details)
def read_node_history(self, node_id, start, end, nb_values)
def datachange_notification(self, node, val, data)
def _create_subscription(self, handler)
def new_historized_node(self, node_id, period, count=0)
def __init__(self, storage)
def read_node_history(self, node_id, start, end, nb_values)
def read_history(self, params)
def _read_event_history(self, rv, details)
def save_event(self, event)
def dehistorize(self, node)
def historize_data_change(self, node, period=timedelta(days=7), count=0)
def save_node_value(self, node_id, datavalue)
def save_node_value(self, node_id, datavalue)