00001 """
00002 high level interface to subscriptions
00003 """
00004 import time
00005 import logging
00006 from threading import Lock
00007 import collections
00008
00009 from opcua import ua
00010 from opcua.common import events
00011 from opcua import Node
00012
00013
00014 class SubHandler(object):
00015 """
00016 Subscription Handler. To receive events from server for a subscription
00017 This class is just a sample class. Whatever class having these methods can be used
00018 """
00019
00020 def data_change(self, handle, node, val, attr):
00021 """
00022 Deprecated, use datachange_notification
00023 """
00024 pass
00025
00026 def datachange_notification(self, node, val, data):
00027 """
00028 called for every datachange notification from server
00029 """
00030 pass
00031
00032 def event_notification(self, event):
00033 """
00034 called for every event notification from server
00035 """
00036 pass
00037
00038 def status_change_notification(self, status):
00039 """
00040 called for every status change notification from server
00041 """
00042 pass
00043
00044
00045 class SubscriptionItemData(object):
00046 """
00047 To store useful data from a monitored item
00048 """
00049 def __init__(self):
00050 self.node = None
00051 self.client_handle = None
00052 self.server_handle = None
00053 self.attribute = None
00054 self.mfilter = None
00055
00056
00057 class DataChangeNotif(object):
00058 """
00059 To be send to clients for every datachange notification from server
00060 """
00061 def __init__(self, subscription_data, monitored_item):
00062 self.monitored_item = monitored_item
00063 self.subscription_data = subscription_data
00064
00065 def __str__(self):
00066 return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
00067 __repr__ = __str__
00068
00069
00070 class Subscription(object):
00071 """
00072 Subscription object returned by Server or Client objects.
00073 The object represent a subscription to an opc-ua server.
00074 This is a high level class, especially subscribe_data_change
00075 and subscribe_events methods. If more control is necessary look at
00076 code and/or use create_monitored_items method.
00077 """
00078
00079 def __init__(self, server, params, handler):
00080 self.logger = logging.getLogger(__name__)
00081 self.server = server
00082 self._client_handle = 200
00083 self._handler = handler
00084 self.parameters = params
00085 self._monitoreditems_map = {}
00086 self._lock = Lock()
00087 self.subscription_id = None
00088 response = self.server.create_subscription(params, self.publish_callback)
00089 self.subscription_id = response.SubscriptionId
00090
00091
00092
00093
00094 self.server.publish()
00095 self.server.publish()
00096
00097 def delete(self):
00098 """
00099 Delete subscription on server. This is automatically done by Client and Server classes on exit
00100 """
00101 results = self.server.delete_subscriptions([self.subscription_id])
00102 results[0].check()
00103
00104 def publish_callback(self, publishresult):
00105 self.logger.info("Publish callback called with result: %s", publishresult)
00106 while self.subscription_id is None:
00107 time.sleep(0.01)
00108
00109 for notif in publishresult.NotificationMessage.NotificationData:
00110 if isinstance(notif, ua.DataChangeNotification):
00111 self._call_datachange(notif)
00112 elif isinstance(notif, ua.EventNotificationList):
00113 self._call_event(notif)
00114 elif isinstance(notif, ua.StatusChangeNotification):
00115 self._call_status(notif)
00116 else:
00117 self.logger.warning("Notification type not supported yet for notification %s", notif)
00118
00119 ack = ua.SubscriptionAcknowledgement()
00120 ack.SubscriptionId = self.subscription_id
00121 ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
00122 self.server.publish([ack])
00123
00124 def _call_datachange(self, datachange):
00125 for item in datachange.MonitoredItems:
00126 with self._lock:
00127 if item.ClientHandle not in self._monitoreditems_map:
00128 self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
00129 continue
00130 data = self._monitoreditems_map[item.ClientHandle]
00131 if hasattr(self._handler, "datachange_notification"):
00132 event_data = DataChangeNotif(data, item)
00133 try:
00134 self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
00135 except Exception:
00136 self.logger.exception("Exception calling data change handler")
00137 elif hasattr(self._handler, "data_change"):
00138 self.logger.warning("data_change method is deprecated, use datachange_notification")
00139 try:
00140 self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
00141 except Exception:
00142 self.logger.exception("Exception calling deprecated data change handler")
00143 else:
00144 self.logger.error("DataChange subscription created but handler has no datachange_notification method")
00145
00146 def _call_event(self, eventlist):
00147 for event in eventlist.Events:
00148 with self._lock:
00149 data = self._monitoreditems_map[event.ClientHandle]
00150 result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
00151 result.server_handle = data.server_handle
00152 if hasattr(self._handler, "event_notification"):
00153 try:
00154 self._handler.event_notification(result)
00155 except Exception:
00156 self.logger.exception("Exception calling event handler")
00157 elif hasattr(self._handler, "event"):
00158 try:
00159 self._handler.event(data.server_handle, result)
00160 except Exception:
00161 self.logger.exception("Exception calling deprecated event handler")
00162 else:
00163 self.logger.error("Event subscription created but handler has no event_notification method")
00164
00165 def _call_status(self, status):
00166 try:
00167 self._handler.status_change_notification(status.Status)
00168 except Exception:
00169 self.logger.exception("Exception calling status change handler")
00170
00171 def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
00172 """
00173 Subscribe for data change events for a node or list of nodes.
00174 default attribute is Value.
00175 Return a handle which can be used to unsubscribe
00176 If more control is necessary use create_monitored_items method
00177 """
00178 return self._subscribe(nodes, attr, queuesize=0)
00179
00180 def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None):
00181 """
00182 Subscribe to events from a node. Default node is Server node.
00183 In most servers the server node is the only one you can subscribe to.
00184 if evtypes is not provided, evtype defaults to BaseEventType
00185 if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
00186 Return a handle which can be used to unsubscribe
00187 """
00188 sourcenode = Node(self.server, sourcenode)
00189
00190 if evfilter is None:
00191 if not type(evtypes) in (list, tuple):
00192 evtypes = [evtypes]
00193
00194 evtypes = [Node(self.server, evtype) for evtype in evtypes]
00195
00196 evfilter = events.get_filter_from_event_type(evtypes)
00197 return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
00198
00199 def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
00200 is_list = True
00201 if isinstance(nodes, collections.Iterable):
00202 nodes = list(nodes)
00203 else:
00204 nodes = [nodes]
00205 is_list = False
00206 mirs = []
00207 for node in nodes:
00208 mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
00209 mirs.append(mir)
00210
00211 mids = self.create_monitored_items(mirs)
00212 if is_list:
00213 return mids
00214 if type(mids[0]) == ua.StatusCode:
00215 mids[0].check()
00216 return mids[0]
00217
00218 def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
00219 rv = ua.ReadValueId()
00220 rv.NodeId = node.nodeid
00221 rv.AttributeId = attr
00222
00223 mparams = ua.MonitoringParameters()
00224 with self._lock:
00225 self._client_handle += 1
00226 mparams.ClientHandle = self._client_handle
00227 mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
00228 mparams.QueueSize = queuesize
00229 mparams.DiscardOldest = True
00230 if mfilter:
00231 mparams.Filter = mfilter
00232 mir = ua.MonitoredItemCreateRequest()
00233 mir.ItemToMonitor = rv
00234 mir.MonitoringMode = ua.MonitoringMode.Reporting
00235 mir.RequestedParameters = mparams
00236 return mir
00237
00238 def create_monitored_items(self, monitored_items):
00239 """
00240 low level method to have full control over subscription parameters
00241 Client handle must be unique since it will be used as key for internal registration of data
00242 """
00243 params = ua.CreateMonitoredItemsParameters()
00244 params.SubscriptionId = self.subscription_id
00245 params.ItemsToCreate = monitored_items
00246 params.TimestampsToReturn = ua.TimestampsToReturn.Both
00247
00248
00249
00250 with self._lock:
00251 for mi in monitored_items:
00252 data = SubscriptionItemData()
00253 data.client_handle = mi.RequestedParameters.ClientHandle
00254 data.node = Node(self.server, mi.ItemToMonitor.NodeId)
00255 data.attribute = mi.ItemToMonitor.AttributeId
00256
00257 data.mfilter = mi.RequestedParameters.Filter
00258 self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
00259 results = self.server.create_monitored_items(params)
00260 mids = []
00261
00262 with self._lock:
00263 for idx, result in enumerate(results):
00264 mi = params.ItemsToCreate[idx]
00265 if not result.StatusCode.is_good():
00266 del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
00267 mids.append(result.StatusCode)
00268 continue
00269 data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
00270 data.server_handle = result.MonitoredItemId
00271 mids.append(result.MonitoredItemId)
00272 return mids
00273
00274 def unsubscribe(self, handle):
00275 """
00276 unsubscribe to datachange or events using the handle returned while subscribing
00277 if you delete subscription, you do not need to unsubscribe
00278 """
00279 params = ua.DeleteMonitoredItemsParameters()
00280 params.SubscriptionId = self.subscription_id
00281 params.MonitoredItemIds = [handle]
00282 results = self.server.delete_monitored_items(params)
00283 results[0].check()
00284 with self._lock:
00285 for k, v in self._monitoreditems_map.items():
00286 if v.server_handle == handle:
00287 del(self._monitoreditems_map[k])
00288 return
00289
00290 def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
00291 """
00292 Modify a monitored item.
00293 :param handle: Handle returned when originally subscribing
00294 :param new_samp_time: New wanted sample time
00295 :param new_queuesize: New wanted queuesize, default is 0
00296 :param mod_filter_val: New deadband filter value
00297 :return: Return a Modify Monitored Item Result
00298 """
00299 for monitored_item_index in self._monitoreditems_map:
00300 if self._monitoreditems_map[monitored_item_index].server_handle == handle:
00301 item_to_change = self._monitoreditems_map[monitored_item_index]
00302 break
00303 if mod_filter_val is None:
00304 mod_filter = None
00305 elif mod_filter_val < 0:
00306 mod_filter = item_to_change.mfilter
00307 else:
00308 mod_filter = ua.DataChangeFilter()
00309 mod_filter.Trigger = ua.DataChangeTrigger(1)
00310 mod_filter.DeadbandType = 1
00311 mod_filter.DeadbandValue = mod_filter_val
00312 modif_item = ua.MonitoredItemModifyRequest()
00313 modif_item.MonitoredItemId = handle
00314 modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
00315 mod_filter)
00316 params = ua.ModifyMonitoredItemsParameters()
00317 params.SubscriptionId = self.subscription_id
00318 params.ItemsToModify.append(modif_item)
00319 results = self.server.modify_monitored_items(params)
00320 item_to_change.mfilter = results[0].FilterResult
00321 return results
00322
00323 def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter):
00324 req_params = ua.MonitoringParameters()
00325 with self._lock:
00326 req_params.ClientHandle = self._client_handle
00327 req_params.QueueSize = new_queuesize
00328 req_params.Filter = mod_filter
00329 req_params.SamplingInterval = new_samp_time
00330 return req_params
00331
00332 def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
00333 """
00334 Method to create a subscription with a Deadband Value.
00335 Default deadband value type is absolute.
00336 Return a handle which can be used to unsubscribe
00337 :param var: Variable to which you want to subscribe
00338 :param deadband_val: Absolute float value
00339 :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
00340 :param queuesize: Wanted queue size, default is 1
00341 """
00342 deadband_filter = ua.DataChangeFilter()
00343 deadband_filter.Trigger = ua.DataChangeTrigger(1)
00344 deadband_filter.DeadbandType = deadbandtype
00345 deadband_filter.DeadbandValue = deadband_val
00346 return self._subscribe(var, attr, deadband_filter, queuesize)