subscription.py
Go to the documentation of this file.
00001 """
00002 high level interface to subscriptions
00003 """
00004 import time
00005 import logging
00006 from threading import Lock
00007 import collections
00008 
00009 from opcua import ua
00010 from opcua.common import events
00011 from opcua import Node
00012 
00013 
00014 class SubHandler(object):
00015     """
00016     Subscription Handler. To receive events from server for a subscription
00017     This class is just a sample class. Whatever class having these methods can be used
00018     """
00019 
00020     def data_change(self, handle, node, val, attr):
00021         """
00022         Deprecated, use datachange_notification
00023         """
00024         pass
00025 
00026     def datachange_notification(self, node, val, data):
00027         """
00028         called for every datachange notification from server
00029         """
00030         pass
00031 
00032     def event_notification(self, event):
00033         """
00034         called for every event notification from server
00035         """
00036         pass
00037 
00038     def status_change_notification(self, status):
00039         """
00040         called for every status change notification from server
00041         """
00042         pass
00043 
00044 
00045 class SubscriptionItemData(object):
00046     """
00047     To store useful data from a monitored item
00048     """
00049     def __init__(self):
00050         self.node = None
00051         self.client_handle = None
00052         self.server_handle = None
00053         self.attribute = None
00054         self.mfilter = None
00055 
00056 
00057 class DataChangeNotif(object):
00058     """
00059     To be send to clients for every datachange notification from server
00060     """
00061     def __init__(self, subscription_data, monitored_item):
00062         self.monitored_item = monitored_item
00063         self.subscription_data = subscription_data
00064 
00065     def __str__(self):
00066         return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
00067     __repr__ = __str__
00068 
00069 
00070 class Subscription(object):
00071     """
00072     Subscription object returned by Server or Client objects.
00073     The object represent a subscription to an opc-ua server.
00074     This is a high level class, especially subscribe_data_change
00075     and subscribe_events methods. If more control is necessary look at
00076     code and/or use create_monitored_items method.
00077     """
00078 
00079     def __init__(self, server, params, handler):
00080         self.logger = logging.getLogger(__name__)
00081         self.server = server
00082         self._client_handle = 200
00083         self._handler = handler
00084         self.parameters = params  # move to data class
00085         self._monitoreditems_map = {}
00086         self._lock = Lock()
00087         self.subscription_id = None
00088         response = self.server.create_subscription(params, self.publish_callback)
00089         self.subscription_id = response.SubscriptionId  # move to data class
00090 
00091         # Launching two publish requests is a heuristic. We try to ensure
00092         # that the server always has at least one publish request in the queue,
00093         # even after it just replied to a publish request.
00094         self.server.publish()
00095         self.server.publish()
00096 
00097     def delete(self):
00098         """
00099         Delete subscription on server. This is automatically done by Client and Server classes on exit
00100         """
00101         results = self.server.delete_subscriptions([self.subscription_id])
00102         results[0].check()
00103 
00104     def publish_callback(self, publishresult):
00105         self.logger.info("Publish callback called with result: %s", publishresult)
00106         while self.subscription_id is None:
00107             time.sleep(0.01)
00108 
00109         for notif in publishresult.NotificationMessage.NotificationData:
00110             if isinstance(notif, ua.DataChangeNotification):
00111                 self._call_datachange(notif)
00112             elif isinstance(notif, ua.EventNotificationList):
00113                 self._call_event(notif)
00114             elif isinstance(notif, ua.StatusChangeNotification):
00115                 self._call_status(notif)
00116             else:
00117                 self.logger.warning("Notification type not supported yet for notification %s", notif)
00118 
00119         ack = ua.SubscriptionAcknowledgement()
00120         ack.SubscriptionId = self.subscription_id
00121         ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
00122         self.server.publish([ack])
00123 
00124     def _call_datachange(self, datachange):
00125         for item in datachange.MonitoredItems:
00126             with self._lock:
00127                 if item.ClientHandle not in self._monitoreditems_map:
00128                     self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
00129                     continue
00130                 data = self._monitoreditems_map[item.ClientHandle]
00131             if hasattr(self._handler, "datachange_notification"):
00132                 event_data = DataChangeNotif(data, item)
00133                 try:
00134                     self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
00135                 except Exception:
00136                     self.logger.exception("Exception calling data change handler")
00137             elif hasattr(self._handler, "data_change"):  # deprecated API
00138                 self.logger.warning("data_change method is deprecated, use datachange_notification")
00139                 try:
00140                     self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
00141                 except Exception:
00142                     self.logger.exception("Exception calling deprecated data change handler")
00143             else:
00144                 self.logger.error("DataChange subscription created but handler has no datachange_notification method")
00145 
00146     def _call_event(self, eventlist):
00147         for event in eventlist.Events:
00148             with self._lock:
00149                 data = self._monitoreditems_map[event.ClientHandle]
00150             result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
00151             result.server_handle = data.server_handle
00152             if hasattr(self._handler, "event_notification"):
00153                 try:
00154                     self._handler.event_notification(result)
00155                 except Exception:
00156                     self.logger.exception("Exception calling event handler")
00157             elif hasattr(self._handler, "event"):  # depcrecated API
00158                 try:
00159                     self._handler.event(data.server_handle, result)
00160                 except Exception:
00161                     self.logger.exception("Exception calling deprecated event handler")
00162             else:
00163                 self.logger.error("Event subscription created but handler has no event_notification method")
00164 
00165     def _call_status(self, status):
00166         try:
00167             self._handler.status_change_notification(status.Status)
00168         except Exception:
00169             self.logger.exception("Exception calling status change handler")
00170 
00171     def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
00172         """
00173         Subscribe for data change events for a node or list of nodes.
00174         default attribute is Value.
00175         Return a handle which can be used to unsubscribe
00176         If more control is necessary use create_monitored_items method
00177         """
00178         return self._subscribe(nodes, attr, queuesize=0)
00179 
00180     def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None):
00181         """
00182         Subscribe to events from a node. Default node is Server node.
00183         In most servers the server node is the only one you can subscribe to.
00184         if evtypes is not provided, evtype defaults to BaseEventType
00185         if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
00186         Return a handle which can be used to unsubscribe
00187         """
00188         sourcenode = Node(self.server, sourcenode)
00189 
00190         if evfilter is None:
00191             if not type(evtypes) in (list, tuple):
00192                 evtypes = [evtypes]
00193 
00194             evtypes = [Node(self.server, evtype) for evtype in evtypes]
00195 
00196             evfilter = events.get_filter_from_event_type(evtypes)
00197         return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
00198 
00199     def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
00200         is_list = True
00201         if isinstance(nodes, collections.Iterable):
00202             nodes = list(nodes)
00203         else:
00204             nodes = [nodes]
00205             is_list = False
00206         mirs = []
00207         for node in nodes:
00208             mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
00209             mirs.append(mir)
00210 
00211         mids = self.create_monitored_items(mirs)
00212         if is_list:
00213             return mids
00214         if type(mids[0]) == ua.StatusCode:
00215             mids[0].check()
00216         return mids[0]
00217 
00218     def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
00219         rv = ua.ReadValueId()
00220         rv.NodeId = node.nodeid
00221         rv.AttributeId = attr
00222         # rv.IndexRange //We leave it null, then the entire array is returned
00223         mparams = ua.MonitoringParameters()
00224         with self._lock:
00225             self._client_handle += 1
00226             mparams.ClientHandle = self._client_handle
00227         mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
00228         mparams.QueueSize = queuesize
00229         mparams.DiscardOldest = True
00230         if mfilter:
00231             mparams.Filter = mfilter
00232         mir = ua.MonitoredItemCreateRequest()
00233         mir.ItemToMonitor = rv
00234         mir.MonitoringMode = ua.MonitoringMode.Reporting
00235         mir.RequestedParameters = mparams
00236         return mir
00237 
00238     def create_monitored_items(self, monitored_items):
00239         """
00240         low level method to have full control over subscription parameters
00241         Client handle must be unique since it will be used as key for internal registration of data
00242         """
00243         params = ua.CreateMonitoredItemsParameters()
00244         params.SubscriptionId = self.subscription_id
00245         params.ItemsToCreate = monitored_items
00246         params.TimestampsToReturn = ua.TimestampsToReturn.Both
00247 
00248         # insert monitored item into map to avoid notification arrive before result return
00249         # server_handle is left as None in purpose as we don't get it yet.
00250         with self._lock:
00251             for mi in monitored_items:
00252                 data = SubscriptionItemData()
00253                 data.client_handle = mi.RequestedParameters.ClientHandle
00254                 data.node = Node(self.server, mi.ItemToMonitor.NodeId)
00255                 data.attribute = mi.ItemToMonitor.AttributeId
00256                 #TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
00257                 data.mfilter = mi.RequestedParameters.Filter
00258                 self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
00259         results = self.server.create_monitored_items(params)
00260         mids = []
00261         # process result, add server_handle, or remove it if failed
00262         with self._lock:
00263             for idx, result in enumerate(results):
00264                 mi = params.ItemsToCreate[idx]
00265                 if not result.StatusCode.is_good():
00266                     del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
00267                     mids.append(result.StatusCode)
00268                     continue
00269                 data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
00270                 data.server_handle = result.MonitoredItemId
00271                 mids.append(result.MonitoredItemId)
00272         return mids
00273 
00274     def unsubscribe(self, handle):
00275         """
00276         unsubscribe to datachange or events using the handle returned while subscribing
00277         if you delete subscription, you do not need to unsubscribe
00278         """
00279         params = ua.DeleteMonitoredItemsParameters()
00280         params.SubscriptionId = self.subscription_id
00281         params.MonitoredItemIds = [handle]
00282         results = self.server.delete_monitored_items(params)
00283         results[0].check()
00284         with self._lock:
00285             for k, v in self._monitoreditems_map.items():
00286                 if v.server_handle == handle:
00287                     del(self._monitoreditems_map[k])
00288                     return
00289 
00290     def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
00291         """
00292         Modify a monitored item.
00293         :param handle: Handle returned when originally subscribing
00294         :param new_samp_time: New wanted sample time
00295         :param new_queuesize: New wanted queuesize, default is 0
00296         :param mod_filter_val: New deadband filter value
00297         :return: Return a Modify Monitored Item Result
00298         """
00299         for monitored_item_index in self._monitoreditems_map:
00300             if self._monitoreditems_map[monitored_item_index].server_handle == handle:
00301                 item_to_change = self._monitoreditems_map[monitored_item_index]
00302                 break
00303         if mod_filter_val is None:
00304             mod_filter = None
00305         elif mod_filter_val < 0:
00306             mod_filter = item_to_change.mfilter
00307         else:
00308             mod_filter = ua.DataChangeFilter()
00309             mod_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
00310             mod_filter.DeadbandType = 1
00311             mod_filter.DeadbandValue = mod_filter_val  # absolute float value or from 0 to 100 for percentage deadband
00312         modif_item = ua.MonitoredItemModifyRequest()
00313         modif_item.MonitoredItemId = handle
00314         modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
00315                                                                              mod_filter)
00316         params = ua.ModifyMonitoredItemsParameters()
00317         params.SubscriptionId = self.subscription_id
00318         params.ItemsToModify.append(modif_item)
00319         results = self.server.modify_monitored_items(params)
00320         item_to_change.mfilter = results[0].FilterResult
00321         return results
00322 
00323     def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter):
00324         req_params = ua.MonitoringParameters()
00325         with self._lock:
00326             req_params.ClientHandle = self._client_handle
00327         req_params.QueueSize = new_queuesize
00328         req_params.Filter = mod_filter
00329         req_params.SamplingInterval = new_samp_time
00330         return req_params
00331 
00332     def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
00333         """
00334         Method to create a subscription with a Deadband Value.
00335         Default deadband value type is absolute.
00336         Return a handle which can be used to unsubscribe
00337         :param var: Variable to which you want to subscribe
00338         :param deadband_val: Absolute float value
00339         :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
00340         :param queuesize: Wanted queue size, default is 1
00341         """
00342         deadband_filter = ua.DataChangeFilter()
00343         deadband_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
00344         deadband_filter.DeadbandType = deadbandtype
00345         deadband_filter.DeadbandValue = deadband_val  # absolute float value or from 0 to 100 for percentage deadband
00346         return self._subscribe(var, attr, deadband_filter, queuesize)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23