2 server side implementation of a subscription object 5 from threading
import RLock
46 implement monitoreditem service for 1 subscription 50 self.
logger = logging.getLogger(__name__ +
"." + str(isub.data.SubscriptionId))
64 for item
in params.ItemsToCreate:
66 if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
70 results.append(result)
75 for item
in params.ItemsToModify:
80 self.logger.debug(
"triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
81 variant = self.aspace.get_attribute_value(nodeid, attr)
86 for mdata
in self._monitored_items.values():
88 if mdata.monitored_item_id == params.MonitoredItemId:
89 result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
90 result.RevisedQueueSize = params.RequestedParameters.QueueSize
91 if params.RequestedParameters.Filter
is not None:
92 mdata.filter = params.RequestedParameters.Filter
93 mdata.queue_size = params.RequestedParameters.QueueSize
96 result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
100 if result.StatusCode.is_good():
106 result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
107 result.RevisedQueueSize = params.RequestedParameters.QueueSize
110 self.logger.debug(
"Creating MonitoredItem with id %s", result.MonitoredItemId)
113 mdata.mode = params.MonitoringMode
114 mdata.client_handle = params.RequestedParameters.ClientHandle
115 mdata.monitored_item_id = result.MonitoredItemId
116 mdata.queue_size = params.RequestedParameters.QueueSize
117 mdata.filter = params.RequestedParameters.Filter
122 self.logger.info(
"request to subscribe to events for node %s and attribute %s",
123 params.ItemToMonitor.NodeId,
124 params.ItemToMonitor.AttributeId)
127 ev_notify_byte = self.aspace.get_attribute_value(
128 params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
130 if ev_notify_byte
is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
131 result.StatusCode =
ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
138 self.
_monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
142 self.logger.info(
"request to subscribe to datachange for node %s and attribute %s",
143 params.ItemToMonitor.NodeId,
144 params.ItemToMonitor.AttributeId)
147 result.FilterResult = params.RequestedParameters.Filter
148 result.StatusCode, handle = self.aspace.add_datachange_callback(
151 self.logger.debug(
"adding callback return status %s and handle %s", result.StatusCode, handle)
152 mdata.callback_handle = handle
154 if result.StatusCode.is_good():
157 self.
trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
161 self.logger.debug(
"delete monitored items %s", ids)
170 return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
171 for k, v
in self._monitored_events.items():
175 self._monitored_events.pop(k)
177 for k, v
in self._monitored_datachange.items():
179 self.aspace.delete_datachange_callback(k)
180 self._monitored_datachange.pop(k)
182 self._monitored_items.pop(mid)
187 self.logger.info(
"subscription %s: datachange callback called with handle '%s' and erorr '%s'", self,
191 self.logger.info(
"subscription %s: datachange callback called with handle '%s' and value '%s'", self,
197 mdata.mvalue.set_current_value(value.Value.Value)
198 if mdata.filter
is not None:
201 deadband_flag_pass =
True 202 if deadband_flag_pass:
203 event.ClientHandle = mdata.client_handle
205 self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
208 if (values.get_old_value()
is None)
or \
209 ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
217 self.logger.debug(
"%s has no subscription for events %s from node: %s",
218 self, event, event.SourceNode)
220 self.logger.debug(
"%s has subscription for events %s from node: %s",
221 self, event, event.SourceNode)
228 self.logger.debug(
"Could not find monitored items for id %s for event %s in subscription %s",
232 if not mdata.where_clause_evaluator.eval(event):
233 self.logger.info(
"%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
236 fieldlist.ClientHandle = mdata.client_handle
237 fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
238 self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
241 self.isub.enqueue_statuschange(code)
246 def __init__(self, subservice, data, addressspace, callback):
247 self.
logger = logging.getLogger(__name__)
266 return "Subscription(id:{0})".format(self.data.SubscriptionId)
269 self.logger.debug(
"starting subscription %s", self.data.SubscriptionId)
273 self.logger.debug(
"stopping subscription %s", self.data.SubscriptionId)
275 self.monitored_item_srv.delete_all_monitored_items()
279 self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self.
_sub_loop)
292 self.logger.debug(
"keep alive count %s is > than max keep alive count %s, sending publish event",
300 self.logger.warning(
"Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)",
303 self.monitored_item_srv.trigger_statuschange(
ua.StatusCode(ua.StatusCodes.BadTimeout))
311 if result
is not None:
316 result.SubscriptionId = self.data.SubscriptionId
323 if len(result.NotificationMessage.NotificationData) != 0:
326 result.MoreNotifications =
False 327 result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
333 notif.MonitoredItems = [item
for sublist
in self._triggered_datachanges.values()
for item
in sublist]
335 self.logger.debug(
"sending datachanges notification with %s events", len(notif.MonitoredItems))
336 result.NotificationMessage.NotificationData.append(notif)
341 notif.Events = [item
for sublist
in self._triggered_events.values()
for item
in sublist]
343 result.NotificationMessage.NotificationData.append(notif)
344 self.logger.debug(
"sending event notification with %s events", len(notif.Events))
349 notif.Status = self._triggered_statuschanges.pop(0)
350 result.NotificationMessage.NotificationData.append(notif)
351 self.logger.debug(
"sending event notification %s", notif.Status)
354 self.logger.info(
"publish request with acks %s", acks)
359 self._not_acknowledged_results.pop(nb)
362 self.logger.info(
"re-publish request for ack %s in subscription %s", nb, self)
365 self.logger.info(
"re-publishing ack %s in subscription %s", nb, self)
368 self.logger.info(
"Error request to re-published non existing ack %s in subscription %s", nb, self)
378 self._triggered_statuschanges.append(code)
382 queue[mid] = [eventdata]
385 if len(queue[mid]) >= size:
387 queue[mid].append(eventdata)
402 except Exception
as ex:
403 self.logger.exception(
"Exception while evaluating WhereClause %s for event %s: %s",
411 ops = el.FilterOperands
412 if el.FilterOperator == ua.FilterOperator.Equals:
414 elif el.FilterOperator == ua.FilterOperator.IsNull:
415 return self.
_eval_op(ops[0], event)
is None 416 elif el.FilterOperator == ua.FilterOperator.GreaterThan:
418 elif el.FilterOperator == ua.FilterOperator.LessThan:
420 elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
422 elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
424 elif el.FilterOperator == ua.FilterOperator.Like:
425 return self._likeoperator(self.
_eval_op(ops[0], event), self.
_eval_el(ops[1], event))
426 elif el.FilterOperator == ua.FilterOperator.Not:
427 return not self.
_eval_op(ops[0], event)
428 elif el.FilterOperator == ua.FilterOperator.Between:
430 elif el.FilterOperator == ua.FilterOperator.InList:
431 return self.
_eval_op(ops[0], event)
in [self.
_eval_op(op, event)
for op
in ops[1:]]
432 elif el.FilterOperator == ua.FilterOperator.And:
435 elif el.FilterOperator == ua.FilterOperator.Or:
437 elif el.FilterOperator == ua.FilterOperator.Cast:
438 self.logger.warn(
"Cast operand not implemented, assuming True")
440 elif el.FilterOperator == ua.FilterOperator.OfType:
441 return event.EventType == self.
_eval_op(ops[0], event)
444 self.logger.warning(
"WhereClause not implemented for element: %s", el)
445 raise NotImplementedError
448 raise NotImplementedError
453 return self.
_eval_el(op.Index, event)
456 return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
458 return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
463 return getattr(event, op.BrowsePath[0].Name)
466 return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
468 return op.Value.Value
470 self.logger.warning(
"Where clause element % is not of a known type", op)
471 raise NotImplementedError
def _modify_monitored_item(self, params)
def has_published_results(self)
def datachange_callback(self, handle, value, error=None)
def _enqueue_event(self, mid, eventdata, size, queue)
def publish_results(self)
def __init__(self, subservice, data, addressspace, callback)
def deadband_callback(self, values, flt)
def _trigger_event(self, event, mid)
def set_current_value(self, cur_val)
def __init__(self, isub, aspace)
def _delete_monitored_items(self, mid)
def create_monitored_items(self, params)
def _create_data_change_monitored_item(self, params)
def _create_events_monitored_item(self, params)
def trigger_event(self, event)
def _make_monitored_item_common(self, params)
def _pop_triggered_statuschanges(self, result)
def enqueue_statuschange(self, code)
def _pop_triggered_datachanges(self, result)
def _commit_monitored_item(self, result, mdata)
def trigger_statuschange(self, code)
def enqueue_datachange_event(self, mid, eventdata, maxsize)
def _pop_triggered_events(self, result)
def _eval_op(self, op, event)
_not_acknowledged_results
def delete_monitored_items(self, ids)
def _pop_publish_result(self)
def trigger_datachange(self, handle, nodeid, attr)
def modify_monitored_items(self, params)
def delete_all_monitored_items(self)
def _subscription_loop(self)
def enqueue_event(self, mid, eventdata, maxsize)
def _like_operator(self, string, pattern)
def get_current_value(self)
def _eval_el(self, index, event)
def __init__(self, logger, aspace, whereclause)