security_policies.py
Go to the documentation of this file.
1 from abc import ABCMeta, abstractmethod
2 from opcua.ua import CryptographyNone, SecurityPolicy
3 from opcua.ua import MessageSecurityMode
4 from opcua.ua import UaError
5 try:
6  from opcua.crypto import uacrypto
7  CRYPTOGRAPHY_AVAILABLE = True
8 except ImportError:
9  CRYPTOGRAPHY_AVAILABLE = False
10 
11 
12 POLICY_NONE_URI = 'http://opcfoundation.org/UA/SecurityPolicy#None'
13 
14 
16  """
17  Raise exception if cryptography module is not available.
18  Call this function in constructors.
19  """
20  if not CRYPTOGRAPHY_AVAILABLE:
21  raise UaError("Can't use {0}, cryptography module is not installed".format(obj.__class__.__name__))
22 
23 
24 class Signer(object):
25  """
26  Abstract base class for cryptographic signature algorithm
27  """
28 
29  __metaclass__ = ABCMeta
30 
31  @abstractmethod
32  def signature_size(self):
33  pass
34 
35  @abstractmethod
36  def signature(self, data):
37  pass
38 
39 
40 class Verifier(object):
41  """
42  Abstract base class for cryptographic signature verification
43  """
44 
45  __metaclass__ = ABCMeta
46 
47  @abstractmethod
48  def signature_size(self):
49  pass
50 
51  @abstractmethod
52  def verify(self, data, signature):
53  pass
54 
55 
56 class Encryptor(object):
57  """
58  Abstract base class for encryption algorithm
59  """
60 
61  __metaclass__ = ABCMeta
62 
63  @abstractmethod
64  def plain_block_size(self):
65  pass
66 
67  @abstractmethod
69  pass
70 
71  @abstractmethod
72  def encrypt(self, data):
73  pass
74 
75 
76 class Decryptor(object):
77  """
78  Abstract base class for decryption algorithm
79  """
80 
81  __metaclass__ = ABCMeta
82 
83  @abstractmethod
84  def plain_block_size(self):
85  pass
86 
87  @abstractmethod
89  pass
90 
91  @abstractmethod
92  def decrypt(self, data):
93  pass
94 
95 
96 class Cryptography(CryptographyNone):
97  """
98  Security policy: Sign or SignAndEncrypt
99  """
100 
101  def __init__(self, mode=MessageSecurityMode.Sign):
102  self.Signer = None
103  self.Verifier = None
104  self.Encryptor = None
105  self.Decryptor = None
106  assert mode in (MessageSecurityMode.Sign,
107  MessageSecurityMode.SignAndEncrypt)
108  self.is_encrypted = (mode == MessageSecurityMode.SignAndEncrypt)
109 
110  def plain_block_size(self):
111  """
112  Size of plain text block for block cipher.
113  """
114  if self.is_encrypted:
115  return self.Encryptor.plain_block_size()
116  return 1
117 
119  """
120  Size of encrypted text block for block cipher.
121  """
122  if self.is_encrypted:
123  return self.Encryptor.encrypted_block_size()
124  return 1
125 
126  def padding(self, size):
127  """
128  Create padding for a block of given size.
129  plain_size = size + len(padding) + signature_size()
130  plain_size = N * plain_block_size()
131  """
132  if not self.is_encrypted:
133  return b''
134  block_size = self.Encryptor.plain_block_size()
135  rem = (size + self.signature_size() + 1) % block_size
136  if rem != 0:
137  rem = block_size - rem
138  return bytes(bytearray([rem])) * (rem + 1)
139 
140  def min_padding_size(self):
141  if self.is_encrypted:
142  return 1
143  return 0
144 
145  def signature_size(self):
146  return self.Signer.signature_size()
147 
148  def signature(self, data):
149  return self.Signer.signature(data)
150 
151  def vsignature_size(self):
152  return self.Verifier.signature_size()
153 
154  def verify(self, data, sig):
155  self.Verifier.verify(data, sig)
156 
157  def encrypt(self, data):
158  if self.is_encrypted:
159  assert len(data) % self.Encryptor.plain_block_size() == 0
160  return self.Encryptor.encrypt(data)
161  return data
162 
163  def decrypt(self, data):
164  if self.is_encrypted:
165  return self.Decryptor.decrypt(data)
166  return data
167 
168  def remove_padding(self, data):
169  if self.is_encrypted:
170  pad_size = bytearray(data[-1:])[0] + 1
171  return data[:-pad_size]
172  return data
173 
174 
176 
177  def __init__(self, client_pk):
179  self.client_pk = client_pk
180  self.key_size = self.client_pk.key_size // 8
181 
182  def signature_size(self):
183  return self.key_size
184 
185  def signature(self, data):
186  return uacrypto.sign_sha1(self.client_pk, data)
187 
188 
190 
191  def __init__(self, server_cert):
193  self.server_cert = server_cert
194  self.key_size = self.server_cert.public_key().key_size // 8
195 
196  def signature_size(self):
197  return self.key_size
198 
199  def verify(self, data, signature):
200  uacrypto.verify_sha1(self.server_cert, data, signature)
201 
202 
204 
205  def __init__(self, server_cert, enc_fn, padding_size):
207  self.server_cert = server_cert
208  self.key_size = self.server_cert.public_key().key_size // 8
209  self.encryptor = enc_fn
210  self.padding_size = padding_size
211 
212  def plain_block_size(self):
213  return self.key_size - self.padding_size
214 
216  return self.key_size
217 
218  def encrypt(self, data):
219  encrypted = b''
220  block_size = self.plain_block_size()
221  for i in range(0, len(data), block_size):
222  encrypted += self.encryptor(self.server_cert.public_key(),
223  data[i: i + block_size])
224  return encrypted
225 
226 
228 
229  def __init__(self, client_pk, dec_fn, padding_size):
231  self.client_pk = client_pk
232  self.key_size = self.client_pk.key_size // 8
233  self.decryptor = dec_fn
234  self.padding_size = padding_size
235 
236  def plain_block_size(self):
237  return self.key_size - self.padding_size
238 
240  return self.key_size
241 
242  def decrypt(self, data):
243  decrypted = b''
244  block_size = self.encrypted_block_size()
245  for i in range(0, len(data), block_size):
246  decrypted += self.decryptor(self.client_pk,
247  data[i: i + block_size])
248  return decrypted
249 
250 
252 
253  def __init__(self, key):
255  self.key = key
256 
257  def signature_size(self):
258  return uacrypto.sha1_size()
259 
260  def signature(self, data):
261  return uacrypto.hmac_sha1(self.key, data)
262 
263 
265 
266  def __init__(self, key):
268  self.key = key
269 
270  def signature_size(self):
271  return uacrypto.sha1_size()
272 
273  def verify(self, data, signature):
274  expected = uacrypto.hmac_sha1(self.key, data)
275  if signature != expected:
276  raise uacrypto.InvalidSignature
277 
278 
280 
281  def __init__(self, key, init_vec):
283  self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
284 
285  def plain_block_size(self):
286  return self.cipher.algorithm.key_size // 8
287 
289  return self.cipher.algorithm.key_size // 8
290 
291  def encrypt(self, data):
292  return uacrypto.cipher_encrypt(self.cipher, data)
293 
294 
296 
297  def __init__(self, key, init_vec):
299  self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
300 
301  def plain_block_size(self):
302  return self.cipher.algorithm.key_size // 8
303 
305  return self.cipher.algorithm.key_size // 8
306 
307  def decrypt(self, data):
308  return uacrypto.cipher_decrypt(self.cipher, data)
309 
310 
311 class SecurityPolicyBasic128Rsa15(SecurityPolicy):
312  """
313  Security Basic 128Rsa15
314  A suite of algorithms that uses RSA15 as Key-Wrap-algorithm
315  and 128-Bit (16 bytes) for encryption algorithms.
316  - SymmetricSignatureAlgorithm - HmacSha1
317  (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
318  - SymmetricEncryptionAlgorithm - Aes128
319  (http://www.w3.org/2001/04/xmlenc#aes128-cbc)
320  - AsymmetricSignatureAlgorithm - RsaSha1
321  (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
322  - AsymmetricKeyWrapAlgorithm - KwRsa15
323  (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
324  - AsymmetricEncryptionAlgorithm - Rsa15
325  (http://www.w3.org/2001/04/xmlenc#rsa-1_5)
326  - KeyDerivationAlgorithm - PSha1
327  (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
328  - DerivedSignatureKeyLength - 128 (16 bytes)
329  - MinAsymmetricKeyLength - 1024 (128 bytes)
330  - MaxAsymmetricKeyLength - 2048 (256 bytes)
331  - CertificateSignatureAlgorithm - Sha1
332 
333  If a certificate or any certificate in the chain is not signed with
334  a hash that is Sha1 or stronger then the certificate shall be rejected.
335  """
336 
337  URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15"
338  signature_key_size = 16
339  symmetric_key_size = 16
340  AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-1_5"
341 
342  @staticmethod
343  def encrypt_asymmetric(pubkey, data):
344  return uacrypto.encrypt_rsa15(pubkey, data)
345 
346  def __init__(self, server_cert, client_cert, client_pk, mode):
348  if isinstance(server_cert, bytes):
349  server_cert = uacrypto.x509_from_der(server_cert)
350  # even in Sign mode we need to asymmetrically encrypt secrets
351  # transmitted in OpenSecureChannel. So SignAndEncrypt here
353  MessageSecurityMode.SignAndEncrypt)
354  self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
355  self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
356  self.asymmetric_cryptography.Encryptor = EncryptorRsa(
357  server_cert, uacrypto.encrypt_rsa15, 11)
358  self.asymmetric_cryptography.Decryptor = DecryptorRsa(
359  client_pk, uacrypto.decrypt_rsa15, 11)
361  self.Mode = mode
362  self.server_certificate = uacrypto.der_from_x509(server_cert)
363  self.client_certificate = uacrypto.der_from_x509(client_cert)
364 
365  def make_symmetric_key(self, nonce1, nonce2):
366  key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
367 
368  (sigkey, key, init_vec) = uacrypto.p_sha1(nonce2, nonce1, key_sizes)
369  self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
370  self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
371 
372  (sigkey, key, init_vec) = uacrypto.p_sha1(nonce1, nonce2, key_sizes)
373  self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
374  self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
375 
376 
377 class SecurityPolicyBasic256(SecurityPolicy):
378  """
379  Security Basic 256
380  A suite of algorithms that are for 256-Bit (32 bytes) encryption,
381  algorithms include:
382  - SymmetricSignatureAlgorithm - HmacSha1
383  (http://www.w3.org/2000/09/xmldsig#hmac-sha1)
384  - SymmetricEncryptionAlgorithm - Aes256
385  (http://www.w3.org/2001/04/xmlenc#aes256-cbc)
386  - AsymmetricSignatureAlgorithm - RsaSha1
387  (http://www.w3.org/2000/09/xmldsig#rsa-sha1)
388  - AsymmetricKeyWrapAlgorithm - KwRsaOaep
389  (http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p)
390  - AsymmetricEncryptionAlgorithm - RsaOaep
391  (http://www.w3.org/2001/04/xmlenc#rsa-oaep)
392  - KeyDerivationAlgorithm - PSha1
393  (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1)
394  - DerivedSignatureKeyLength - 192 (24 bytes)
395  - MinAsymmetricKeyLength - 1024 (128 bytes)
396  - MaxAsymmetricKeyLength - 2048 (256 bytes)
397  - CertificateSignatureAlgorithm - Sha1
398 
399  If a certificate or any certificate in the chain is not signed with
400  a hash that is Sha1 or stronger then the certificate shall be rejected.
401  """
402 
403  URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256"
404  signature_key_size = 24
405  symmetric_key_size = 32
406  AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-oaep"
407 
408  @staticmethod
409  def encrypt_asymmetric(pubkey, data):
410  return uacrypto.encrypt_rsa_oaep(pubkey, data)
411 
412  def __init__(self, server_cert, client_cert, client_pk, mode):
414  if isinstance(server_cert, bytes):
415  server_cert = uacrypto.x509_from_der(server_cert)
416  # even in Sign mode we need to asymmetrically encrypt secrets
417  # transmitted in OpenSecureChannel. So SignAndEncrypt here
419  MessageSecurityMode.SignAndEncrypt)
420  self.asymmetric_cryptography.Signer = SignerRsa(client_pk)
421  self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert)
422  self.asymmetric_cryptography.Encryptor = EncryptorRsa(
423  server_cert, uacrypto.encrypt_rsa_oaep, 42)
424  self.asymmetric_cryptography.Decryptor = DecryptorRsa(
425  client_pk, uacrypto.decrypt_rsa_oaep, 42)
427  self.Mode = mode
428  self.server_certificate = uacrypto.der_from_x509(server_cert)
429  self.client_certificate = uacrypto.der_from_x509(client_cert)
430 
431  def make_symmetric_key(self, nonce1, nonce2):
432  # specs part 6, 6.7.5
433  key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
434 
435  (sigkey, key, init_vec) = uacrypto.p_sha1(nonce2, nonce1, key_sizes)
436  self.symmetric_cryptography.Signer = SignerAesCbc(sigkey)
437  self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
438 
439  (sigkey, key, init_vec) = uacrypto.p_sha1(nonce1, nonce2, key_sizes)
440  self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
441  self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
442 
443 
444 def encrypt_asymmetric(pubkey, data, policy_uri):
445  """
446  Encrypt data with pubkey using an asymmetric algorithm.
447  The algorithm is selected by policy_uri.
448  Returns a tuple (encrypted_data, algorithm_uri)
449  """
450  for cls in [SecurityPolicyBasic256, SecurityPolicyBasic128Rsa15]:
451  if policy_uri == cls.URI:
452  return (cls.encrypt_asymmetric(pubkey, data),
453  cls.AsymmetricEncryptionURI)
454  if not policy_uri or policy_uri == POLICY_NONE_URI:
455  return (data, '')
456  raise UaError("Unsupported security policy `{0}`".format(policy_uri))
def __init__(self, mode=MessageSecurityMode.Sign)
def __init__(self, client_pk, dec_fn, padding_size)
def __init__(self, server_cert, client_cert, client_pk, mode)
def __init__(self, server_cert, client_cert, client_pk, mode)
def __init__(self, server_cert, enc_fn, padding_size)
def encrypt_asymmetric(pubkey, data, policy_uri)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44