internal_server.py
Go to the documentation of this file.
00001 """
00002 Internal server implementing opcu-ua interface.
00003 Can be used on server side or to implement binary/https opc-ua servers
00004 """
00005 
00006 from datetime import datetime
00007 from copy import copy, deepcopy
00008 from datetime import timedelta
00009 from os import path
00010 import logging
00011 from threading import Lock
00012 from enum import Enum
00013 try:
00014     from urllib.parse import urlparse
00015 except ImportError:
00016     from urlparse import urlparse
00017 
00018 
00019 from opcua import ua
00020 from opcua.common import utils
00021 from opcua.common.callback import (CallbackType, ServerItemCallback,
00022                                    CallbackDispatcher)
00023 from opcua.common.node import Node
00024 from opcua.server.history import HistoryManager
00025 from opcua.server.address_space import AddressSpace
00026 from opcua.server.address_space import AttributeService
00027 from opcua.server.address_space import ViewService
00028 from opcua.server.address_space import NodeManagementService
00029 from opcua.server.address_space import MethodService
00030 from opcua.server.subscription_service import SubscriptionService
00031 from opcua.server.standard_address_space import standard_address_space
00032 from opcua.server.users import User
00033 from opcua.common import xmlimporter
00034 
00035 
00036 class SessionState(Enum):
00037     Created = 0
00038     Activated = 1
00039     Closed = 2
00040 
00041 
00042 class ServerDesc(object):
00043     def __init__(self, serv, cap=None):
00044         self.Server = serv
00045         self.Capabilities = cap
00046 
00047 
00048 class InternalServer(object):
00049 
00050     def __init__(self, shelffile=None):
00051         self.logger = logging.getLogger(__name__)
00052 
00053         self.server_callback_dispatcher = CallbackDispatcher()
00054 
00055         self.endpoints = []
00056         self._channel_id_counter = 5
00057         self.allow_remote_admin = True
00058         self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
00059         self._known_servers = {}  # used if we are a discovery server
00060 
00061         self.aspace = AddressSpace()
00062         self.attribute_service = AttributeService(self.aspace)
00063         self.view_service = ViewService(self.aspace)
00064         self.method_service = MethodService(self.aspace)
00065         self.node_mgt_service = NodeManagementService(self.aspace)
00066 
00067         self.load_standard_address_space(shelffile)
00068 
00069         self.loop = utils.ThreadLoop()
00070         self.asyncio_transports = []
00071         self.subscription_service = SubscriptionService(self.loop, self.aspace)
00072 
00073         self.history_manager = HistoryManager(self)
00074 
00075         # create a session to use on server side
00076         self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
00077 
00078         self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
00079         self.setup_nodes()
00080 
00081     def setup_nodes(self):
00082         """
00083         Set up some nodes as defined by spec
00084         """
00085         uries = ["http://opcfoundation.org/UA/"]
00086         ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
00087         ns_node.set_value(uries)
00088 
00089     def load_standard_address_space(self, shelffile=None):
00090         # check for a python shelf file, in windows the file extension is also needed for the check
00091         shelffile_win = shelffile
00092         if shelffile_win:
00093             shelffile_win += ".dat"
00094 
00095         if shelffile and (path.isfile(shelffile) or path.isfile(shelffile_win)):
00096             # import address space from shelf
00097             self.aspace.load_aspace_shelf(shelffile)
00098         else:
00099             # import address space from code generated from xml
00100             standard_address_space.fill_address_space(self.node_mgt_service)
00101             # import address space directly from xml, this has performance impact so disabled
00102             # importer = xmlimporter.XmlImporter(self.node_mgt_service)
00103             # importer.import_xml("/path/to/python-opcua/schemas/Opc.Ua.NodeSet2.xml", self)
00104 
00105             # if a cache file was supplied a shelve of the standard address space can now be built for next start up
00106             if shelffile:
00107                 self.aspace.make_aspace_shelf(shelffile)
00108 
00109     def load_address_space(self, path):
00110         """
00111         Load address space from path
00112         """
00113         self.aspace.load(path)
00114 
00115     def dump_address_space(self, path):
00116         """
00117         Dump current address space to path
00118         """
00119         self.aspace.dump(path)
00120 
00121     def start(self):
00122         self.logger.info("starting internal server")
00123         for edp in self.endpoints:
00124             self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
00125         self.loop.start()
00126         Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
00127         Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
00128         if not self.disabled_clock:
00129             self._set_current_time()
00130 
00131     def stop(self):
00132         self.logger.info("stopping internal server")
00133         self.isession.close_session()
00134         self.loop.stop()
00135         self.history_manager.stop()
00136 
00137     def _set_current_time(self):
00138         self.current_time_node.set_value(datetime.utcnow())
00139         self.loop.call_later(1, self._set_current_time)
00140 
00141     def get_new_channel_id(self):
00142         self._channel_id_counter += 1
00143         return self._channel_id_counter
00144 
00145     def add_endpoint(self, endpoint):
00146         self.endpoints.append(endpoint)
00147 
00148     def get_endpoints(self, params=None, sockname=None):
00149         self.logger.info("get endpoint")
00150         if sockname:
00151             # return to client the ip address it has access to
00152             edps = []
00153             for edp in self.endpoints:
00154                 edp1 = copy(edp)
00155                 url = urlparse(edp1.EndpointUrl)
00156                 url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
00157                 edp1.EndpointUrl = url.geturl()
00158                 edps.append(edp1)
00159             return edps
00160         return self.endpoints[:]
00161 
00162     def find_servers(self, params):
00163         if not params.ServerUris:
00164             return [desc.Server for desc in self._known_servers.values()]
00165         servers = []
00166         for serv in self._known_servers.values():
00167             serv_uri = serv.Server.ApplicationUri.split(":")
00168             for uri in params.ServerUris:
00169                 uri = uri.split(":")
00170                 if serv_uri[:len(uri)] == uri:
00171                     servers.append(serv.Server)
00172                     break
00173         return servers
00174 
00175     def register_server(self, server, conf=None):
00176         appdesc = ua.ApplicationDescription()
00177         appdesc.ApplicationUri = server.ServerUri
00178         appdesc.ProductUri = server.ProductUri
00179         # FIXME: select name from client locale
00180         appdesc.ApplicationName = server.ServerNames[0]
00181         appdesc.ApplicationType = server.ServerType
00182         appdesc.DiscoveryUrls = server.DiscoveryUrls
00183         # FIXME: select discovery uri using reachability from client network
00184         appdesc.GatewayServerUri = server.GatewayServerUri
00185         self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
00186 
00187     def register_server2(self, params):
00188         return self.register_server(params.Server, params.DiscoveryConfiguration)
00189 
00190     def create_session(self, name, user=User.Anonymous, external=False):
00191         return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
00192 
00193     def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
00194         """
00195         Set attribute Historizing of node to True and start storing data for history
00196         """
00197         node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
00198         node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
00199         node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
00200         self.history_manager.historize_data_change(node, period, count)
00201 
00202     def disable_history_data_change(self, node):
00203         """
00204         Set attribute Historizing of node to False and stop storing data for history
00205         """
00206         node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
00207         node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
00208         node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
00209         self.history_manager.dehistorize(node)
00210 
00211     def enable_history_event(self, source, period=timedelta(days=7), count=0):
00212         """
00213         Set attribute History Read of object events to True and start storing data for history
00214         """
00215         event_notifier = source.get_event_notifier()
00216         if ua.EventNotifier.SubscribeToEvents not in event_notifier:
00217             raise ua.UaError("Node does not generate events", event_notifier)
00218 
00219         if ua.EventNotifier.HistoryRead not in event_notifier:
00220             event_notifier.append(ua.EventNotifier.HistoryRead)
00221             source.set_event_notifier(event_notifier)
00222 
00223         self.history_manager.historize_event(source, period, count)
00224 
00225     def disable_history_event(self, source):
00226         """
00227         Set attribute History Read of node to False and stop storing data for history
00228         """
00229         source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
00230         self.history_manager.dehistorize(source)
00231 
00232     def subscribe_server_callback(self, event, handle):
00233         """
00234         Create a subscription from event to handle
00235         """
00236         self.server_callback_dispatcher.addListener(event, handle)
00237 
00238     def unsubscribe_server_callback(self, event, handle):
00239         """
00240         Remove a subscription from event to handle
00241         """
00242         self.server_callback_dispatcher.removeListener(event, handle)
00243 
00244 
00245 class InternalSession(object):
00246     _counter = 10
00247     _auth_counter = 1000
00248 
00249     def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False):
00250         self.logger = logging.getLogger(__name__)
00251         self.iserver = internal_server
00252         self.external = external  # define if session is external, we need to copy some objects if it is internal
00253         self.aspace = aspace
00254         self.subscription_service = submgr
00255         self.name = name
00256         self.user = user
00257         self.nonce = None
00258         self.state = SessionState.Created
00259         self.session_id = ua.NodeId(self._counter)
00260         InternalSession._counter += 1
00261         self.authentication_token = ua.NodeId(self._auth_counter)
00262         InternalSession._auth_counter += 1
00263         self.subscriptions = []
00264         self.logger.info("Created internal session %s", self.name)
00265         self._lock = Lock()
00266 
00267     def __str__(self):
00268         return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format(
00269             self.name, self.user, self.session_id, self.authentication_token)
00270 
00271     def get_endpoints(self, params=None, sockname=None):
00272         return self.iserver.get_endpoints(params, sockname)
00273 
00274     def create_session(self, params, sockname=None):
00275         self.logger.info("Create session request")
00276 
00277         result = ua.CreateSessionResult()
00278         result.SessionId = self.session_id
00279         result.AuthenticationToken = self.authentication_token
00280         result.RevisedSessionTimeout = params.RequestedSessionTimeout
00281         result.MaxRequestMessageSize = 65536
00282         self.nonce = utils.create_nonce(32)
00283         result.ServerNonce = self.nonce
00284         result.ServerEndpoints = self.get_endpoints(sockname=sockname)
00285 
00286         return result
00287 
00288     def close_session(self, delete_subs=True):
00289         self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
00290         self.state = SessionState.Closed
00291         self.delete_subscriptions(self.subscriptions[:])
00292 
00293     def activate_session(self, params):
00294         self.logger.info("activate session")
00295         result = ua.ActivateSessionResult()
00296         if self.state != SessionState.Created:
00297             raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
00298         self.nonce = utils.create_nonce(32)
00299         result.ServerNonce = self.nonce
00300         for _ in params.ClientSoftwareCertificates:
00301             result.Results.append(ua.StatusCode())
00302         self.state = SessionState.Activated
00303         id_token = params.UserIdentityToken
00304         if isinstance(id_token, ua.UserNameIdentityToken):
00305             if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
00306                 self.user = User.Admin
00307         self.logger.info("Activated internal session %s for user %s", self.name, self.user)
00308         return result
00309 
00310     def read(self, params):
00311         results = self.iserver.attribute_service.read(params)
00312         if self.external:
00313             return results
00314         return [deepcopy(dv) for dv in results]
00315 
00316     def history_read(self, params):
00317         return self.iserver.history_manager.read_history(params)
00318 
00319     def write(self, params):
00320         if not self.external:
00321             # If session is internal we need to store a copy og object, not a reference,
00322             # otherwise users may change it and we will not generate expected events
00323             params.NodesToWrite = [deepcopy(ntw) for ntw in params.NodesToWrite]
00324         return self.iserver.attribute_service.write(params, self.user)
00325 
00326     def browse(self, params):
00327         return self.iserver.view_service.browse(params)
00328 
00329     def translate_browsepaths_to_nodeids(self, params):
00330         return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
00331 
00332     def add_nodes(self, params):
00333         return self.iserver.node_mgt_service.add_nodes(params, self.user)
00334 
00335     def delete_nodes(self, params):
00336         return self.iserver.node_mgt_service.delete_nodes(params, self.user)
00337 
00338     def add_references(self, params):
00339         return self.iserver.node_mgt_service.add_references(params, self.user)
00340 
00341     def delete_references(self, params):
00342         return self.iserver.node_mgt_service.delete_references(params, self.user)
00343 
00344     def add_method_callback(self, methodid, callback):
00345         return self.aspace.add_method_callback(methodid, callback)
00346 
00347     def call(self, params):
00348         return self.iserver.method_service.call(params)
00349 
00350     def create_subscription(self, params, callback):
00351         result = self.subscription_service.create_subscription(params, callback)
00352         with self._lock:
00353             self.subscriptions.append(result.SubscriptionId)
00354         return result
00355 
00356     def create_monitored_items(self, params):
00357         subscription_result = self.subscription_service.create_monitored_items(params)
00358         self.iserver.server_callback_dispatcher.dispatch(
00359             CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
00360         return subscription_result
00361 
00362     def modify_monitored_items(self, params):
00363         subscription_result = self.subscription_service.modify_monitored_items(params)
00364         self.iserver.server_callback_dispatcher.dispatch(
00365             CallbackType.ItemSubscriptionModified, ServerItemCallback(params, subscription_result))
00366         return subscription_result
00367 
00368     def republish(self, params):
00369         return self.subscription_service.republish(params)
00370 
00371     def delete_subscriptions(self, ids):
00372         for i in ids:
00373             with self._lock:
00374                 if i in self.subscriptions:
00375                     self.subscriptions.remove(i)
00376         return self.subscription_service.delete_subscriptions(ids)
00377 
00378     def delete_monitored_items(self, params):
00379         subscription_result = self.subscription_service.delete_monitored_items(params)
00380         self.iserver.server_callback_dispatcher.dispatch(
00381             CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
00382         return subscription_result
00383 
00384     def publish(self, acks=None):
00385         if acks is None:
00386             acks = []
00387         return self.subscription_service.publish(acks)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23