00001 """
00002 Internal server implementing opcu-ua interface.
00003 Can be used on server side or to implement binary/https opc-ua servers
00004 """
00005
00006 from datetime import datetime
00007 from copy import copy, deepcopy
00008 from datetime import timedelta
00009 from os import path
00010 import logging
00011 from threading import Lock
00012 from enum import Enum
00013 try:
00014 from urllib.parse import urlparse
00015 except ImportError:
00016 from urlparse import urlparse
00017
00018
00019 from opcua import ua
00020 from opcua.common import utils
00021 from opcua.common.callback import (CallbackType, ServerItemCallback,
00022 CallbackDispatcher)
00023 from opcua.common.node import Node
00024 from opcua.server.history import HistoryManager
00025 from opcua.server.address_space import AddressSpace
00026 from opcua.server.address_space import AttributeService
00027 from opcua.server.address_space import ViewService
00028 from opcua.server.address_space import NodeManagementService
00029 from opcua.server.address_space import MethodService
00030 from opcua.server.subscription_service import SubscriptionService
00031 from opcua.server.standard_address_space import standard_address_space
00032 from opcua.server.users import User
00033 from opcua.common import xmlimporter
00034
00035
00036 class SessionState(Enum):
00037 Created = 0
00038 Activated = 1
00039 Closed = 2
00040
00041
00042 class ServerDesc(object):
00043 def __init__(self, serv, cap=None):
00044 self.Server = serv
00045 self.Capabilities = cap
00046
00047
00048 class InternalServer(object):
00049
00050 def __init__(self, shelffile=None):
00051 self.logger = logging.getLogger(__name__)
00052
00053 self.server_callback_dispatcher = CallbackDispatcher()
00054
00055 self.endpoints = []
00056 self._channel_id_counter = 5
00057 self.allow_remote_admin = True
00058 self.disabled_clock = False
00059 self._known_servers = {}
00060
00061 self.aspace = AddressSpace()
00062 self.attribute_service = AttributeService(self.aspace)
00063 self.view_service = ViewService(self.aspace)
00064 self.method_service = MethodService(self.aspace)
00065 self.node_mgt_service = NodeManagementService(self.aspace)
00066
00067 self.load_standard_address_space(shelffile)
00068
00069 self.loop = utils.ThreadLoop()
00070 self.asyncio_transports = []
00071 self.subscription_service = SubscriptionService(self.loop, self.aspace)
00072
00073 self.history_manager = HistoryManager(self)
00074
00075
00076 self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
00077
00078 self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
00079 self.setup_nodes()
00080
00081 def setup_nodes(self):
00082 """
00083 Set up some nodes as defined by spec
00084 """
00085 uries = ["http://opcfoundation.org/UA/"]
00086 ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
00087 ns_node.set_value(uries)
00088
00089 def load_standard_address_space(self, shelffile=None):
00090
00091 shelffile_win = shelffile
00092 if shelffile_win:
00093 shelffile_win += ".dat"
00094
00095 if shelffile and (path.isfile(shelffile) or path.isfile(shelffile_win)):
00096
00097 self.aspace.load_aspace_shelf(shelffile)
00098 else:
00099
00100 standard_address_space.fill_address_space(self.node_mgt_service)
00101
00102
00103
00104
00105
00106 if shelffile:
00107 self.aspace.make_aspace_shelf(shelffile)
00108
00109 def load_address_space(self, path):
00110 """
00111 Load address space from path
00112 """
00113 self.aspace.load(path)
00114
00115 def dump_address_space(self, path):
00116 """
00117 Dump current address space to path
00118 """
00119 self.aspace.dump(path)
00120
00121 def start(self):
00122 self.logger.info("starting internal server")
00123 for edp in self.endpoints:
00124 self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
00125 self.loop.start()
00126 Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
00127 Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
00128 if not self.disabled_clock:
00129 self._set_current_time()
00130
00131 def stop(self):
00132 self.logger.info("stopping internal server")
00133 self.isession.close_session()
00134 self.loop.stop()
00135 self.history_manager.stop()
00136
00137 def _set_current_time(self):
00138 self.current_time_node.set_value(datetime.utcnow())
00139 self.loop.call_later(1, self._set_current_time)
00140
00141 def get_new_channel_id(self):
00142 self._channel_id_counter += 1
00143 return self._channel_id_counter
00144
00145 def add_endpoint(self, endpoint):
00146 self.endpoints.append(endpoint)
00147
00148 def get_endpoints(self, params=None, sockname=None):
00149 self.logger.info("get endpoint")
00150 if sockname:
00151
00152 edps = []
00153 for edp in self.endpoints:
00154 edp1 = copy(edp)
00155 url = urlparse(edp1.EndpointUrl)
00156 url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
00157 edp1.EndpointUrl = url.geturl()
00158 edps.append(edp1)
00159 return edps
00160 return self.endpoints[:]
00161
00162 def find_servers(self, params):
00163 if not params.ServerUris:
00164 return [desc.Server for desc in self._known_servers.values()]
00165 servers = []
00166 for serv in self._known_servers.values():
00167 serv_uri = serv.Server.ApplicationUri.split(":")
00168 for uri in params.ServerUris:
00169 uri = uri.split(":")
00170 if serv_uri[:len(uri)] == uri:
00171 servers.append(serv.Server)
00172 break
00173 return servers
00174
00175 def register_server(self, server, conf=None):
00176 appdesc = ua.ApplicationDescription()
00177 appdesc.ApplicationUri = server.ServerUri
00178 appdesc.ProductUri = server.ProductUri
00179
00180 appdesc.ApplicationName = server.ServerNames[0]
00181 appdesc.ApplicationType = server.ServerType
00182 appdesc.DiscoveryUrls = server.DiscoveryUrls
00183
00184 appdesc.GatewayServerUri = server.GatewayServerUri
00185 self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
00186
00187 def register_server2(self, params):
00188 return self.register_server(params.Server, params.DiscoveryConfiguration)
00189
00190 def create_session(self, name, user=User.Anonymous, external=False):
00191 return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
00192
00193 def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
00194 """
00195 Set attribute Historizing of node to True and start storing data for history
00196 """
00197 node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
00198 node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
00199 node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
00200 self.history_manager.historize_data_change(node, period, count)
00201
00202 def disable_history_data_change(self, node):
00203 """
00204 Set attribute Historizing of node to False and stop storing data for history
00205 """
00206 node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
00207 node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
00208 node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
00209 self.history_manager.dehistorize(node)
00210
00211 def enable_history_event(self, source, period=timedelta(days=7), count=0):
00212 """
00213 Set attribute History Read of object events to True and start storing data for history
00214 """
00215 event_notifier = source.get_event_notifier()
00216 if ua.EventNotifier.SubscribeToEvents not in event_notifier:
00217 raise ua.UaError("Node does not generate events", event_notifier)
00218
00219 if ua.EventNotifier.HistoryRead not in event_notifier:
00220 event_notifier.append(ua.EventNotifier.HistoryRead)
00221 source.set_event_notifier(event_notifier)
00222
00223 self.history_manager.historize_event(source, period, count)
00224
00225 def disable_history_event(self, source):
00226 """
00227 Set attribute History Read of node to False and stop storing data for history
00228 """
00229 source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
00230 self.history_manager.dehistorize(source)
00231
00232 def subscribe_server_callback(self, event, handle):
00233 """
00234 Create a subscription from event to handle
00235 """
00236 self.server_callback_dispatcher.addListener(event, handle)
00237
00238 def unsubscribe_server_callback(self, event, handle):
00239 """
00240 Remove a subscription from event to handle
00241 """
00242 self.server_callback_dispatcher.removeListener(event, handle)
00243
00244
00245 class InternalSession(object):
00246 _counter = 10
00247 _auth_counter = 1000
00248
00249 def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False):
00250 self.logger = logging.getLogger(__name__)
00251 self.iserver = internal_server
00252 self.external = external
00253 self.aspace = aspace
00254 self.subscription_service = submgr
00255 self.name = name
00256 self.user = user
00257 self.nonce = None
00258 self.state = SessionState.Created
00259 self.session_id = ua.NodeId(self._counter)
00260 InternalSession._counter += 1
00261 self.authentication_token = ua.NodeId(self._auth_counter)
00262 InternalSession._auth_counter += 1
00263 self.subscriptions = []
00264 self.logger.info("Created internal session %s", self.name)
00265 self._lock = Lock()
00266
00267 def __str__(self):
00268 return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format(
00269 self.name, self.user, self.session_id, self.authentication_token)
00270
00271 def get_endpoints(self, params=None, sockname=None):
00272 return self.iserver.get_endpoints(params, sockname)
00273
00274 def create_session(self, params, sockname=None):
00275 self.logger.info("Create session request")
00276
00277 result = ua.CreateSessionResult()
00278 result.SessionId = self.session_id
00279 result.AuthenticationToken = self.authentication_token
00280 result.RevisedSessionTimeout = params.RequestedSessionTimeout
00281 result.MaxRequestMessageSize = 65536
00282 self.nonce = utils.create_nonce(32)
00283 result.ServerNonce = self.nonce
00284 result.ServerEndpoints = self.get_endpoints(sockname=sockname)
00285
00286 return result
00287
00288 def close_session(self, delete_subs=True):
00289 self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
00290 self.state = SessionState.Closed
00291 self.delete_subscriptions(self.subscriptions[:])
00292
00293 def activate_session(self, params):
00294 self.logger.info("activate session")
00295 result = ua.ActivateSessionResult()
00296 if self.state != SessionState.Created:
00297 raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
00298 self.nonce = utils.create_nonce(32)
00299 result.ServerNonce = self.nonce
00300 for _ in params.ClientSoftwareCertificates:
00301 result.Results.append(ua.StatusCode())
00302 self.state = SessionState.Activated
00303 id_token = params.UserIdentityToken
00304 if isinstance(id_token, ua.UserNameIdentityToken):
00305 if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
00306 self.user = User.Admin
00307 self.logger.info("Activated internal session %s for user %s", self.name, self.user)
00308 return result
00309
00310 def read(self, params):
00311 results = self.iserver.attribute_service.read(params)
00312 if self.external:
00313 return results
00314 return [deepcopy(dv) for dv in results]
00315
00316 def history_read(self, params):
00317 return self.iserver.history_manager.read_history(params)
00318
00319 def write(self, params):
00320 if not self.external:
00321
00322
00323 params.NodesToWrite = [deepcopy(ntw) for ntw in params.NodesToWrite]
00324 return self.iserver.attribute_service.write(params, self.user)
00325
00326 def browse(self, params):
00327 return self.iserver.view_service.browse(params)
00328
00329 def translate_browsepaths_to_nodeids(self, params):
00330 return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
00331
00332 def add_nodes(self, params):
00333 return self.iserver.node_mgt_service.add_nodes(params, self.user)
00334
00335 def delete_nodes(self, params):
00336 return self.iserver.node_mgt_service.delete_nodes(params, self.user)
00337
00338 def add_references(self, params):
00339 return self.iserver.node_mgt_service.add_references(params, self.user)
00340
00341 def delete_references(self, params):
00342 return self.iserver.node_mgt_service.delete_references(params, self.user)
00343
00344 def add_method_callback(self, methodid, callback):
00345 return self.aspace.add_method_callback(methodid, callback)
00346
00347 def call(self, params):
00348 return self.iserver.method_service.call(params)
00349
00350 def create_subscription(self, params, callback):
00351 result = self.subscription_service.create_subscription(params, callback)
00352 with self._lock:
00353 self.subscriptions.append(result.SubscriptionId)
00354 return result
00355
00356 def create_monitored_items(self, params):
00357 subscription_result = self.subscription_service.create_monitored_items(params)
00358 self.iserver.server_callback_dispatcher.dispatch(
00359 CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
00360 return subscription_result
00361
00362 def modify_monitored_items(self, params):
00363 subscription_result = self.subscription_service.modify_monitored_items(params)
00364 self.iserver.server_callback_dispatcher.dispatch(
00365 CallbackType.ItemSubscriptionModified, ServerItemCallback(params, subscription_result))
00366 return subscription_result
00367
00368 def republish(self, params):
00369 return self.subscription_service.republish(params)
00370
00371 def delete_subscriptions(self, ids):
00372 for i in ids:
00373 with self._lock:
00374 if i in self.subscriptions:
00375 self.subscriptions.remove(i)
00376 return self.subscription_service.delete_subscriptions(ids)
00377
00378 def delete_monitored_items(self, params):
00379 subscription_result = self.subscription_service.delete_monitored_items(params)
00380 self.iserver.server_callback_dispatcher.dispatch(
00381 CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
00382 return subscription_result
00383
00384 def publish(self, acks=None):
00385 if acks is None:
00386 acks = []
00387 return self.subscription_service.publish(acks)