history.py
Go to the documentation of this file.
1 import logging
2 from datetime import timedelta
3 from datetime import datetime
4 
5 from opcua import Subscription
6 from opcua import ua
7 from opcua.common import utils
8 
9 
10 class UaNodeAlreadyHistorizedError(ua.UaError):
11  pass
12 
13 
15 
16  """
17  Interface of a history backend.
18  Must be implemented by backends
19  """
20 
21  def new_historized_node(self, node_id, period, count=0):
22  """
23  Called when a new node is to be historized
24  Returns None
25  """
26  raise NotImplementedError
27 
28  def save_node_value(self, node_id, datavalue):
29  """
30  Called when the value of a historized node has changed and should be saved in history
31  Returns None
32  """
33  raise NotImplementedError
34 
35  def read_node_history(self, node_id, start, end, nb_values):
36  """
37  Called when a client make a history read request for a node
38  if start or end is missing then nb_values is used to limit query
39  nb_values is the max number of values to read. Ignored if 0
40  Start time and end time are inclusive
41  Returns a list of DataValues and a continuation point which
42  is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
43  """
44  raise NotImplementedError
45 
46  def new_historized_event(self, source_id, evtypes, period, count=0):
47  """
48  Called when historization of events is enabled on server side
49  Returns None
50  """
51  raise NotImplementedError
52 
53  def save_event(self, event):
54  """
55  Called when a new event has been generated ans should be saved in history
56  Returns None
57  """
58  raise NotImplementedError
59 
60  def read_event_history(self, source_id, start, end, nb_values, evfilter):
61  """
62  Called when a client make a history read request for events
63  Start time and end time are inclusive
64  Returns a list of Events and a continuation point which
65  is None if all events are read or the ServerTimeStamp of the last rejected event
66  """
67  raise NotImplementedError
68 
69  def stop(self):
70  """
71  Called when the server shuts down
72  Can be used to close database connections etc.
73  """
74  raise NotImplementedError
75 
76 
78  """
79  Very minimal history backend storing data in memory using a Python dictionary
80  """
81 
82  def __init__(self):
83  self._datachanges = {}
85  self._events = {}
86  self._events_periods = {}
87 
88  def new_historized_node(self, node_id, period, count=0):
89  if node_id in self._datachanges:
90  raise UaNodeAlreadyHistorizedError(node_id)
91  self._datachanges[node_id] = []
92  self._datachanges_period[node_id] = period, count
93 
94  def save_node_value(self, node_id, datavalue):
95  data = self._datachanges[node_id]
96  period, count = self._datachanges_period[node_id]
97  data.append(datavalue)
98  now = datetime.utcnow()
99  if period:
100  while len(data) and now - data[0].ServerTimestamp > period:
101  data.pop(0)
102  if count and len(data) > count:
103  data.pop(0)
104 
105  def read_node_history(self, node_id, start, end, nb_values):
106  cont = None
107  if node_id not in self._datachanges:
108  print("Error attempt to read history for a node which is not historized")
109  return [], cont
110  else:
111  if start is None:
112  start = ua.get_win_epoch()
113  if end is None:
114  end = ua.get_win_epoch()
115  if start == ua.get_win_epoch():
116  results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
117  elif end == ua.get_win_epoch():
118  results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
119  elif start > end:
120  results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
121 
122  else:
123  results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
124  if nb_values and len(results) > nb_values:
125  cont = results[nb_values + 1].ServerTimestamp
126  results = results[:nb_values]
127  return results, cont
128 
129  def new_historized_event(self, source_id, evtypes, period, count=0):
130  if source_id in self._events:
131  raise UaNodeAlreadyHistorizedError(source_id)
132  self._events[source_id] = []
133  self._events_periods[source_id] = period, count
134 
135  def save_event(self, event):
136  evts = self._events[event.SourceNode]
137  evts.append(event)
138  period, count = self._events_periods[event.SourceNode]
139  now = datetime.utcnow()
140  if period:
141  while len(evts) and now - evts[0].ServerTimestamp > period:
142  evts.pop(0)
143  if count and len(evts) > count:
144  evts.pop(0)
145 
146  def read_event_history(self, source_id, start, end, nb_values, evfilter):
147  cont = None
148  if source_id not in self._events:
149  print("Error attempt to read event history for a node which does not historize events")
150  return [], cont
151  else:
152  if start is None:
153  start = ua.get_win_epoch()
154  if end is None:
155  end = ua.get_win_epoch()
156  if start == ua.get_win_epoch():
157  results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
158  elif end == ua.get_win_epoch():
159  results = [ev for ev in self._events[source_id] if start <= ev.Time]
160  elif start > end:
161  results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
162 
163  else:
164  results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
165  if nb_values and len(results) > nb_values:
166  cont = results[nb_values + 1].Time
167  results = results[:nb_values]
168  return results, cont
169 
170  def stop(self):
171  pass
172 
173 
174 class SubHandler(object):
175  def __init__(self, storage):
176  self.storage = storage
177 
178  def datachange_notification(self, node, val, data):
179  self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
180 
181  def event_notification(self, event):
182  self.storage.save_event(event)
183 
184 
185 class HistoryManager(object):
186  def __init__(self, iserver):
187  self.logger = logging.getLogger(__name__)
188  self.iserver = iserver
190  self._sub = None
191  self._handlers = {}
192 
193  def set_storage(self, storage):
194  """
195  set the desired HistoryStorageInterface which History Manager will use for historizing
196  """
197  self.storage = storage
198 
199  def _create_subscription(self, handler):
201  params.RequestedPublishingInterval = 10
202  params.RequestedLifetimeCount = 3000
203  params.RequestedMaxKeepAliveCount = 10000
204  params.MaxNotificationsPerPublish = 0
205  params.PublishingEnabled = True
206  params.Priority = 0
207  return Subscription(self.iserver.isession, params, handler)
208 
209  def historize_data_change(self, node, period=timedelta(days=7), count=0):
210  """
211  Subscribe to the nodes' data changes and store the data in the active storage.
212  """
213  if not self._sub:
214  self._sub = self._create_subscription(SubHandler(self.storage))
215  if node in self._handlers:
216  raise ua.UaError("Node {0} is already historized".format(node))
217  self.storage.new_historized_node(node.nodeid, period, count)
218  handler = self._sub.subscribe_data_change(node)
219  self._handlers[node] = handler
220 
221  def historize_event(self, source, period=timedelta(days=7), count=0):
222  """
223  Subscribe to the source nodes' events and store the data in the active storage.
224 
225  SQL Implementation
226  The default is to historize every event type the source generates, custom event properties are included. At
227  this time there is no way to historize a specific event type. The user software can filter out events which are
228  not desired when reading.
229 
230  Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
231  time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
232  must be deleted manually so that a new table with the custom event fields can be created.
233  """
234  if not self._sub:
235  self._sub = self._create_subscription(SubHandler(self.storage))
236  if source in self._handlers:
237  raise ua.UaError("Events from {0} are already historized".format(source))
238 
239  # get list of all event types that the source node generates; change this to only historize specific events
240  event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
241 
242  self.storage.new_historized_event(source.nodeid, event_types, period, count)
243 
244  handler = self._sub.subscribe_events(source, event_types)
245  self._handlers[source] = handler
246 
247  def dehistorize(self, node):
248  """
249  Remove subscription to the node/source which is being historized
250 
251  SQL Implementation
252  Only the subscriptions is removed. The historical data remains.
253  """
254  if node in self._handlers:
255  self._sub.unsubscribe(self._handlers[node])
256  del(self._handlers[node])
257  else:
258  self.logger.error("History Manager isn't subscribed to %s", node)
259 
260  def read_history(self, params):
261  """
262  Read history for a node
263  This is the part AttributeService, but implemented as its own service
264  since it requires more logic than other attribute service methods
265  """
266  results = []
267 
268  for rv in params.NodesToRead:
269  res = self._read_history(params.HistoryReadDetails, rv)
270  results.append(res)
271  return results
272 
273  def _read_history(self, details, rv):
274  """
275  determine if the history read is for a data changes or events; then read the history for that node
276  """
277  result = ua.HistoryReadResult()
278  if isinstance(details, ua.ReadRawModifiedDetails):
279  if details.IsReadModified:
280  result.HistoryData = ua.HistoryModifiedData()
281  # we do not support modified history by design so we return what we have
282  else:
283  result.HistoryData = ua.HistoryData()
284  dv, cont = self._read_datavalue_history(rv, details)
285  result.HistoryData.DataValues = dv
286  result.ContinuationPoint = cont
287 
288  elif isinstance(details, ua.ReadEventDetails):
289  result.HistoryData = ua.HistoryEvent()
290  # FIXME: filter is a cumbersome type, maybe transform it something easier
291  # to handle for storage
292  ev, cont = self._read_event_history(rv, details)
293  result.HistoryData.Events = ev
294  result.ContinuationPoint = cont
295 
296  else:
297  # we do not currently support the other types, clients can process data themselves
298  result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
299  return result
300 
301  def _read_datavalue_history(self, rv, details):
302  starttime = details.StartTime
303  if rv.ContinuationPoint:
304  # Spec says we should ignore details if cont point is present
305  # but they also say we can use cont point as timestamp to enable stateless
306  # implementation. This is contradictory, so we assume details is
307  # send correctly with continuation point
308  starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
309 
310  dv, cont = self.storage.read_node_history(rv.NodeId,
311  starttime,
312  details.EndTime,
313  details.NumValuesPerNode)
314  if cont:
315  cont = ua.ua_binary.Primitives.DateTime.pack(cont)
316  # rv.IndexRange
317  # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
318  return dv, cont
319 
320  def _read_event_history(self, rv, details):
321  starttime = details.StartTime
322  if rv.ContinuationPoint:
323  # Spec says we should ignore details if cont point is present
324  # but they also say we can use cont point as timestamp to enable stateless
325  # implementation. This is contradictory, so we assume details is
326  # send correctly with continuation point
327  starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
328 
329  evts, cont = self.storage.read_event_history(rv.NodeId,
330  starttime,
331  details.EndTime,
332  details.NumValuesPerNode,
333  details.Filter)
334  results = []
335  for ev in evts:
336  field_list = ua.HistoryEventFieldList()
337  field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
338  results.append(field_list)
339  if cont:
340  cont = ua.ua_binary.Primitives.DateTime.pack(cont)
341  return results, cont
342 
343  def update_history(self, params):
344  """
345  Update history for a node
346  This is the part AttributeService, but implemented as its own service
347  since it requires more logic than other attribute service methods
348  """
349  results = []
350  for _ in params.HistoryUpdateDetails:
351  result = ua.HistoryUpdateResult()
352  # we do not accept to rewrite history
353  result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
354  results.append(results)
355  return results
356 
357  def stop(self):
358  """
359  call stop methods of active storage interface whenever the server is stopped
360  """
361  self.storage.stop()
def new_historized_event(self, source_id, evtypes, period, count=0)
Definition: history.py:46
def update_history(self, params)
Definition: history.py:343
def read_event_history(self, source_id, start, end, nb_values, evfilter)
Definition: history.py:146
def new_historized_node(self, node_id, period, count=0)
Definition: history.py:21
def event_notification(self, event)
Definition: history.py:181
def historize_event(self, source, period=timedelta(days=7), count=0)
Definition: history.py:221
def set_storage(self, storage)
Definition: history.py:193
def new_historized_event(self, source_id, evtypes, period, count=0)
Definition: history.py:129
def save_event(self, event)
Definition: history.py:135
def __init__(self, iserver)
Definition: history.py:186
def read_event_history(self, source_id, start, end, nb_values, evfilter)
Definition: history.py:60
def _read_history(self, details, rv)
Definition: history.py:273
def _read_datavalue_history(self, rv, details)
Definition: history.py:301
def read_node_history(self, node_id, start, end, nb_values)
Definition: history.py:35
def datachange_notification(self, node, val, data)
Definition: history.py:178
def _create_subscription(self, handler)
Definition: history.py:199
def new_historized_node(self, node_id, period, count=0)
Definition: history.py:88
def __init__(self, storage)
Definition: history.py:175
def read_node_history(self, node_id, start, end, nb_values)
Definition: history.py:105
def read_history(self, params)
Definition: history.py:260
def _read_event_history(self, rv, details)
Definition: history.py:320
def historize_data_change(self, node, period=timedelta(days=7), count=0)
Definition: history.py:209
def save_node_value(self, node_id, datavalue)
Definition: history.py:94
def save_node_value(self, node_id, datavalue)
Definition: history.py:28


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44