00001 """
00002 server side implementation of a subscription object
00003 """
00004
00005 from threading import RLock
00006 import logging
00007
00008
00009
00010 from opcua import ua
00011
00012
00013 class MonitoredItemData(object):
00014
00015 def __init__(self):
00016 self.client_handle = None
00017 self.callback_handle = None
00018 self.monitored_item_id = None
00019 self.mode = None
00020 self.filter = None
00021 self.mvalue = MonitoredItemValues()
00022 self.where_clause_evaluator = None
00023 self.queue_size = 0
00024
00025
00026 class MonitoredItemValues(object):
00027
00028 def __init__(self):
00029 self.current_value = None
00030 self.old_value = None
00031
00032 def set_current_value(self, cur_val):
00033 self.old_value = self.current_value
00034 self.current_value = cur_val
00035
00036 def get_current_value(self):
00037 return self.current_value
00038
00039 def get_old_value(self):
00040 return self.old_value
00041
00042
00043 class MonitoredItemService(object):
00044
00045 """
00046 implement monitoreditem service for 1 subscription
00047 """
00048
00049 def __init__(self, isub, aspace):
00050 self.logger = logging.getLogger(__name__ + "." + str(isub.data.SubscriptionId))
00051 self.isub = isub
00052 self.aspace = aspace
00053 self._lock = RLock()
00054 self._monitored_items = {}
00055 self._monitored_events = {}
00056 self._monitored_datachange = {}
00057 self._monitored_item_counter = 111
00058
00059 def delete_all_monitored_items(self):
00060 self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
00061
00062 def create_monitored_items(self, params):
00063 results = []
00064 for item in params.ItemsToCreate:
00065 with self._lock:
00066 if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
00067 result = self._create_events_monitored_item(item)
00068 else:
00069 result = self._create_data_change_monitored_item(item)
00070 results.append(result)
00071 return results
00072
00073 def modify_monitored_items(self, params):
00074 results = []
00075 for item in params.ItemsToModify:
00076 results.append(self._modify_monitored_item(item))
00077 return results
00078
00079 def trigger_datachange(self, handle, nodeid, attr):
00080 self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
00081 variant = self.aspace.get_attribute_value(nodeid, attr)
00082 self.datachange_callback(handle, variant)
00083
00084 def _modify_monitored_item(self, params):
00085 with self._lock:
00086 for mdata in self._monitored_items.values():
00087 result = ua.MonitoredItemModifyResult()
00088 if mdata.monitored_item_id == params.MonitoredItemId:
00089 result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
00090 result.RevisedQueueSize = params.RequestedParameters.QueueSize
00091 if params.RequestedParameters.Filter is not None:
00092 mdata.filter = params.RequestedParameters.Filter
00093 mdata.queue_size = params.RequestedParameters.QueueSize
00094 return result
00095 result = ua.MonitoredItemModifyResult()
00096 result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
00097 return result
00098
00099 def _commit_monitored_item(self, result, mdata):
00100 if result.StatusCode.is_good():
00101 self._monitored_items[result.MonitoredItemId] = mdata
00102 self._monitored_item_counter += 1
00103
00104 def _make_monitored_item_common(self, params):
00105 result = ua.MonitoredItemCreateResult()
00106 result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
00107 result.RevisedQueueSize = params.RequestedParameters.QueueSize
00108 self._monitored_item_counter += 1
00109 result.MonitoredItemId = self._monitored_item_counter
00110 self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
00111
00112 mdata = MonitoredItemData()
00113 mdata.mode = params.MonitoringMode
00114 mdata.client_handle = params.RequestedParameters.ClientHandle
00115 mdata.monitored_item_id = result.MonitoredItemId
00116 mdata.queue_size = params.RequestedParameters.QueueSize
00117 mdata.filter = params.RequestedParameters.Filter
00118
00119 return result, mdata
00120
00121 def _create_events_monitored_item(self, params):
00122 self.logger.info("request to subscribe to events for node %s and attribute %s",
00123 params.ItemToMonitor.NodeId,
00124 params.ItemToMonitor.AttributeId)
00125
00126 result, mdata = self._make_monitored_item_common(params)
00127 ev_notify_byte = self.aspace.get_attribute_value(
00128 params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
00129
00130 if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
00131 result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
00132 return result
00133
00134 mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
00135 self._commit_monitored_item(result, mdata)
00136 if params.ItemToMonitor.NodeId not in self._monitored_events:
00137 self._monitored_events[params.ItemToMonitor.NodeId] = []
00138 self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
00139 return result
00140
00141 def _create_data_change_monitored_item(self, params):
00142 self.logger.info("request to subscribe to datachange for node %s and attribute %s",
00143 params.ItemToMonitor.NodeId,
00144 params.ItemToMonitor.AttributeId)
00145
00146 result, mdata = self._make_monitored_item_common(params)
00147 result.FilterResult = params.RequestedParameters.Filter
00148 result.StatusCode, handle = self.aspace.add_datachange_callback(
00149 params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
00150
00151 self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
00152 mdata.callback_handle = handle
00153 self._commit_monitored_item(result, mdata)
00154 if result.StatusCode.is_good():
00155 self._monitored_datachange[handle] = result.MonitoredItemId
00156
00157 self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
00158 return result
00159
00160 def delete_monitored_items(self, ids):
00161 self.logger.debug("delete monitored items %s", ids)
00162 with self._lock:
00163 results = []
00164 for mid in ids:
00165 results.append(self._delete_monitored_items(mid))
00166 return results
00167
00168 def _delete_monitored_items(self, mid):
00169 if mid not in self._monitored_items:
00170 return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
00171 for k, v in self._monitored_events.items():
00172 if mid in v:
00173 v.remove(mid)
00174 if not v:
00175 self._monitored_events.pop(k)
00176 break
00177 for k, v in self._monitored_datachange.items():
00178 if v == mid:
00179 self.aspace.delete_datachange_callback(k)
00180 self._monitored_datachange.pop(k)
00181 break
00182 self._monitored_items.pop(mid)
00183 return ua.StatusCode()
00184
00185 def datachange_callback(self, handle, value, error=None):
00186 if error:
00187 self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'", self,
00188 handle, error)
00189 self.trigger_statuschange(error)
00190 else:
00191 self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
00192 handle, value.Value)
00193 event = ua.MonitoredItemNotification()
00194 with self._lock:
00195 mid = self._monitored_datachange[handle]
00196 mdata = self._monitored_items[mid]
00197 mdata.mvalue.set_current_value(value.Value.Value)
00198 if mdata.filter is not None:
00199 deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
00200 else:
00201 deadband_flag_pass = True
00202 if deadband_flag_pass:
00203 event.ClientHandle = mdata.client_handle
00204 event.Value = value
00205 self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
00206
00207 def deadband_callback(self, values, flt):
00208 if (values.get_old_value() is None) or \
00209 ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
00210 return True
00211 else:
00212 return False
00213
00214 def trigger_event(self, event):
00215 with self._lock:
00216 if event.SourceNode not in self._monitored_events:
00217 self.logger.debug("%s has no subscription for events %s from node: %s",
00218 self, event, event.SourceNode)
00219 return False
00220 self.logger.debug("%s has subscription for events %s from node: %s",
00221 self, event, event.SourceNode)
00222 mids = self._monitored_events[event.SourceNode]
00223 for mid in mids:
00224 self._trigger_event(event, mid)
00225
00226 def _trigger_event(self, event, mid):
00227 if mid not in self._monitored_items:
00228 self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s",
00229 mid, event, self)
00230 return
00231 mdata = self._monitored_items[mid]
00232 if not mdata.where_clause_evaluator.eval(event):
00233 self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
00234 return
00235 fieldlist = ua.EventFieldList()
00236 fieldlist.ClientHandle = mdata.client_handle
00237 fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
00238 self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
00239
00240 def trigger_statuschange(self, code):
00241 self.isub.enqueue_statuschange(code)
00242
00243
00244 class InternalSubscription(object):
00245
00246 def __init__(self, subservice, data, addressspace, callback):
00247 self.logger = logging.getLogger(__name__)
00248 self.aspace = addressspace
00249 self.subservice = subservice
00250 self.data = data
00251 self.callback = callback
00252 self.monitored_item_srv = MonitoredItemService(self, addressspace)
00253 self.task = None
00254 self._lock = RLock()
00255 self._triggered_datachanges = {}
00256 self._triggered_events = {}
00257 self._triggered_statuschanges = []
00258 self._notification_seq = 1
00259 self._not_acknowledged_results = {}
00260 self._startup = True
00261 self._keep_alive_count = 0
00262 self._publish_cycles_count = 0
00263 self._stopev = False
00264
00265 def __str__(self):
00266 return "Subscription(id:{0})".format(self.data.SubscriptionId)
00267
00268 def start(self):
00269 self.logger.debug("starting subscription %s", self.data.SubscriptionId)
00270 self._subscription_loop()
00271
00272 def stop(self):
00273 self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
00274 self._stopev = True
00275 self.monitored_item_srv.delete_all_monitored_items()
00276
00277 def _subscription_loop(self):
00278 if not self._stopev:
00279 self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
00280
00281 def _sub_loop(self):
00282 if self._stopev:
00283 return
00284 self.publish_results()
00285 self._subscription_loop()
00286
00287 def has_published_results(self):
00288 with self._lock:
00289 if self._startup or self._triggered_datachanges or self._triggered_events:
00290 return True
00291 if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
00292 self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
00293 self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
00294 return True
00295 self._keep_alive_count += 1
00296 return False
00297
00298 def publish_results(self):
00299 if self._publish_cycles_count > self.data.RevisedLifetimeCount:
00300 self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)",
00301 self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
00302
00303 self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
00304 self._stopev = True
00305 result = None
00306 with self._lock:
00307 if self.has_published_results():
00308
00309 self._publish_cycles_count += 1
00310 result = self._pop_publish_result()
00311 if result is not None:
00312 self.callback(result)
00313
00314 def _pop_publish_result(self):
00315 result = ua.PublishResult()
00316 result.SubscriptionId = self.data.SubscriptionId
00317 self._pop_triggered_datachanges(result)
00318 self._pop_triggered_events(result)
00319 self._pop_triggered_statuschanges(result)
00320 self._keep_alive_count = 0
00321 self._startup = False
00322 result.NotificationMessage.SequenceNumber = self._notification_seq
00323 if len(result.NotificationMessage.NotificationData) != 0:
00324 self._notification_seq += 1
00325 self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
00326 result.MoreNotifications = False
00327 result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
00328 return result
00329
00330 def _pop_triggered_datachanges(self, result):
00331 if self._triggered_datachanges:
00332 notif = ua.DataChangeNotification()
00333 notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
00334 self._triggered_datachanges = {}
00335 self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
00336 result.NotificationMessage.NotificationData.append(notif)
00337
00338 def _pop_triggered_events(self, result):
00339 if self._triggered_events:
00340 notif = ua.EventNotificationList()
00341 notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
00342 self._triggered_events = {}
00343 result.NotificationMessage.NotificationData.append(notif)
00344 self.logger.debug("sending event notification with %s events", len(notif.Events))
00345
00346 def _pop_triggered_statuschanges(self, result):
00347 if self._triggered_statuschanges:
00348 notif = ua.StatusChangeNotification()
00349 notif.Status = self._triggered_statuschanges.pop(0)
00350 result.NotificationMessage.NotificationData.append(notif)
00351 self.logger.debug("sending event notification %s", notif.Status)
00352
00353 def publish(self, acks):
00354 self.logger.info("publish request with acks %s", acks)
00355 with self._lock:
00356 self._publish_cycles_count = 0
00357 for nb in acks:
00358 if nb in self._not_acknowledged_results:
00359 self._not_acknowledged_results.pop(nb)
00360
00361 def republish(self, nb):
00362 self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
00363 with self._lock:
00364 if nb in self._not_acknowledged_results:
00365 self.logger.info("re-publishing ack %s in subscription %s", nb, self)
00366 return self._not_acknowledged_results[nb].NotificationMessage
00367 else:
00368 self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
00369 return ua.NotificationMessage()
00370
00371 def enqueue_datachange_event(self, mid, eventdata, maxsize):
00372 self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
00373
00374 def enqueue_event(self, mid, eventdata, maxsize):
00375 self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
00376
00377 def enqueue_statuschange(self, code):
00378 self._triggered_statuschanges.append(code)
00379
00380 def _enqueue_event(self, mid, eventdata, size, queue):
00381 if mid not in queue:
00382 queue[mid] = [eventdata]
00383 return
00384 if size != 0:
00385 if len(queue[mid]) >= size:
00386 queue[mid].pop(0)
00387 queue[mid].append(eventdata)
00388
00389
00390 class WhereClauseEvaluator(object):
00391 def __init__(self, logger, aspace, whereclause):
00392 self.logger = logger
00393 self.elements = whereclause.Elements
00394 self._aspace = aspace
00395
00396 def eval(self, event):
00397 if not self.elements:
00398 return True
00399
00400 try:
00401 res = self._eval_el(0, event)
00402 except Exception as ex:
00403 self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s",
00404 self.elements, event, ex)
00405 return False
00406 return res
00407
00408 def _eval_el(self, index, event):
00409 el = self.elements[index]
00410
00411 ops = el.FilterOperands
00412 if el.FilterOperator == ua.FilterOperator.Equals:
00413 return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
00414 elif el.FilterOperator == ua.FilterOperator.IsNull:
00415 return self._eval_op(ops[0], event) is None
00416 elif el.FilterOperator == ua.FilterOperator.GreaterThan:
00417 return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
00418 elif el.FilterOperator == ua.FilterOperator.LessThan:
00419 return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
00420 elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
00421 return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
00422 elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
00423 return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
00424 elif el.FilterOperator == ua.FilterOperator.Like:
00425 return self._likeoperator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
00426 elif el.FilterOperator == ua.FilterOperator.Not:
00427 return not self._eval_op(ops[0], event)
00428 elif el.FilterOperator == ua.FilterOperator.Between:
00429 return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
00430 elif el.FilterOperator == ua.FilterOperator.InList:
00431 return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
00432 elif el.FilterOperator == ua.FilterOperator.And:
00433 self.elements(ops[0].Index)
00434 return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
00435 elif el.FilterOperator == ua.FilterOperator.Or:
00436 return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
00437 elif el.FilterOperator == ua.FilterOperator.Cast:
00438 self.logger.warn("Cast operand not implemented, assuming True")
00439 return True
00440 elif el.FilterOperator == ua.FilterOperator.OfType:
00441 return event.EventType == self._eval_op(ops[0], event)
00442 else:
00443
00444 self.logger.warning("WhereClause not implemented for element: %s", el)
00445 raise NotImplementedError
00446
00447 def _like_operator(self, string, pattern):
00448 raise NotImplementedError
00449
00450 def _eval_op(self, op, event):
00451
00452 if type(op) is ua.ElementOperand:
00453 return self._eval_el(op.Index, event)
00454 elif type(op) is ua.AttributeOperand:
00455 if op.BrowsePath:
00456 return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
00457 else:
00458 return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
00459
00460 elif type(op) is ua.SimpleAttributeOperand:
00461 if op.BrowsePath:
00462
00463 return getattr(event, op.BrowsePath[0].Name)
00464 else:
00465
00466 return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
00467 elif type(op) is ua.LiteralOperand:
00468 return op.Value.Value
00469 else:
00470 self.logger.warning("Where clause element % is not of a known type", op)
00471 raise NotImplementedError
00472
00473
00474