opcua/server/server.py
Go to the documentation of this file.
1 """
2 High level interface to pure python OPC-UA server
3 """
4 
5 import logging
6 from datetime import timedelta
7 try:
8  from urllib.parse import urlparse
9 except ImportError:
10  from urlparse import urlparse
11 
12 
13 from opcua import ua
14 # from opcua.binary_server import BinaryServer
15 from opcua.server.binary_server_asyncio import BinaryServer
16 from opcua.server.internal_server import InternalServer
17 from opcua.server.event_generator import EventGenerator
18 from opcua.common.node import Node
19 from opcua.common.subscription import Subscription
20 from opcua.common import xmlimporter
21 from opcua.common.manage_nodes import delete_nodes
22 from opcua.client.client import Client
23 from opcua.crypto import security_policies
24 from opcua.common.event_objects import BaseEvent
25 from opcua.common.shortcuts import Shortcuts
26 from opcua.common.xmlexporter import XmlExporter
27 from opcua.common.ua_utils import get_nodes_of_namespace
28 use_crypto = True
29 try:
30  from opcua.crypto import uacrypto
31 except ImportError:
32  print("cryptography is not installed, use of crypto disabled")
33  use_crypto = False
34 
35 
36 class Server(object):
37 
38  """
39  High level Server class
40 
41  This class creates an opcua server with default values
42 
43  Create your own namespace and then populate your server address space
44  using use the get_root() or get_objects() to get Node objects.
45  and get_event_object() to fire events.
46  Then start server. See example_server.py
47  All methods are threadsafe
48 
49  If you need more flexibility you call directly the Ua Service methods
50  on the iserver or iserver.isesssion object members.
51 
52  During startup the standard address space will be constructed, which may be
53  time-consuming when running a server on a less powerful device (e.g. a
54  Raspberry Pi). In order to improve startup performance, a optional path to a
55  cache file can be passed to the server constructor.
56  If the parameter is defined, the address space will be loaded from the
57  cache file or the file will be created if it does not exist yet.
58  As a result the first startup will be even slower due to the cache file
59  generation but all further start ups will be significantly faster.
60 
61  :ivar application_uri:
62  :vartype application_uri: uri
63  :ivar product_uri:
64  :vartype product_uri: uri
65  :ivar name:
66  :vartype name: string
67  :ivar default_timeout: timeout in milliseconds for sessions and secure channel
68  :vartype default_timeout: int
69  :ivar iserver: internal server object
70  :vartype default_timeout: InternalServer
71  :ivar bserver: binary protocol server
72  :vartype bserver: BinaryServer
73  :ivar nodes: shortcuts to common nodes
74  :vartype nodes: Shortcuts
75 
76  """
77 
78  def __init__(self, shelffile=None, iserver=None):
79  self.logger = logging.getLogger(__name__)
80  self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
81  self.application_uri = "urn:freeopcua:python:server"
82  self.product_uri = "urn:freeopcua.github.no:python:server"
83  self.name = "FreeOpcUa Python Server"
84  self.application_type = ua.ApplicationType.ClientAndServer
85  self.default_timeout = 3600000
86  if iserver is not None:
87  self.iserver = iserver
88  else:
89  self.iserver = InternalServer(shelffile)
90  self.bserver = None
93  self.certificate = None
94  self.private_key = None
95  self._policies = []
96  self.nodes = Shortcuts(self.iserver.isession)
97 
98  # setup some expected values
100  sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
101  sa_node.set_value([self.application_uri])
102 
103  def __enter__(self):
104  self.start()
105  return self
106 
107  def __exit__(self, exc_type, exc_value, traceback):
108  self.stop()
109 
110  def load_certificate(self, path):
111  """
112  load server certificate from file, either pem or der
113  """
114  self.certificate = uacrypto.load_certificate(path)
115 
116  def load_private_key(self, path):
117  self.private_key = uacrypto.load_private_key(path)
118 
119  def disable_clock(self, val=True):
120  """
121  for debugging you may want to disable clock that write every second
122  to address space
123  """
124  self.iserver.disabled_clock = val
125 
126  def set_application_uri(self, uri):
127  """
128  Set application/server URI.
129  This uri is supposed to be unique. If you intent to register
130  your server to a discovery server, it really should be unique in
131  your system!
132  default is : "urn:freeopcua:python:server"
133  """
134  self.application_uri = uri
135 
136  def find_servers(self, uris=None):
137  """
138  find_servers. mainly implemented for symmetry with client
139  """
140  if uris is None:
141  uris = []
142  params = ua.FindServersParameters()
143  params.EndpointUrl = self.endpoint.geturl()
144  params.ServerUris = uris
145  return self.iserver.find_servers(params)
146 
147  def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
148  """
149  Register to an OPC-UA Discovery server. Registering must be renewed at
150  least every 10 minutes, so this method will use our asyncio thread to
151  re-register every period seconds
152  if period is 0 registration is not automatically renewed
153  """
154  # FIXME: have a period per discovery
155  if url in self._discovery_clients:
156  self._discovery_clients[url].disconnect()
157  self._discovery_clients[url] = Client(url)
158  self._discovery_clients[url].connect()
159  self._discovery_clients[url].register_server(self)
160  self._discovery_period = period
161  if period:
162  self.iserver.loop.call_soon(self._renew_registration)
163 
164  def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
165  """
166  stop registration thread
167  """
168  # FIXME: is there really no way to deregister?
169  self._discovery_clients[url].disconnect()
170 
172  for client in self._discovery_clients.values():
173  client.register_server(self)
174  self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
175 
176  def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
177  """
178  Create a client to discovery server and return it
179  """
180  client = Client(url)
181  client.connect()
182  return client
183 
184  def allow_remote_admin(self, allow):
185  """
186  Enable or disable the builtin Admin user from network clients
187  """
188  self.iserver.allow_remote_admin = allow
189 
190  def set_endpoint(self, url):
191  self.endpoint = urlparse(url)
192 
193  def get_endpoints(self):
194  return self.iserver.get_endpoints()
195 
197  # to be called just before starting server since it needs all parameters to be setup
198  self._set_endpoints()
200  if self.certificate and self.private_key:
201  self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
202  ua.MessageSecurityMode.SignAndEncrypt)
203  self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
204  ua.MessageSecurityMode.SignAndEncrypt,
205  self.certificate,
206  self.private_key)
207  )
208  self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
209  ua.MessageSecurityMode.Sign)
210  self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
211  ua.MessageSecurityMode.Sign,
212  self.certificate,
213  self.private_key)
214  )
215  self._set_endpoints(security_policies.SecurityPolicyBasic256,
216  ua.MessageSecurityMode.SignAndEncrypt)
217  self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
218  ua.MessageSecurityMode.SignAndEncrypt,
219  self.certificate,
220  self.private_key)
221  )
222  self._set_endpoints(security_policies.SecurityPolicyBasic256,
223  ua.MessageSecurityMode.Sign)
224  self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
225  ua.MessageSecurityMode.Sign,
226  self.certificate,
227  self.private_key)
228  )
229 
230  def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
231  idtoken = ua.UserTokenPolicy()
232  idtoken.PolicyId = 'anonymous'
233  idtoken.TokenType = ua.UserTokenType.Anonymous
234 
235  idtoken2 = ua.UserTokenPolicy()
236  idtoken2.PolicyId = 'certificate_basic256'
237  idtoken2.TokenType = ua.UserTokenType.Certificate
238 
239  idtoken3 = ua.UserTokenPolicy()
240  idtoken3.PolicyId = 'certificate_basic128'
241  idtoken3.TokenType = ua.UserTokenType.Certificate
242 
243  idtoken4 = ua.UserTokenPolicy()
244  idtoken4.PolicyId = 'username'
245  idtoken4.TokenType = ua.UserTokenType.UserName
246 
247  appdesc = ua.ApplicationDescription()
248  appdesc.ApplicationName = ua.LocalizedText(self.name)
249  appdesc.ApplicationUri = self.application_uri
250  appdesc.ApplicationType = self.application_type
251  appdesc.ProductUri = self.product_uri
252  appdesc.DiscoveryUrls.append(self.endpoint.geturl())
253 
254  edp = ua.EndpointDescription()
255  edp.EndpointUrl = self.endpoint.geturl()
256  edp.Server = appdesc
257  if self.certificate:
258  edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
259  edp.SecurityMode = mode
260  edp.SecurityPolicyUri = policy.URI
261  edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
262  edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
263  edp.SecurityLevel = 0
264  self.iserver.add_endpoint(edp)
265 
266  def set_server_name(self, name):
267  self.name = name
268 
269  def start(self):
270  """
271  Start to listen on network
272  """
273  self._setup_server_nodes()
274  self.iserver.start()
275  self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
276  self.bserver.set_policies(self._policies)
277  self.bserver.start()
278 
279  def stop(self):
280  """
281  Stop server
282  """
283  for client in self._discovery_clients.values():
284  client.disconnect()
285  self.bserver.stop()
286  self.iserver.stop()
287 
288  def get_root_node(self):
289  """
290  Get Root node of server. Returns a Node object.
291  """
292  return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
293 
294  def get_objects_node(self):
295  """
296  Get Objects node of server. Returns a Node object.
297  """
298  return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
299 
300  def get_server_node(self):
301  """
302  Get Server node of server. Returns a Node object.
303  """
304  return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
305 
306  def get_node(self, nodeid):
307  """
308  Get a specific node using NodeId object or a string representing a NodeId
309  """
310  return Node(self.iserver.isession, nodeid)
311 
312  def create_subscription(self, period, handler):
313  """
314  Create a subscription.
315  returns a Subscription object which allow
316  to subscribe to events or data on server
317  """
319  params.RequestedPublishingInterval = period
320  params.RequestedLifetimeCount = 3000
321  params.RequestedMaxKeepAliveCount = 10000
322  params.MaxNotificationsPerPublish = 0
323  params.PublishingEnabled = True
324  params.Priority = 0
325  return Subscription(self.iserver.isession, params, handler)
326 
328  """
329  get all namespace defined in server
330  """
331  ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
332  return ns_node.get_value()
333 
334  def register_namespace(self, uri):
335  """
336  Register a new namespace. Nodes should in custom namespace, not 0.
337  """
338  ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
339  uries = ns_node.get_value()
340  if uri in uries:
341  return uries.index(uri)
342  uries.append(uri)
343  ns_node.set_value(uries)
344  return len(uries) - 1
345 
346  def get_namespace_index(self, uri):
347  """
348  get index of a namespace using its uri
349  """
350  uries = self.get_namespace_array()
351  return uries.index(uri)
352 
353  def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
354  """
355  Returns an event object using an event type from address space.
356  Use this object to fire events
357  """
358  if not etype:
359  etype = BaseEvent()
360  return EventGenerator(self.iserver.isession, etype, source)
361 
362  def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None):
363  if properties is None:
364  properties = []
365  return self._create_custom_type(idx, name, basetype, properties, [], [])
366 
367  def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None):
368  if properties is None:
369  properties = []
370  return self._create_custom_type(idx, name, basetype, properties, [], [])
371 
372  def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=None, variables=None, methods=None):
373  if properties is None:
374  properties = []
375  if variables is None:
376  variables = []
377  if methods is None:
378  methods = []
379  return self._create_custom_type(idx, name, basetype, properties, variables, methods)
380 
381  # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
382  # return self._create_custom_type(idx, name, basetype, properties)
383 
384  def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=None, variables=None, methods=None):
385  if properties is None:
386  properties = []
387  if variables is None:
388  variables = []
389  if methods is None:
390  methods = []
391  return self._create_custom_type(idx, name, basetype, properties, variables, methods)
392 
393  def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
394  if isinstance(basetype, Node):
395  base_t = basetype
396  elif isinstance(basetype, ua.NodeId):
397  base_t = Node(self.iserver.isession, basetype)
398  else:
399  base_t = Node(self.iserver.isession, ua.NodeId(basetype))
400 
401  custom_t = base_t.add_object_type(idx, name)
402  for prop in properties:
403  datatype = None
404  if len(prop) > 2:
405  datatype = prop[2]
406  custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
407  for variable in variables:
408  datatype = None
409  if len(variable) > 2:
410  datatype = variable[2]
411  custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
412  for method in methods:
413  custom_t.add_method(idx, method[0], method[1], method[2], method[3])
414 
415  return custom_t
416 
417  def import_xml(self, path):
418  """
419  Import nodes defined in xml
420  """
421  importer = xmlimporter.XmlImporter(self)
422  return importer.import_xml(path)
423 
424  def export_xml(self, nodes, path):
425  """
426  Export defined nodes to xml
427  """
428  exp = XmlExporter(self)
429  exp.build_etree(nodes)
430  return exp.write_xml(path)
431 
432  def export_xml_by_ns(self, path, namespaces=None):
433  """
434  Export nodes of one or more namespaces to an XML file.
435  Namespaces used by nodes are always exported for consistency.
436  Args:
437  server: opc ua server to use
438  path: name of the xml file to write
439  namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
440 
441  Returns:
442  """
443  if namespaces is None:
444  namespaces = []
445  nodes = get_nodes_of_namespace(self, namespaces)
446  self.export_xml(nodes, path)
447 
448  def delete_nodes(self, nodes, recursive=False):
449  return delete_nodes(self.iserver.isession, nodes, recursive)
450 
451  def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
452  """
453  Start historizing supplied nodes; see history module
454  Args:
455  node: node or list of nodes that can be historized (variables/properties)
456  period: time delta to store the history; older data will be deleted from the storage
457  count: number of changes to store in the history
458 
459  Returns:
460  """
461  nodes = [node]
462  for node in nodes:
463  self.iserver.enable_history_data_change(node, period, count)
464 
466  """
467  Stop historizing supplied nodes; see history module
468  Args:
469  node: node or list of nodes that can be historized (UA variables/properties)
470 
471  Returns:
472  """
473  nodes = [node]
474  for node in nodes:
475  self.iserver.disable_history_data_change(node)
476 
477  def historize_node_event(self, node, period=timedelta(days=7), count=0):
478  """
479  Start historizing events from node (typically a UA object); see history module
480  Args:
481  node: node or list of nodes that can be historized (UA objects)
482  period: time delta to store the history; older data will be deleted from the storage
483  count: number of events to store in the history
484 
485  Returns:
486  """
487  nodes = [node]
488  for node in nodes:
489  self.iserver.enable_history_event(node, period, count)
490 
491  def dehistorize_node_event(self, node):
492  """
493  Stop historizing events from node (typically a UA object); see history module
494  Args:
495  node: node or list of nodes that can be historized (UA objects)
496 
497  Returns:
498  """
499  nodes = [node]
500  for node in nodes:
501  self.iserver.disable_history_event(node)
502 
503  def subscribe_server_callback(self, event, handle):
504  self.iserver.subscribe_server_callback(event, handle)
505 
506  def unsubscribe_server_callback(self, event, handle):
507  self.iserver.unsubscribe_server_callback(event, handle)
508 
509  def link_method(self, node, callback):
510  """
511  Link a python function to a UA method in the address space; required when a UA method has been imported
512  to the address space via XML; the python executable must be linked manually
513  Args:
514  node: UA method node
515  callback: python function that the UA method will call
516 
517  Returns:
518  """
519  self.iserver.isession.add_method_callback(node.nodeid, callback)
def dehistorize_node_data_change(self, node)
def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=None, variables=None, methods=None)
def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None)
def _create_custom_type(self, idx, name, basetype, properties, variables, methods)
def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60)
def __exit__(self, exc_type, exc_value, traceback)
def get_nodes_of_namespace(server, namespaces=None)
Definition: ua_utils.py:236
def historize_node_event(self, node, period=timedelta(days=7), count=0)
def export_xml(self, nodes, path)
def __init__(self, shelffile=None, iserver=None)
def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=None, variables=None, methods=None)
def link_method(self, node, callback)
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_)
def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None)
def create_subscription(self, period, handler)
def dehistorize_node_event(self, node)
def unregister_to_discovery(self, url="opc.tcp://localhost:4840")
def export_xml_by_ns(self, path, namespaces=None)
def get_client_to_discovery(self, url="opc.tcp://localhost:4840")
def unsubscribe_server_callback(self, event, handle)
def subscribe_server_callback(self, event, handle)
def get_event_generator(self, etype=None, source=ua.ObjectIds.Server)
def disable_clock(self, val=True)
def delete_nodes(self, nodes, recursive=False)
def historize_node_data_change(self, node, period=timedelta(days=7), count=0)
def find_servers(self, uris=None)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44