internal_subscription.py
Go to the documentation of this file.
00001 """
00002 server side implementation of a subscription object
00003 """
00004 
00005 from threading import RLock
00006 import logging
00007 # import copy
00008 # import traceback
00009 
00010 from opcua import ua
00011 
00012 
00013 class MonitoredItemData(object):
00014 
00015     def __init__(self):
00016         self.client_handle = None
00017         self.callback_handle = None
00018         self.monitored_item_id = None
00019         self.mode = None
00020         self.filter = None
00021         self.mvalue = MonitoredItemValues()
00022         self.where_clause_evaluator = None
00023         self.queue_size = 0
00024 
00025 
00026 class MonitoredItemValues(object):
00027 
00028     def __init__(self):
00029         self.current_value = None
00030         self.old_value = None
00031 
00032     def set_current_value(self, cur_val):
00033         self.old_value = self.current_value
00034         self.current_value = cur_val
00035 
00036     def get_current_value(self):
00037         return self.current_value
00038 
00039     def get_old_value(self):
00040         return self.old_value
00041 
00042 
00043 class MonitoredItemService(object):
00044 
00045     """
00046     implement monitoreditem service for 1 subscription
00047     """
00048 
00049     def __init__(self, isub, aspace):
00050         self.logger = logging.getLogger(__name__ + "." + str(isub.data.SubscriptionId))
00051         self.isub = isub
00052         self.aspace = aspace
00053         self._lock = RLock()
00054         self._monitored_items = {}
00055         self._monitored_events = {}
00056         self._monitored_datachange = {}
00057         self._monitored_item_counter = 111
00058 
00059     def delete_all_monitored_items(self):
00060         self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
00061 
00062     def create_monitored_items(self, params):
00063         results = []
00064         for item in params.ItemsToCreate:
00065             with self._lock:
00066                 if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
00067                     result = self._create_events_monitored_item(item)
00068                 else:
00069                     result = self._create_data_change_monitored_item(item)
00070             results.append(result)
00071         return results
00072 
00073     def modify_monitored_items(self, params):
00074         results = []
00075         for item in params.ItemsToModify:
00076             results.append(self._modify_monitored_item(item))
00077         return results
00078 
00079     def trigger_datachange(self, handle, nodeid, attr):
00080         self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
00081         variant = self.aspace.get_attribute_value(nodeid, attr)
00082         self.datachange_callback(handle, variant)
00083 
00084     def _modify_monitored_item(self, params):
00085         with self._lock:
00086             for mdata in self._monitored_items.values():
00087                 result = ua.MonitoredItemModifyResult()
00088                 if mdata.monitored_item_id == params.MonitoredItemId:
00089                     result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
00090                     result.RevisedQueueSize = params.RequestedParameters.QueueSize
00091                     if params.RequestedParameters.Filter is not None:
00092                         mdata.filter = params.RequestedParameters.Filter
00093                     mdata.queue_size = params.RequestedParameters.QueueSize
00094                     return result
00095             result = ua.MonitoredItemModifyResult()
00096             result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
00097             return result
00098 
00099     def _commit_monitored_item(self, result, mdata):
00100         if result.StatusCode.is_good():
00101             self._monitored_items[result.MonitoredItemId] = mdata
00102             self._monitored_item_counter += 1
00103 
00104     def _make_monitored_item_common(self, params):
00105         result = ua.MonitoredItemCreateResult()
00106         result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
00107         result.RevisedQueueSize = params.RequestedParameters.QueueSize
00108         self._monitored_item_counter += 1
00109         result.MonitoredItemId = self._monitored_item_counter
00110         self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
00111 
00112         mdata = MonitoredItemData()
00113         mdata.mode = params.MonitoringMode
00114         mdata.client_handle = params.RequestedParameters.ClientHandle
00115         mdata.monitored_item_id = result.MonitoredItemId
00116         mdata.queue_size = params.RequestedParameters.QueueSize
00117         mdata.filter = params.RequestedParameters.Filter
00118 
00119         return result, mdata
00120 
00121     def _create_events_monitored_item(self, params):
00122         self.logger.info("request to subscribe to events for node %s and attribute %s",
00123                          params.ItemToMonitor.NodeId,
00124                          params.ItemToMonitor.AttributeId)
00125 
00126         result, mdata = self._make_monitored_item_common(params)
00127         ev_notify_byte = self.aspace.get_attribute_value(
00128             params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
00129 
00130         if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
00131             result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
00132             return result
00133         # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
00134         mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
00135         self._commit_monitored_item(result, mdata)
00136         if params.ItemToMonitor.NodeId not in self._monitored_events:
00137             self._monitored_events[params.ItemToMonitor.NodeId] = []
00138         self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
00139         return result
00140 
00141     def _create_data_change_monitored_item(self, params):
00142         self.logger.info("request to subscribe to datachange for node %s and attribute %s",
00143                          params.ItemToMonitor.NodeId,
00144                          params.ItemToMonitor.AttributeId)
00145 
00146         result, mdata = self._make_monitored_item_common(params)
00147         result.FilterResult = params.RequestedParameters.Filter
00148         result.StatusCode, handle = self.aspace.add_datachange_callback(
00149             params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
00150 
00151         self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
00152         mdata.callback_handle = handle
00153         self._commit_monitored_item(result, mdata)
00154         if result.StatusCode.is_good():
00155             self._monitored_datachange[handle] = result.MonitoredItemId
00156             # force data change event generation
00157             self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
00158         return result
00159 
00160     def delete_monitored_items(self, ids):
00161         self.logger.debug("delete monitored items %s", ids)
00162         with self._lock:
00163             results = []
00164             for mid in ids:
00165                 results.append(self._delete_monitored_items(mid))
00166             return results
00167 
00168     def _delete_monitored_items(self, mid):
00169         if mid not in self._monitored_items:
00170             return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
00171         for k, v in self._monitored_events.items():
00172             if mid in v:
00173                 v.remove(mid)
00174                 if not v:
00175                     self._monitored_events.pop(k)
00176                 break
00177         for k, v in self._monitored_datachange.items():
00178             if v == mid:
00179                 self.aspace.delete_datachange_callback(k)
00180                 self._monitored_datachange.pop(k)
00181                 break
00182         self._monitored_items.pop(mid)
00183         return ua.StatusCode()
00184 
00185     def datachange_callback(self, handle, value, error=None):
00186         if error:
00187             self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'", self,
00188                              handle, error)
00189             self.trigger_statuschange(error)
00190         else:
00191             self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
00192                              handle, value.Value)
00193             event = ua.MonitoredItemNotification()
00194             with self._lock:
00195                 mid = self._monitored_datachange[handle]
00196                 mdata = self._monitored_items[mid]
00197                 mdata.mvalue.set_current_value(value.Value.Value)
00198                 if mdata.filter is not None:
00199                     deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
00200                 else:
00201                     deadband_flag_pass = True
00202                 if deadband_flag_pass:
00203                     event.ClientHandle = mdata.client_handle
00204                     event.Value = value
00205                     self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
00206 
00207     def deadband_callback(self, values, flt):
00208         if (values.get_old_value() is None) or \
00209                 ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
00210             return True
00211         else:
00212             return False
00213 
00214     def trigger_event(self, event):
00215         with self._lock:
00216             if event.SourceNode not in self._monitored_events:
00217                 self.logger.debug("%s has no subscription for events %s from node: %s",
00218                                   self, event, event.SourceNode)
00219                 return False
00220             self.logger.debug("%s has subscription for events %s from node: %s",
00221                               self, event, event.SourceNode)
00222             mids = self._monitored_events[event.SourceNode]
00223             for mid in mids:
00224                 self._trigger_event(event, mid)
00225 
00226     def _trigger_event(self, event, mid):
00227         if mid not in self._monitored_items:
00228             self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s",
00229                               mid, event, self)
00230             return
00231         mdata = self._monitored_items[mid]
00232         if not mdata.where_clause_evaluator.eval(event):
00233             self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
00234             return
00235         fieldlist = ua.EventFieldList()
00236         fieldlist.ClientHandle = mdata.client_handle
00237         fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
00238         self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
00239 
00240     def trigger_statuschange(self, code):
00241         self.isub.enqueue_statuschange(code)
00242 
00243 
00244 class InternalSubscription(object):
00245 
00246     def __init__(self, subservice, data, addressspace, callback):
00247         self.logger = logging.getLogger(__name__)
00248         self.aspace = addressspace
00249         self.subservice = subservice
00250         self.data = data
00251         self.callback = callback
00252         self.monitored_item_srv = MonitoredItemService(self, addressspace)
00253         self.task = None
00254         self._lock = RLock()
00255         self._triggered_datachanges = {}
00256         self._triggered_events = {}
00257         self._triggered_statuschanges = []
00258         self._notification_seq = 1
00259         self._not_acknowledged_results = {}
00260         self._startup = True
00261         self._keep_alive_count = 0
00262         self._publish_cycles_count = 0
00263         self._stopev = False
00264 
00265     def __str__(self):
00266         return "Subscription(id:{0})".format(self.data.SubscriptionId)
00267 
00268     def start(self):
00269         self.logger.debug("starting subscription %s", self.data.SubscriptionId)
00270         self._subscription_loop()
00271 
00272     def stop(self):
00273         self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
00274         self._stopev = True
00275         self.monitored_item_srv.delete_all_monitored_items()
00276 
00277     def _subscription_loop(self):
00278         if not self._stopev:
00279             self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
00280 
00281     def _sub_loop(self):
00282         if self._stopev:
00283             return
00284         self.publish_results()
00285         self._subscription_loop()
00286 
00287     def has_published_results(self):
00288         with self._lock:
00289             if self._startup or self._triggered_datachanges or self._triggered_events:
00290                 return True
00291             if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
00292                 self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
00293                                   self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
00294                 return True
00295             self._keep_alive_count += 1
00296             return False
00297 
00298     def publish_results(self):
00299         if self._publish_cycles_count > self.data.RevisedLifetimeCount:
00300             self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)",
00301                                 self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
00302             # FIXME this will never be send since we do not have publish request anyway
00303             self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
00304             self._stopev = True
00305         result = None
00306         with self._lock:
00307             if self.has_published_results():
00308                 # FIXME: should we pop a publish request here? or we do not care?
00309                 self._publish_cycles_count += 1
00310                 result = self._pop_publish_result()
00311         if result is not None:
00312             self.callback(result)
00313 
00314     def _pop_publish_result(self):
00315         result = ua.PublishResult()
00316         result.SubscriptionId = self.data.SubscriptionId
00317         self._pop_triggered_datachanges(result)
00318         self._pop_triggered_events(result)
00319         self._pop_triggered_statuschanges(result)
00320         self._keep_alive_count = 0
00321         self._startup = False
00322         result.NotificationMessage.SequenceNumber = self._notification_seq
00323         if len(result.NotificationMessage.NotificationData) != 0:
00324             self._notification_seq += 1
00325             self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
00326         result.MoreNotifications = False
00327         result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
00328         return result
00329 
00330     def _pop_triggered_datachanges(self, result):
00331         if self._triggered_datachanges:
00332             notif = ua.DataChangeNotification()
00333             notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
00334             self._triggered_datachanges = {}
00335             self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
00336             result.NotificationMessage.NotificationData.append(notif)
00337 
00338     def _pop_triggered_events(self, result):
00339         if self._triggered_events:
00340             notif = ua.EventNotificationList()
00341             notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
00342             self._triggered_events = {}
00343             result.NotificationMessage.NotificationData.append(notif)
00344             self.logger.debug("sending event notification with %s events", len(notif.Events))
00345 
00346     def _pop_triggered_statuschanges(self, result):
00347         if self._triggered_statuschanges:
00348             notif = ua.StatusChangeNotification()
00349             notif.Status = self._triggered_statuschanges.pop(0)
00350             result.NotificationMessage.NotificationData.append(notif)
00351             self.logger.debug("sending event notification %s", notif.Status)
00352 
00353     def publish(self, acks):
00354         self.logger.info("publish request with acks %s", acks)
00355         with self._lock:
00356             self._publish_cycles_count = 0
00357             for nb in acks:
00358                 if nb in self._not_acknowledged_results:
00359                     self._not_acknowledged_results.pop(nb)
00360 
00361     def republish(self, nb):
00362         self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
00363         with self._lock:
00364             if nb in self._not_acknowledged_results:
00365                 self.logger.info("re-publishing ack %s in subscription %s", nb, self)
00366                 return self._not_acknowledged_results[nb].NotificationMessage
00367             else:
00368                 self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
00369                 return ua.NotificationMessage()
00370 
00371     def enqueue_datachange_event(self, mid, eventdata, maxsize):
00372         self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
00373 
00374     def enqueue_event(self, mid, eventdata, maxsize):
00375         self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
00376 
00377     def enqueue_statuschange(self, code):
00378         self._triggered_statuschanges.append(code)
00379 
00380     def _enqueue_event(self, mid, eventdata, size, queue):
00381         if mid not in queue:
00382             queue[mid] = [eventdata]
00383             return
00384         if size != 0:
00385             if len(queue[mid]) >= size:
00386                 queue[mid].pop(0)
00387         queue[mid].append(eventdata)
00388 
00389 
00390 class WhereClauseEvaluator(object):
00391     def __init__(self, logger, aspace, whereclause):
00392         self.logger = logger
00393         self.elements = whereclause.Elements
00394         self._aspace = aspace
00395 
00396     def eval(self, event):
00397         if not self.elements:
00398             return True
00399         # spec says we should only evaluate first element, which may use other elements
00400         try:
00401             res = self._eval_el(0, event)
00402         except Exception as ex:
00403             self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s",
00404                                   self.elements, event, ex)
00405             return False
00406         return res
00407 
00408     def _eval_el(self, index, event):
00409         el = self.elements[index]
00410         # ops = [self._eval_op(op, event) for op in el.FilterOperands]
00411         ops = el.FilterOperands  # just to make code more readable
00412         if el.FilterOperator == ua.FilterOperator.Equals:
00413             return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
00414         elif el.FilterOperator == ua.FilterOperator.IsNull:
00415             return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
00416         elif el.FilterOperator == ua.FilterOperator.GreaterThan:
00417             return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
00418         elif el.FilterOperator == ua.FilterOperator.LessThan:
00419             return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
00420         elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
00421             return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
00422         elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
00423             return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
00424         elif el.FilterOperator == ua.FilterOperator.Like:
00425             return self._likeoperator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
00426         elif el.FilterOperator == ua.FilterOperator.Not:
00427             return not self._eval_op(ops[0], event)
00428         elif el.FilterOperator == ua.FilterOperator.Between:
00429             return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
00430         elif el.FilterOperator == ua.FilterOperator.InList:
00431             return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
00432         elif el.FilterOperator == ua.FilterOperator.And:
00433             self.elements(ops[0].Index)
00434             return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
00435         elif el.FilterOperator == ua.FilterOperator.Or:
00436             return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
00437         elif el.FilterOperator == ua.FilterOperator.Cast:
00438             self.logger.warn("Cast operand not implemented, assuming True")
00439             return True
00440         elif el.FilterOperator == ua.FilterOperator.OfType:
00441             return event.EventType == self._eval_op(ops[0], event)
00442         else:
00443             # TODO: implement missing operators
00444             self.logger.warning("WhereClause not implemented for element: %s", el)
00445             raise NotImplementedError
00446 
00447     def _like_operator(self, string, pattern):
00448         raise NotImplementedError
00449 
00450     def _eval_op(self, op, event):
00451         # seems spec says we should return Null if issues
00452         if type(op) is ua.ElementOperand:
00453             return self._eval_el(op.Index, event)
00454         elif type(op) is ua.AttributeOperand:
00455             if op.BrowsePath:
00456                 return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
00457             else:
00458                 return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
00459             # FIXME: check, this is probably broken
00460         elif type(op) is ua.SimpleAttributeOperand:
00461             if op.BrowsePath:
00462                 # we only support depth of 1
00463                 return getattr(event, op.BrowsePath[0].Name)
00464             else:
00465                 # TODO: write code for index range.... but doe it make any sense
00466                 return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
00467         elif type(op) is ua.LiteralOperand:
00468             return op.Value.Value
00469         else:
00470             self.logger.warning("Where clause element % is not of a known type", op)
00471             raise NotImplementedError
00472 
00473 
00474 


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23