client.py
Go to the documentation of this file.
00001 from __future__ import division  # support for python2
00002 from threading import Thread, Condition
00003 import logging
00004 try:
00005     from urllib.parse import urlparse
00006 except ImportError:  # support for python2
00007     from urlparse import urlparse
00008 
00009 from opcua import ua
00010 from opcua.client.ua_client import UaClient
00011 from opcua.common.node import Node
00012 from opcua.common.manage_nodes import delete_nodes
00013 from opcua.common.subscription import Subscription
00014 from opcua.common import utils
00015 from opcua.crypto import security_policies
00016 from opcua.common.shortcuts import Shortcuts
00017 use_crypto = True
00018 try:
00019     from opcua.crypto import uacrypto
00020 except ImportError:
00021     print("cryptography is not installed, use of crypto disabled")
00022     use_crypto = False
00023 
00024 
00025 class KeepAlive(Thread):
00026 
00027     """
00028     Used by Client to keep the session open.
00029     OPCUA defines timeout both for sessions and secure channel
00030     """
00031 
00032     def __init__(self, client, timeout):
00033         """
00034         :param session_timeout: Timeout to re-new the session
00035             in milliseconds.
00036         """
00037         Thread.__init__(self)
00038         self.logger = logging.getLogger(__name__)
00039 
00040         self.client = client
00041         self._dostop = False
00042         self._cond = Condition()
00043         self.timeout = timeout
00044 
00045         # some server support no timeout, but we do not trust them
00046         if self.timeout == 0:
00047             self.timeout = 3600000 # 1 hour
00048 
00049     def run(self):
00050         self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
00051         server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
00052         while not self._dostop:
00053             with self._cond:
00054                 self._cond.wait(self.timeout / 1000)
00055             if self._dostop:
00056                 break
00057             self.logger.debug("renewing channel")
00058             self.client.open_secure_channel(renew=True)
00059             val = server_state.get_value()
00060             self.logger.debug("server state is: %s ", val)
00061         self.logger.debug("keepalive thread has stopped")
00062 
00063     def stop(self):
00064         self.logger.debug("stoping keepalive thread")
00065         self._dostop = True
00066         with self._cond:
00067             self._cond.notify_all()
00068 
00069 
00070 class Client(object):
00071 
00072     """
00073     High level client to connect to an OPC-UA server.
00074 
00075     This class makes it easy to connect and browse address space.
00076     It attemps to expose as much functionality as possible
00077     but if you want more flexibility it is possible and adviced to
00078     use UaClient object, available as self.uaclient
00079     which offers the raw OPC-UA services interface.
00080     """
00081 
00082     def __init__(self, url, timeout=4):
00083         """
00084 
00085         :param url: url of the server.
00086             if you are unsure of url, write at least hostname
00087             and port and call get_endpoints
00088 
00089         :param timeout:
00090             Each request sent to the server expects an answer within this
00091             time. The timeout is specified in seconds.
00092         """
00093         self.logger = logging.getLogger(__name__)
00094         self.server_url = urlparse(url)
00095         self.name = "Pure Python Client"
00096         self.description = self.name
00097         self.application_uri = "urn:freeopcua:client"
00098         self.product_uri = "urn:freeopcua.github.no:client"
00099         self.security_policy = ua.SecurityPolicy()
00100         self.secure_channel_id = None
00101         self.secure_channel_timeout = 3600000 # 1 hour
00102         self.session_timeout = 3600000 # 1 hour
00103         self._policy_ids = []
00104         self.uaclient = UaClient(timeout)
00105         self.user_certificate = None
00106         self.user_private_key = None
00107         self._session_counter = 1
00108         self.keepalive = None
00109         self.nodes = Shortcuts(self.uaclient)
00110 
00111     def __enter__(self):
00112         self.connect()
00113         return self
00114 
00115     def __exit__(self, exc_type, exc_value, traceback):
00116         self.disconnect()
00117 
00118     @staticmethod
00119     def find_endpoint(endpoints, security_mode, policy_uri):
00120         """
00121         Find endpoint with required security mode and policy URI
00122         """
00123         for ep in endpoints:
00124             if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
00125                     ep.SecurityMode == security_mode and
00126                     ep.SecurityPolicyUri == policy_uri):
00127                 return ep
00128         raise ua.UaError("No matching endpoints: {0}, {1}".format(
00129                          security_mode, policy_uri))
00130 
00131     def set_security_string(self, string):
00132         """
00133         Set SecureConnection mode. String format:
00134         Policy,Mode,certificate,private_key[,server_private_key]
00135         where Policy is Basic128Rsa15 or Basic256,
00136             Mode is Sign or SignAndEncrypt
00137             certificate, private_key and server_private_key are
00138                 paths to .pem or .der files
00139         Call this before connect()
00140         """
00141         if not string:
00142             return
00143         parts = string.split(',')
00144         if len(parts) < 4:
00145             raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
00146         policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
00147         mode = getattr(ua.MessageSecurityMode, parts[1])
00148         return self.set_security(policy_class, parts[2], parts[3],
00149                                  parts[4] if len(parts) >= 5 else None, mode)
00150 
00151     def set_security(self, policy, certificate_path, private_key_path,
00152                      server_certificate_path=None,
00153                      mode=ua.MessageSecurityMode.SignAndEncrypt):
00154         """
00155         Set SecureConnection mode.
00156         Call this before connect()
00157         """
00158         if server_certificate_path is None:
00159             # load certificate from server's list of endpoints
00160             endpoints = self.connect_and_get_server_endpoints()
00161             endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
00162             server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
00163         else:
00164             server_cert = uacrypto.load_certificate(server_certificate_path)
00165         cert = uacrypto.load_certificate(certificate_path)
00166         pk = uacrypto.load_private_key(private_key_path)
00167         self.security_policy = policy(server_cert, cert, pk, mode)
00168         self.uaclient.set_security(self.security_policy)
00169 
00170     def load_client_certificate(self, path):
00171         """
00172         load our certificate from file, either pem or der
00173         """
00174         self.user_certificate = uacrypto.load_certificate(path)
00175 
00176     def load_private_key(self, path):
00177         """
00178         Load user private key. This is used for authenticating using certificate
00179         """
00180         self.user_private_key = uacrypto.load_private_key(path)
00181 
00182     def connect_and_get_server_endpoints(self):
00183         """
00184         Connect, ask server for endpoints, and disconnect
00185         """
00186         self.connect_socket()
00187         self.send_hello()
00188         self.open_secure_channel()
00189         endpoints = self.get_endpoints()
00190         self.close_secure_channel()
00191         self.disconnect_socket()
00192         return endpoints
00193 
00194     def connect_and_find_servers(self):
00195         """
00196         Connect, ask server for a list of known servers, and disconnect
00197         """
00198         self.connect_socket()
00199         self.send_hello()
00200         self.open_secure_channel()  # spec says it should not be necessary to open channel
00201         servers = self.find_servers()
00202         self.close_secure_channel()
00203         self.disconnect_socket()
00204         return servers
00205 
00206     def connect_and_find_servers_on_network(self):
00207         """
00208         Connect, ask server for a list of known servers on network, and disconnect
00209         """
00210         self.connect_socket()
00211         self.send_hello()
00212         self.open_secure_channel()
00213         servers = self.find_servers_on_network()
00214         self.close_secure_channel()
00215         self.disconnect_socket()
00216         return servers
00217 
00218     def connect(self):
00219         """
00220         High level method
00221         Connect, create and activate session
00222         """
00223         self.connect_socket()
00224         self.send_hello()
00225         self.open_secure_channel()
00226         self.create_session()
00227         self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
00228 
00229     def disconnect(self):
00230         """
00231         High level method
00232         Close session, secure channel and socket
00233         """
00234         self.close_session()
00235         self.close_secure_channel()
00236         self.disconnect_socket()
00237 
00238     def connect_socket(self):
00239         """
00240         connect to socket defined in url
00241         """
00242         self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
00243 
00244     def disconnect_socket(self):
00245         self.uaclient.disconnect_socket()
00246 
00247     def send_hello(self):
00248         """
00249         Send OPC-UA hello to server
00250         """
00251         ack = self.uaclient.send_hello(self.server_url.geturl())
00252         # FIXME check ack
00253 
00254     def open_secure_channel(self, renew=False):
00255         """
00256         Open secure channel, if renew is True, renew channel
00257         """
00258         params = ua.OpenSecureChannelParameters()
00259         params.ClientProtocolVersion = 0
00260         params.RequestType = ua.SecurityTokenRequestType.Issue
00261         if renew:
00262             params.RequestType = ua.SecurityTokenRequestType.Renew
00263         params.SecurityMode = self.security_policy.Mode
00264         params.RequestedLifetime = self.secure_channel_timeout
00265         nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
00266         params.ClientNonce = nonce      # this nonce is used to create a symmetric key
00267         result = self.uaclient.open_secure_channel(params)
00268         self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
00269         self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
00270 
00271     def close_secure_channel(self):
00272         return self.uaclient.close_secure_channel()
00273 
00274     def get_endpoints(self):
00275         params = ua.GetEndpointsParameters()
00276         params.EndpointUrl = self.server_url.geturl()
00277         return self.uaclient.get_endpoints(params)
00278 
00279     def register_server(self, server, discovery_configuration=None):
00280         """
00281         register a server to discovery server
00282         if discovery_configuration is provided, the newer register_server2 service call is used
00283         """
00284         serv = ua.RegisteredServer()
00285         serv.ServerUri = server.application_uri
00286         serv.ProductUri = server.product_uri
00287         serv.DiscoveryUrls = [server.endpoint.geturl()]
00288         serv.ServerType = server.application_type
00289         serv.ServerNames = [ua.LocalizedText(server.name)]
00290         serv.IsOnline = True
00291         if discovery_configuration:
00292             params = ua.RegisterServer2Parameters()
00293             params.Server = serv
00294             params.DiscoveryConfiguration = discovery_configuration
00295             return self.uaclient.register_server2(params)
00296         else:
00297             return self.uaclient.register_server(serv)
00298 
00299     def find_servers(self, uris=None):
00300         """
00301         send a FindServer request to the server. The answer should be a list of
00302         servers the server knows about
00303         A list of uris can be provided, only server having matching uris will be returned
00304         """
00305         if uris is None:
00306             uris = []
00307         params = ua.FindServersParameters()
00308         params.EndpointUrl = self.server_url.geturl()
00309         params.ServerUris = uris
00310         return self.uaclient.find_servers(params)
00311 
00312     def find_servers_on_network(self):
00313         params = ua.FindServersOnNetworkParameters()
00314         return self.uaclient.find_servers_on_network(params)
00315 
00316     def create_session(self):
00317         """
00318         send a CreateSessionRequest to server with reasonable parameters.
00319         If you want o modify settings look at code of this methods
00320         and make your own
00321         """
00322         desc = ua.ApplicationDescription()
00323         desc.ApplicationUri = self.application_uri
00324         desc.ProductUri = self.product_uri
00325         desc.ApplicationName = ua.LocalizedText(self.name)
00326         desc.ApplicationType = ua.ApplicationType.Client
00327 
00328         params = ua.CreateSessionParameters()
00329         nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
00330         params.ClientNonce = nonce
00331         params.ClientCertificate = self.security_policy.client_certificate
00332         params.ClientDescription = desc
00333         params.EndpointUrl = self.server_url.geturl()
00334         params.SessionName = self.description + " Session" + str(self._session_counter)
00335         params.RequestedSessionTimeout = 3600000
00336         params.MaxResponseMessageSize = 0  # means no max size
00337         response = self.uaclient.create_session(params)
00338         if self.security_policy.client_certificate is None:
00339             data = nonce
00340         else:
00341             data = self.security_policy.client_certificate + nonce
00342         self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
00343         self._server_nonce = response.ServerNonce
00344         if not self.security_policy.server_certificate:
00345             self.security_policy.server_certificate = response.ServerCertificate
00346         elif self.security_policy.server_certificate != response.ServerCertificate:
00347             raise ua.UaError("Server certificate mismatch")
00348         # remember PolicyId's: we will use them in activate_session()
00349         ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
00350         self._policy_ids = ep.UserIdentityTokens
00351         self.session_timeout = response.RevisedSessionTimeout
00352         self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
00353         self.keepalive.start()
00354         return response
00355 
00356     def server_policy_id(self, token_type, default):
00357         """
00358         Find PolicyId of server's UserTokenPolicy by token_type.
00359         Return default if there's no matching UserTokenPolicy.
00360         """
00361         for policy in self._policy_ids:
00362             if policy.TokenType == token_type:
00363                 return policy.PolicyId
00364         return default
00365 
00366     def server_policy_uri(self, token_type):
00367         """
00368         Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
00369         If SecurityPolicyUri is empty, use default SecurityPolicyUri
00370         of the endpoint
00371         """
00372         for policy in self._policy_ids:
00373             if policy.TokenType == token_type:
00374                 if policy.SecurityPolicyUri:
00375                     return policy.SecurityPolicyUri
00376                 else:   # empty URI means "use this endpoint's policy URI"
00377                     return self.security_policy.URI
00378         return self.security_policy.URI
00379 
00380     def activate_session(self, username=None, password=None, certificate=None):
00381         """
00382         Activate session using either username and password or private_key
00383         """
00384         params = ua.ActivateSessionParameters()
00385         challenge = b""
00386         if self.security_policy.server_certificate is not None:
00387             challenge += self.security_policy.server_certificate
00388         if self._server_nonce is not None:
00389             challenge += self._server_nonce
00390         params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
00391         params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
00392         params.LocaleIds.append("en")
00393         if not username and not certificate:
00394             self._add_anonymous_auth(params)
00395         elif certificate:
00396             self._add_certificate_auth(params, certificate, challenge)
00397         else:
00398             self._add_user_auth(params, username, password)
00399         return self.uaclient.activate_session(params)
00400 
00401     def _add_anonymous_auth(self, params):
00402         params.UserIdentityToken = ua.AnonymousIdentityToken()
00403         params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
00404 
00405     def _add_certificate_auth(self, params, certificate, challenge):
00406         params.UserIdentityToken = ua.X509IdentityToken()
00407         params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
00408         params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
00409         # specs part 4, 5.6.3.1: the data to sign is created by appending
00410         # the last serverNonce to the serverCertificate
00411         sig = uacrypto.sign_sha1(self.user_private_key, challenge)
00412         params.UserTokenSignature = ua.SignatureData()
00413         params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
00414         params.UserTokenSignature.Signature = sig
00415 
00416     def _add_user_auth(self, params, username, password):
00417         params.UserIdentityToken = ua.UserNameIdentityToken()
00418         params.UserIdentityToken.UserName = username
00419         policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
00420         if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
00421             # see specs part 4, 7.36.3: if the token is NOT encrypted,
00422             # then the password only contains UTF-8 encoded password
00423             # and EncryptionAlgorithm is null
00424             if self.server_url.password:
00425                 self.logger.warning("Sending plain-text password")
00426                 params.UserIdentityToken.Password = password
00427             params.UserIdentityToken.EncryptionAlgorithm = ''
00428         elif self.server_url.password:
00429             data, uri = self._encrypt_password(password, policy_uri)
00430             params.UserIdentityToken.Password = data 
00431             params.UserIdentityToken.EncryptionAlgorithm = uri
00432         params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
00433 
00434     def _encrypt_password(self, password, policy_uri):
00435         pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
00436         # see specs part 4, 7.36.3: if the token is encrypted, password
00437         # shall be converted to UTF-8 and serialized with server nonce
00438         passwd = password.encode("utf8")
00439         if self._server_nonce is not None:
00440             passwd += self._server_nonce
00441         etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
00442         data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
00443         return data, uri
00444 
00445     def close_session(self):
00446         """
00447         Close session
00448         """
00449         if self.keepalive:
00450             self.keepalive.stop()
00451         return self.uaclient.close_session(True)
00452 
00453     def get_root_node(self):
00454         return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
00455 
00456     def get_objects_node(self):
00457         return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
00458 
00459     def get_server_node(self):
00460         return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
00461 
00462     def get_node(self, nodeid):
00463         """
00464         Get node using NodeId object or a string representing a NodeId
00465         """
00466         return Node(self.uaclient, nodeid)
00467 
00468     def create_subscription(self, period, handler):
00469         """
00470         Create a subscription.
00471         returns a Subscription object which allow
00472         to subscribe to events or data on server
00473         handler argument is a class with data_change and/or event methods.
00474         period argument is either a publishing interval in seconds or a 
00475         CreateSubscriptionParameters instance. The second option should be used,
00476         if the opcua-server has problems with the default options.
00477         These methods will be called when notfication from server are received.
00478         See example-client.py.
00479         Do not do expensive/slow or network operation from these methods
00480         since they are called directly from receiving thread. This is a design choice,
00481         start another thread if you need to do such a thing.  
00482         """
00483         
00484         if isinstance(period, ua.CreateSubscriptionParameters):
00485              return Subscription(self.uaclient, period, handler)
00486         params = ua.CreateSubscriptionParameters()
00487         params.RequestedPublishingInterval = period
00488         params.RequestedLifetimeCount = 10000
00489         params.RequestedMaxKeepAliveCount = 3000
00490         params.MaxNotificationsPerPublish = 10000
00491         params.PublishingEnabled = True
00492         params.Priority = 0
00493         return Subscription(self.uaclient, params, handler)
00494 
00495     def get_namespace_array(self):
00496         ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
00497         return ns_node.get_value()
00498 
00499     def get_namespace_index(self, uri):
00500         uries = self.get_namespace_array()
00501         return uries.index(uri)
00502 
00503     def delete_nodes(self, nodes, recursive=False):
00504         return delete_nodes(self.uaclient, nodes, recursive)
00505             


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23