internal_server.py
Go to the documentation of this file.
1 """
2 Internal server implementing opcu-ua interface.
3 Can be used on server side or to implement binary/https opc-ua servers
4 """
5 
6 from datetime import datetime
7 from copy import copy, deepcopy
8 from datetime import timedelta
9 from os import path
10 import logging
11 from threading import Lock
12 from enum import Enum
13 try:
14  from urllib.parse import urlparse
15 except ImportError:
16  from urlparse import urlparse
17 
18 
19 from opcua import ua
20 from opcua.common import utils
21 from opcua.common.callback import (CallbackType, ServerItemCallback,
22  CallbackDispatcher)
23 from opcua.common.node import Node
24 from opcua.server.history import HistoryManager
25 from opcua.server.address_space import AddressSpace
26 from opcua.server.address_space import AttributeService
27 from opcua.server.address_space import ViewService
28 from opcua.server.address_space import NodeManagementService
29 from opcua.server.address_space import MethodService
30 from opcua.server.subscription_service import SubscriptionService
31 from opcua.server.standard_address_space import standard_address_space
32 from opcua.server.users import User
33 from opcua.common import xmlimporter
34 
35 
36 class SessionState(Enum):
37  Created = 0
38  Activated = 1
39  Closed = 2
40 
41 
42 class ServerDesc(object):
43  def __init__(self, serv, cap=None):
44  self.Server = serv
45  self.Capabilities = cap
46 
47 
48 class InternalServer(object):
49 
50  def __init__(self, shelffile=None):
51  self.logger = logging.getLogger(__name__)
52 
53  self.server_callback_dispatcher = CallbackDispatcher()
54 
55  self.endpoints = []
57  self.allow_remote_admin = True
58  self.disabled_clock = False # for debugging we may want to disable clock that writes too much in log
59  self._known_servers = {} # used if we are a discovery server
60 
66 
67  self.load_standard_address_space(shelffile)
68 
69  self.loop = utils.ThreadLoop()
72 
74 
75  # create a session to use on server side
76  self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
77 
78  self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
79  self.setup_nodes()
80 
81  def setup_nodes(self):
82  """
83  Set up some nodes as defined by spec
84  """
85  uries = ["http://opcfoundation.org/UA/"]
86  ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
87  ns_node.set_value(uries)
88 
89  def load_standard_address_space(self, shelffile=None):
90  # check for a python shelf file, in windows the file extension is also needed for the check
91  shelffile_win = shelffile
92  if shelffile_win:
93  shelffile_win += ".dat"
94 
95  if shelffile and (path.isfile(shelffile) or path.isfile(shelffile_win)):
96  # import address space from shelf
97  self.aspace.load_aspace_shelf(shelffile)
98  else:
99  # import address space from code generated from xml
100  standard_address_space.fill_address_space(self.node_mgt_service)
101  # import address space directly from xml, this has performance impact so disabled
102  # importer = xmlimporter.XmlImporter(self.node_mgt_service)
103  # importer.import_xml("/path/to/python-opcua/schemas/Opc.Ua.NodeSet2.xml", self)
104 
105  # if a cache file was supplied a shelve of the standard address space can now be built for next start up
106  if shelffile:
107  self.aspace.make_aspace_shelf(shelffile)
108 
109  def load_address_space(self, path):
110  """
111  Load address space from path
112  """
113  self.aspace.load(path)
114 
115  def dump_address_space(self, path):
116  """
117  Dump current address space to path
118  """
119  self.aspace.dump(path)
120 
121  def start(self):
122  self.logger.info("starting internal server")
123  for edp in self.endpoints:
124  self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
125  self.loop.start()
126  Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
127  Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
128  if not self.disabled_clock:
129  self._set_current_time()
130 
131  def stop(self):
132  self.logger.info("stopping internal server")
133  self.isession.close_session()
134  self.loop.stop()
135  self.history_manager.stop()
136 
137  def _set_current_time(self):
138  self.current_time_node.set_value(datetime.utcnow())
139  self.loop.call_later(1, self._set_current_time)
140 
142  self._channel_id_counter += 1
143  return self._channel_id_counter
144 
145  def add_endpoint(self, endpoint):
146  self.endpoints.append(endpoint)
147 
148  def get_endpoints(self, params=None, sockname=None):
149  self.logger.info("get endpoint")
150  if sockname:
151  # return to client the ip address it has access to
152  edps = []
153  for edp in self.endpoints:
154  edp1 = copy(edp)
155  url = urlparse(edp1.EndpointUrl)
156  url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
157  edp1.EndpointUrl = url.geturl()
158  edps.append(edp1)
159  return edps
160  return self.endpoints[:]
161 
162  def find_servers(self, params):
163  if not params.ServerUris:
164  return [desc.Server for desc in self._known_servers.values()]
165  servers = []
166  for serv in self._known_servers.values():
167  serv_uri = serv.Server.ApplicationUri.split(":")
168  for uri in params.ServerUris:
169  uri = uri.split(":")
170  if serv_uri[:len(uri)] == uri:
171  servers.append(serv.Server)
172  break
173  return servers
174 
175  def register_server(self, server, conf=None):
176  appdesc = ua.ApplicationDescription()
177  appdesc.ApplicationUri = server.ServerUri
178  appdesc.ProductUri = server.ProductUri
179  # FIXME: select name from client locale
180  appdesc.ApplicationName = server.ServerNames[0]
181  appdesc.ApplicationType = server.ServerType
182  appdesc.DiscoveryUrls = server.DiscoveryUrls
183  # FIXME: select discovery uri using reachability from client network
184  appdesc.GatewayServerUri = server.GatewayServerUri
185  self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
186 
187  def register_server2(self, params):
188  return self.register_server(params.Server, params.DiscoveryConfiguration)
189 
190  def create_session(self, name, user=User.Anonymous, external=False):
191  return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
192 
193  def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
194  """
195  Set attribute Historizing of node to True and start storing data for history
196  """
197  node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
198  node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
199  node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
200  self.history_manager.historize_data_change(node, period, count)
201 
203  """
204  Set attribute Historizing of node to False and stop storing data for history
205  """
206  node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
207  node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
208  node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
209  self.history_manager.dehistorize(node)
210 
211  def enable_history_event(self, source, period=timedelta(days=7), count=0):
212  """
213  Set attribute History Read of object events to True and start storing data for history
214  """
215  event_notifier = source.get_event_notifier()
216  if ua.EventNotifier.SubscribeToEvents not in event_notifier:
217  raise ua.UaError("Node does not generate events", event_notifier)
218 
219  if ua.EventNotifier.HistoryRead not in event_notifier:
220  event_notifier.append(ua.EventNotifier.HistoryRead)
221  source.set_event_notifier(event_notifier)
222 
223  self.history_manager.historize_event(source, period, count)
224 
225  def disable_history_event(self, source):
226  """
227  Set attribute History Read of node to False and stop storing data for history
228  """
229  source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
230  self.history_manager.dehistorize(source)
231 
232  def subscribe_server_callback(self, event, handle):
233  """
234  Create a subscription from event to handle
235  """
236  self.server_callback_dispatcher.addListener(event, handle)
237 
238  def unsubscribe_server_callback(self, event, handle):
239  """
240  Remove a subscription from event to handle
241  """
242  self.server_callback_dispatcher.removeListener(event, handle)
243 
244 
245 class InternalSession(object):
246  _counter = 10
247  _auth_counter = 1000
248 
249  def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False):
250  self.logger = logging.getLogger(__name__)
251  self.iserver = internal_server
252  self.external = external # define if session is external, we need to copy some objects if it is internal
253  self.aspace = aspace
254  self.subscription_service = submgr
255  self.name = name
256  self.user = user
257  self.nonce = None
258  self.state = SessionState.Created
260  InternalSession._counter += 1
262  InternalSession._auth_counter += 1
263  self.subscriptions = []
264  self.logger.info("Created internal session %s", self.name)
265  self._lock = Lock()
266 
267  def __str__(self):
268  return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format(
269  self.name, self.user, self.session_id, self.authentication_token)
270 
271  def get_endpoints(self, params=None, sockname=None):
272  return self.iserver.get_endpoints(params, sockname)
273 
274  def create_session(self, params, sockname=None):
275  self.logger.info("Create session request")
276 
277  result = ua.CreateSessionResult()
278  result.SessionId = self.session_id
279  result.AuthenticationToken = self.authentication_token
280  result.RevisedSessionTimeout = params.RequestedSessionTimeout
281  result.MaxRequestMessageSize = 65536
282  self.nonce = utils.create_nonce(32)
283  result.ServerNonce = self.nonce
284  result.ServerEndpoints = self.get_endpoints(sockname=sockname)
285 
286  return result
287 
288  def close_session(self, delete_subs=True):
289  self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
290  self.state = SessionState.Closed
291  self.delete_subscriptions(self.subscriptions[:])
292 
293  def activate_session(self, params):
294  self.logger.info("activate session")
295  result = ua.ActivateSessionResult()
296  if self.state != SessionState.Created:
297  raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
298  self.nonce = utils.create_nonce(32)
299  result.ServerNonce = self.nonce
300  for _ in params.ClientSoftwareCertificates:
301  result.Results.append(ua.StatusCode())
302  self.state = SessionState.Activated
303  id_token = params.UserIdentityToken
304  if isinstance(id_token, ua.UserNameIdentityToken):
305  if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
306  self.user = User.Admin
307  self.logger.info("Activated internal session %s for user %s", self.name, self.user)
308  return result
309 
310  def read(self, params):
311  results = self.iserver.attribute_service.read(params)
312  if self.external:
313  return results
314  return [deepcopy(dv) for dv in results]
315 
316  def history_read(self, params):
317  return self.iserver.history_manager.read_history(params)
318 
319  def write(self, params):
320  if not self.external:
321  # If session is internal we need to store a copy og object, not a reference,
322  # otherwise users may change it and we will not generate expected events
323  params.NodesToWrite = [deepcopy(ntw) for ntw in params.NodesToWrite]
324  return self.iserver.attribute_service.write(params, self.user)
325 
326  def browse(self, params):
327  return self.iserver.view_service.browse(params)
328 
330  return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
331 
332  def add_nodes(self, params):
333  return self.iserver.node_mgt_service.add_nodes(params, self.user)
334 
335  def delete_nodes(self, params):
336  return self.iserver.node_mgt_service.delete_nodes(params, self.user)
337 
338  def add_references(self, params):
339  return self.iserver.node_mgt_service.add_references(params, self.user)
340 
341  def delete_references(self, params):
342  return self.iserver.node_mgt_service.delete_references(params, self.user)
343 
344  def add_method_callback(self, methodid, callback):
345  return self.aspace.add_method_callback(methodid, callback)
346 
347  def call(self, params):
348  return self.iserver.method_service.call(params)
349 
350  def create_subscription(self, params, callback):
351  result = self.subscription_service.create_subscription(params, callback)
352  with self._lock:
353  self.subscriptions.append(result.SubscriptionId)
354  return result
355 
356  def create_monitored_items(self, params):
357  subscription_result = self.subscription_service.create_monitored_items(params)
358  self.iserver.server_callback_dispatcher.dispatch(
359  CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
360  return subscription_result
361 
362  def modify_monitored_items(self, params):
363  subscription_result = self.subscription_service.modify_monitored_items(params)
364  self.iserver.server_callback_dispatcher.dispatch(
365  CallbackType.ItemSubscriptionModified, ServerItemCallback(params, subscription_result))
366  return subscription_result
367 
368  def republish(self, params):
369  return self.subscription_service.republish(params)
370 
371  def delete_subscriptions(self, ids):
372  for i in ids:
373  with self._lock:
374  if i in self.subscriptions:
375  self.subscriptions.remove(i)
376  return self.subscription_service.delete_subscriptions(ids)
377 
378  def delete_monitored_items(self, params):
379  subscription_result = self.subscription_service.delete_monitored_items(params)
380  self.iserver.server_callback_dispatcher.dispatch(
381  CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
382  return subscription_result
383 
384  def publish(self, acks=None):
385  if acks is None:
386  acks = []
387  return self.subscription_service.publish(acks)
def subscribe_server_callback(self, event, handle)
def create_session(self, name, user=User.Anonymous, external=False)
def load_standard_address_space(self, shelffile=None)
def __init__(self, serv, cap=None)
def add_method_callback(self, methodid, callback)
def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False)
def unsubscribe_server_callback(self, event, handle)
def enable_history_event(self, source, period=timedelta(days=7), count=0)
def enable_history_data_change(self, node, period=timedelta(days=7), count=0)
def create_subscription(self, params, callback)
def get_endpoints(self, params=None, sockname=None)
def register_server(self, server, conf=None)
def get_endpoints(self, params=None, sockname=None)
def create_session(self, params, sockname=None)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44