2 Internal server implementing opcu-ua interface. 3 Can be used on server side or to implement binary/https opc-ua servers 6 from datetime
import datetime
7 from copy
import copy, deepcopy
8 from datetime
import timedelta
11 from threading
import Lock
14 from urllib.parse
import urlparse
16 from urlparse
import urlparse
51 self.
logger = logging.getLogger(__name__)
69 self.
loop = utils.ThreadLoop()
83 Set up some nodes as defined by spec 85 uries = [
"http://opcfoundation.org/UA/"]
87 ns_node.set_value(uries)
91 shelffile_win = shelffile
93 shelffile_win +=
".dat" 95 if shelffile
and (path.isfile(shelffile)
or path.isfile(shelffile_win)):
97 self.aspace.load_aspace_shelf(shelffile)
107 self.aspace.make_aspace_shelf(shelffile)
111 Load address space from path 113 self.aspace.load(path)
117 Dump current address space to path 119 self.aspace.dump(path)
122 self.logger.info(
"starting internal server")
126 Node(self.
isession,
ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
127 Node(self.
isession,
ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
132 self.logger.info(
"stopping internal server")
133 self.isession.close_session()
135 self.history_manager.stop()
138 self.current_time_node.set_value(datetime.utcnow())
146 self.endpoints.append(endpoint)
149 self.logger.info(
"get endpoint")
155 url = urlparse(edp1.EndpointUrl)
156 url = url._replace(netloc=sockname[0] +
":" + str(sockname[1]))
157 edp1.EndpointUrl = url.geturl()
163 if not params.ServerUris:
164 return [desc.Server
for desc
in self._known_servers.values()]
166 for serv
in self._known_servers.values():
167 serv_uri = serv.Server.ApplicationUri.split(
":")
168 for uri
in params.ServerUris:
170 if serv_uri[:len(uri)] == uri:
171 servers.append(serv.Server)
177 appdesc.ApplicationUri = server.ServerUri
178 appdesc.ProductUri = server.ProductUri
180 appdesc.ApplicationName = server.ServerNames[0]
181 appdesc.ApplicationType = server.ServerType
182 appdesc.DiscoveryUrls = server.DiscoveryUrls
184 appdesc.GatewayServerUri = server.GatewayServerUri
188 return self.
register_server(params.Server, params.DiscoveryConfiguration)
195 Set attribute Historizing of node to True and start storing data for history 197 node.set_attribute(ua.AttributeIds.Historizing,
ua.DataValue(
True))
198 node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
199 node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
200 self.history_manager.historize_data_change(node, period, count)
204 Set attribute Historizing of node to False and stop storing data for history 206 node.set_attribute(ua.AttributeIds.Historizing,
ua.DataValue(
False))
207 node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
208 node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
209 self.history_manager.dehistorize(node)
213 Set attribute History Read of object events to True and start storing data for history 215 event_notifier = source.get_event_notifier()
216 if ua.EventNotifier.SubscribeToEvents
not in event_notifier:
217 raise ua.UaError(
"Node does not generate events", event_notifier)
219 if ua.EventNotifier.HistoryRead
not in event_notifier:
220 event_notifier.append(ua.EventNotifier.HistoryRead)
221 source.set_event_notifier(event_notifier)
223 self.history_manager.historize_event(source, period, count)
227 Set attribute History Read of node to False and stop storing data for history 229 source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
230 self.history_manager.dehistorize(source)
234 Create a subscription from event to handle 236 self.server_callback_dispatcher.addListener(event, handle)
240 Remove a subscription from event to handle 242 self.server_callback_dispatcher.removeListener(event, handle)
249 def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False):
250 self.
logger = logging.getLogger(__name__)
260 InternalSession._counter += 1
262 InternalSession._auth_counter += 1
264 self.logger.info(
"Created internal session %s", self.
name)
268 return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format(
272 return self.iserver.get_endpoints(params, sockname)
275 self.logger.info(
"Create session request")
280 result.RevisedSessionTimeout = params.RequestedSessionTimeout
281 result.MaxRequestMessageSize = 65536
282 self.
nonce = utils.create_nonce(32)
283 result.ServerNonce = self.
nonce 284 result.ServerEndpoints = self.
get_endpoints(sockname=sockname)
289 self.logger.info(
"close session %s with subscriptions %s", self, self.
subscriptions)
290 self.
state = SessionState.Closed
294 self.logger.info(
"activate session")
296 if self.
state != SessionState.Created:
297 raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
298 self.
nonce = utils.create_nonce(32)
299 result.ServerNonce = self.
nonce 300 for _
in params.ClientSoftwareCertificates:
302 self.
state = SessionState.Activated
303 id_token = params.UserIdentityToken
305 if self.iserver.allow_remote_admin
and id_token.UserName
in (
"admin",
"Admin"):
306 self.
user = User.Admin
307 self.logger.info(
"Activated internal session %s for user %s", self.
name, self.
user)
311 results = self.iserver.attribute_service.read(params)
314 return [deepcopy(dv)
for dv
in results]
317 return self.iserver.history_manager.read_history(params)
323 params.NodesToWrite = [deepcopy(ntw)
for ntw
in params.NodesToWrite]
324 return self.iserver.attribute_service.write(params, self.
user)
327 return self.iserver.view_service.browse(params)
330 return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
333 return self.iserver.node_mgt_service.add_nodes(params, self.
user)
336 return self.iserver.node_mgt_service.delete_nodes(params, self.
user)
339 return self.iserver.node_mgt_service.add_references(params, self.
user)
342 return self.iserver.node_mgt_service.delete_references(params, self.
user)
345 return self.aspace.add_method_callback(methodid, callback)
348 return self.iserver.method_service.call(params)
351 result = self.subscription_service.create_subscription(params, callback)
353 self.subscriptions.append(result.SubscriptionId)
357 subscription_result = self.subscription_service.create_monitored_items(params)
358 self.iserver.server_callback_dispatcher.dispatch(
359 CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
360 return subscription_result
363 subscription_result = self.subscription_service.modify_monitored_items(params)
364 self.iserver.server_callback_dispatcher.dispatch(
365 CallbackType.ItemSubscriptionModified, ServerItemCallback(params, subscription_result))
366 return subscription_result
369 return self.subscription_service.republish(params)
375 self.subscriptions.remove(i)
376 return self.subscription_service.delete_subscriptions(ids)
379 subscription_result = self.subscription_service.delete_monitored_items(params)
380 self.iserver.server_callback_dispatcher.dispatch(
381 CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
382 return subscription_result
387 return self.subscription_service.publish(acks)
def publish(self, acks=None)
def activate_session(self, params)
def _set_current_time(self)
def subscribe_server_callback(self, event, handle)
def dump_address_space(self, path)
def create_session(self, name, user=User.Anonymous, external=False)
def load_standard_address_space(self, shelffile=None)
def create_monitored_items(self, params)
def delete_monitored_items(self, params)
def delete_nodes(self, params)
def __init__(self, shelffile=None)
def disable_history_event(self, source)
def history_read(self, params)
def add_endpoint(self, endpoint)
def __init__(self, serv, cap=None)
def modify_monitored_items(self, params)
def add_method_callback(self, methodid, callback)
def close_session(self, delete_subs=True)
def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False)
def unsubscribe_server_callback(self, event, handle)
def enable_history_event(self, source, period=timedelta(days=7), count=0)
def enable_history_data_change(self, node, period=timedelta(days=7), count=0)
server_callback_dispatcher
def republish(self, params)
def create_subscription(self, params, callback)
def get_endpoints(self, params=None, sockname=None)
def register_server(self, server, conf=None)
def add_nodes(self, params)
def find_servers(self, params)
def disable_history_data_change(self, node)
def load_address_space(self, path)
def register_server2(self, params)
def add_references(self, params)
def get_new_channel_id(self)
def translate_browsepaths_to_nodeids(self, params)
def get_endpoints(self, params=None, sockname=None)
def delete_subscriptions(self, ids)
def delete_references(self, params)
def create_session(self, params, sockname=None)