00001 """
00002 implement ua datatypes
00003 """
00004 import struct
00005 from enum import Enum, IntEnum, EnumMeta
00006 from datetime import datetime
00007 import sys
00008 import os
00009 import uuid
00010 import re
00011 import itertools
00012
00013 from opcua.ua import ua_binary as uabin
00014 from opcua.ua import status_codes
00015 from opcua.ua import ObjectIds
00016 from opcua.ua.uaerrors import UaError
00017 from opcua.ua.uaerrors import UaStatusCodeError
00018 from opcua.ua.uaerrors import UaStringParsingError
00019
00020
00021 if sys.version_info.major > 2:
00022 unicode = str
00023 def get_win_epoch():
00024 return uabin.win_epoch_to_datetime(0)
00025
00026
00027 class _FrozenClass(object):
00028
00029 """
00030 Make it impossible to add members to a class.
00031 Not pythonic at all but we found out it prevents many many
00032 bugs in use of protocol structures
00033 """
00034 _freeze = False
00035
00036 def __setattr__(self, key, value):
00037 if self._freeze and not hasattr(self, key):
00038 raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
00039 key, self.__class__.__name__, self.__dict__.keys()))
00040 object.__setattr__(self, key, value)
00041
00042
00043 if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
00044
00045
00046
00047
00048 FrozenClass = object
00049 else:
00050 FrozenClass = _FrozenClass
00051
00052
00053 class ValueRank(IntEnum):
00054 """
00055 Defines dimensions of a variable.
00056 This enum does not support all cases since ValueRank support any n>0
00057 but since it is an IntEnum it can be replace by a normal int
00058 """
00059 ScalarOrOneDimension = -3
00060 Any = -2
00061 Scalar = -1
00062 OneOrMoreDimensions = 0
00063 OneDimension = 1
00064
00065 TwoDimensions = 2
00066 ThreeDimensions = 3
00067 FourDimensions = 4
00068
00069
00070 class _MaskEnum(IntEnum):
00071
00072 @classmethod
00073 def parse_bitfield(cls, the_int):
00074 """ Take an integer and interpret it as a set of enum values. """
00075 assert isinstance(the_int, int)
00076
00077 return {cls(b) for b in cls._bits(the_int)}
00078
00079 @classmethod
00080 def to_bitfield(cls, collection):
00081 """ Takes some enum values and creates an integer from them. """
00082
00083
00084 iter1, iter2 = itertools.tee(iter(collection))
00085 assert all(isinstance(x, cls) for x in iter1)
00086
00087 return sum(x.mask for x in iter2)
00088
00089 @property
00090 def mask(self):
00091 return 1 << self.value
00092
00093 @staticmethod
00094 def _bits(n):
00095 """ Iterate over the bits in n.
00096
00097 e.g. bits(44) yields at 2, 3, 5
00098 """
00099 assert n >= 0
00100
00101 pos = 0
00102 while n:
00103 if n & 0x1:
00104 yield pos
00105 n = n // 2
00106 pos += 1
00107
00108
00109 class AccessLevel(_MaskEnum):
00110 """
00111 Bit index to indicate what the access level is.
00112
00113 Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
00114 """
00115 CurrentRead = 0
00116 CurrentWrite = 1
00117 HistoryRead = 2
00118 HistoryWrite = 3
00119 SemanticChange = 4
00120 StatusWrite = 5
00121 TimestampWrite = 6
00122
00123
00124 class WriteMask(_MaskEnum):
00125 """
00126 Bit index to indicate which attribute of a node is writable
00127
00128 Spec Part 3, Paragraph 5.2.7 WriteMask
00129 """
00130 AccessLevel = 0
00131 ArrayDimensions = 1
00132 BrowseName = 2
00133 ContainsNoLoops = 3
00134 DataType = 4
00135 Description = 5
00136 DisplayName = 6
00137 EventNotifier = 7
00138 Executable = 8
00139 Historizing = 9
00140 InverseName = 10
00141 IsAbstract = 11
00142 MinimumSamplingInterval = 12
00143 NodeClass = 13
00144 NodeId = 14
00145 Symmetric = 15
00146 UserAccessLevel = 16
00147 UserExecutable = 17
00148 UserWriteMask = 18
00149 ValueRank = 19
00150 WriteMask = 20
00151 ValueForVariableType = 21
00152
00153
00154 class EventNotifier(_MaskEnum):
00155 """
00156 Bit index to indicate how a node can be used for events.
00157
00158 Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
00159 """
00160 SubscribeToEvents = 0
00161
00162 HistoryRead = 2
00163 HistoryWrite = 3
00164
00165
00166 class StatusCode(FrozenClass):
00167 """
00168 :ivar value:
00169 :vartype value: int
00170 :ivar name:
00171 :vartype name: string
00172 :ivar doc:
00173 :vartype doc: string
00174 """
00175
00176 def __init__(self, value=0):
00177 if isinstance(value, str):
00178 self.name = value
00179 self.value = getattr(status_codes.StatusCodes, value)
00180 else:
00181 self.value = value
00182 self.name, self.doc = status_codes.get_name_and_doc(value)
00183 self._freeze = True
00184
00185 def to_binary(self):
00186 return uabin.Primitives.UInt32.pack(self.value)
00187
00188 @staticmethod
00189 def from_binary(data):
00190 val = uabin.Primitives.UInt32.unpack(data)
00191 sc = StatusCode(val)
00192 return sc
00193
00194 def check(self):
00195 """
00196 Raises an exception if the status code is anything else than 0 (good).
00197
00198 Use the is_good() method if you do not want an exception.
00199 """
00200 if not self.is_good():
00201 raise UaStatusCodeError(self.value)
00202
00203 def is_good(self):
00204 """
00205 return True if status is Good.
00206 """
00207 mask = 3 << 30
00208 if mask & self.value:
00209 return False
00210 else:
00211 return True
00212
00213 def __str__(self):
00214 return 'StatusCode({0})'.format(self.name)
00215 __repr__ = __str__
00216
00217 def __eq__(self, other):
00218 return self.value == other.value
00219
00220 def __ne__(self, other):
00221 return not self.__eq__(other)
00222
00223
00224 class NodeIdType(IntEnum):
00225 TwoByte = 0
00226 FourByte = 1
00227 Numeric = 2
00228 String = 3
00229 Guid = 4
00230 ByteString = 5
00231
00232
00233 class NodeId(FrozenClass):
00234 """
00235 NodeId Object
00236
00237 Args:
00238 identifier: The identifier might be an int, a string, bytes or a Guid
00239 namespaceidx(int): The index of the namespace
00240 nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
00241
00242
00243 :ivar Identifier:
00244 :vartype Identifier: NodeId
00245 :ivar NamespaceIndex:
00246 :vartype NamespaceIndex: Int
00247 :ivar NamespaceUri:
00248 :vartype NamespaceUri: String
00249 :ivar ServerIndex:
00250 :vartype ServerIndex: Int
00251 """
00252
00253 def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
00254
00255 self.Identifier = identifier
00256 self.NamespaceIndex = namespaceidx
00257 self.NodeIdType = nodeidtype
00258 self.NamespaceUri = ""
00259 self.ServerIndex = 0
00260 self._freeze = True
00261 if not isinstance(self.NamespaceIndex, int):
00262 raise UaError("NamespaceIndex must be an int")
00263 if self.Identifier is None:
00264 self.Identifier = 0
00265 self.NodeIdType = NodeIdType.TwoByte
00266 return
00267 if self.NodeIdType is None:
00268 if isinstance(self.Identifier, int):
00269 self.NodeIdType = NodeIdType.Numeric
00270 elif isinstance(self.Identifier, str):
00271 self.NodeIdType = NodeIdType.String
00272 elif isinstance(self.Identifier, bytes):
00273 self.NodeIdType = NodeIdType.ByteString
00274 elif isinstance(self.Identifier, uuid.UUID):
00275 self.NodeIdType = NodeIdType.Guid
00276 else:
00277 raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
00278
00279 def _key(self):
00280 if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):
00281
00282 return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
00283 return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
00284
00285 def __eq__(self, node):
00286 return isinstance(node, NodeId) and self._key() == node._key()
00287
00288 def __ne__(self, other):
00289 return not self.__eq__(other)
00290
00291 def __hash__(self):
00292 return hash(self._key())
00293
00294 def __lt__(self, other):
00295 if not isinstance(other, NodeId):
00296 raise AttributeError("Can only compare to NodeId")
00297 return self._key() < other._key()
00298
00299 def is_null(self):
00300 if self.NamespaceIndex != 0:
00301 return False
00302 return self.has_null_identifier()
00303
00304 def has_null_identifier(self):
00305 if not self.Identifier:
00306 return True
00307 if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
00308 return True
00309 return False
00310
00311 @staticmethod
00312 def from_string(string):
00313 try:
00314 return NodeId._from_string(string)
00315 except ValueError as ex:
00316 raise UaStringParsingError("Error parsing string {0}".format(string), ex)
00317
00318 @staticmethod
00319 def _from_string(string):
00320 l = string.split(";")
00321 identifier = None
00322 namespace = 0
00323 ntype = None
00324 srv = None
00325 nsu = None
00326 for el in l:
00327 if not el:
00328 continue
00329 k, v = el.split("=", 1)
00330 k = k.strip()
00331 v = v.strip()
00332 if k == "ns":
00333 namespace = int(v)
00334 elif k == "i":
00335 ntype = NodeIdType.Numeric
00336 identifier = int(v)
00337 elif k == "s":
00338 ntype = NodeIdType.String
00339 identifier = v
00340 elif k == "g":
00341 ntype = NodeIdType.Guid
00342 identifier = v
00343 elif k == "b":
00344 ntype = NodeIdType.ByteString
00345 identifier = v
00346 elif k == "srv":
00347 srv = v
00348 elif k == "nsu":
00349 nsu = v
00350 if identifier is None:
00351 raise UaStringParsingError("Could not find identifier in string: " + string)
00352 nodeid = NodeId(identifier, namespace, ntype)
00353 nodeid.NamespaceUri = nsu
00354 nodeid.ServerIndex = srv
00355 return nodeid
00356
00357 def to_string(self):
00358 string = ""
00359 if self.NamespaceIndex != 0:
00360 string += "ns={0};".format(self.NamespaceIndex)
00361 ntype = None
00362 if self.NodeIdType == NodeIdType.Numeric:
00363 ntype = "i"
00364 elif self.NodeIdType == NodeIdType.String:
00365 ntype = "s"
00366 elif self.NodeIdType == NodeIdType.TwoByte:
00367 ntype = "i"
00368 elif self.NodeIdType == NodeIdType.FourByte:
00369 ntype = "i"
00370 elif self.NodeIdType == NodeIdType.Guid:
00371 ntype = "g"
00372 elif self.NodeIdType == NodeIdType.ByteString:
00373 ntype = "b"
00374 string += "{0}={1}".format(ntype, self.Identifier)
00375 if self.ServerIndex:
00376 string = "srv=" + str(self.ServerIndex) + string
00377 if self.NamespaceUri:
00378 string += "nsu={0}".format(self.NamespaceUri)
00379 return string
00380
00381 def __str__(self):
00382 return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
00383 __repr__ = __str__
00384
00385 def to_binary(self):
00386 if self.NodeIdType == NodeIdType.TwoByte:
00387 return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
00388 elif self.NodeIdType == NodeIdType.FourByte:
00389 return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
00390 elif self.NodeIdType == NodeIdType.Numeric:
00391 return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
00392 elif self.NodeIdType == NodeIdType.String:
00393 return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
00394 uabin.Primitives.String.pack(self.Identifier)
00395 elif self.NodeIdType == NodeIdType.ByteString:
00396 return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
00397 uabin.Primitives.Bytes.pack(self.Identifier)
00398 elif self.NodeIdType == NodeIdType.Guid:
00399 return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
00400 uabin.Primitives.Guid.pack(self.Identifier)
00401 else:
00402 return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
00403 self.Identifier.to_binary()
00404
00405
00406 @staticmethod
00407 def from_binary(data):
00408 nid = NodeId()
00409 encoding = ord(data.read(1))
00410 nid.NodeIdType = NodeIdType(encoding & 0b00111111)
00411
00412 if nid.NodeIdType == NodeIdType.TwoByte:
00413 nid.Identifier = ord(data.read(1))
00414 elif nid.NodeIdType == NodeIdType.FourByte:
00415 nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
00416 elif nid.NodeIdType == NodeIdType.Numeric:
00417 nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
00418 elif nid.NodeIdType == NodeIdType.String:
00419 nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
00420 nid.Identifier = uabin.Primitives.String.unpack(data)
00421 elif nid.NodeIdType == NodeIdType.ByteString:
00422 nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
00423 nid.Identifier = uabin.Primitives.Bytes.unpack(data)
00424 elif nid.NodeIdType == NodeIdType.Guid:
00425 nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
00426 nid.Identifier = uabin.Primitives.Guid.unpack(data)
00427 else:
00428 raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
00429
00430 if uabin.test_bit(encoding, 7):
00431 nid.NamespaceUri = uabin.Primitives.String.unpack(data)
00432 if uabin.test_bit(encoding, 6):
00433 nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
00434
00435 return nid
00436
00437
00438 class TwoByteNodeId(NodeId):
00439
00440 def __init__(self, identifier):
00441 NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
00442
00443
00444 class FourByteNodeId(NodeId):
00445
00446 def __init__(self, identifier, namespace=0):
00447 NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
00448
00449
00450 class NumericNodeId(NodeId):
00451
00452 def __init__(self, identifier, namespace=0):
00453 NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
00454
00455
00456 class ByteStringNodeId(NodeId):
00457
00458 def __init__(self, identifier, namespace=0):
00459 NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
00460
00461
00462 class GuidNodeId(NodeId):
00463
00464 def __init__(self, identifier, namespace=0):
00465 NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
00466
00467
00468 class StringNodeId(NodeId):
00469
00470 def __init__(self, identifier, namespace=0):
00471 NodeId.__init__(self, identifier, namespace, NodeIdType.String)
00472
00473
00474 ExpandedNodeId = NodeId
00475
00476
00477 class QualifiedName(FrozenClass):
00478 """
00479 A string qualified with a namespace index.
00480 """
00481
00482 def __init__(self, name=None, namespaceidx=0):
00483 if not isinstance(namespaceidx, int):
00484 raise UaError("namespaceidx must be an int")
00485 self.NamespaceIndex = namespaceidx
00486 self.Name = name
00487 self._freeze = True
00488
00489 def to_string(self):
00490 return "{0}:{1}".format(self.NamespaceIndex, self.Name)
00491
00492 @staticmethod
00493 def from_string(string):
00494 if ":" in string:
00495 try:
00496 idx, name = string.split(":", 1)
00497 idx = int(idx)
00498 except (TypeError, ValueError) as ex:
00499 raise UaStringParsingError("Error parsing string {0}".format(string), ex)
00500 else:
00501 idx = 0
00502 name = string
00503 return QualifiedName(name, idx)
00504
00505 def to_binary(self):
00506 packet = []
00507 packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
00508 packet.append(uabin.Primitives.String.pack(self.Name))
00509 return b''.join(packet)
00510
00511 @staticmethod
00512 def from_binary(data):
00513 obj = QualifiedName()
00514 obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
00515 obj.Name = uabin.Primitives.String.unpack(data)
00516 return obj
00517
00518 def __eq__(self, bname):
00519 return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
00520
00521 def __ne__(self, other):
00522 return not self.__eq__(other)
00523
00524 def __lt__(self, other):
00525 if not isinstance(other, QualifiedName):
00526 raise TypeError("Cannot compare QualifiedName and {0}".format(other))
00527 if self.NamespaceIndex == other.NamespaceIndex:
00528 return self.Name < other.Name
00529 else:
00530 return self.NamespaceIndex < other.NamespaceIndex
00531
00532 def __str__(self):
00533 return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
00534
00535 __repr__ = __str__
00536
00537
00538 class LocalizedText(FrozenClass):
00539 """
00540 A string qualified with a namespace index.
00541 """
00542
00543 ua_types = {
00544 "Text": "ByteString",
00545 "Locale": "ByteString"
00546 }
00547
00548 def __init__(self, text=None):
00549 self.Encoding = 0
00550 self.Text = text
00551 if isinstance(self.Text, unicode):
00552 self.Text = self.Text.encode('utf-8')
00553 if self.Text:
00554 self.Encoding |= (1 << 1)
00555 self.Locale = None
00556 self._freeze = True
00557
00558 def to_binary(self):
00559 packet = []
00560 if self.Locale:
00561 self.Encoding |= (1 << 0)
00562 if self.Text:
00563 self.Encoding |= (1 << 1)
00564 packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
00565 if self.Locale:
00566 packet.append(uabin.Primitives.Bytes.pack(self.Locale))
00567 if self.Text:
00568 packet.append(uabin.Primitives.Bytes.pack(self.Text))
00569 return b''.join(packet)
00570
00571 @staticmethod
00572 def from_binary(data):
00573 obj = LocalizedText()
00574 obj.Encoding = ord(data.read(1))
00575 if obj.Encoding & (1 << 0):
00576 obj.Locale = uabin.Primitives.Bytes.unpack(data)
00577 if obj.Encoding & (1 << 1):
00578 obj.Text = uabin.Primitives.Bytes.unpack(data)
00579 return obj
00580
00581 def to_string(self):
00582
00583 if self.Text is None:
00584 return ""
00585 return self.Text.decode('utf-8')
00586
00587 def __str__(self):
00588 return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
00589 'Locale:' + str(self.Locale) + ', ' + \
00590 'Text:' + str(self.Text) + ')'
00591 __repr__ = __str__
00592
00593 def __eq__(self, other):
00594 if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
00595 return True
00596 return False
00597
00598 def __ne__(self, other):
00599 return not self.__eq__(other)
00600
00601
00602 class ExtensionObject(FrozenClass):
00603 """
00604 Any UA object packed as an ExtensionObject
00605
00606 :ivar TypeId:
00607 :vartype TypeId: NodeId
00608 :ivar Body:
00609 :vartype Body: bytes
00610 """
00611
00612 def __init__(self):
00613 self.TypeId = NodeId()
00614 self.Encoding = 0
00615 self.Body = b''
00616 self._freeze = True
00617
00618 def to_binary(self):
00619 packet = []
00620 if self.Body:
00621 self.Encoding |= (1 << 0)
00622 packet.append(self.TypeId.to_binary())
00623 packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
00624 if self.Body:
00625 packet.append(uabin.Primitives.ByteString.pack(self.Body))
00626 return b''.join(packet)
00627
00628 @staticmethod
00629 def from_binary(data):
00630 obj = ExtensionObject()
00631 obj.TypeId = NodeId.from_binary(data)
00632 obj.Encoding = uabin.Primitives.UInt8.unpack(data)
00633 if obj.Encoding & (1 << 0):
00634 obj.Body = uabin.Primitives.ByteString.unpack(data)
00635 return obj
00636
00637 @staticmethod
00638 def from_object(obj):
00639 ext = ExtensionObject()
00640 oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
00641 ext.TypeId = FourByteNodeId(oid)
00642 ext.Body = obj.to_binary()
00643 return ext
00644
00645 def __str__(self):
00646 return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
00647 'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
00648
00649 __repr__ = __str__
00650
00651
00652 class VariantType(Enum):
00653 """
00654 The possible types of a variant.
00655
00656 :ivar Null:
00657 :ivar Boolean:
00658 :ivar SByte:
00659 :ivar Byte:
00660 :ivar Int16:
00661 :ivar UInt16:
00662 :ivar Int32:
00663 :ivar UInt32:
00664 :ivar Int64:
00665 :ivar UInt64:
00666 :ivar Float:
00667 :ivar Double:
00668 :ivar String:
00669 :ivar DateTime:
00670 :ivar Guid:
00671 :ivar ByteString:
00672 :ivar XmlElement:
00673 :ivar NodeId:
00674 :ivar ExpandedNodeId:
00675 :ivar StatusCode:
00676 :ivar QualifiedName:
00677 :ivar LocalizedText:
00678 :ivar ExtensionObject:
00679 :ivar DataValue:
00680 :ivar Variant:
00681 :ivar DiagnosticInfo:
00682 """
00683
00684 Null = 0
00685 Boolean = 1
00686 SByte = 2
00687 Byte = 3
00688 Int16 = 4
00689 UInt16 = 5
00690 Int32 = 6
00691 UInt32 = 7
00692 Int64 = 8
00693 UInt64 = 9
00694 Float = 10
00695 Double = 11
00696 String = 12
00697 DateTime = 13
00698 Guid = 14
00699 ByteString = 15
00700 XmlElement = 16
00701 NodeId = 17
00702 ExpandedNodeId = 18
00703 StatusCode = 19
00704 QualifiedName = 20
00705 LocalizedText = 21
00706 ExtensionObject = 22
00707 DataValue = 23
00708 Variant = 24
00709 DiagnosticInfo = 25
00710
00711
00712 class VariantTypeCustom(object):
00713 """
00714 Looks like sometime we get variant with other values than those
00715 defined in VariantType.
00716 FIXME: We should not need this class, as far as I iunderstand the spec
00717 variants can only be of VariantType
00718 """
00719
00720 def __init__(self, val):
00721 self.name = "Custom"
00722 self.value = val
00723 if self.value > 0b00111111:
00724 raise UaError("Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
00725
00726 def __str__(self):
00727 return "VariantType.Custom:{0}".format(self.value)
00728 __repr__ = __str__
00729
00730 def __eq__(self, other):
00731 return self.value == other.value
00732
00733
00734 class Variant(FrozenClass):
00735 """
00736 Create an OPC-UA Variant object.
00737 if no argument a Null Variant is created.
00738 if not variant type is given, attemps to guess type from python type
00739 if a variant is given as value, the new objects becomes a copy of the argument
00740
00741 :ivar Value:
00742 :vartype Value: Any supported type
00743 :ivar VariantType:
00744 :vartype VariantType: VariantType
00745 """
00746
00747 def __init__(self, value=None, varianttype=None, dimensions=None):
00748 self.Value = value
00749 self.VariantType = varianttype
00750 self.Dimensions = dimensions
00751 self._freeze = True
00752 if isinstance(value, Variant):
00753 self.Value = value.Value
00754 self.VariantType = value.VariantType
00755 if self.VariantType is None:
00756 self.VariantType = self._guess_type(self.Value)
00757 if self.Value is None and self.VariantType not in (
00758 VariantType.Null,
00759 VariantType.String,
00760 VariantType.DateTime):
00761 raise UaError("Variant of type {0} cannot have value None".format(self.VariantType))
00762 if self.Dimensions is None and type(self.Value) in (list, tuple):
00763 dims = get_shape(self.Value)
00764 if len(dims) > 1:
00765 self.Dimensions = dims
00766
00767 def __eq__(self, other):
00768 if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
00769 return True
00770 return False
00771
00772 def __ne__(self, other):
00773 return not self.__eq__(other)
00774
00775 def _guess_type(self, val):
00776 if isinstance(val, (list, tuple)):
00777 error_val = val
00778 while isinstance(val, (list, tuple)):
00779 if len(val) == 0:
00780 raise UaError("could not guess UA type of variable {0}".format(error_val))
00781 val = val[0]
00782 if val is None:
00783 return VariantType.Null
00784 elif isinstance(val, bool):
00785 return VariantType.Boolean
00786 elif isinstance(val, float):
00787 return VariantType.Double
00788 elif isinstance(val, int):
00789 return VariantType.Int64
00790 elif type(val) in (str, unicode):
00791 return VariantType.String
00792 elif isinstance(val, bytes):
00793 return VariantType.ByteString
00794 elif isinstance(val, datetime):
00795 return VariantType.DateTime
00796 elif isinstance(val, uuid.UUID):
00797 return VariantType.Guid
00798 else:
00799 if isinstance(val, object):
00800 try:
00801 return getattr(VariantType, val.__class__.__name__)
00802 except AttributeError:
00803 return VariantType.ExtensionObject
00804 else:
00805 raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
00806
00807 def __str__(self):
00808 return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
00809 __repr__ = __str__
00810
00811 def to_binary(self):
00812 b = []
00813 encoding = self.VariantType.value & 0b111111
00814 if type(self.Value) in (list, tuple):
00815 if self.Dimensions is not None:
00816 encoding = uabin.set_bit(encoding, 6)
00817 encoding = uabin.set_bit(encoding, 7)
00818 b.append(uabin.Primitives.UInt8.pack(encoding))
00819 b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
00820 if self.Dimensions is not None:
00821 b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
00822 else:
00823 b.append(uabin.Primitives.UInt8.pack(encoding))
00824 b.append(uabin.pack_uatype(self.VariantType, self.Value))
00825
00826 return b"".join(b)
00827
00828 @staticmethod
00829 def from_binary(data):
00830 dimensions = None
00831 encoding = ord(data.read(1))
00832 int_type = encoding & 0b00111111
00833 vtype = datatype_to_varianttype(int_type)
00834 if vtype == VariantType.Null:
00835 return Variant(None, vtype, encoding)
00836 if uabin.test_bit(encoding, 7):
00837 value = uabin.unpack_uatype_array(vtype, data)
00838 else:
00839 value = uabin.unpack_uatype(vtype, data)
00840 if uabin.test_bit(encoding, 6):
00841 dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
00842 value = reshape(value, dimensions)
00843
00844 return Variant(value, vtype, dimensions)
00845
00846
00847 def reshape(flat, dims):
00848 subdims = dims[1:]
00849 subsize = 1
00850 for i in subdims:
00851 if i == 0:
00852 i = 1
00853 subsize *= i
00854 while dims[0] * subsize > len(flat):
00855 flat.append([])
00856 if not subdims or subdims == [0]:
00857 return flat
00858 return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
00859
00860
00861 def _split_list(l, n):
00862 n = max(1, n)
00863 return [l[i:i + n] for i in range(0, len(l), n)]
00864
00865
00866 def flatten_and_get_shape(mylist):
00867 dims = []
00868 dims.append(len(mylist))
00869 while isinstance(mylist[0], (list, tuple)):
00870 dims.append(len(mylist[0]))
00871 mylist = [item for sublist in mylist for item in sublist]
00872 if len(mylist) == 0:
00873 break
00874 return mylist, dims
00875
00876
00877 def flatten(mylist):
00878 if len(mylist) == 0:
00879 return mylist
00880 while isinstance(mylist[0], (list, tuple)):
00881 mylist = [item for sublist in mylist for item in sublist]
00882 if len(mylist) == 0:
00883 break
00884 return mylist
00885
00886
00887 def get_shape(mylist):
00888 dims = []
00889 while isinstance(mylist, (list, tuple)):
00890 dims.append(len(mylist))
00891 if len(mylist) == 0:
00892 break
00893 mylist = mylist[0]
00894 return dims
00895
00896
00897 class XmlElement(FrozenClass):
00898 """
00899 An XML element encoded as an UTF-8 string.
00900 """
00901
00902 def __init__(self, binary=None):
00903 if binary is not None:
00904 self._binary_init(binary)
00905 self._freeze = True
00906 return
00907 self.Value = []
00908 self._freeze = True
00909
00910 def to_binary(self):
00911 return uabin.Primitives.String.pack(self.Value)
00912
00913 @staticmethod
00914 def from_binary(data):
00915 return XmlElement(data)
00916
00917 def _binary_init(self, data):
00918 self.Value = uabin.Primitives.String.unpack(data)
00919
00920 def __str__(self):
00921 return 'XmlElement(Value:' + str(self.Value) + ')'
00922
00923 __repr__ = __str__
00924
00925
00926 class DataValue(FrozenClass):
00927 """
00928 A value with an associated timestamp, and quality.
00929 Automatically generated from xml , copied and modified here to fix errors in xml spec
00930
00931 :ivar Value:
00932 :vartype Value: Variant
00933 :ivar StatusCode:
00934 :vartype StatusCode: StatusCode
00935 :ivar SourceTimestamp:
00936 :vartype SourceTimestamp: datetime
00937 :ivar SourcePicoSeconds:
00938 :vartype SourcePicoSeconds: int
00939 :ivar ServerTimestamp:
00940 :vartype ServerTimestamp: datetime
00941 :ivar ServerPicoseconds:
00942 :vartype ServerPicoseconds: int
00943 """
00944
00945 def __init__(self, variant=None, status=None):
00946 self.Encoding = 0
00947 if not isinstance(variant, Variant):
00948 variant = Variant(variant)
00949 self.Value = variant
00950 if status is None:
00951 self.StatusCode = StatusCode()
00952 else:
00953 self.StatusCode = status
00954 self.SourceTimestamp = None
00955 self.SourcePicoseconds = None
00956 self.ServerTimestamp = None
00957 self.ServerPicoseconds = None
00958 self._freeze = True
00959
00960 def to_binary(self):
00961 packet = []
00962 if self.Value:
00963 self.Encoding |= (1 << 0)
00964 if self.StatusCode:
00965 self.Encoding |= (1 << 1)
00966 if self.SourceTimestamp:
00967 self.Encoding |= (1 << 2)
00968 if self.ServerTimestamp:
00969 self.Encoding |= (1 << 3)
00970 if self.SourcePicoseconds:
00971 self.Encoding |= (1 << 4)
00972 if self.ServerPicoseconds:
00973 self.Encoding |= (1 << 5)
00974 packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
00975 if self.Value:
00976 packet.append(self.Value.to_binary())
00977 if self.StatusCode:
00978 packet.append(self.StatusCode.to_binary())
00979 if self.SourceTimestamp:
00980 packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))
00981 if self.ServerTimestamp:
00982 packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))
00983 if self.SourcePicoseconds:
00984 packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
00985 if self.ServerPicoseconds:
00986 packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
00987 return b''.join(packet)
00988
00989 @staticmethod
00990 def from_binary(data):
00991 encoding = ord(data.read(1))
00992 if encoding & (1 << 0):
00993 value = Variant.from_binary(data)
00994 else:
00995 value = None
00996 if encoding & (1 << 1):
00997 status = StatusCode.from_binary(data)
00998 else:
00999 status = None
01000 obj = DataValue(value, status)
01001 obj.Encoding = encoding
01002 if obj.Encoding & (1 << 2):
01003 obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)
01004 if obj.Encoding & (1 << 3):
01005 obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)
01006 if obj.Encoding & (1 << 4):
01007 obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
01008 if obj.Encoding & (1 << 5):
01009 obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
01010 return obj
01011
01012 def __str__(self):
01013 s = 'DataValue(Value:{0}'.format(self.Value)
01014 if self.StatusCode is not None:
01015 s += ', StatusCode:{0}'.format(self.StatusCode)
01016 if self.SourceTimestamp is not None:
01017 s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
01018 if self.ServerTimestamp is not None:
01019 s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
01020 if self.SourcePicoseconds is not None:
01021 s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
01022 if self.ServerPicoseconds is not None:
01023 s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
01024 s += ')'
01025 return s
01026
01027 __repr__ = __str__
01028
01029
01030 def datatype_to_varianttype(int_type):
01031 """
01032 Takes a NodeId or int and return a VariantType
01033 This is only supported if int_type < 63 due to VariantType encoding
01034 At low level we do not have access to address space thus decoding is limited
01035 a better version of this method can be find in ua_utils.py
01036 """
01037 if isinstance(int_type, NodeId):
01038 int_type = int_type.Identifier
01039
01040 if int_type <= 25:
01041 return VariantType(int_type)
01042 else:
01043 return VariantTypeCustom(int_type)
01044
01045
01046 def get_default_value(vtype):
01047 """
01048 Given a variant type return default value for this type
01049 """
01050 if vtype == VariantType.Null:
01051 return None
01052 elif vtype == VariantType.Boolean:
01053 return False
01054 elif vtype in (VariantType.SByte, VariantType.Byte, VariantType.ByteString):
01055 return b""
01056 elif 4 <= vtype.value <= 9:
01057 return 0
01058 elif vtype in (VariantType.Float, VariantType.Double):
01059 return 0.0
01060 elif vtype == VariantType.String:
01061 return None
01062 elif vtype == VariantType.DateTime:
01063 return datetime.now()
01064 elif vtype == VariantType.Guid:
01065 return uuid.uuid4()
01066 elif vtype == VariantType.XmlElement:
01067 return None
01068 elif vtype == VariantType.NodeId:
01069 return NodeId()
01070 elif vtype == VariantType.ExpandedNodeId:
01071 return NodeId()
01072 elif vtype == VariantType.StatusCode:
01073 return StatusCode()
01074 elif vtype == VariantType.QualifiedName:
01075 return QualifiedName()
01076 elif vtype == VariantType.LocalizedText:
01077 return LocalizedText()
01078 elif vtype == VariantType.ExtensionObject:
01079 return ExtensionObject()
01080 elif vtype == VariantType.DataValue:
01081 return DataValue()
01082 elif vtype == VariantType.Variant:
01083 return Variant()
01084 else:
01085 raise RuntimeError("function take a uatype as argument, got:", vtype)
01086
01087