internal_subscription.py
Go to the documentation of this file.
1 """
2 server side implementation of a subscription object
3 """
4 
5 from threading import RLock
6 import logging
7 # import copy
8 # import traceback
9 
10 from opcua import ua
11 
12 
13 class MonitoredItemData(object):
14 
15  def __init__(self):
16  self.client_handle = None
17  self.callback_handle = None
18  self.monitored_item_id = None
19  self.mode = None
20  self.filter = None
23  self.queue_size = 0
24 
25 
26 class MonitoredItemValues(object):
27 
28  def __init__(self):
29  self.current_value = None
30  self.old_value = None
31 
32  def set_current_value(self, cur_val):
33  self.old_value = self.current_value
34  self.current_value = cur_val
35 
36  def get_current_value(self):
37  return self.current_value
38 
39  def get_old_value(self):
40  return self.old_value
41 
42 
43 class MonitoredItemService(object):
44 
45  """
46  implement monitoreditem service for 1 subscription
47  """
48 
49  def __init__(self, isub, aspace):
50  self.logger = logging.getLogger(__name__ + "." + str(isub.data.SubscriptionId))
51  self.isub = isub
52  self.aspace = aspace
53  self._lock = RLock()
54  self._monitored_items = {}
58 
60  self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
61 
62  def create_monitored_items(self, params):
63  results = []
64  for item in params.ItemsToCreate:
65  with self._lock:
66  if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
67  result = self._create_events_monitored_item(item)
68  else:
69  result = self._create_data_change_monitored_item(item)
70  results.append(result)
71  return results
72 
73  def modify_monitored_items(self, params):
74  results = []
75  for item in params.ItemsToModify:
76  results.append(self._modify_monitored_item(item))
77  return results
78 
79  def trigger_datachange(self, handle, nodeid, attr):
80  self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
81  variant = self.aspace.get_attribute_value(nodeid, attr)
82  self.datachange_callback(handle, variant)
83 
84  def _modify_monitored_item(self, params):
85  with self._lock:
86  for mdata in self._monitored_items.values():
88  if mdata.monitored_item_id == params.MonitoredItemId:
89  result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
90  result.RevisedQueueSize = params.RequestedParameters.QueueSize
91  if params.RequestedParameters.Filter is not None:
92  mdata.filter = params.RequestedParameters.Filter
93  mdata.queue_size = params.RequestedParameters.QueueSize
94  return result
96  result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
97  return result
98 
99  def _commit_monitored_item(self, result, mdata):
100  if result.StatusCode.is_good():
101  self._monitored_items[result.MonitoredItemId] = mdata
102  self._monitored_item_counter += 1
103 
104  def _make_monitored_item_common(self, params):
106  result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
107  result.RevisedQueueSize = params.RequestedParameters.QueueSize
108  self._monitored_item_counter += 1
109  result.MonitoredItemId = self._monitored_item_counter
110  self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
111 
112  mdata = MonitoredItemData()
113  mdata.mode = params.MonitoringMode
114  mdata.client_handle = params.RequestedParameters.ClientHandle
115  mdata.monitored_item_id = result.MonitoredItemId
116  mdata.queue_size = params.RequestedParameters.QueueSize
117  mdata.filter = params.RequestedParameters.Filter
118 
119  return result, mdata
120 
121  def _create_events_monitored_item(self, params):
122  self.logger.info("request to subscribe to events for node %s and attribute %s",
123  params.ItemToMonitor.NodeId,
124  params.ItemToMonitor.AttributeId)
125 
126  result, mdata = self._make_monitored_item_common(params)
127  ev_notify_byte = self.aspace.get_attribute_value(
128  params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
129 
130  if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
131  result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
132  return result
133  # result.FilterResult = ua.EventFilterResult() # spec says we can ignore if not error
134  mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
135  self._commit_monitored_item(result, mdata)
136  if params.ItemToMonitor.NodeId not in self._monitored_events:
137  self._monitored_events[params.ItemToMonitor.NodeId] = []
138  self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
139  return result
140 
142  self.logger.info("request to subscribe to datachange for node %s and attribute %s",
143  params.ItemToMonitor.NodeId,
144  params.ItemToMonitor.AttributeId)
145 
146  result, mdata = self._make_monitored_item_common(params)
147  result.FilterResult = params.RequestedParameters.Filter
148  result.StatusCode, handle = self.aspace.add_datachange_callback(
149  params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
150 
151  self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
152  mdata.callback_handle = handle
153  self._commit_monitored_item(result, mdata)
154  if result.StatusCode.is_good():
155  self._monitored_datachange[handle] = result.MonitoredItemId
156  # force data change event generation
157  self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
158  return result
159 
160  def delete_monitored_items(self, ids):
161  self.logger.debug("delete monitored items %s", ids)
162  with self._lock:
163  results = []
164  for mid in ids:
165  results.append(self._delete_monitored_items(mid))
166  return results
167 
168  def _delete_monitored_items(self, mid):
169  if mid not in self._monitored_items:
170  return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
171  for k, v in self._monitored_events.items():
172  if mid in v:
173  v.remove(mid)
174  if not v:
175  self._monitored_events.pop(k)
176  break
177  for k, v in self._monitored_datachange.items():
178  if v == mid:
179  self.aspace.delete_datachange_callback(k)
180  self._monitored_datachange.pop(k)
181  break
182  self._monitored_items.pop(mid)
183  return ua.StatusCode()
184 
185  def datachange_callback(self, handle, value, error=None):
186  if error:
187  self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'", self,
188  handle, error)
189  self.trigger_statuschange(error)
190  else:
191  self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
192  handle, value.Value)
194  with self._lock:
195  mid = self._monitored_datachange[handle]
196  mdata = self._monitored_items[mid]
197  mdata.mvalue.set_current_value(value.Value.Value)
198  if mdata.filter is not None:
199  deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
200  else:
201  deadband_flag_pass = True
202  if deadband_flag_pass:
203  event.ClientHandle = mdata.client_handle
204  event.Value = value
205  self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
206 
207  def deadband_callback(self, values, flt):
208  if (values.get_old_value() is None) or \
209  ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
210  return True
211  else:
212  return False
213 
214  def trigger_event(self, event):
215  with self._lock:
216  if event.SourceNode not in self._monitored_events:
217  self.logger.debug("%s has no subscription for events %s from node: %s",
218  self, event, event.SourceNode)
219  return False
220  self.logger.debug("%s has subscription for events %s from node: %s",
221  self, event, event.SourceNode)
222  mids = self._monitored_events[event.SourceNode]
223  for mid in mids:
224  self._trigger_event(event, mid)
225 
226  def _trigger_event(self, event, mid):
227  if mid not in self._monitored_items:
228  self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s",
229  mid, event, self)
230  return
231  mdata = self._monitored_items[mid]
232  if not mdata.where_clause_evaluator.eval(event):
233  self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
234  return
235  fieldlist = ua.EventFieldList()
236  fieldlist.ClientHandle = mdata.client_handle
237  fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
238  self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
239 
240  def trigger_statuschange(self, code):
241  self.isub.enqueue_statuschange(code)
242 
243 
244 class InternalSubscription(object):
245 
246  def __init__(self, subservice, data, addressspace, callback):
247  self.logger = logging.getLogger(__name__)
248  self.aspace = addressspace
249  self.subservice = subservice
250  self.data = data
251  self.callback = callback
252  self.monitored_item_srv = MonitoredItemService(self, addressspace)
253  self.task = None
254  self._lock = RLock()
260  self._startup = True
263  self._stopev = False
264 
265  def __str__(self):
266  return "Subscription(id:{0})".format(self.data.SubscriptionId)
267 
268  def start(self):
269  self.logger.debug("starting subscription %s", self.data.SubscriptionId)
270  self._subscription_loop()
271 
272  def stop(self):
273  self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
274  self._stopev = True
275  self.monitored_item_srv.delete_all_monitored_items()
276 
278  if not self._stopev:
279  self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
280 
281  def _sub_loop(self):
282  if self._stopev:
283  return
284  self.publish_results()
285  self._subscription_loop()
286 
288  with self._lock:
289  if self._startup or self._triggered_datachanges or self._triggered_events:
290  return True
291  if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
292  self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
293  self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
294  return True
295  self._keep_alive_count += 1
296  return False
297 
298  def publish_results(self):
299  if self._publish_cycles_count > self.data.RevisedLifetimeCount:
300  self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)",
301  self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
302  # FIXME this will never be send since we do not have publish request anyway
303  self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
304  self._stopev = True
305  result = None
306  with self._lock:
307  if self.has_published_results():
308  # FIXME: should we pop a publish request here? or we do not care?
309  self._publish_cycles_count += 1
310  result = self._pop_publish_result()
311  if result is not None:
312  self.callback(result)
313 
315  result = ua.PublishResult()
316  result.SubscriptionId = self.data.SubscriptionId
317  self._pop_triggered_datachanges(result)
318  self._pop_triggered_events(result)
319  self._pop_triggered_statuschanges(result)
320  self._keep_alive_count = 0
321  self._startup = False
322  result.NotificationMessage.SequenceNumber = self._notification_seq
323  if len(result.NotificationMessage.NotificationData) != 0:
324  self._notification_seq += 1
325  self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
326  result.MoreNotifications = False
327  result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
328  return result
329 
330  def _pop_triggered_datachanges(self, result):
331  if self._triggered_datachanges:
332  notif = ua.DataChangeNotification()
333  notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
334  self._triggered_datachanges = {}
335  self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
336  result.NotificationMessage.NotificationData.append(notif)
337 
338  def _pop_triggered_events(self, result):
339  if self._triggered_events:
340  notif = ua.EventNotificationList()
341  notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
342  self._triggered_events = {}
343  result.NotificationMessage.NotificationData.append(notif)
344  self.logger.debug("sending event notification with %s events", len(notif.Events))
345 
346  def _pop_triggered_statuschanges(self, result):
347  if self._triggered_statuschanges:
349  notif.Status = self._triggered_statuschanges.pop(0)
350  result.NotificationMessage.NotificationData.append(notif)
351  self.logger.debug("sending event notification %s", notif.Status)
352 
353  def publish(self, acks):
354  self.logger.info("publish request with acks %s", acks)
355  with self._lock:
356  self._publish_cycles_count = 0
357  for nb in acks:
358  if nb in self._not_acknowledged_results:
359  self._not_acknowledged_results.pop(nb)
360 
361  def republish(self, nb):
362  self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
363  with self._lock:
364  if nb in self._not_acknowledged_results:
365  self.logger.info("re-publishing ack %s in subscription %s", nb, self)
366  return self._not_acknowledged_results[nb].NotificationMessage
367  else:
368  self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
369  return ua.NotificationMessage()
370 
371  def enqueue_datachange_event(self, mid, eventdata, maxsize):
372  self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
373 
374  def enqueue_event(self, mid, eventdata, maxsize):
375  self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
376 
377  def enqueue_statuschange(self, code):
378  self._triggered_statuschanges.append(code)
379 
380  def _enqueue_event(self, mid, eventdata, size, queue):
381  if mid not in queue:
382  queue[mid] = [eventdata]
383  return
384  if size != 0:
385  if len(queue[mid]) >= size:
386  queue[mid].pop(0)
387  queue[mid].append(eventdata)
388 
389 
390 class WhereClauseEvaluator(object):
391  def __init__(self, logger, aspace, whereclause):
392  self.logger = logger
393  self.elements = whereclause.Elements
394  self._aspace = aspace
395 
396  def eval(self, event):
397  if not self.elements:
398  return True
399  # spec says we should only evaluate first element, which may use other elements
400  try:
401  res = self._eval_el(0, event)
402  except Exception as ex:
403  self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s",
404  self.elements, event, ex)
405  return False
406  return res
407 
408  def _eval_el(self, index, event):
409  el = self.elements[index]
410  # ops = [self._eval_op(op, event) for op in el.FilterOperands]
411  ops = el.FilterOperands # just to make code more readable
412  if el.FilterOperator == ua.FilterOperator.Equals:
413  return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
414  elif el.FilterOperator == ua.FilterOperator.IsNull:
415  return self._eval_op(ops[0], event) is None # FIXME: might be too strict
416  elif el.FilterOperator == ua.FilterOperator.GreaterThan:
417  return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
418  elif el.FilterOperator == ua.FilterOperator.LessThan:
419  return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
420  elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
421  return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
422  elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
423  return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
424  elif el.FilterOperator == ua.FilterOperator.Like:
425  return self._likeoperator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
426  elif el.FilterOperator == ua.FilterOperator.Not:
427  return not self._eval_op(ops[0], event)
428  elif el.FilterOperator == ua.FilterOperator.Between:
429  return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
430  elif el.FilterOperator == ua.FilterOperator.InList:
431  return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
432  elif el.FilterOperator == ua.FilterOperator.And:
433  self.elements(ops[0].Index)
434  return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
435  elif el.FilterOperator == ua.FilterOperator.Or:
436  return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
437  elif el.FilterOperator == ua.FilterOperator.Cast:
438  self.logger.warn("Cast operand not implemented, assuming True")
439  return True
440  elif el.FilterOperator == ua.FilterOperator.OfType:
441  return event.EventType == self._eval_op(ops[0], event)
442  else:
443  # TODO: implement missing operators
444  self.logger.warning("WhereClause not implemented for element: %s", el)
445  raise NotImplementedError
446 
447  def _like_operator(self, string, pattern):
448  raise NotImplementedError
449 
450  def _eval_op(self, op, event):
451  # seems spec says we should return Null if issues
452  if type(op) is ua.ElementOperand:
453  return self._eval_el(op.Index, event)
454  elif type(op) is ua.AttributeOperand:
455  if op.BrowsePath:
456  return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
457  else:
458  return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
459  # FIXME: check, this is probably broken
460  elif type(op) is ua.SimpleAttributeOperand:
461  if op.BrowsePath:
462  # we only support depth of 1
463  return getattr(event, op.BrowsePath[0].Name)
464  else:
465  # TODO: write code for index range.... but doe it make any sense
466  return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
467  elif type(op) is ua.LiteralOperand:
468  return op.Value.Value
469  else:
470  self.logger.warning("Where clause element % is not of a known type", op)
471  raise NotImplementedError
472 
473 
474 
def __init__(self, subservice, data, addressspace, callback)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44