Go to the documentation of this file.00001 """
00002 server side implementation of subscription service
00003 """
00004
00005 from threading import RLock
00006 import logging
00007
00008 from opcua import ua
00009 from opcua.server.internal_subscription import InternalSubscription
00010
00011
00012 class SubscriptionService(object):
00013
00014 def __init__(self, loop, aspace):
00015 self.logger = logging.getLogger(__name__)
00016 self.loop = loop
00017 self.aspace = aspace
00018 self.subscriptions = {}
00019 self._sub_id_counter = 77
00020 self._lock = RLock()
00021
00022 def create_subscription(self, params, callback):
00023 self.logger.info("create subscription with callback: %s", callback)
00024 result = ua.CreateSubscriptionResult()
00025 result.RevisedPublishingInterval = params.RequestedPublishingInterval
00026 result.RevisedLifetimeCount = params.RequestedLifetimeCount
00027 result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
00028 with self._lock:
00029 self._sub_id_counter += 1
00030 result.SubscriptionId = self._sub_id_counter
00031
00032 sub = InternalSubscription(self, result, self.aspace, callback)
00033 sub.start()
00034 self.subscriptions[result.SubscriptionId] = sub
00035
00036 return result
00037
00038 def delete_subscriptions(self, ids):
00039 self.logger.info("delete subscriptions: %s", ids)
00040 res = []
00041 for i in ids:
00042 with self._lock:
00043 if i not in self.subscriptions:
00044 res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
00045 else:
00046 sub = self.subscriptions.pop(i)
00047 sub.stop()
00048 res.append(ua.StatusCode())
00049 return res
00050
00051 def publish(self, acks):
00052 self.logger.info("publish request with acks %s", acks)
00053 with self._lock:
00054 for subid, sub in self.subscriptions.items():
00055 sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
00056
00057 def create_monitored_items(self, params):
00058 self.logger.info("create monitored items")
00059 with self._lock:
00060 if params.SubscriptionId not in self.subscriptions:
00061 res = []
00062 for _ in params.ItemsToCreate:
00063 response = ua.MonitoredItemCreateResult()
00064 response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
00065 res.append(response)
00066 return res
00067 return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
00068
00069 def modify_monitored_items(self, params):
00070 self.logger.info("modify monitored items")
00071 with self._lock:
00072 if params.SubscriptionId not in self.subscriptions:
00073 res = []
00074 for _ in params.ItemsToModify:
00075 result = ua.MonitoredItemModifyResult()
00076 result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
00077 res.append(result)
00078 return res
00079 return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
00080
00081 def delete_monitored_items(self, params):
00082 self.logger.info("delete monitored items")
00083 with self._lock:
00084 if params.SubscriptionId not in self.subscriptions:
00085 res = []
00086 for _ in params.MonitoredItemIds:
00087 res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
00088 return res
00089 return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
00090 params.MonitoredItemIds)
00091
00092 def republish(self, params):
00093 with self._lock:
00094 if params.SubscriptionId not in self.subscriptions:
00095
00096 return ua.NotificationMessage()
00097 return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
00098
00099 def trigger_event(self, event):
00100 with self._lock:
00101 for sub in self.subscriptions.values():
00102 sub.monitored_item_srv.trigger_event(event)