uatypes.py
Go to the documentation of this file.
1 """
2 implement ua datatypes
3 """
4 import struct
5 from enum import Enum, IntEnum, EnumMeta
6 from datetime import datetime
7 import sys
8 import os
9 import uuid
10 import re
11 import itertools
12 
13 from opcua.ua import ua_binary as uabin
14 from opcua.ua import status_codes
15 from opcua.ua import ObjectIds
16 from opcua.ua.uaerrors import UaError
17 from opcua.ua.uaerrors import UaStatusCodeError
18 from opcua.ua.uaerrors import UaStringParsingError
19 
20 
21 if sys.version_info.major > 2:
22  unicode = str
24  return uabin.win_epoch_to_datetime(0)
25 
26 
27 class _FrozenClass(object):
28 
29  """
30  Make it impossible to add members to a class.
31  Not pythonic at all but we found out it prevents many many
32  bugs in use of protocol structures
33  """
34  _freeze = False
35 
36  def __setattr__(self, key, value):
37  if self._freeze and not hasattr(self, key):
38  raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
39  key, self.__class__.__name__, self.__dict__.keys()))
40  object.__setattr__(self, key, value)
41 
42 
43 if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
44  # typo check is cpu consuming, but it will make debug easy.
45  # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
46  # this will make all uatype class inherit from object intead of _FrozenClass
47  # and skip the typo check.
48  FrozenClass = object
49 else:
50  FrozenClass = _FrozenClass
51 
52 
53 class ValueRank(IntEnum):
54  """
55  Defines dimensions of a variable.
56  This enum does not support all cases since ValueRank support any n>0
57  but since it is an IntEnum it can be replace by a normal int
58  """
59  ScalarOrOneDimension = -3
60  Any = -2
61  Scalar = -1
62  OneOrMoreDimensions = 0
63  OneDimension = 1
64  # the next names are not in spec but so common we express them here
65  TwoDimensions = 2
66  ThreeDimensions = 3
67  FourDimensions = 4
68 
69 
70 class _MaskEnum(IntEnum):
71 
72  @classmethod
73  def parse_bitfield(cls, the_int):
74  """ Take an integer and interpret it as a set of enum values. """
75  assert isinstance(the_int, int)
76 
77  return {cls(b) for b in cls._bits(the_int)}
78 
79  @classmethod
80  def to_bitfield(cls, collection):
81  """ Takes some enum values and creates an integer from them. """
82  # make sure all elements are of the correct type (use itertools.tee in case we get passed an
83  # iterator)
84  iter1, iter2 = itertools.tee(iter(collection))
85  assert all(isinstance(x, cls) for x in iter1)
86 
87  return sum(x.mask for x in iter2)
88 
89  @property
90  def mask(self):
91  return 1 << self.value
92 
93  @staticmethod
94  def _bits(n):
95  """ Iterate over the bits in n.
96 
97  e.g. bits(44) yields at 2, 3, 5
98  """
99  assert n >= 0 # avoid infinite recursion
100 
101  pos = 0
102  while n:
103  if n & 0x1:
104  yield pos
105  n = n // 2
106  pos += 1
107 
108 
110  """
111  Bit index to indicate what the access level is.
112 
113  Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
114  """
115  CurrentRead = 0
116  CurrentWrite = 1
117  HistoryRead = 2
118  HistoryWrite = 3
119  SemanticChange = 4
120  StatusWrite = 5
121  TimestampWrite = 6
122 
123 
125  """
126  Bit index to indicate which attribute of a node is writable
127 
128  Spec Part 3, Paragraph 5.2.7 WriteMask
129  """
130  AccessLevel = 0
131  ArrayDimensions = 1
132  BrowseName = 2
133  ContainsNoLoops = 3
134  DataType = 4
135  Description = 5
136  DisplayName = 6
137  EventNotifier = 7
138  Executable = 8
139  Historizing = 9
140  InverseName = 10
141  IsAbstract = 11
142  MinimumSamplingInterval = 12
143  NodeClass = 13
144  NodeId = 14
145  Symmetric = 15
146  UserAccessLevel = 16
147  UserExecutable = 17
148  UserWriteMask = 18
149  ValueRank = 19
150  WriteMask = 20
151  ValueForVariableType = 21
152 
153 
155  """
156  Bit index to indicate how a node can be used for events.
157 
158  Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
159  """
160  SubscribeToEvents = 0
161  # Reserved = 1
162  HistoryRead = 2
163  HistoryWrite = 3
164 
165 
167  """
168  :ivar value:
169  :vartype value: int
170  :ivar name:
171  :vartype name: string
172  :ivar doc:
173  :vartype doc: string
174  """
175 
176  def __init__(self, value=0):
177  if isinstance(value, str):
178  self.name = value
179  self.value = getattr(status_codes.StatusCodes, value)
180  else:
181  self.value = value
182  self.name, self.doc = status_codes.get_name_and_doc(value)
183  self._freeze = True
184 
185  def to_binary(self):
186  return uabin.Primitives.UInt32.pack(self.value)
187 
188  @staticmethod
189  def from_binary(data):
190  val = uabin.Primitives.UInt32.unpack(data)
191  sc = StatusCode(val)
192  return sc
193 
194  def check(self):
195  """
196  Raises an exception if the status code is anything else than 0 (good).
197 
198  Use the is_good() method if you do not want an exception.
199  """
200  if not self.is_good():
201  raise UaStatusCodeError(self.value)
202 
203  def is_good(self):
204  """
205  return True if status is Good.
206  """
207  mask = 3 << 30
208  if mask & self.value:
209  return False
210  else:
211  return True
212 
213  def __str__(self):
214  return 'StatusCode({0})'.format(self.name)
215  __repr__ = __str__
216 
217  def __eq__(self, other):
218  return self.value == other.value
219 
220  def __ne__(self, other):
221  return not self.__eq__(other)
222 
223 
224 class NodeIdType(IntEnum):
225  TwoByte = 0
226  FourByte = 1
227  Numeric = 2
228  String = 3
229  Guid = 4
230  ByteString = 5
231 
232 
234  """
235  NodeId Object
236 
237  Args:
238  identifier: The identifier might be an int, a string, bytes or a Guid
239  namespaceidx(int): The index of the namespace
240  nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
241 
242 
243  :ivar Identifier:
244  :vartype Identifier: NodeId
245  :ivar NamespaceIndex:
246  :vartype NamespaceIndex: Int
247  :ivar NamespaceUri:
248  :vartype NamespaceUri: String
249  :ivar ServerIndex:
250  :vartype ServerIndex: Int
251  """
252 
253  def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
254 
255  self.Identifier = identifier
256  self.NamespaceIndex = namespaceidx
257  self.NodeIdType = nodeidtype
258  self.NamespaceUri = ""
259  self.ServerIndex = 0
260  self._freeze = True
261  if not isinstance(self.NamespaceIndex, int):
262  raise UaError("NamespaceIndex must be an int")
263  if self.Identifier is None:
264  self.Identifier = 0
265  self.NodeIdType = NodeIdType.TwoByte
266  return
267  if self.NodeIdType is None:
268  if isinstance(self.Identifier, int):
269  self.NodeIdType = NodeIdType.Numeric
270  elif isinstance(self.Identifier, str):
271  self.NodeIdType = NodeIdType.String
272  elif isinstance(self.Identifier, bytes):
273  self.NodeIdType = NodeIdType.ByteString
274  elif isinstance(self.Identifier, uuid.UUID):
275  self.NodeIdType = NodeIdType.Guid
276  else:
277  raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
278 
279  def _key(self):
280  if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):
281  # twobyte, fourbyte and numeric may represent the same node
282  return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
283  return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
284 
285  def __eq__(self, node):
286  return isinstance(node, NodeId) and self._key() == node._key()
287 
288  def __ne__(self, other):
289  return not self.__eq__(other)
290 
291  def __hash__(self):
292  return hash(self._key())
293 
294  def __lt__(self, other):
295  if not isinstance(other, NodeId):
296  raise AttributeError("Can only compare to NodeId")
297  return self._key() < other._key()
298 
299  def is_null(self):
300  if self.NamespaceIndex != 0:
301  return False
302  return self.has_null_identifier()
303 
305  if not self.Identifier:
306  return True
307  if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
308  return True
309  return False
310 
311  @staticmethod
312  def from_string(string):
313  try:
314  return NodeId._from_string(string)
315  except ValueError as ex:
316  raise UaStringParsingError("Error parsing string {0}".format(string), ex)
317 
318  @staticmethod
319  def _from_string(string):
320  l = string.split(";")
321  identifier = None
322  namespace = 0
323  ntype = None
324  srv = None
325  nsu = None
326  for el in l:
327  if not el:
328  continue
329  k, v = el.split("=", 1)
330  k = k.strip()
331  v = v.strip()
332  if k == "ns":
333  namespace = int(v)
334  elif k == "i":
335  ntype = NodeIdType.Numeric
336  identifier = int(v)
337  elif k == "s":
338  ntype = NodeIdType.String
339  identifier = v
340  elif k == "g":
341  ntype = NodeIdType.Guid
342  identifier = v
343  elif k == "b":
344  ntype = NodeIdType.ByteString
345  identifier = v
346  elif k == "srv":
347  srv = v
348  elif k == "nsu":
349  nsu = v
350  if identifier is None:
351  raise UaStringParsingError("Could not find identifier in string: " + string)
352  nodeid = NodeId(identifier, namespace, ntype)
353  nodeid.NamespaceUri = nsu
354  nodeid.ServerIndex = srv
355  return nodeid
356 
357  def to_string(self):
358  string = ""
359  if self.NamespaceIndex != 0:
360  string += "ns={0};".format(self.NamespaceIndex)
361  ntype = None
362  if self.NodeIdType == NodeIdType.Numeric:
363  ntype = "i"
364  elif self.NodeIdType == NodeIdType.String:
365  ntype = "s"
366  elif self.NodeIdType == NodeIdType.TwoByte:
367  ntype = "i"
368  elif self.NodeIdType == NodeIdType.FourByte:
369  ntype = "i"
370  elif self.NodeIdType == NodeIdType.Guid:
371  ntype = "g"
372  elif self.NodeIdType == NodeIdType.ByteString:
373  ntype = "b"
374  string += "{0}={1}".format(ntype, self.Identifier)
375  if self.ServerIndex:
376  string = "srv=" + str(self.ServerIndex) + string
377  if self.NamespaceUri:
378  string += "nsu={0}".format(self.NamespaceUri)
379  return string
380 
381  def __str__(self):
382  return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
383  __repr__ = __str__
384 
385  def to_binary(self):
386  if self.NodeIdType == NodeIdType.TwoByte:
387  return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
388  elif self.NodeIdType == NodeIdType.FourByte:
389  return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
390  elif self.NodeIdType == NodeIdType.Numeric:
391  return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
392  elif self.NodeIdType == NodeIdType.String:
393  return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
394  uabin.Primitives.String.pack(self.Identifier)
395  elif self.NodeIdType == NodeIdType.ByteString:
396  return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
397  uabin.Primitives.Bytes.pack(self.Identifier)
398  elif self.NodeIdType == NodeIdType.Guid:
399  return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
400  uabin.Primitives.Guid.pack(self.Identifier)
401  else:
402  return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
403  self.Identifier.to_binary()
404  # FIXME: Missing NNamespaceURI and ServerIndex
405 
406  @staticmethod
407  def from_binary(data):
408  nid = NodeId()
409  encoding = ord(data.read(1))
410  nid.NodeIdType = NodeIdType(encoding & 0b00111111)
411 
412  if nid.NodeIdType == NodeIdType.TwoByte:
413  nid.Identifier = ord(data.read(1))
414  elif nid.NodeIdType == NodeIdType.FourByte:
415  nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
416  elif nid.NodeIdType == NodeIdType.Numeric:
417  nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
418  elif nid.NodeIdType == NodeIdType.String:
419  nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
420  nid.Identifier = uabin.Primitives.String.unpack(data)
421  elif nid.NodeIdType == NodeIdType.ByteString:
422  nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
423  nid.Identifier = uabin.Primitives.Bytes.unpack(data)
424  elif nid.NodeIdType == NodeIdType.Guid:
425  nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
426  nid.Identifier = uabin.Primitives.Guid.unpack(data)
427  else:
428  raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
429 
430  if uabin.test_bit(encoding, 7):
431  nid.NamespaceUri = uabin.Primitives.String.unpack(data)
432  if uabin.test_bit(encoding, 6):
433  nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
434 
435  return nid
436 
437 
439 
440  def __init__(self, identifier):
441  NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
442 
443 
445 
446  def __init__(self, identifier, namespace=0):
447  NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
448 
449 
451 
452  def __init__(self, identifier, namespace=0):
453  NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
454 
455 
457 
458  def __init__(self, identifier, namespace=0):
459  NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
460 
461 
463 
464  def __init__(self, identifier, namespace=0):
465  NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
466 
467 
469 
470  def __init__(self, identifier, namespace=0):
471  NodeId.__init__(self, identifier, namespace, NodeIdType.String)
472 
473 
474 ExpandedNodeId = NodeId
475 
476 
478  """
479  A string qualified with a namespace index.
480  """
481 
482  def __init__(self, name=None, namespaceidx=0):
483  if not isinstance(namespaceidx, int):
484  raise UaError("namespaceidx must be an int")
485  self.NamespaceIndex = namespaceidx
486  self.Name = name
487  self._freeze = True
488 
489  def to_string(self):
490  return "{0}:{1}".format(self.NamespaceIndex, self.Name)
491 
492  @staticmethod
493  def from_string(string):
494  if ":" in string:
495  try:
496  idx, name = string.split(":", 1)
497  idx = int(idx)
498  except (TypeError, ValueError) as ex:
499  raise UaStringParsingError("Error parsing string {0}".format(string), ex)
500  else:
501  idx = 0
502  name = string
503  return QualifiedName(name, idx)
504 
505  def to_binary(self):
506  packet = []
507  packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
508  packet.append(uabin.Primitives.String.pack(self.Name))
509  return b''.join(packet)
510 
511  @staticmethod
512  def from_binary(data):
513  obj = QualifiedName()
514  obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
515  obj.Name = uabin.Primitives.String.unpack(data)
516  return obj
517 
518  def __eq__(self, bname):
519  return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
520 
521  def __ne__(self, other):
522  return not self.__eq__(other)
523 
524  def __lt__(self, other):
525  if not isinstance(other, QualifiedName):
526  raise TypeError("Cannot compare QualifiedName and {0}".format(other))
527  if self.NamespaceIndex == other.NamespaceIndex:
528  return self.Name < other.Name
529  else:
530  return self.NamespaceIndex < other.NamespaceIndex
531 
532  def __str__(self):
533  return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
534 
535  __repr__ = __str__
536 
537 
539  """
540  A string qualified with a namespace index.
541  """
542 
543  ua_types = {
544  "Text": "ByteString",
545  "Locale": "ByteString"
546  }
547 
548  def __init__(self, text=None):
549  self.Encoding = 0
550  self.Text = text
551  if isinstance(self.Text, unicode):
552  self.Text = self.Text.encode('utf-8')
553  if self.Text:
554  self.Encoding |= (1 << 1)
555  self.Locale = None
556  self._freeze = True
557 
558  def to_binary(self):
559  packet = []
560  if self.Locale:
561  self.Encoding |= (1 << 0)
562  if self.Text:
563  self.Encoding |= (1 << 1)
564  packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
565  if self.Locale:
566  packet.append(uabin.Primitives.Bytes.pack(self.Locale))
567  if self.Text:
568  packet.append(uabin.Primitives.Bytes.pack(self.Text))
569  return b''.join(packet)
570 
571  @staticmethod
572  def from_binary(data):
573  obj = LocalizedText()
574  obj.Encoding = ord(data.read(1))
575  if obj.Encoding & (1 << 0):
576  obj.Locale = uabin.Primitives.Bytes.unpack(data)
577  if obj.Encoding & (1 << 1):
578  obj.Text = uabin.Primitives.Bytes.unpack(data)
579  return obj
580 
581  def to_string(self):
582  # FIXME: use local
583  if self.Text is None:
584  return ""
585  return self.Text.decode('utf-8')
586 
587  def __str__(self):
588  return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
589  'Locale:' + str(self.Locale) + ', ' + \
590  'Text:' + str(self.Text) + ')'
591  __repr__ = __str__
592 
593  def __eq__(self, other):
594  if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
595  return True
596  return False
597 
598  def __ne__(self, other):
599  return not self.__eq__(other)
600 
601 
603  """
604  Any UA object packed as an ExtensionObject
605 
606  :ivar TypeId:
607  :vartype TypeId: NodeId
608  :ivar Body:
609  :vartype Body: bytes
610  """
611 
612  def __init__(self):
613  self.TypeId = NodeId()
614  self.Encoding = 0
615  self.Body = b''
616  self._freeze = True
617 
618  def to_binary(self):
619  packet = []
620  if self.Body:
621  self.Encoding |= (1 << 0)
622  packet.append(self.TypeId.to_binary())
623  packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
624  if self.Body:
625  packet.append(uabin.Primitives.ByteString.pack(self.Body))
626  return b''.join(packet)
627 
628  @staticmethod
629  def from_binary(data):
630  obj = ExtensionObject()
631  obj.TypeId = NodeId.from_binary(data)
632  obj.Encoding = uabin.Primitives.UInt8.unpack(data)
633  if obj.Encoding & (1 << 0):
634  obj.Body = uabin.Primitives.ByteString.unpack(data)
635  return obj
636 
637  @staticmethod
638  def from_object(obj):
639  ext = ExtensionObject()
640  oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
641  ext.TypeId = FourByteNodeId(oid)
642  ext.Body = obj.to_binary()
643  return ext
644 
645  def __str__(self):
646  return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
647  'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
648 
649  __repr__ = __str__
650 
651 
652 class VariantType(Enum):
653  """
654  The possible types of a variant.
655 
656  :ivar Null:
657  :ivar Boolean:
658  :ivar SByte:
659  :ivar Byte:
660  :ivar Int16:
661  :ivar UInt16:
662  :ivar Int32:
663  :ivar UInt32:
664  :ivar Int64:
665  :ivar UInt64:
666  :ivar Float:
667  :ivar Double:
668  :ivar String:
669  :ivar DateTime:
670  :ivar Guid:
671  :ivar ByteString:
672  :ivar XmlElement:
673  :ivar NodeId:
674  :ivar ExpandedNodeId:
675  :ivar StatusCode:
676  :ivar QualifiedName:
677  :ivar LocalizedText:
678  :ivar ExtensionObject:
679  :ivar DataValue:
680  :ivar Variant:
681  :ivar DiagnosticInfo:
682  """
683 
684  Null = 0
685  Boolean = 1
686  SByte = 2
687  Byte = 3
688  Int16 = 4
689  UInt16 = 5
690  Int32 = 6
691  UInt32 = 7
692  Int64 = 8
693  UInt64 = 9
694  Float = 10
695  Double = 11
696  String = 12
697  DateTime = 13
698  Guid = 14
699  ByteString = 15
700  XmlElement = 16
701  NodeId = 17
702  ExpandedNodeId = 18
703  StatusCode = 19
704  QualifiedName = 20
705  LocalizedText = 21
706  ExtensionObject = 22
707  DataValue = 23
708  Variant = 24
709  DiagnosticInfo = 25
710 
711 
712 class VariantTypeCustom(object):
713  """
714  Looks like sometime we get variant with other values than those
715  defined in VariantType.
716  FIXME: We should not need this class, as far as I iunderstand the spec
717  variants can only be of VariantType
718  """
719 
720  def __init__(self, val):
721  self.name = "Custom"
722  self.value = val
723  if self.value > 0b00111111:
724  raise UaError("Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
725 
726  def __str__(self):
727  return "VariantType.Custom:{0}".format(self.value)
728  __repr__ = __str__
729 
730  def __eq__(self, other):
731  return self.value == other.value
732 
733 
735  """
736  Create an OPC-UA Variant object.
737  if no argument a Null Variant is created.
738  if not variant type is given, attemps to guess type from python type
739  if a variant is given as value, the new objects becomes a copy of the argument
740 
741  :ivar Value:
742  :vartype Value: Any supported type
743  :ivar VariantType:
744  :vartype VariantType: VariantType
745  """
746 
747  def __init__(self, value=None, varianttype=None, dimensions=None):
748  self.Value = value
749  self.VariantType = varianttype
750  self.Dimensions = dimensions
751  self._freeze = True
752  if isinstance(value, Variant):
753  self.Value = value.Value
754  self.VariantType = value.VariantType
755  if self.VariantType is None:
756  self.VariantType = self._guess_type(self.Value)
757  if self.Value is None and self.VariantType not in (
758  VariantType.Null,
759  VariantType.String,
760  VariantType.DateTime):
761  raise UaError("Variant of type {0} cannot have value None".format(self.VariantType))
762  if self.Dimensions is None and type(self.Value) in (list, tuple):
763  dims = get_shape(self.Value)
764  if len(dims) > 1:
765  self.Dimensions = dims
766 
767  def __eq__(self, other):
768  if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
769  return True
770  return False
771 
772  def __ne__(self, other):
773  return not self.__eq__(other)
774 
775  def _guess_type(self, val):
776  if isinstance(val, (list, tuple)):
777  error_val = val
778  while isinstance(val, (list, tuple)):
779  if len(val) == 0:
780  raise UaError("could not guess UA type of variable {0}".format(error_val))
781  val = val[0]
782  if val is None:
783  return VariantType.Null
784  elif isinstance(val, bool):
785  return VariantType.Boolean
786  elif isinstance(val, float):
787  return VariantType.Double
788  elif isinstance(val, int):
789  return VariantType.Int64
790  elif type(val) in (str, unicode):
791  return VariantType.String
792  elif isinstance(val, bytes):
793  return VariantType.ByteString
794  elif isinstance(val, datetime):
795  return VariantType.DateTime
796  elif isinstance(val, uuid.UUID):
797  return VariantType.Guid
798  else:
799  if isinstance(val, object):
800  try:
801  return getattr(VariantType, val.__class__.__name__)
802  except AttributeError:
803  return VariantType.ExtensionObject
804  else:
805  raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
806 
807  def __str__(self):
808  return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
809  __repr__ = __str__
810 
811  def to_binary(self):
812  b = []
813  encoding = self.VariantType.value & 0b111111
814  if type(self.Value) in (list, tuple):
815  if self.Dimensions is not None:
816  encoding = uabin.set_bit(encoding, 6)
817  encoding = uabin.set_bit(encoding, 7)
818  b.append(uabin.Primitives.UInt8.pack(encoding))
819  b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
820  if self.Dimensions is not None:
821  b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
822  else:
823  b.append(uabin.Primitives.UInt8.pack(encoding))
824  b.append(uabin.pack_uatype(self.VariantType, self.Value))
825 
826  return b"".join(b)
827 
828  @staticmethod
829  def from_binary(data):
830  dimensions = None
831  encoding = ord(data.read(1))
832  int_type = encoding & 0b00111111
833  vtype = datatype_to_varianttype(int_type)
834  if vtype == VariantType.Null:
835  return Variant(None, vtype, encoding)
836  if uabin.test_bit(encoding, 7):
837  value = uabin.unpack_uatype_array(vtype, data)
838  else:
839  value = uabin.unpack_uatype(vtype, data)
840  if uabin.test_bit(encoding, 6):
841  dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
842  value = reshape(value, dimensions)
843 
844  return Variant(value, vtype, dimensions)
845 
846 
847 def reshape(flat, dims):
848  subdims = dims[1:]
849  subsize = 1
850  for i in subdims:
851  if i == 0:
852  i = 1
853  subsize *= i
854  while dims[0] * subsize > len(flat):
855  flat.append([])
856  if not subdims or subdims == [0]:
857  return flat
858  return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
859 
860 
861 def _split_list(l, n):
862  n = max(1, n)
863  return [l[i:i + n] for i in range(0, len(l), n)]
864 
865 
867  dims = []
868  dims.append(len(mylist))
869  while isinstance(mylist[0], (list, tuple)):
870  dims.append(len(mylist[0]))
871  mylist = [item for sublist in mylist for item in sublist]
872  if len(mylist) == 0:
873  break
874  return mylist, dims
875 
876 
877 def flatten(mylist):
878  if len(mylist) == 0:
879  return mylist
880  while isinstance(mylist[0], (list, tuple)):
881  mylist = [item for sublist in mylist for item in sublist]
882  if len(mylist) == 0:
883  break
884  return mylist
885 
886 
887 def get_shape(mylist):
888  dims = []
889  while isinstance(mylist, (list, tuple)):
890  dims.append(len(mylist))
891  if len(mylist) == 0:
892  break
893  mylist = mylist[0]
894  return dims
895 
896 
898  """
899  An XML element encoded as an UTF-8 string.
900  """
901 
902  def __init__(self, binary=None):
903  if binary is not None:
904  self._binary_init(binary)
905  self._freeze = True
906  return
907  self.Value = []
908  self._freeze = True
909 
910  def to_binary(self):
911  return uabin.Primitives.String.pack(self.Value)
912 
913  @staticmethod
914  def from_binary(data):
915  return XmlElement(data)
916 
917  def _binary_init(self, data):
918  self.Value = uabin.Primitives.String.unpack(data)
919 
920  def __str__(self):
921  return 'XmlElement(Value:' + str(self.Value) + ')'
922 
923  __repr__ = __str__
924 
925 
927  """
928  A value with an associated timestamp, and quality.
929  Automatically generated from xml , copied and modified here to fix errors in xml spec
930 
931  :ivar Value:
932  :vartype Value: Variant
933  :ivar StatusCode:
934  :vartype StatusCode: StatusCode
935  :ivar SourceTimestamp:
936  :vartype SourceTimestamp: datetime
937  :ivar SourcePicoSeconds:
938  :vartype SourcePicoSeconds: int
939  :ivar ServerTimestamp:
940  :vartype ServerTimestamp: datetime
941  :ivar ServerPicoseconds:
942  :vartype ServerPicoseconds: int
943  """
944 
945  def __init__(self, variant=None, status=None):
946  self.Encoding = 0
947  if not isinstance(variant, Variant):
948  variant = Variant(variant)
949  self.Value = variant
950  if status is None:
952  else:
953  self.StatusCode = status
954  self.SourceTimestamp = None # DateTime()
955  self.SourcePicoseconds = None
956  self.ServerTimestamp = None # DateTime()
957  self.ServerPicoseconds = None
958  self._freeze = True
959 
960  def to_binary(self):
961  packet = []
962  if self.Value:
963  self.Encoding |= (1 << 0)
964  if self.StatusCode:
965  self.Encoding |= (1 << 1)
966  if self.SourceTimestamp:
967  self.Encoding |= (1 << 2)
968  if self.ServerTimestamp:
969  self.Encoding |= (1 << 3)
970  if self.SourcePicoseconds:
971  self.Encoding |= (1 << 4)
972  if self.ServerPicoseconds:
973  self.Encoding |= (1 << 5)
974  packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
975  if self.Value:
976  packet.append(self.Value.to_binary())
977  if self.StatusCode:
978  packet.append(self.StatusCode.to_binary())
979  if self.SourceTimestamp:
980  packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp)) # self.SourceTimestamp.to_binary())
981  if self.ServerTimestamp:
982  packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp)) # self.ServerTimestamp.to_binary())
983  if self.SourcePicoseconds:
984  packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
985  if self.ServerPicoseconds:
986  packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
987  return b''.join(packet)
988 
989  @staticmethod
990  def from_binary(data):
991  encoding = ord(data.read(1))
992  if encoding & (1 << 0):
993  value = Variant.from_binary(data)
994  else:
995  value = None
996  if encoding & (1 << 1):
997  status = StatusCode.from_binary(data)
998  else:
999  status = None
1000  obj = DataValue(value, status)
1001  obj.Encoding = encoding
1002  if obj.Encoding & (1 << 2):
1003  obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data) # DateTime.from_binary(data)
1004  if obj.Encoding & (1 << 3):
1005  obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data) # DateTime.from_binary(data)
1006  if obj.Encoding & (1 << 4):
1007  obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
1008  if obj.Encoding & (1 << 5):
1009  obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
1010  return obj
1011 
1012  def __str__(self):
1013  s = 'DataValue(Value:{0}'.format(self.Value)
1014  if self.StatusCode is not None:
1015  s += ', StatusCode:{0}'.format(self.StatusCode)
1016  if self.SourceTimestamp is not None:
1017  s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
1018  if self.ServerTimestamp is not None:
1019  s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
1020  if self.SourcePicoseconds is not None:
1021  s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
1022  if self.ServerPicoseconds is not None:
1023  s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
1024  s += ')'
1025  return s
1026 
1027  __repr__ = __str__
1028 
1029 
1031  """
1032  Takes a NodeId or int and return a VariantType
1033  This is only supported if int_type < 63 due to VariantType encoding
1034  At low level we do not have access to address space thus decoding is limited
1035  a better version of this method can be find in ua_utils.py
1036  """
1037  if isinstance(int_type, NodeId):
1038  int_type = int_type.Identifier
1039 
1040  if int_type <= 25:
1041  return VariantType(int_type)
1042  else:
1043  return VariantTypeCustom(int_type)
1044 
1045 
1047  """
1048  Given a variant type return default value for this type
1049  """
1050  if vtype == VariantType.Null:
1051  return None
1052  elif vtype == VariantType.Boolean:
1053  return False
1054  elif vtype in (VariantType.SByte, VariantType.Byte, VariantType.ByteString):
1055  return b""
1056  elif 4 <= vtype.value <= 9:
1057  return 0
1058  elif vtype in (VariantType.Float, VariantType.Double):
1059  return 0.0
1060  elif vtype == VariantType.String:
1061  return None # a string can be null
1062  elif vtype == VariantType.DateTime:
1063  return datetime.now()
1064  elif vtype == VariantType.Guid:
1065  return uuid.uuid4()
1066  elif vtype == VariantType.XmlElement:
1067  return None #Not sure this is correct
1068  elif vtype == VariantType.NodeId:
1069  return NodeId()
1070  elif vtype == VariantType.ExpandedNodeId:
1071  return NodeId()
1072  elif vtype == VariantType.StatusCode:
1073  return StatusCode()
1074  elif vtype == VariantType.QualifiedName:
1075  return QualifiedName()
1076  elif vtype == VariantType.LocalizedText:
1077  return LocalizedText()
1078  elif vtype == VariantType.ExtensionObject:
1079  return ExtensionObject()
1080  elif vtype == VariantType.DataValue:
1081  return DataValue()
1082  elif vtype == VariantType.Variant:
1083  return Variant()
1084  else:
1085  raise RuntimeError("function take a uatype as argument, got:", vtype)
1086 
1087 
def from_binary(data)
Definition: uatypes.py:407
def __eq__(self, other)
Definition: uatypes.py:217
def _guess_type(self, val)
Definition: uatypes.py:775
def __init__(self, binary=None)
Definition: uatypes.py:902
def __ne__(self, other)
Definition: uatypes.py:288
def has_null_identifier(self)
Definition: uatypes.py:304
def parse_bitfield(cls, the_int)
Definition: uatypes.py:73
def __eq__(self, other)
Definition: uatypes.py:767
def __eq__(self, other)
Definition: uatypes.py:593
def __ne__(self, other)
Definition: uatypes.py:521
def flatten(mylist)
Definition: uatypes.py:877
def __init__(self, identifier, namespace=0)
Definition: uatypes.py:464
def __lt__(self, other)
Definition: uatypes.py:524
def __setattr__(self, key, value)
Definition: uatypes.py:36
def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None)
Definition: uatypes.py:253
def __init__(self, variant=None, status=None)
Definition: uatypes.py:945
def flatten_and_get_shape(mylist)
Definition: uatypes.py:866
def __eq__(self, node)
Definition: uatypes.py:285
def _split_list(l, n)
Definition: uatypes.py:861
def __init__(self, identifier, namespace=0)
Definition: uatypes.py:458
def __eq__(self, bname)
Definition: uatypes.py:518
def __init__(self, identifier)
Definition: uatypes.py:440
def get_default_value(vtype)
Definition: uatypes.py:1046
def from_string(string)
Definition: uatypes.py:312
def reshape(flat, dims)
Definition: uatypes.py:847
def get_shape(mylist)
Definition: uatypes.py:887
def __init__(self, value=None, varianttype=None, dimensions=None)
Definition: uatypes.py:747
def __init__(self, name=None, namespaceidx=0)
Definition: uatypes.py:482
def __ne__(self, other)
Definition: uatypes.py:772
def __init__(self, text=None)
Definition: uatypes.py:548
def __lt__(self, other)
Definition: uatypes.py:294
def __ne__(self, other)
Definition: uatypes.py:220
def datatype_to_varianttype(int_type)
Definition: uatypes.py:1030
def __init__(self, identifier, namespace=0)
Definition: uatypes.py:446
def _from_string(string)
Definition: uatypes.py:319
def _binary_init(self, data)
Definition: uatypes.py:917
def __init__(self, identifier, namespace=0)
Definition: uatypes.py:452
def get_win_epoch()
Definition: uatypes.py:23
def __init__(self, identifier, namespace=0)
Definition: uatypes.py:470
def __init__(self, value=0)
Definition: uatypes.py:176
def to_bitfield(cls, collection)
Definition: uatypes.py:80
def __ne__(self, other)
Definition: uatypes.py:598


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44