subscription_service.py
Go to the documentation of this file.
00001 """
00002 server side implementation of subscription service
00003 """
00004 
00005 from threading import RLock
00006 import logging
00007 
00008 from opcua import ua
00009 from opcua.server.internal_subscription import InternalSubscription
00010 
00011 
00012 class SubscriptionService(object):
00013 
00014     def __init__(self, loop, aspace):
00015         self.logger = logging.getLogger(__name__)
00016         self.loop = loop
00017         self.aspace = aspace
00018         self.subscriptions = {}
00019         self._sub_id_counter = 77
00020         self._lock = RLock()
00021 
00022     def create_subscription(self, params, callback):
00023         self.logger.info("create subscription with callback: %s", callback)
00024         result = ua.CreateSubscriptionResult()
00025         result.RevisedPublishingInterval = params.RequestedPublishingInterval
00026         result.RevisedLifetimeCount = params.RequestedLifetimeCount
00027         result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
00028         with self._lock:
00029             self._sub_id_counter += 1
00030             result.SubscriptionId = self._sub_id_counter
00031 
00032             sub = InternalSubscription(self, result, self.aspace, callback)
00033             sub.start()
00034             self.subscriptions[result.SubscriptionId] = sub
00035 
00036             return result
00037 
00038     def delete_subscriptions(self, ids):
00039         self.logger.info("delete subscriptions: %s", ids)
00040         res = []
00041         for i in ids:
00042             with self._lock:
00043                 if i not in self.subscriptions:
00044                     res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
00045                 else:
00046                     sub = self.subscriptions.pop(i)
00047                     sub.stop()
00048                     res.append(ua.StatusCode())
00049         return res
00050 
00051     def publish(self, acks):
00052         self.logger.info("publish request with acks %s", acks)
00053         with self._lock:
00054             for subid, sub in self.subscriptions.items():
00055                 sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
00056 
00057     def create_monitored_items(self, params):
00058         self.logger.info("create monitored items")
00059         with self._lock:
00060             if params.SubscriptionId not in self.subscriptions:
00061                 res = []
00062                 for _ in params.ItemsToCreate:
00063                     response = ua.MonitoredItemCreateResult()
00064                     response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
00065                     res.append(response)
00066                 return res
00067             return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
00068 
00069     def modify_monitored_items(self, params):
00070         self.logger.info("modify monitored items")
00071         with self._lock:
00072             if params.SubscriptionId not in self.subscriptions:
00073                 res = []
00074                 for _ in params.ItemsToModify:
00075                     result = ua.MonitoredItemModifyResult()
00076                     result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
00077                     res.append(result)
00078                 return res
00079             return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
00080 
00081     def delete_monitored_items(self, params):
00082         self.logger.info("delete monitored items")
00083         with self._lock:
00084             if params.SubscriptionId not in self.subscriptions:
00085                 res = []
00086                 for _ in params.MonitoredItemIds:
00087                     res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
00088                 return res
00089             return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
00090                 params.MonitoredItemIds)
00091 
00092     def republish(self, params):
00093         with self._lock:
00094             if params.SubscriptionId not in self.subscriptions:
00095                 # TODO: what should I do?
00096                 return ua.NotificationMessage()
00097             return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
00098 
00099     def trigger_event(self, event):
00100         with self._lock:
00101             for sub in self.subscriptions.values():
00102                 sub.monitored_item_srv.trigger_event(event)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23