subscription.py
Go to the documentation of this file.
1 """
2 high level interface to subscriptions
3 """
4 import time
5 import logging
6 from threading import Lock
7 import collections
8 
9 from opcua import ua
10 from opcua.common import events
11 from opcua import Node
12 
13 
14 class SubHandler(object):
15  """
16  Subscription Handler. To receive events from server for a subscription
17  This class is just a sample class. Whatever class having these methods can be used
18  """
19 
20  def data_change(self, handle, node, val, attr):
21  """
22  Deprecated, use datachange_notification
23  """
24  pass
25 
26  def datachange_notification(self, node, val, data):
27  """
28  called for every datachange notification from server
29  """
30  pass
31 
32  def event_notification(self, event):
33  """
34  called for every event notification from server
35  """
36  pass
37 
38  def status_change_notification(self, status):
39  """
40  called for every status change notification from server
41  """
42  pass
43 
44 
45 class SubscriptionItemData(object):
46  """
47  To store useful data from a monitored item
48  """
49  def __init__(self):
50  self.node = None
51  self.client_handle = None
52  self.server_handle = None
53  self.attribute = None
54  self.mfilter = None
55 
56 
57 class DataChangeNotif(object):
58  """
59  To be send to clients for every datachange notification from server
60  """
61  def __init__(self, subscription_data, monitored_item):
62  self.monitored_item = monitored_item
63  self.subscription_data = subscription_data
64 
65  def __str__(self):
66  return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
67  __repr__ = __str__
68 
69 
70 class Subscription(object):
71  """
72  Subscription object returned by Server or Client objects.
73  The object represent a subscription to an opc-ua server.
74  This is a high level class, especially subscribe_data_change
75  and subscribe_events methods. If more control is necessary look at
76  code and/or use create_monitored_items method.
77  """
78 
79  def __init__(self, server, params, handler):
80  self.logger = logging.getLogger(__name__)
81  self.server = server
82  self._client_handle = 200
83  self._handler = handler
84  self.parameters = params # move to data class
86  self._lock = Lock()
87  self.subscription_id = None
88  response = self.server.create_subscription(params, self.publish_callback)
89  self.subscription_id = response.SubscriptionId # move to data class
90 
91  # Launching two publish requests is a heuristic. We try to ensure
92  # that the server always has at least one publish request in the queue,
93  # even after it just replied to a publish request.
94  self.server.publish()
95  self.server.publish()
96 
97  def delete(self):
98  """
99  Delete subscription on server. This is automatically done by Client and Server classes on exit
100  """
101  results = self.server.delete_subscriptions([self.subscription_id])
102  results[0].check()
103 
104  def publish_callback(self, publishresult):
105  self.logger.info("Publish callback called with result: %s", publishresult)
106  while self.subscription_id is None:
107  time.sleep(0.01)
108 
109  for notif in publishresult.NotificationMessage.NotificationData:
110  if isinstance(notif, ua.DataChangeNotification):
111  self._call_datachange(notif)
112  elif isinstance(notif, ua.EventNotificationList):
113  self._call_event(notif)
114  elif isinstance(notif, ua.StatusChangeNotification):
115  self._call_status(notif)
116  else:
117  self.logger.warning("Notification type not supported yet for notification %s", notif)
118 
120  ack.SubscriptionId = self.subscription_id
121  ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
122  self.server.publish([ack])
123 
124  def _call_datachange(self, datachange):
125  for item in datachange.MonitoredItems:
126  with self._lock:
127  if item.ClientHandle not in self._monitoreditems_map:
128  self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
129  continue
130  data = self._monitoreditems_map[item.ClientHandle]
131  if hasattr(self._handler, "datachange_notification"):
132  event_data = DataChangeNotif(data, item)
133  try:
134  self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
135  except Exception:
136  self.logger.exception("Exception calling data change handler")
137  elif hasattr(self._handler, "data_change"): # deprecated API
138  self.logger.warning("data_change method is deprecated, use datachange_notification")
139  try:
140  self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
141  except Exception:
142  self.logger.exception("Exception calling deprecated data change handler")
143  else:
144  self.logger.error("DataChange subscription created but handler has no datachange_notification method")
145 
146  def _call_event(self, eventlist):
147  for event in eventlist.Events:
148  with self._lock:
149  data = self._monitoreditems_map[event.ClientHandle]
150  result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
151  result.server_handle = data.server_handle
152  if hasattr(self._handler, "event_notification"):
153  try:
154  self._handler.event_notification(result)
155  except Exception:
156  self.logger.exception("Exception calling event handler")
157  elif hasattr(self._handler, "event"): # depcrecated API
158  try:
159  self._handler.event(data.server_handle, result)
160  except Exception:
161  self.logger.exception("Exception calling deprecated event handler")
162  else:
163  self.logger.error("Event subscription created but handler has no event_notification method")
164 
165  def _call_status(self, status):
166  try:
167  self._handler.status_change_notification(status.Status)
168  except Exception:
169  self.logger.exception("Exception calling status change handler")
170 
171  def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
172  """
173  Subscribe for data change events for a node or list of nodes.
174  default attribute is Value.
175  Return a handle which can be used to unsubscribe
176  If more control is necessary use create_monitored_items method
177  """
178  return self._subscribe(nodes, attr, queuesize=0)
179 
180  def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None):
181  """
182  Subscribe to events from a node. Default node is Server node.
183  In most servers the server node is the only one you can subscribe to.
184  if evtypes is not provided, evtype defaults to BaseEventType
185  if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
186  Return a handle which can be used to unsubscribe
187  """
188  sourcenode = Node(self.server, sourcenode)
189 
190  if evfilter is None:
191  if not type(evtypes) in (list, tuple):
192  evtypes = [evtypes]
193 
194  evtypes = [Node(self.server, evtype) for evtype in evtypes]
195 
196  evfilter = events.get_filter_from_event_type(evtypes)
197  return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
198 
199  def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
200  is_list = True
201  if isinstance(nodes, collections.Iterable):
202  nodes = list(nodes)
203  else:
204  nodes = [nodes]
205  is_list = False
206  mirs = []
207  for node in nodes:
208  mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
209  mirs.append(mir)
210 
211  mids = self.create_monitored_items(mirs)
212  if is_list:
213  return mids
214  if type(mids[0]) == ua.StatusCode:
215  mids[0].check()
216  return mids[0]
217 
218  def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
219  rv = ua.ReadValueId()
220  rv.NodeId = node.nodeid
221  rv.AttributeId = attr
222  # rv.IndexRange //We leave it null, then the entire array is returned
223  mparams = ua.MonitoringParameters()
224  with self._lock:
225  self._client_handle += 1
226  mparams.ClientHandle = self._client_handle
227  mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
228  mparams.QueueSize = queuesize
229  mparams.DiscardOldest = True
230  if mfilter:
231  mparams.Filter = mfilter
233  mir.ItemToMonitor = rv
234  mir.MonitoringMode = ua.MonitoringMode.Reporting
235  mir.RequestedParameters = mparams
236  return mir
237 
238  def create_monitored_items(self, monitored_items):
239  """
240  low level method to have full control over subscription parameters
241  Client handle must be unique since it will be used as key for internal registration of data
242  """
244  params.SubscriptionId = self.subscription_id
245  params.ItemsToCreate = monitored_items
246  params.TimestampsToReturn = ua.TimestampsToReturn.Both
247 
248  # insert monitored item into map to avoid notification arrive before result return
249  # server_handle is left as None in purpose as we don't get it yet.
250  with self._lock:
251  for mi in monitored_items:
252  data = SubscriptionItemData()
253  data.client_handle = mi.RequestedParameters.ClientHandle
254  data.node = Node(self.server, mi.ItemToMonitor.NodeId)
255  data.attribute = mi.ItemToMonitor.AttributeId
256  #TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
257  data.mfilter = mi.RequestedParameters.Filter
258  self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
259  results = self.server.create_monitored_items(params)
260  mids = []
261  # process result, add server_handle, or remove it if failed
262  with self._lock:
263  for idx, result in enumerate(results):
264  mi = params.ItemsToCreate[idx]
265  if not result.StatusCode.is_good():
266  del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
267  mids.append(result.StatusCode)
268  continue
269  data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
270  data.server_handle = result.MonitoredItemId
271  mids.append(result.MonitoredItemId)
272  return mids
273 
274  def unsubscribe(self, handle):
275  """
276  unsubscribe to datachange or events using the handle returned while subscribing
277  if you delete subscription, you do not need to unsubscribe
278  """
280  params.SubscriptionId = self.subscription_id
281  params.MonitoredItemIds = [handle]
282  results = self.server.delete_monitored_items(params)
283  results[0].check()
284  with self._lock:
285  for k, v in self._monitoreditems_map.items():
286  if v.server_handle == handle:
287  del(self._monitoreditems_map[k])
288  return
289 
290  def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
291  """
292  Modify a monitored item.
293  :param handle: Handle returned when originally subscribing
294  :param new_samp_time: New wanted sample time
295  :param new_queuesize: New wanted queuesize, default is 0
296  :param mod_filter_val: New deadband filter value
297  :return: Return a Modify Monitored Item Result
298  """
299  for monitored_item_index in self._monitoreditems_map:
300  if self._monitoreditems_map[monitored_item_index].server_handle == handle:
301  item_to_change = self._monitoreditems_map[monitored_item_index]
302  break
303  if mod_filter_val is None:
304  mod_filter = None
305  elif mod_filter_val < 0:
306  mod_filter = item_to_change.mfilter
307  else:
308  mod_filter = ua.DataChangeFilter()
309  mod_filter.Trigger = ua.DataChangeTrigger(1) # send notification when status or value change
310  mod_filter.DeadbandType = 1
311  mod_filter.DeadbandValue = mod_filter_val # absolute float value or from 0 to 100 for percentage deadband
312  modif_item = ua.MonitoredItemModifyRequest()
313  modif_item.MonitoredItemId = handle
314  modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
315  mod_filter)
317  params.SubscriptionId = self.subscription_id
318  params.ItemsToModify.append(modif_item)
319  results = self.server.modify_monitored_items(params)
320  item_to_change.mfilter = results[0].FilterResult
321  return results
322 
323  def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter):
324  req_params = ua.MonitoringParameters()
325  with self._lock:
326  req_params.ClientHandle = self._client_handle
327  req_params.QueueSize = new_queuesize
328  req_params.Filter = mod_filter
329  req_params.SamplingInterval = new_samp_time
330  return req_params
331 
332  def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
333  """
334  Method to create a subscription with a Deadband Value.
335  Default deadband value type is absolute.
336  Return a handle which can be used to unsubscribe
337  :param var: Variable to which you want to subscribe
338  :param deadband_val: Absolute float value
339  :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
340  :param queuesize: Wanted queue size, default is 1
341  """
342  deadband_filter = ua.DataChangeFilter()
343  deadband_filter.Trigger = ua.DataChangeTrigger(1) # send notification when status or value change
344  deadband_filter.DeadbandType = deadbandtype
345  deadband_filter.DeadbandValue = deadband_val # absolute float value or from 0 to 100 for percentage deadband
346  return self._subscribe(var, attr, deadband_filter, queuesize)
def data_change(self, handle, node, val, attr)
Definition: subscription.py:20
def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None)
def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1)
def _make_monitored_item_request(self, node, attr, mfilter, queuesize)
def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value)
def __init__(self, subscription_data, monitored_item)
Definition: subscription.py:61
def publish_callback(self, publishresult)
def create_monitored_items(self, monitored_items)
def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value)
def datachange_notification(self, node, val, data)
Definition: subscription.py:26
def __init__(self, server, params, handler)
Definition: subscription.py:79
def status_change_notification(self, status)
Definition: subscription.py:38
def _call_datachange(self, datachange)
def _subscribe(self, nodes, attr, mfilter=None, queuesize=0)
def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44