1 from __future__
import division
2 from threading
import Thread, Condition
5 from urllib.parse
import urlparse
7 from urlparse
import urlparse
21 print(
"cryptography is not installed, use of crypto disabled")
28 Used by Client to keep the session open. 29 OPCUA defines timeout both for sessions and secure channel 34 :param session_timeout: Timeout to re-new the session 38 self.
logger = logging.getLogger(__name__)
50 self.logger.debug(
"starting keepalive thread with period of %s milliseconds", self.
timeout)
51 server_state = self.client.get_node(
ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
54 self._cond.wait(self.
timeout / 1000)
57 self.logger.debug(
"renewing channel")
58 self.client.open_secure_channel(renew=
True)
59 val = server_state.get_value()
60 self.logger.debug(
"server state is: %s ", val)
61 self.logger.debug(
"keepalive thread has stopped")
64 self.logger.debug(
"stoping keepalive thread")
67 self._cond.notify_all()
73 High level client to connect to an OPC-UA server. 75 This class makes it easy to connect and browse address space. 76 It attemps to expose as much functionality as possible 77 but if you want more flexibility it is possible and adviced to 78 use UaClient object, available as self.uaclient 79 which offers the raw OPC-UA services interface. 85 :param url: url of the server. 86 if you are unsure of url, write at least hostname 87 and port and call get_endpoints 90 Each request sent to the server expects an answer within this 91 time. The timeout is specified in seconds. 93 self.
logger = logging.getLogger(__name__)
95 self.
name =
"Pure Python Client" 115 def __exit__(self, exc_type, exc_value, traceback):
121 Find endpoint with required security mode and policy URI 124 if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME)
and 125 ep.SecurityMode == security_mode
and 126 ep.SecurityPolicyUri == policy_uri):
128 raise ua.UaError(
"No matching endpoints: {0}, {1}".format(
129 security_mode, policy_uri))
133 Set SecureConnection mode. String format: 134 Policy,Mode,certificate,private_key[,server_private_key] 135 where Policy is Basic128Rsa15 or Basic256, 136 Mode is Sign or SignAndEncrypt 137 certificate, private_key and server_private_key are 138 paths to .pem or .der files 139 Call this before connect() 143 parts = string.split(
',')
145 raise ua.UaError(
'Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
146 policy_class = getattr(security_policies,
'SecurityPolicy' + parts[0])
148 return self.
set_security(policy_class, parts[2], parts[3],
149 parts[4]
if len(parts) >= 5
else None, mode)
151 def set_security(self, policy, certificate_path, private_key_path,
152 server_certificate_path=
None,
153 mode=ua.MessageSecurityMode.SignAndEncrypt):
155 Set SecureConnection mode. 156 Call this before connect() 158 if server_certificate_path
is None:
161 endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
162 server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
164 server_cert = uacrypto.load_certificate(server_certificate_path)
165 cert = uacrypto.load_certificate(certificate_path)
166 pk = uacrypto.load_private_key(private_key_path)
172 load our certificate from file, either pem or der 178 Load user private key. This is used for authenticating using certificate 184 Connect, ask server for endpoints, and disconnect 196 Connect, ask server for a list of known servers, and disconnect 208 Connect, ask server for a list of known servers on network, and disconnect 221 Connect, create and activate session 232 Close session, secure channel and socket 240 connect to socket defined in url 242 self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
245 self.uaclient.disconnect_socket()
249 Send OPC-UA hello to server 251 ack = self.uaclient.send_hello(self.server_url.geturl())
256 Open secure channel, if renew is True, renew channel 259 params.ClientProtocolVersion = 0
260 params.RequestType = ua.SecurityTokenRequestType.Issue
262 params.RequestType = ua.SecurityTokenRequestType.Renew
263 params.SecurityMode = self.security_policy.Mode
265 nonce = utils.create_nonce(self.security_policy.symmetric_key_size)
266 params.ClientNonce = nonce
267 result = self.uaclient.open_secure_channel(params)
268 self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
272 return self.uaclient.close_secure_channel()
276 params.EndpointUrl = self.server_url.geturl()
277 return self.uaclient.get_endpoints(params)
281 register a server to discovery server 282 if discovery_configuration is provided, the newer register_server2 service call is used 285 serv.ServerUri = server.application_uri
286 serv.ProductUri = server.product_uri
287 serv.DiscoveryUrls = [server.endpoint.geturl()]
288 serv.ServerType = server.application_type
291 if discovery_configuration:
294 params.DiscoveryConfiguration = discovery_configuration
295 return self.uaclient.register_server2(params)
297 return self.uaclient.register_server(serv)
301 send a FindServer request to the server. The answer should be a list of 302 servers the server knows about 303 A list of uris can be provided, only server having matching uris will be returned 308 params.EndpointUrl = self.server_url.geturl()
309 params.ServerUris = uris
310 return self.uaclient.find_servers(params)
314 return self.uaclient.find_servers_on_network(params)
318 send a CreateSessionRequest to server with reasonable parameters. 319 If you want o modify settings look at code of this methods 326 desc.ApplicationType = ua.ApplicationType.Client
329 nonce = utils.create_nonce(32)
330 params.ClientNonce = nonce
331 params.ClientCertificate = self.security_policy.client_certificate
332 params.ClientDescription = desc
333 params.EndpointUrl = self.server_url.geturl()
335 params.RequestedSessionTimeout = 3600000
336 params.MaxResponseMessageSize = 0
337 response = self.uaclient.create_session(params)
338 if self.security_policy.client_certificate
is None:
341 data = self.security_policy.client_certificate + nonce
342 self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
344 if not self.security_policy.server_certificate:
345 self.security_policy.server_certificate = response.ServerCertificate
346 elif self.security_policy.server_certificate != response.ServerCertificate:
347 raise ua.UaError(
"Server certificate mismatch")
349 ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
353 self.keepalive.start()
358 Find PolicyId of server's UserTokenPolicy by token_type. 359 Return default if there's no matching UserTokenPolicy. 362 if policy.TokenType == token_type:
363 return policy.PolicyId
368 Find SecurityPolicyUri of server's UserTokenPolicy by token_type. 369 If SecurityPolicyUri is empty, use default SecurityPolicyUri 373 if policy.TokenType == token_type:
374 if policy.SecurityPolicyUri:
375 return policy.SecurityPolicyUri
377 return self.security_policy.URI
378 return self.security_policy.URI
382 Activate session using either username and password or private_key 386 if self.security_policy.server_certificate
is not None:
387 challenge += self.security_policy.server_certificate
390 params.ClientSignature.Algorithm = b
"http://www.w3.org/2000/09/xmldsig#rsa-sha1" 391 params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
392 params.LocaleIds.append(
"en")
393 if not username
and not certificate:
399 return self.uaclient.activate_session(params)
403 params.UserIdentityToken.PolicyId = self.
server_policy_id(ua.UserTokenType.Anonymous, b
"anonymous")
407 params.UserIdentityToken.PolicyId = self.
server_policy_id(ua.UserTokenType.Certificate, b
"certificate_basic256")
408 params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
413 params.UserTokenSignature.Algorithm = b
"http://www.w3.org/2000/09/xmldsig#rsa-sha1" 414 params.UserTokenSignature.Signature = sig
418 params.UserIdentityToken.UserName = username
420 if not policy_uri
or policy_uri == security_policies.POLICY_NONE_URI:
424 if self.server_url.password:
425 self.logger.warning(
"Sending plain-text password")
426 params.UserIdentityToken.Password = password
427 params.UserIdentityToken.EncryptionAlgorithm =
'' 428 elif self.server_url.password:
430 params.UserIdentityToken.Password = data
431 params.UserIdentityToken.EncryptionAlgorithm = uri
432 params.UserIdentityToken.PolicyId = self.
server_policy_id(ua.UserTokenType.UserName, b
"username_basic256")
435 pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
438 passwd = password.encode(
"utf8")
441 etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
442 data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
450 self.keepalive.stop()
451 return self.uaclient.close_session(
True)
464 Get node using NodeId object or a string representing a NodeId 470 Create a subscription. 471 returns a Subscription object which allow 472 to subscribe to events or data on server 473 handler argument is a class with data_change and/or event methods. 474 period argument is either a publishing interval in seconds or a 475 CreateSubscriptionParameters instance. The second option should be used, 476 if the opcua-server has problems with the default options. 477 These methods will be called when notfication from server are received. 478 See example-client.py. 479 Do not do expensive/slow or network operation from these methods 480 since they are called directly from receiving thread. This is a design choice, 481 start another thread if you need to do such a thing. 487 params.RequestedPublishingInterval = period
488 params.RequestedLifetimeCount = 10000
489 params.RequestedMaxKeepAliveCount = 3000
490 params.MaxNotificationsPerPublish = 10000
491 params.PublishingEnabled =
True 497 return ns_node.get_value()
501 return uries.index(uri)
def activate_session(self, username=None, password=None, certificate=None)
def get_node(self, nodeid)
def find_servers_on_network(self)
def connect_and_find_servers_on_network(self)
def get_objects_node(self)
def get_namespace_array(self)
def _add_anonymous_auth(self, params)
def get_namespace_index(self, uri)
def register_server(self, server, discovery_configuration=None)
def connect_and_find_servers(self)
def open_secure_channel(self, renew=False)
def create_subscription(self, period, handler)
def load_client_certificate(self, path)
def disconnect_socket(self)
def __init__(self, url, timeout=4)
def __init__(self, client, timeout)
def set_security_string(self, string)
def server_policy_id(self, token_type, default)
def set_security(self, policy, certificate_path, private_key_path, server_certificate_path=None, mode=ua.MessageSecurityMode.SignAndEncrypt)
def _add_certificate_auth(self, params, certificate, challenge)
def get_server_node(self)
def find_servers(self, uris=None)
def __exit__(self, exc_type, exc_value, traceback)
def load_private_key(self, path)
def delete_nodes(self, nodes, recursive=False)
def server_policy_uri(self, token_type)
def connect_and_get_server_endpoints(self)
def find_endpoint(endpoints, security_mode, policy_uri)
def _add_user_auth(self, params, username, password)
def close_secure_channel(self)
def _encrypt_password(self, password, policy_uri)