2 server side implementation of subscription service 5 from threading
import RLock
15 self.
logger = logging.getLogger(__name__)
23 self.logger.info(
"create subscription with callback: %s", callback)
25 result.RevisedPublishingInterval = params.RequestedPublishingInterval
26 result.RevisedLifetimeCount = params.RequestedLifetimeCount
27 result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
39 self.logger.info(
"delete subscriptions: %s", ids)
44 res.append(
ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
46 sub = self.subscriptions.pop(i)
52 self.logger.info(
"publish request with acks %s", acks)
54 for subid, sub
in self.subscriptions.items():
55 sub.publish([ack.SequenceNumber
for ack
in acks
if ack.SubscriptionId == subid])
58 self.logger.info(
"create monitored items")
62 for _
in params.ItemsToCreate:
64 response.StatusCode =
ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
67 return self.
subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
70 self.logger.info(
"modify monitored items")
74 for _
in params.ItemsToModify:
76 result.StatusCode =
ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
79 return self.
subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
82 self.logger.info(
"delete monitored items")
86 for _
in params.MonitoredItemIds:
87 res.append(
ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
89 return self.
subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
90 params.MonitoredItemIds)
101 for sub
in self.subscriptions.values():
102 sub.monitored_item_srv.trigger_event(event)
def delete_subscriptions(self, ids)
def delete_monitored_items(self, params)
def trigger_event(self, event)
def create_subscription(self, params, callback)
def republish(self, params)
def __init__(self, loop, aspace)
def create_monitored_items(self, params)
def modify_monitored_items(self, params)