00001 from __future__ import division
00002 from threading import Thread, Condition
00003 import logging
00004 try:
00005 from urllib.parse import urlparse
00006 except ImportError:
00007 from urlparse import urlparse
00008
00009 from opcua import ua
00010 from opcua.client.ua_client import UaClient
00011 from opcua.common.node import Node
00012 from opcua.common.manage_nodes import delete_nodes
00013 from opcua.common.subscription import Subscription
00014 from opcua.common import utils
00015 from opcua.crypto import security_policies
00016 from opcua.common.shortcuts import Shortcuts
00017 use_crypto = True
00018 try:
00019 from opcua.crypto import uacrypto
00020 except ImportError:
00021 print("cryptography is not installed, use of crypto disabled")
00022 use_crypto = False
00023
00024
00025 class KeepAlive(Thread):
00026
00027 """
00028 Used by Client to keep the session open.
00029 OPCUA defines timeout both for sessions and secure channel
00030 """
00031
00032 def __init__(self, client, timeout):
00033 """
00034 :param session_timeout: Timeout to re-new the session
00035 in milliseconds.
00036 """
00037 Thread.__init__(self)
00038 self.logger = logging.getLogger(__name__)
00039
00040 self.client = client
00041 self._dostop = False
00042 self._cond = Condition()
00043 self.timeout = timeout
00044
00045
00046 if self.timeout == 0:
00047 self.timeout = 3600000
00048
00049 def run(self):
00050 self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
00051 server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
00052 while not self._dostop:
00053 with self._cond:
00054 self._cond.wait(self.timeout / 1000)
00055 if self._dostop:
00056 break
00057 self.logger.debug("renewing channel")
00058 self.client.open_secure_channel(renew=True)
00059 val = server_state.get_value()
00060 self.logger.debug("server state is: %s ", val)
00061 self.logger.debug("keepalive thread has stopped")
00062
00063 def stop(self):
00064 self.logger.debug("stoping keepalive thread")
00065 self._dostop = True
00066 with self._cond:
00067 self._cond.notify_all()
00068
00069
00070 class Client(object):
00071
00072 """
00073 High level client to connect to an OPC-UA server.
00074
00075 This class makes it easy to connect and browse address space.
00076 It attemps to expose as much functionality as possible
00077 but if you want more flexibility it is possible and adviced to
00078 use UaClient object, available as self.uaclient
00079 which offers the raw OPC-UA services interface.
00080 """
00081
00082 def __init__(self, url, timeout=4):
00083 """
00084
00085 :param url: url of the server.
00086 if you are unsure of url, write at least hostname
00087 and port and call get_endpoints
00088
00089 :param timeout:
00090 Each request sent to the server expects an answer within this
00091 time. The timeout is specified in seconds.
00092 """
00093 self.logger = logging.getLogger(__name__)
00094 self.server_url = urlparse(url)
00095 self.name = "Pure Python Client"
00096 self.description = self.name
00097 self.application_uri = "urn:freeopcua:client"
00098 self.product_uri = "urn:freeopcua.github.no:client"
00099 self.security_policy = ua.SecurityPolicy()
00100 self.secure_channel_id = None
00101 self.secure_channel_timeout = 3600000
00102 self.session_timeout = 3600000
00103 self._policy_ids = []
00104 self.uaclient = UaClient(timeout)
00105 self.user_certificate = None
00106 self.user_private_key = None
00107 self._session_counter = 1
00108 self.keepalive = None
00109 self.nodes = Shortcuts(self.uaclient)
00110
00111 def __enter__(self):
00112 self.connect()
00113 return self
00114
00115 def __exit__(self, exc_type, exc_value, traceback):
00116 self.disconnect()
00117
00118 @staticmethod
00119 def find_endpoint(endpoints, security_mode, policy_uri):
00120 """
00121 Find endpoint with required security mode and policy URI
00122 """
00123 for ep in endpoints:
00124 if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
00125 ep.SecurityMode == security_mode and
00126 ep.SecurityPolicyUri == policy_uri):
00127 return ep
00128 raise ua.UaError("No matching endpoints: {0}, {1}".format(
00129 security_mode, policy_uri))
00130
00131 def set_security_string(self, string):
00132 """
00133 Set SecureConnection mode. String format:
00134 Policy,Mode,certificate,private_key[,server_private_key]
00135 where Policy is Basic128Rsa15 or Basic256,
00136 Mode is Sign or SignAndEncrypt
00137 certificate, private_key and server_private_key are
00138 paths to .pem or .der files
00139 Call this before connect()
00140 """
00141 if not string:
00142 return
00143 parts = string.split(',')
00144 if len(parts) < 4:
00145 raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
00146 policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
00147 mode = getattr(ua.MessageSecurityMode, parts[1])
00148 return self.set_security(policy_class, parts[2], parts[3],
00149 parts[4] if len(parts) >= 5 else None, mode)
00150
00151 def set_security(self, policy, certificate_path, private_key_path,
00152 server_certificate_path=None,
00153 mode=ua.MessageSecurityMode.SignAndEncrypt):
00154 """
00155 Set SecureConnection mode.
00156 Call this before connect()
00157 """
00158 if server_certificate_path is None:
00159
00160 endpoints = self.connect_and_get_server_endpoints()
00161 endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
00162 server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
00163 else:
00164 server_cert = uacrypto.load_certificate(server_certificate_path)
00165 cert = uacrypto.load_certificate(certificate_path)
00166 pk = uacrypto.load_private_key(private_key_path)
00167 self.security_policy = policy(server_cert, cert, pk, mode)
00168 self.uaclient.set_security(self.security_policy)
00169
00170 def load_client_certificate(self, path):
00171 """
00172 load our certificate from file, either pem or der
00173 """
00174 self.user_certificate = uacrypto.load_certificate(path)
00175
00176 def load_private_key(self, path):
00177 """
00178 Load user private key. This is used for authenticating using certificate
00179 """
00180 self.user_private_key = uacrypto.load_private_key(path)
00181
00182 def connect_and_get_server_endpoints(self):
00183 """
00184 Connect, ask server for endpoints, and disconnect
00185 """
00186 self.connect_socket()
00187 self.send_hello()
00188 self.open_secure_channel()
00189 endpoints = self.get_endpoints()
00190 self.close_secure_channel()
00191 self.disconnect_socket()
00192 return endpoints
00193
00194 def connect_and_find_servers(self):
00195 """
00196 Connect, ask server for a list of known servers, and disconnect
00197 """
00198 self.connect_socket()
00199 self.send_hello()
00200 self.open_secure_channel()
00201 servers = self.find_servers()
00202 self.close_secure_channel()
00203 self.disconnect_socket()
00204 return servers
00205
00206 def connect_and_find_servers_on_network(self):
00207 """
00208 Connect, ask server for a list of known servers on network, and disconnect
00209 """
00210 self.connect_socket()
00211 self.send_hello()
00212 self.open_secure_channel()
00213 servers = self.find_servers_on_network()
00214 self.close_secure_channel()
00215 self.disconnect_socket()
00216 return servers
00217
00218 def connect(self):
00219 """
00220 High level method
00221 Connect, create and activate session
00222 """
00223 self.connect_socket()
00224 self.send_hello()
00225 self.open_secure_channel()
00226 self.create_session()
00227 self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
00228
00229 def disconnect(self):
00230 """
00231 High level method
00232 Close session, secure channel and socket
00233 """
00234 self.close_session()
00235 self.close_secure_channel()
00236 self.disconnect_socket()
00237
00238 def connect_socket(self):
00239 """
00240 connect to socket defined in url
00241 """
00242 self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
00243
00244 def disconnect_socket(self):
00245 self.uaclient.disconnect_socket()
00246
00247 def send_hello(self):
00248 """
00249 Send OPC-UA hello to server
00250 """
00251 ack = self.uaclient.send_hello(self.server_url.geturl())
00252
00253
00254 def open_secure_channel(self, renew=False):
00255 """
00256 Open secure channel, if renew is True, renew channel
00257 """
00258 params = ua.OpenSecureChannelParameters()
00259 params.ClientProtocolVersion = 0
00260 params.RequestType = ua.SecurityTokenRequestType.Issue
00261 if renew:
00262 params.RequestType = ua.SecurityTokenRequestType.Renew
00263 params.SecurityMode = self.security_policy.Mode
00264 params.RequestedLifetime = self.secure_channel_timeout
00265 nonce = utils.create_nonce(self.security_policy.symmetric_key_size)
00266 params.ClientNonce = nonce
00267 result = self.uaclient.open_secure_channel(params)
00268 self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
00269 self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
00270
00271 def close_secure_channel(self):
00272 return self.uaclient.close_secure_channel()
00273
00274 def get_endpoints(self):
00275 params = ua.GetEndpointsParameters()
00276 params.EndpointUrl = self.server_url.geturl()
00277 return self.uaclient.get_endpoints(params)
00278
00279 def register_server(self, server, discovery_configuration=None):
00280 """
00281 register a server to discovery server
00282 if discovery_configuration is provided, the newer register_server2 service call is used
00283 """
00284 serv = ua.RegisteredServer()
00285 serv.ServerUri = server.application_uri
00286 serv.ProductUri = server.product_uri
00287 serv.DiscoveryUrls = [server.endpoint.geturl()]
00288 serv.ServerType = server.application_type
00289 serv.ServerNames = [ua.LocalizedText(server.name)]
00290 serv.IsOnline = True
00291 if discovery_configuration:
00292 params = ua.RegisterServer2Parameters()
00293 params.Server = serv
00294 params.DiscoveryConfiguration = discovery_configuration
00295 return self.uaclient.register_server2(params)
00296 else:
00297 return self.uaclient.register_server(serv)
00298
00299 def find_servers(self, uris=None):
00300 """
00301 send a FindServer request to the server. The answer should be a list of
00302 servers the server knows about
00303 A list of uris can be provided, only server having matching uris will be returned
00304 """
00305 if uris is None:
00306 uris = []
00307 params = ua.FindServersParameters()
00308 params.EndpointUrl = self.server_url.geturl()
00309 params.ServerUris = uris
00310 return self.uaclient.find_servers(params)
00311
00312 def find_servers_on_network(self):
00313 params = ua.FindServersOnNetworkParameters()
00314 return self.uaclient.find_servers_on_network(params)
00315
00316 def create_session(self):
00317 """
00318 send a CreateSessionRequest to server with reasonable parameters.
00319 If you want o modify settings look at code of this methods
00320 and make your own
00321 """
00322 desc = ua.ApplicationDescription()
00323 desc.ApplicationUri = self.application_uri
00324 desc.ProductUri = self.product_uri
00325 desc.ApplicationName = ua.LocalizedText(self.name)
00326 desc.ApplicationType = ua.ApplicationType.Client
00327
00328 params = ua.CreateSessionParameters()
00329 nonce = utils.create_nonce(32)
00330 params.ClientNonce = nonce
00331 params.ClientCertificate = self.security_policy.client_certificate
00332 params.ClientDescription = desc
00333 params.EndpointUrl = self.server_url.geturl()
00334 params.SessionName = self.description + " Session" + str(self._session_counter)
00335 params.RequestedSessionTimeout = 3600000
00336 params.MaxResponseMessageSize = 0
00337 response = self.uaclient.create_session(params)
00338 if self.security_policy.client_certificate is None:
00339 data = nonce
00340 else:
00341 data = self.security_policy.client_certificate + nonce
00342 self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
00343 self._server_nonce = response.ServerNonce
00344 if not self.security_policy.server_certificate:
00345 self.security_policy.server_certificate = response.ServerCertificate
00346 elif self.security_policy.server_certificate != response.ServerCertificate:
00347 raise ua.UaError("Server certificate mismatch")
00348
00349 ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
00350 self._policy_ids = ep.UserIdentityTokens
00351 self.session_timeout = response.RevisedSessionTimeout
00352 self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)
00353 self.keepalive.start()
00354 return response
00355
00356 def server_policy_id(self, token_type, default):
00357 """
00358 Find PolicyId of server's UserTokenPolicy by token_type.
00359 Return default if there's no matching UserTokenPolicy.
00360 """
00361 for policy in self._policy_ids:
00362 if policy.TokenType == token_type:
00363 return policy.PolicyId
00364 return default
00365
00366 def server_policy_uri(self, token_type):
00367 """
00368 Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
00369 If SecurityPolicyUri is empty, use default SecurityPolicyUri
00370 of the endpoint
00371 """
00372 for policy in self._policy_ids:
00373 if policy.TokenType == token_type:
00374 if policy.SecurityPolicyUri:
00375 return policy.SecurityPolicyUri
00376 else:
00377 return self.security_policy.URI
00378 return self.security_policy.URI
00379
00380 def activate_session(self, username=None, password=None, certificate=None):
00381 """
00382 Activate session using either username and password or private_key
00383 """
00384 params = ua.ActivateSessionParameters()
00385 challenge = b""
00386 if self.security_policy.server_certificate is not None:
00387 challenge += self.security_policy.server_certificate
00388 if self._server_nonce is not None:
00389 challenge += self._server_nonce
00390 params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
00391 params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
00392 params.LocaleIds.append("en")
00393 if not username and not certificate:
00394 self._add_anonymous_auth(params)
00395 elif certificate:
00396 self._add_certificate_auth(params, certificate, challenge)
00397 else:
00398 self._add_user_auth(params, username, password)
00399 return self.uaclient.activate_session(params)
00400
00401 def _add_anonymous_auth(self, params):
00402 params.UserIdentityToken = ua.AnonymousIdentityToken()
00403 params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
00404
00405 def _add_certificate_auth(self, params, certificate, challenge):
00406 params.UserIdentityToken = ua.X509IdentityToken()
00407 params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
00408 params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
00409
00410
00411 sig = uacrypto.sign_sha1(self.user_private_key, challenge)
00412 params.UserTokenSignature = ua.SignatureData()
00413 params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
00414 params.UserTokenSignature.Signature = sig
00415
00416 def _add_user_auth(self, params, username, password):
00417 params.UserIdentityToken = ua.UserNameIdentityToken()
00418 params.UserIdentityToken.UserName = username
00419 policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
00420 if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
00421
00422
00423
00424 if self.server_url.password:
00425 self.logger.warning("Sending plain-text password")
00426 params.UserIdentityToken.Password = password
00427 params.UserIdentityToken.EncryptionAlgorithm = ''
00428 elif self.server_url.password:
00429 data, uri = self._encrypt_password(password, policy_uri)
00430 params.UserIdentityToken.Password = data
00431 params.UserIdentityToken.EncryptionAlgorithm = uri
00432 params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
00433
00434 def _encrypt_password(self, password, policy_uri):
00435 pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
00436
00437
00438 passwd = password.encode("utf8")
00439 if self._server_nonce is not None:
00440 passwd += self._server_nonce
00441 etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
00442 data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
00443 return data, uri
00444
00445 def close_session(self):
00446 """
00447 Close session
00448 """
00449 if self.keepalive:
00450 self.keepalive.stop()
00451 return self.uaclient.close_session(True)
00452
00453 def get_root_node(self):
00454 return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
00455
00456 def get_objects_node(self):
00457 return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
00458
00459 def get_server_node(self):
00460 return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
00461
00462 def get_node(self, nodeid):
00463 """
00464 Get node using NodeId object or a string representing a NodeId
00465 """
00466 return Node(self.uaclient, nodeid)
00467
00468 def create_subscription(self, period, handler):
00469 """
00470 Create a subscription.
00471 returns a Subscription object which allow
00472 to subscribe to events or data on server
00473 handler argument is a class with data_change and/or event methods.
00474 period argument is either a publishing interval in seconds or a
00475 CreateSubscriptionParameters instance. The second option should be used,
00476 if the opcua-server has problems with the default options.
00477 These methods will be called when notfication from server are received.
00478 See example-client.py.
00479 Do not do expensive/slow or network operation from these methods
00480 since they are called directly from receiving thread. This is a design choice,
00481 start another thread if you need to do such a thing.
00482 """
00483
00484 if isinstance(period, ua.CreateSubscriptionParameters):
00485 return Subscription(self.uaclient, period, handler)
00486 params = ua.CreateSubscriptionParameters()
00487 params.RequestedPublishingInterval = period
00488 params.RequestedLifetimeCount = 10000
00489 params.RequestedMaxKeepAliveCount = 3000
00490 params.MaxNotificationsPerPublish = 10000
00491 params.PublishingEnabled = True
00492 params.Priority = 0
00493 return Subscription(self.uaclient, params, handler)
00494
00495 def get_namespace_array(self):
00496 ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
00497 return ns_node.get_value()
00498
00499 def get_namespace_index(self, uri):
00500 uries = self.get_namespace_array()
00501 return uries.index(uri)
00502
00503 def delete_nodes(self, nodes, recursive=False):
00504 return delete_nodes(self.uaclient, nodes, recursive)
00505