2 high level interface to subscriptions 6 from threading
import Lock
11 from opcua
import Node
16 Subscription Handler. To receive events from server for a subscription 17 This class is just a sample class. Whatever class having these methods can be used 22 Deprecated, use datachange_notification 28 called for every datachange notification from server 34 called for every event notification from server 40 called for every status change notification from server 45 class SubscriptionItemData(object):
47 To store useful data from a monitored item 59 To be send to clients for every datachange notification from server 61 def __init__(self, subscription_data, monitored_item):
72 Subscription object returned by Server or Client objects. 73 The object represent a subscription to an opc-ua server. 74 This is a high level class, especially subscribe_data_change 75 and subscribe_events methods. If more control is necessary look at 76 code and/or use create_monitored_items method. 80 self.
logger = logging.getLogger(__name__)
99 Delete subscription on server. This is automatically done by Client and Server classes on exit 105 self.logger.info(
"Publish callback called with result: %s", publishresult)
109 for notif
in publishresult.NotificationMessage.NotificationData:
117 self.logger.warning(
"Notification type not supported yet for notification %s", notif)
121 ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
122 self.server.publish([ack])
125 for item
in datachange.MonitoredItems:
128 self.logger.warning(
"Received a notification for unknown handle: %s", item.ClientHandle)
131 if hasattr(self.
_handler,
"datachange_notification"):
134 self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
136 self.logger.exception(
"Exception calling data change handler")
137 elif hasattr(self.
_handler,
"data_change"):
138 self.logger.warning(
"data_change method is deprecated, use datachange_notification")
140 self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
142 self.logger.exception(
"Exception calling deprecated data change handler")
144 self.logger.error(
"DataChange subscription created but handler has no datachange_notification method")
147 for event
in eventlist.Events:
150 result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
151 result.server_handle = data.server_handle
152 if hasattr(self.
_handler,
"event_notification"):
154 self._handler.event_notification(result)
156 self.logger.exception(
"Exception calling event handler")
157 elif hasattr(self.
_handler,
"event"):
159 self._handler.event(data.server_handle, result)
161 self.logger.exception(
"Exception calling deprecated event handler")
163 self.logger.error(
"Event subscription created but handler has no event_notification method")
167 self._handler.status_change_notification(status.Status)
169 self.logger.exception(
"Exception calling status change handler")
173 Subscribe for data change events for a node or list of nodes. 174 default attribute is Value. 175 Return a handle which can be used to unsubscribe 176 If more control is necessary use create_monitored_items method 178 return self.
_subscribe(nodes, attr, queuesize=0)
180 def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None):
182 Subscribe to events from a node. Default node is Server node. 183 In most servers the server node is the only one you can subscribe to. 184 if evtypes is not provided, evtype defaults to BaseEventType 185 if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types 186 Return a handle which can be used to unsubscribe 188 sourcenode = Node(self.
server, sourcenode)
191 if not type(evtypes)
in (list, tuple):
194 evtypes = [Node(self.
server, evtype)
for evtype
in evtypes]
196 evfilter = events.get_filter_from_event_type(evtypes)
197 return self.
_subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
199 def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
201 if isinstance(nodes, collections.Iterable):
220 rv.NodeId = node.nodeid
221 rv.AttributeId = attr
227 mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
228 mparams.QueueSize = queuesize
229 mparams.DiscardOldest =
True 231 mparams.Filter = mfilter
233 mir.ItemToMonitor = rv
234 mir.MonitoringMode = ua.MonitoringMode.Reporting
235 mir.RequestedParameters = mparams
240 low level method to have full control over subscription parameters 241 Client handle must be unique since it will be used as key for internal registration of data 245 params.ItemsToCreate = monitored_items
246 params.TimestampsToReturn = ua.TimestampsToReturn.Both
251 for mi
in monitored_items:
253 data.client_handle = mi.RequestedParameters.ClientHandle
254 data.node = Node(self.
server, mi.ItemToMonitor.NodeId)
255 data.attribute = mi.ItemToMonitor.AttributeId
257 data.mfilter = mi.RequestedParameters.Filter
259 results = self.server.create_monitored_items(params)
263 for idx, result
in enumerate(results):
264 mi = params.ItemsToCreate[idx]
265 if not result.StatusCode.is_good():
267 mids.append(result.StatusCode)
270 data.server_handle = result.MonitoredItemId
271 mids.append(result.MonitoredItemId)
276 unsubscribe to datachange or events using the handle returned while subscribing 277 if you delete subscription, you do not need to unsubscribe 281 params.MonitoredItemIds = [handle]
282 results = self.server.delete_monitored_items(params)
285 for k, v
in self._monitoreditems_map.items():
286 if v.server_handle == handle:
292 Modify a monitored item. 293 :param handle: Handle returned when originally subscribing 294 :param new_samp_time: New wanted sample time 295 :param new_queuesize: New wanted queuesize, default is 0 296 :param mod_filter_val: New deadband filter value 297 :return: Return a Modify Monitored Item Result 303 if mod_filter_val
is None:
305 elif mod_filter_val < 0:
306 mod_filter = item_to_change.mfilter
310 mod_filter.DeadbandType = 1
311 mod_filter.DeadbandValue = mod_filter_val
313 modif_item.MonitoredItemId = handle
318 params.ItemsToModify.append(modif_item)
319 results = self.server.modify_monitored_items(params)
320 item_to_change.mfilter = results[0].FilterResult
327 req_params.QueueSize = new_queuesize
328 req_params.Filter = mod_filter
329 req_params.SamplingInterval = new_samp_time
332 def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
334 Method to create a subscription with a Deadband Value. 335 Default deadband value type is absolute. 336 Return a handle which can be used to unsubscribe 337 :param var: Variable to which you want to subscribe 338 :param deadband_val: Absolute float value 339 :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband 340 :param queuesize: Wanted queue size, default is 1 344 deadband_filter.DeadbandType = deadbandtype
345 deadband_filter.DeadbandValue = deadband_val
346 return self.
_subscribe(var, attr, deadband_filter, queuesize)
def event_notification(self, event)
def data_change(self, handle, node, val, attr)
def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None)
def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1)
def _make_monitored_item_request(self, node, attr, mfilter, queuesize)
def unsubscribe(self, handle)
def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value)
def __init__(self, subscription_data, monitored_item)
def publish_callback(self, publishresult)
def _call_status(self, status)
def create_monitored_items(self, monitored_items)
def _call_event(self, eventlist)
def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value)
def datachange_notification(self, node, val, data)
def __init__(self, server, params, handler)
def status_change_notification(self, status)
def _call_datachange(self, datachange)
def _subscribe(self, nodes, attr, mfilter=None, queuesize=0)
def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter)