client.py
Go to the documentation of this file.
1 from __future__ import division # support for python2
2 from threading import Thread, Condition
3 import logging
4 try:
5  from urllib.parse import urlparse
6 except ImportError: # support for python2
7  from urlparse import urlparse
8 
9 from opcua import ua
10 from opcua.client.ua_client import UaClient
11 from opcua.common.node import Node
12 from opcua.common.manage_nodes import delete_nodes
13 from opcua.common.subscription import Subscription
14 from opcua.common import utils
15 from opcua.crypto import security_policies
16 from opcua.common.shortcuts import Shortcuts
17 use_crypto = True
18 try:
19  from opcua.crypto import uacrypto
20 except ImportError:
21  print("cryptography is not installed, use of crypto disabled")
22  use_crypto = False
23 
24 
25 class KeepAlive(Thread):
26 
27  """
28  Used by Client to keep the session open.
29  OPCUA defines timeout both for sessions and secure channel
30  """
31 
32  def __init__(self, client, timeout):
33  """
34  :param session_timeout: Timeout to re-new the session
35  in milliseconds.
36  """
37  Thread.__init__(self)
38  self.logger = logging.getLogger(__name__)
39 
40  self.client = client
41  self._dostop = False
42  self._cond = Condition()
43  self.timeout = timeout
44 
45  # some server support no timeout, but we do not trust them
46  if self.timeout == 0:
47  self.timeout = 3600000 # 1 hour
48 
49  def run(self):
50  self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
51  server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
52  while not self._dostop:
53  with self._cond:
54  self._cond.wait(self.timeout / 1000)
55  if self._dostop:
56  break
57  self.logger.debug("renewing channel")
58  self.client.open_secure_channel(renew=True)
59  val = server_state.get_value()
60  self.logger.debug("server state is: %s ", val)
61  self.logger.debug("keepalive thread has stopped")
62 
63  def stop(self):
64  self.logger.debug("stoping keepalive thread")
65  self._dostop = True
66  with self._cond:
67  self._cond.notify_all()
68 
69 
70 class Client(object):
71 
72  """
73  High level client to connect to an OPC-UA server.
74 
75  This class makes it easy to connect and browse address space.
76  It attemps to expose as much functionality as possible
77  but if you want more flexibility it is possible and adviced to
78  use UaClient object, available as self.uaclient
79  which offers the raw OPC-UA services interface.
80  """
81 
82  def __init__(self, url, timeout=4):
83  """
84 
85  :param url: url of the server.
86  if you are unsure of url, write at least hostname
87  and port and call get_endpoints
88 
89  :param timeout:
90  Each request sent to the server expects an answer within this
91  time. The timeout is specified in seconds.
92  """
93  self.logger = logging.getLogger(__name__)
94  self.server_url = urlparse(url)
95  self.name = "Pure Python Client"
96  self.description = self.name
97  self.application_uri = "urn:freeopcua:client"
98  self.product_uri = "urn:freeopcua.github.no:client"
100  self.secure_channel_id = None
101  self.secure_channel_timeout = 3600000 # 1 hour
102  self.session_timeout = 3600000 # 1 hour
103  self._policy_ids = []
104  self.uaclient = UaClient(timeout)
105  self.user_certificate = None
106  self.user_private_key = None
108  self.keepalive = None
109  self.nodes = Shortcuts(self.uaclient)
110 
111  def __enter__(self):
112  self.connect()
113  return self
114 
115  def __exit__(self, exc_type, exc_value, traceback):
116  self.disconnect()
117 
118  @staticmethod
119  def find_endpoint(endpoints, security_mode, policy_uri):
120  """
121  Find endpoint with required security mode and policy URI
122  """
123  for ep in endpoints:
124  if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
125  ep.SecurityMode == security_mode and
126  ep.SecurityPolicyUri == policy_uri):
127  return ep
128  raise ua.UaError("No matching endpoints: {0}, {1}".format(
129  security_mode, policy_uri))
130 
131  def set_security_string(self, string):
132  """
133  Set SecureConnection mode. String format:
134  Policy,Mode,certificate,private_key[,server_private_key]
135  where Policy is Basic128Rsa15 or Basic256,
136  Mode is Sign or SignAndEncrypt
137  certificate, private_key and server_private_key are
138  paths to .pem or .der files
139  Call this before connect()
140  """
141  if not string:
142  return
143  parts = string.split(',')
144  if len(parts) < 4:
145  raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
146  policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
147  mode = getattr(ua.MessageSecurityMode, parts[1])
148  return self.set_security(policy_class, parts[2], parts[3],
149  parts[4] if len(parts) >= 5 else None, mode)
150 
151  def set_security(self, policy, certificate_path, private_key_path,
152  server_certificate_path=None,
153  mode=ua.MessageSecurityMode.SignAndEncrypt):
154  """
155  Set SecureConnection mode.
156  Call this before connect()
157  """
158  if server_certificate_path is None:
159  # load certificate from server's list of endpoints
160  endpoints = self.connect_and_get_server_endpoints()
161  endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
162  server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
163  else:
164  server_cert = uacrypto.load_certificate(server_certificate_path)
165  cert = uacrypto.load_certificate(certificate_path)
166  pk = uacrypto.load_private_key(private_key_path)
167  self.security_policy = policy(server_cert, cert, pk, mode)
168  self.uaclient.set_security(self.security_policy)
169 
170  def load_client_certificate(self, path):
171  """
172  load our certificate from file, either pem or der
173  """
174  self.user_certificate = uacrypto.load_certificate(path)
175 
176  def load_private_key(self, path):
177  """
178  Load user private key. This is used for authenticating using certificate
179  """
180  self.user_private_key = uacrypto.load_private_key(path)
181 
183  """
184  Connect, ask server for endpoints, and disconnect
185  """
186  self.connect_socket()
187  self.send_hello()
188  self.open_secure_channel()
189  endpoints = self.get_endpoints()
190  self.close_secure_channel()
191  self.disconnect_socket()
192  return endpoints
193 
195  """
196  Connect, ask server for a list of known servers, and disconnect
197  """
198  self.connect_socket()
199  self.send_hello()
200  self.open_secure_channel() # spec says it should not be necessary to open channel
201  servers = self.find_servers()
202  self.close_secure_channel()
203  self.disconnect_socket()
204  return servers
205 
207  """
208  Connect, ask server for a list of known servers on network, and disconnect
209  """
210  self.connect_socket()
211  self.send_hello()
212  self.open_secure_channel()
213  servers = self.find_servers_on_network()
214  self.close_secure_channel()
215  self.disconnect_socket()
216  return servers
217 
218  def connect(self):
219  """
220  High level method
221  Connect, create and activate session
222  """
223  self.connect_socket()
224  self.send_hello()
225  self.open_secure_channel()
226  self.create_session()
227  self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
228 
229  def disconnect(self):
230  """
231  High level method
232  Close session, secure channel and socket
233  """
234  self.close_session()
235  self.close_secure_channel()
236  self.disconnect_socket()
237 
238  def connect_socket(self):
239  """
240  connect to socket defined in url
241  """
242  self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
243 
244  def disconnect_socket(self):
245  self.uaclient.disconnect_socket()
246 
247  def send_hello(self):
248  """
249  Send OPC-UA hello to server
250  """
251  ack = self.uaclient.send_hello(self.server_url.geturl())
252  # FIXME check ack
253 
254  def open_secure_channel(self, renew=False):
255  """
256  Open secure channel, if renew is True, renew channel
257  """
259  params.ClientProtocolVersion = 0
260  params.RequestType = ua.SecurityTokenRequestType.Issue
261  if renew:
262  params.RequestType = ua.SecurityTokenRequestType.Renew
263  params.SecurityMode = self.security_policy.Mode
264  params.RequestedLifetime = self.secure_channel_timeout
265  nonce = utils.create_nonce(self.security_policy.symmetric_key_size) # length should be equal to the length of key of symmetric encryption
266  params.ClientNonce = nonce # this nonce is used to create a symmetric key
267  result = self.uaclient.open_secure_channel(params)
268  self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
269  self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
270 
272  return self.uaclient.close_secure_channel()
273 
274  def get_endpoints(self):
275  params = ua.GetEndpointsParameters()
276  params.EndpointUrl = self.server_url.geturl()
277  return self.uaclient.get_endpoints(params)
278 
279  def register_server(self, server, discovery_configuration=None):
280  """
281  register a server to discovery server
282  if discovery_configuration is provided, the newer register_server2 service call is used
283  """
284  serv = ua.RegisteredServer()
285  serv.ServerUri = server.application_uri
286  serv.ProductUri = server.product_uri
287  serv.DiscoveryUrls = [server.endpoint.geturl()]
288  serv.ServerType = server.application_type
289  serv.ServerNames = [ua.LocalizedText(server.name)]
290  serv.IsOnline = True
291  if discovery_configuration:
293  params.Server = serv
294  params.DiscoveryConfiguration = discovery_configuration
295  return self.uaclient.register_server2(params)
296  else:
297  return self.uaclient.register_server(serv)
298 
299  def find_servers(self, uris=None):
300  """
301  send a FindServer request to the server. The answer should be a list of
302  servers the server knows about
303  A list of uris can be provided, only server having matching uris will be returned
304  """
305  if uris is None:
306  uris = []
307  params = ua.FindServersParameters()
308  params.EndpointUrl = self.server_url.geturl()
309  params.ServerUris = uris
310  return self.uaclient.find_servers(params)
311 
314  return self.uaclient.find_servers_on_network(params)
315 
316  def create_session(self):
317  """
318  send a CreateSessionRequest to server with reasonable parameters.
319  If you want o modify settings look at code of this methods
320  and make your own
321  """
323  desc.ApplicationUri = self.application_uri
324  desc.ProductUri = self.product_uri
325  desc.ApplicationName = ua.LocalizedText(self.name)
326  desc.ApplicationType = ua.ApplicationType.Client
327 
328  params = ua.CreateSessionParameters()
329  nonce = utils.create_nonce(32) # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
330  params.ClientNonce = nonce
331  params.ClientCertificate = self.security_policy.client_certificate
332  params.ClientDescription = desc
333  params.EndpointUrl = self.server_url.geturl()
334  params.SessionName = self.description + " Session" + str(self._session_counter)
335  params.RequestedSessionTimeout = 3600000
336  params.MaxResponseMessageSize = 0 # means no max size
337  response = self.uaclient.create_session(params)
338  if self.security_policy.client_certificate is None:
339  data = nonce
340  else:
341  data = self.security_policy.client_certificate + nonce
342  self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
343  self._server_nonce = response.ServerNonce
344  if not self.security_policy.server_certificate:
345  self.security_policy.server_certificate = response.ServerCertificate
346  elif self.security_policy.server_certificate != response.ServerCertificate:
347  raise ua.UaError("Server certificate mismatch")
348  # remember PolicyId's: we will use them in activate_session()
349  ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
350  self._policy_ids = ep.UserIdentityTokens
351  self.session_timeout = response.RevisedSessionTimeout
352  self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7) # 0.7 is from spec
353  self.keepalive.start()
354  return response
355 
356  def server_policy_id(self, token_type, default):
357  """
358  Find PolicyId of server's UserTokenPolicy by token_type.
359  Return default if there's no matching UserTokenPolicy.
360  """
361  for policy in self._policy_ids:
362  if policy.TokenType == token_type:
363  return policy.PolicyId
364  return default
365 
366  def server_policy_uri(self, token_type):
367  """
368  Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
369  If SecurityPolicyUri is empty, use default SecurityPolicyUri
370  of the endpoint
371  """
372  for policy in self._policy_ids:
373  if policy.TokenType == token_type:
374  if policy.SecurityPolicyUri:
375  return policy.SecurityPolicyUri
376  else: # empty URI means "use this endpoint's policy URI"
377  return self.security_policy.URI
378  return self.security_policy.URI
379 
380  def activate_session(self, username=None, password=None, certificate=None):
381  """
382  Activate session using either username and password or private_key
383  """
385  challenge = b""
386  if self.security_policy.server_certificate is not None:
387  challenge += self.security_policy.server_certificate
388  if self._server_nonce is not None:
389  challenge += self._server_nonce
390  params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
391  params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
392  params.LocaleIds.append("en")
393  if not username and not certificate:
394  self._add_anonymous_auth(params)
395  elif certificate:
396  self._add_certificate_auth(params, certificate, challenge)
397  else:
398  self._add_user_auth(params, username, password)
399  return self.uaclient.activate_session(params)
400 
401  def _add_anonymous_auth(self, params):
402  params.UserIdentityToken = ua.AnonymousIdentityToken()
403  params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
404 
405  def _add_certificate_auth(self, params, certificate, challenge):
406  params.UserIdentityToken = ua.X509IdentityToken()
407  params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
408  params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
409  # specs part 4, 5.6.3.1: the data to sign is created by appending
410  # the last serverNonce to the serverCertificate
411  sig = uacrypto.sign_sha1(self.user_private_key, challenge)
412  params.UserTokenSignature = ua.SignatureData()
413  params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
414  params.UserTokenSignature.Signature = sig
415 
416  def _add_user_auth(self, params, username, password):
417  params.UserIdentityToken = ua.UserNameIdentityToken()
418  params.UserIdentityToken.UserName = username
419  policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
420  if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
421  # see specs part 4, 7.36.3: if the token is NOT encrypted,
422  # then the password only contains UTF-8 encoded password
423  # and EncryptionAlgorithm is null
424  if self.server_url.password:
425  self.logger.warning("Sending plain-text password")
426  params.UserIdentityToken.Password = password
427  params.UserIdentityToken.EncryptionAlgorithm = ''
428  elif self.server_url.password:
429  data, uri = self._encrypt_password(password, policy_uri)
430  params.UserIdentityToken.Password = data
431  params.UserIdentityToken.EncryptionAlgorithm = uri
432  params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
433 
434  def _encrypt_password(self, password, policy_uri):
435  pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
436  # see specs part 4, 7.36.3: if the token is encrypted, password
437  # shall be converted to UTF-8 and serialized with server nonce
438  passwd = password.encode("utf8")
439  if self._server_nonce is not None:
440  passwd += self._server_nonce
441  etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
442  data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
443  return data, uri
444 
445  def close_session(self):
446  """
447  Close session
448  """
449  if self.keepalive:
450  self.keepalive.stop()
451  return self.uaclient.close_session(True)
452 
453  def get_root_node(self):
454  return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
455 
456  def get_objects_node(self):
457  return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
458 
459  def get_server_node(self):
460  return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
461 
462  def get_node(self, nodeid):
463  """
464  Get node using NodeId object or a string representing a NodeId
465  """
466  return Node(self.uaclient, nodeid)
467 
468  def create_subscription(self, period, handler):
469  """
470  Create a subscription.
471  returns a Subscription object which allow
472  to subscribe to events or data on server
473  handler argument is a class with data_change and/or event methods.
474  period argument is either a publishing interval in seconds or a
475  CreateSubscriptionParameters instance. The second option should be used,
476  if the opcua-server has problems with the default options.
477  These methods will be called when notfication from server are received.
478  See example-client.py.
479  Do not do expensive/slow or network operation from these methods
480  since they are called directly from receiving thread. This is a design choice,
481  start another thread if you need to do such a thing.
482  """
483 
484  if isinstance(period, ua.CreateSubscriptionParameters):
485  return Subscription(self.uaclient, period, handler)
487  params.RequestedPublishingInterval = period
488  params.RequestedLifetimeCount = 10000
489  params.RequestedMaxKeepAliveCount = 3000
490  params.MaxNotificationsPerPublish = 10000
491  params.PublishingEnabled = True
492  params.Priority = 0
493  return Subscription(self.uaclient, params, handler)
494 
496  ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
497  return ns_node.get_value()
498 
499  def get_namespace_index(self, uri):
500  uries = self.get_namespace_array()
501  return uries.index(uri)
502 
503  def delete_nodes(self, nodes, recursive=False):
504  return delete_nodes(self.uaclient, nodes, recursive)
505 
def activate_session(self, username=None, password=None, certificate=None)
Definition: client.py:380
def get_node(self, nodeid)
Definition: client.py:462
def find_servers_on_network(self)
Definition: client.py:312
def connect_and_find_servers_on_network(self)
Definition: client.py:206
def get_objects_node(self)
Definition: client.py:456
def get_namespace_array(self)
Definition: client.py:495
def _add_anonymous_auth(self, params)
Definition: client.py:401
def get_namespace_index(self, uri)
Definition: client.py:499
def register_server(self, server, discovery_configuration=None)
Definition: client.py:279
def connect_and_find_servers(self)
Definition: client.py:194
def open_secure_channel(self, renew=False)
Definition: client.py:254
def connect_socket(self)
Definition: client.py:238
def create_subscription(self, period, handler)
Definition: client.py:468
def load_client_certificate(self, path)
Definition: client.py:170
def disconnect_socket(self)
Definition: client.py:244
def __init__(self, url, timeout=4)
Definition: client.py:82
def __init__(self, client, timeout)
Definition: client.py:32
def create_session(self)
Definition: client.py:316
def set_security_string(self, string)
Definition: client.py:131
def server_policy_id(self, token_type, default)
Definition: client.py:356
def set_security(self, policy, certificate_path, private_key_path, server_certificate_path=None, mode=ua.MessageSecurityMode.SignAndEncrypt)
Definition: client.py:153
def _add_certificate_auth(self, params, certificate, challenge)
Definition: client.py:405
def get_server_node(self)
Definition: client.py:459
def find_servers(self, uris=None)
Definition: client.py:299
def __exit__(self, exc_type, exc_value, traceback)
Definition: client.py:115
def load_private_key(self, path)
Definition: client.py:176
def delete_nodes(self, nodes, recursive=False)
Definition: client.py:503
def server_policy_uri(self, token_type)
Definition: client.py:366
def connect_and_get_server_endpoints(self)
Definition: client.py:182
def find_endpoint(endpoints, security_mode, policy_uri)
Definition: client.py:119
def _add_user_auth(self, params, username, password)
Definition: client.py:416
def close_secure_channel(self)
Definition: client.py:271
def _encrypt_password(self, password, policy_uri)
Definition: client.py:434


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:43