subscription_service.py
Go to the documentation of this file.
1 """
2 server side implementation of subscription service
3 """
4 
5 from threading import RLock
6 import logging
7 
8 from opcua import ua
9 from opcua.server.internal_subscription import InternalSubscription
10 
11 
12 class SubscriptionService(object):
13 
14  def __init__(self, loop, aspace):
15  self.logger = logging.getLogger(__name__)
16  self.loop = loop
17  self.aspace = aspace
18  self.subscriptions = {}
19  self._sub_id_counter = 77
20  self._lock = RLock()
21 
22  def create_subscription(self, params, callback):
23  self.logger.info("create subscription with callback: %s", callback)
25  result.RevisedPublishingInterval = params.RequestedPublishingInterval
26  result.RevisedLifetimeCount = params.RequestedLifetimeCount
27  result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
28  with self._lock:
29  self._sub_id_counter += 1
30  result.SubscriptionId = self._sub_id_counter
31 
32  sub = InternalSubscription(self, result, self.aspace, callback)
33  sub.start()
34  self.subscriptions[result.SubscriptionId] = sub
35 
36  return result
37 
38  def delete_subscriptions(self, ids):
39  self.logger.info("delete subscriptions: %s", ids)
40  res = []
41  for i in ids:
42  with self._lock:
43  if i not in self.subscriptions:
44  res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
45  else:
46  sub = self.subscriptions.pop(i)
47  sub.stop()
48  res.append(ua.StatusCode())
49  return res
50 
51  def publish(self, acks):
52  self.logger.info("publish request with acks %s", acks)
53  with self._lock:
54  for subid, sub in self.subscriptions.items():
55  sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
56 
57  def create_monitored_items(self, params):
58  self.logger.info("create monitored items")
59  with self._lock:
60  if params.SubscriptionId not in self.subscriptions:
61  res = []
62  for _ in params.ItemsToCreate:
63  response = ua.MonitoredItemCreateResult()
64  response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
65  res.append(response)
66  return res
67  return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
68 
69  def modify_monitored_items(self, params):
70  self.logger.info("modify monitored items")
71  with self._lock:
72  if params.SubscriptionId not in self.subscriptions:
73  res = []
74  for _ in params.ItemsToModify:
76  result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
77  res.append(result)
78  return res
79  return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
80 
81  def delete_monitored_items(self, params):
82  self.logger.info("delete monitored items")
83  with self._lock:
84  if params.SubscriptionId not in self.subscriptions:
85  res = []
86  for _ in params.MonitoredItemIds:
87  res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
88  return res
89  return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
90  params.MonitoredItemIds)
91 
92  def republish(self, params):
93  with self._lock:
94  if params.SubscriptionId not in self.subscriptions:
95  # TODO: what should I do?
96  return ua.NotificationMessage()
97  return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
98 
99  def trigger_event(self, event):
100  with self._lock:
101  for sub in self.subscriptions.values():
102  sub.monitored_item_srv.trigger_event(event)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44