uatypes.py
Go to the documentation of this file.
00001 """
00002 implement ua datatypes
00003 """
00004 import struct
00005 from enum import Enum, IntEnum, EnumMeta
00006 from datetime import datetime
00007 import sys
00008 import os
00009 import uuid
00010 import re
00011 import itertools
00012 
00013 from opcua.ua import ua_binary as uabin
00014 from opcua.ua import status_codes
00015 from opcua.ua import ObjectIds
00016 from opcua.ua.uaerrors import UaError
00017 from opcua.ua.uaerrors import UaStatusCodeError
00018 from opcua.ua.uaerrors import UaStringParsingError
00019 
00020 
00021 if sys.version_info.major > 2:
00022     unicode = str
00023 def get_win_epoch():
00024     return uabin.win_epoch_to_datetime(0)
00025 
00026 
00027 class _FrozenClass(object):
00028 
00029     """
00030     Make it impossible to add members to a class.
00031     Not pythonic at all but we found out it prevents many many
00032     bugs in use of protocol structures
00033     """
00034     _freeze = False
00035 
00036     def __setattr__(self, key, value):
00037         if self._freeze and not hasattr(self, key):
00038             raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
00039                 key, self.__class__.__name__, self.__dict__.keys()))
00040         object.__setattr__(self, key, value)
00041 
00042 
00043 if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
00044     # typo check is cpu consuming, but it will make debug easy.
00045     # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
00046     # this will make all uatype class inherit from object intead of _FrozenClass
00047     # and skip the typo check.
00048     FrozenClass = object
00049 else:
00050     FrozenClass = _FrozenClass
00051 
00052 
00053 class ValueRank(IntEnum):
00054     """
00055     Defines dimensions of a variable.
00056     This enum does not support all cases since ValueRank support any n>0
00057     but since it is an IntEnum it can be replace by a normal int
00058     """
00059     ScalarOrOneDimension = -3
00060     Any = -2
00061     Scalar = -1
00062     OneOrMoreDimensions = 0
00063     OneDimension = 1
00064     # the next names are not in spec but so common we express them here
00065     TwoDimensions = 2
00066     ThreeDimensions = 3
00067     FourDimensions = 4
00068 
00069 
00070 class _MaskEnum(IntEnum):
00071 
00072     @classmethod
00073     def parse_bitfield(cls, the_int):
00074         """ Take an integer and interpret it as a set of enum values. """
00075         assert isinstance(the_int, int)
00076 
00077         return {cls(b) for b in cls._bits(the_int)}
00078 
00079     @classmethod
00080     def to_bitfield(cls, collection):
00081         """ Takes some enum values and creates an integer from them. """
00082         # make sure all elements are of the correct type (use itertools.tee in case we get passed an
00083         # iterator)
00084         iter1, iter2 = itertools.tee(iter(collection))
00085         assert all(isinstance(x, cls) for x in iter1)
00086 
00087         return sum(x.mask for x in iter2)
00088 
00089     @property
00090     def mask(self):
00091         return 1 << self.value
00092 
00093     @staticmethod
00094     def _bits(n):
00095         """ Iterate over the bits in n.
00096 
00097             e.g. bits(44) yields at 2, 3, 5
00098         """
00099         assert n >= 0  # avoid infinite recursion
00100 
00101         pos = 0
00102         while n:
00103             if n & 0x1:
00104                 yield pos
00105             n = n // 2
00106             pos += 1
00107 
00108 
00109 class AccessLevel(_MaskEnum):
00110     """
00111     Bit index to indicate what the access level is.
00112 
00113     Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
00114     """
00115     CurrentRead = 0
00116     CurrentWrite = 1
00117     HistoryRead = 2
00118     HistoryWrite = 3
00119     SemanticChange = 4
00120     StatusWrite = 5
00121     TimestampWrite = 6
00122 
00123 
00124 class WriteMask(_MaskEnum):
00125     """
00126     Bit index to indicate which attribute of a node is writable
00127 
00128     Spec Part 3, Paragraph 5.2.7 WriteMask
00129     """
00130     AccessLevel = 0
00131     ArrayDimensions = 1
00132     BrowseName = 2
00133     ContainsNoLoops = 3
00134     DataType = 4
00135     Description = 5
00136     DisplayName = 6
00137     EventNotifier = 7
00138     Executable = 8
00139     Historizing = 9
00140     InverseName = 10
00141     IsAbstract = 11
00142     MinimumSamplingInterval = 12
00143     NodeClass = 13
00144     NodeId = 14
00145     Symmetric = 15
00146     UserAccessLevel = 16
00147     UserExecutable = 17
00148     UserWriteMask = 18
00149     ValueRank = 19
00150     WriteMask = 20
00151     ValueForVariableType = 21
00152 
00153 
00154 class EventNotifier(_MaskEnum):
00155     """
00156     Bit index to indicate how a node can be used for events.
00157 
00158     Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
00159     """
00160     SubscribeToEvents = 0
00161     # Reserved        = 1
00162     HistoryRead = 2
00163     HistoryWrite = 3
00164 
00165 
00166 class StatusCode(FrozenClass):
00167     """
00168     :ivar value:
00169     :vartype value: int
00170     :ivar name:
00171     :vartype name: string
00172     :ivar doc:
00173     :vartype doc: string
00174     """
00175 
00176     def __init__(self, value=0):
00177         if isinstance(value, str):
00178             self.name = value
00179             self.value = getattr(status_codes.StatusCodes, value)
00180         else:
00181             self.value = value
00182             self.name, self.doc = status_codes.get_name_and_doc(value)
00183         self._freeze = True
00184 
00185     def to_binary(self):
00186         return uabin.Primitives.UInt32.pack(self.value)
00187 
00188     @staticmethod
00189     def from_binary(data):
00190         val = uabin.Primitives.UInt32.unpack(data)
00191         sc = StatusCode(val)
00192         return sc
00193 
00194     def check(self):
00195         """
00196         Raises an exception if the status code is anything else than 0 (good).
00197 
00198         Use the is_good() method if you do not want an exception.
00199         """
00200         if not self.is_good():
00201             raise UaStatusCodeError(self.value)
00202 
00203     def is_good(self):
00204         """
00205         return True if status is Good.
00206         """
00207         mask = 3 << 30
00208         if mask & self.value:
00209             return False
00210         else:
00211             return True
00212 
00213     def __str__(self):
00214         return 'StatusCode({0})'.format(self.name)
00215     __repr__ = __str__
00216 
00217     def __eq__(self, other):
00218         return self.value == other.value
00219 
00220     def __ne__(self, other):
00221         return not self.__eq__(other)
00222 
00223 
00224 class NodeIdType(IntEnum):
00225     TwoByte = 0
00226     FourByte = 1
00227     Numeric = 2
00228     String = 3
00229     Guid = 4
00230     ByteString = 5
00231 
00232 
00233 class NodeId(FrozenClass):
00234     """
00235     NodeId Object
00236 
00237     Args:
00238         identifier: The identifier might be an int, a string, bytes or a Guid
00239         namespaceidx(int): The index of the namespace
00240         nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
00241 
00242 
00243     :ivar Identifier:
00244     :vartype Identifier: NodeId
00245     :ivar NamespaceIndex:
00246     :vartype NamespaceIndex: Int
00247     :ivar NamespaceUri:
00248     :vartype NamespaceUri: String
00249     :ivar ServerIndex:
00250     :vartype ServerIndex: Int
00251     """
00252     
00253     def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
00254 
00255         self.Identifier = identifier
00256         self.NamespaceIndex = namespaceidx
00257         self.NodeIdType = nodeidtype
00258         self.NamespaceUri = ""
00259         self.ServerIndex = 0
00260         self._freeze = True
00261         if not isinstance(self.NamespaceIndex, int):
00262             raise UaError("NamespaceIndex must be an int")
00263         if self.Identifier is None:
00264             self.Identifier = 0
00265             self.NodeIdType = NodeIdType.TwoByte
00266             return
00267         if self.NodeIdType is None:
00268             if isinstance(self.Identifier, int):
00269                 self.NodeIdType = NodeIdType.Numeric
00270             elif isinstance(self.Identifier, str):
00271                 self.NodeIdType = NodeIdType.String
00272             elif isinstance(self.Identifier, bytes):
00273                 self.NodeIdType = NodeIdType.ByteString
00274             elif isinstance(self.Identifier, uuid.UUID):
00275                 self.NodeIdType = NodeIdType.Guid
00276             else:
00277                 raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
00278 
00279     def _key(self):
00280         if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric): 
00281             # twobyte, fourbyte and numeric may represent the same node
00282             return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
00283         return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
00284 
00285     def __eq__(self, node):
00286         return isinstance(node, NodeId) and self._key() == node._key()
00287 
00288     def __ne__(self, other):
00289         return not self.__eq__(other)
00290 
00291     def __hash__(self):
00292         return hash(self._key())
00293 
00294     def __lt__(self, other):
00295         if not isinstance(other, NodeId):
00296             raise AttributeError("Can only compare to NodeId")
00297         return self._key() < other._key()
00298 
00299     def is_null(self):
00300         if self.NamespaceIndex != 0:
00301             return False
00302         return self.has_null_identifier()
00303 
00304     def has_null_identifier(self):
00305         if not self.Identifier:
00306             return True
00307         if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
00308             return True
00309         return False
00310 
00311     @staticmethod
00312     def from_string(string):
00313         try:
00314             return NodeId._from_string(string)
00315         except ValueError as ex:
00316             raise UaStringParsingError("Error parsing string {0}".format(string), ex)
00317 
00318     @staticmethod
00319     def _from_string(string):
00320         l = string.split(";")
00321         identifier = None
00322         namespace = 0
00323         ntype = None
00324         srv = None
00325         nsu = None
00326         for el in l:
00327             if not el:
00328                 continue
00329             k, v = el.split("=", 1)
00330             k = k.strip()
00331             v = v.strip()
00332             if k == "ns":
00333                 namespace = int(v)
00334             elif k == "i":
00335                 ntype = NodeIdType.Numeric
00336                 identifier = int(v)
00337             elif k == "s":
00338                 ntype = NodeIdType.String
00339                 identifier = v
00340             elif k == "g":
00341                 ntype = NodeIdType.Guid
00342                 identifier = v
00343             elif k == "b":
00344                 ntype = NodeIdType.ByteString
00345                 identifier = v
00346             elif k == "srv":
00347                 srv = v
00348             elif k == "nsu":
00349                 nsu = v
00350         if identifier is None:
00351             raise UaStringParsingError("Could not find identifier in string: " + string)
00352         nodeid = NodeId(identifier, namespace, ntype)
00353         nodeid.NamespaceUri = nsu
00354         nodeid.ServerIndex = srv
00355         return nodeid
00356 
00357     def to_string(self):
00358         string = ""
00359         if self.NamespaceIndex != 0:
00360             string += "ns={0};".format(self.NamespaceIndex)
00361         ntype = None
00362         if self.NodeIdType == NodeIdType.Numeric:
00363             ntype = "i"
00364         elif self.NodeIdType == NodeIdType.String:
00365             ntype = "s"
00366         elif self.NodeIdType == NodeIdType.TwoByte:
00367             ntype = "i"
00368         elif self.NodeIdType == NodeIdType.FourByte:
00369             ntype = "i"
00370         elif self.NodeIdType == NodeIdType.Guid:
00371             ntype = "g"
00372         elif self.NodeIdType == NodeIdType.ByteString:
00373             ntype = "b"
00374         string += "{0}={1}".format(ntype, self.Identifier)
00375         if self.ServerIndex:
00376             string = "srv=" + str(self.ServerIndex) + string
00377         if self.NamespaceUri:
00378             string += "nsu={0}".format(self.NamespaceUri)
00379         return string
00380 
00381     def __str__(self):
00382         return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
00383     __repr__ = __str__
00384 
00385     def to_binary(self):
00386         if self.NodeIdType == NodeIdType.TwoByte:
00387             return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
00388         elif self.NodeIdType == NodeIdType.FourByte:
00389             return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
00390         elif self.NodeIdType == NodeIdType.Numeric:
00391             return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
00392         elif self.NodeIdType == NodeIdType.String:
00393             return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
00394                 uabin.Primitives.String.pack(self.Identifier)
00395         elif self.NodeIdType == NodeIdType.ByteString:
00396             return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
00397                 uabin.Primitives.Bytes.pack(self.Identifier)
00398         elif self.NodeIdType == NodeIdType.Guid:
00399             return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
00400                    uabin.Primitives.Guid.pack(self.Identifier)
00401         else:
00402             return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
00403                 self.Identifier.to_binary()
00404         # FIXME: Missing NNamespaceURI and ServerIndex
00405 
00406     @staticmethod
00407     def from_binary(data):
00408         nid = NodeId()
00409         encoding = ord(data.read(1))
00410         nid.NodeIdType = NodeIdType(encoding & 0b00111111)
00411 
00412         if nid.NodeIdType == NodeIdType.TwoByte:
00413             nid.Identifier = ord(data.read(1))
00414         elif nid.NodeIdType == NodeIdType.FourByte:
00415             nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
00416         elif nid.NodeIdType == NodeIdType.Numeric:
00417             nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
00418         elif nid.NodeIdType == NodeIdType.String:
00419             nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
00420             nid.Identifier = uabin.Primitives.String.unpack(data)
00421         elif nid.NodeIdType == NodeIdType.ByteString:
00422             nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
00423             nid.Identifier = uabin.Primitives.Bytes.unpack(data)
00424         elif nid.NodeIdType == NodeIdType.Guid:
00425             nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
00426             nid.Identifier = uabin.Primitives.Guid.unpack(data)
00427         else:
00428             raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
00429 
00430         if uabin.test_bit(encoding, 7):
00431             nid.NamespaceUri = uabin.Primitives.String.unpack(data)
00432         if uabin.test_bit(encoding, 6):
00433             nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
00434 
00435         return nid
00436 
00437 
00438 class TwoByteNodeId(NodeId):
00439 
00440     def __init__(self, identifier):
00441         NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
00442 
00443 
00444 class FourByteNodeId(NodeId):
00445 
00446     def __init__(self, identifier, namespace=0):
00447         NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
00448 
00449 
00450 class NumericNodeId(NodeId):
00451 
00452     def __init__(self, identifier, namespace=0):
00453         NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
00454 
00455 
00456 class ByteStringNodeId(NodeId):
00457 
00458     def __init__(self, identifier, namespace=0):
00459         NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
00460 
00461 
00462 class GuidNodeId(NodeId):
00463 
00464     def __init__(self, identifier, namespace=0):
00465         NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
00466 
00467 
00468 class StringNodeId(NodeId):
00469 
00470     def __init__(self, identifier, namespace=0):
00471         NodeId.__init__(self, identifier, namespace, NodeIdType.String)
00472 
00473 
00474 ExpandedNodeId = NodeId
00475 
00476 
00477 class QualifiedName(FrozenClass):
00478     """
00479     A string qualified with a namespace index.
00480     """
00481 
00482     def __init__(self, name=None, namespaceidx=0):
00483         if not isinstance(namespaceidx, int):
00484             raise UaError("namespaceidx must be an int")
00485         self.NamespaceIndex = namespaceidx
00486         self.Name = name
00487         self._freeze = True
00488 
00489     def to_string(self):
00490         return "{0}:{1}".format(self.NamespaceIndex, self.Name)
00491 
00492     @staticmethod
00493     def from_string(string):
00494         if ":" in string:
00495             try:
00496                 idx, name = string.split(":", 1)
00497                 idx = int(idx)
00498             except (TypeError, ValueError) as ex:
00499                 raise UaStringParsingError("Error parsing string {0}".format(string), ex)
00500         else:
00501             idx = 0
00502             name = string
00503         return QualifiedName(name, idx)
00504 
00505     def to_binary(self):
00506         packet = []
00507         packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
00508         packet.append(uabin.Primitives.String.pack(self.Name))
00509         return b''.join(packet)
00510 
00511     @staticmethod
00512     def from_binary(data):
00513         obj = QualifiedName()
00514         obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
00515         obj.Name = uabin.Primitives.String.unpack(data)
00516         return obj
00517 
00518     def __eq__(self, bname):
00519         return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
00520 
00521     def __ne__(self, other):
00522         return not self.__eq__(other)
00523 
00524     def __lt__(self, other):
00525         if not isinstance(other, QualifiedName):
00526             raise TypeError("Cannot compare QualifiedName and {0}".format(other))
00527         if self.NamespaceIndex == other.NamespaceIndex:
00528             return self.Name < other.Name
00529         else:
00530             return self.NamespaceIndex < other.NamespaceIndex
00531 
00532     def __str__(self):
00533         return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
00534 
00535     __repr__ = __str__
00536 
00537 
00538 class LocalizedText(FrozenClass):
00539     """
00540     A string qualified with a namespace index.
00541     """
00542 
00543     ua_types = {
00544         "Text": "ByteString",
00545         "Locale": "ByteString"
00546     }
00547 
00548     def __init__(self, text=None):
00549         self.Encoding = 0
00550         self.Text = text
00551         if isinstance(self.Text, unicode):
00552             self.Text = self.Text.encode('utf-8')
00553         if self.Text:
00554             self.Encoding |= (1 << 1)
00555         self.Locale = None
00556         self._freeze = True
00557 
00558     def to_binary(self):
00559         packet = []
00560         if self.Locale:
00561             self.Encoding |= (1 << 0)
00562         if self.Text:
00563             self.Encoding |= (1 << 1)
00564         packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
00565         if self.Locale:
00566             packet.append(uabin.Primitives.Bytes.pack(self.Locale))
00567         if self.Text:
00568             packet.append(uabin.Primitives.Bytes.pack(self.Text))
00569         return b''.join(packet)
00570 
00571     @staticmethod
00572     def from_binary(data):
00573         obj = LocalizedText()
00574         obj.Encoding = ord(data.read(1))
00575         if obj.Encoding & (1 << 0):
00576             obj.Locale = uabin.Primitives.Bytes.unpack(data)
00577         if obj.Encoding & (1 << 1):
00578             obj.Text = uabin.Primitives.Bytes.unpack(data)
00579         return obj
00580 
00581     def to_string(self):
00582         # FIXME: use local
00583         if self.Text is None:
00584             return ""
00585         return self.Text.decode('utf-8')
00586 
00587     def __str__(self):
00588         return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
00589             'Locale:' + str(self.Locale) + ', ' + \
00590             'Text:' + str(self.Text) + ')'
00591     __repr__ = __str__
00592 
00593     def __eq__(self, other):
00594         if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
00595             return True
00596         return False
00597 
00598     def __ne__(self, other):
00599         return not self.__eq__(other)
00600 
00601 
00602 class ExtensionObject(FrozenClass):
00603     """
00604     Any UA object packed as an ExtensionObject
00605 
00606     :ivar TypeId:
00607     :vartype TypeId: NodeId
00608     :ivar Body:
00609     :vartype Body: bytes
00610     """
00611 
00612     def __init__(self):
00613         self.TypeId = NodeId()
00614         self.Encoding = 0
00615         self.Body = b''
00616         self._freeze = True
00617 
00618     def to_binary(self):
00619         packet = []
00620         if self.Body:
00621             self.Encoding |= (1 << 0)
00622         packet.append(self.TypeId.to_binary())
00623         packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
00624         if self.Body:
00625             packet.append(uabin.Primitives.ByteString.pack(self.Body))
00626         return b''.join(packet)
00627 
00628     @staticmethod
00629     def from_binary(data):
00630         obj = ExtensionObject()
00631         obj.TypeId = NodeId.from_binary(data)
00632         obj.Encoding = uabin.Primitives.UInt8.unpack(data)
00633         if obj.Encoding & (1 << 0):
00634             obj.Body = uabin.Primitives.ByteString.unpack(data)
00635         return obj
00636 
00637     @staticmethod
00638     def from_object(obj):
00639         ext = ExtensionObject()
00640         oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
00641         ext.TypeId = FourByteNodeId(oid)
00642         ext.Body = obj.to_binary()
00643         return ext
00644 
00645     def __str__(self):
00646         return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
00647             'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
00648 
00649     __repr__ = __str__
00650 
00651 
00652 class VariantType(Enum):
00653     """
00654     The possible types of a variant.
00655 
00656     :ivar Null:
00657     :ivar Boolean:
00658     :ivar SByte:
00659     :ivar Byte:
00660     :ivar Int16:
00661     :ivar UInt16:
00662     :ivar Int32:
00663     :ivar UInt32:
00664     :ivar Int64:
00665     :ivar UInt64:
00666     :ivar Float:
00667     :ivar Double:
00668     :ivar String:
00669     :ivar DateTime:
00670     :ivar Guid:
00671     :ivar ByteString:
00672     :ivar XmlElement:
00673     :ivar NodeId:
00674     :ivar ExpandedNodeId:
00675     :ivar StatusCode:
00676     :ivar QualifiedName:
00677     :ivar LocalizedText:
00678     :ivar ExtensionObject:
00679     :ivar DataValue:
00680     :ivar Variant:
00681     :ivar DiagnosticInfo:
00682     """
00683 
00684     Null = 0
00685     Boolean = 1
00686     SByte = 2
00687     Byte = 3
00688     Int16 = 4
00689     UInt16 = 5
00690     Int32 = 6
00691     UInt32 = 7
00692     Int64 = 8
00693     UInt64 = 9
00694     Float = 10
00695     Double = 11
00696     String = 12
00697     DateTime = 13
00698     Guid = 14
00699     ByteString = 15
00700     XmlElement = 16
00701     NodeId = 17
00702     ExpandedNodeId = 18
00703     StatusCode = 19
00704     QualifiedName = 20
00705     LocalizedText = 21
00706     ExtensionObject = 22
00707     DataValue = 23
00708     Variant = 24
00709     DiagnosticInfo = 25
00710 
00711 
00712 class VariantTypeCustom(object):
00713     """
00714     Looks like sometime we get variant with other values than those
00715     defined in VariantType.
00716     FIXME: We should not need this class, as far as I iunderstand the spec
00717     variants can only be of VariantType
00718     """
00719 
00720     def __init__(self, val):
00721         self.name = "Custom"
00722         self.value = val
00723         if self.value > 0b00111111:
00724             raise UaError("Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
00725 
00726     def __str__(self):
00727         return "VariantType.Custom:{0}".format(self.value)
00728     __repr__ = __str__
00729 
00730     def __eq__(self, other):
00731         return self.value == other.value
00732 
00733 
00734 class Variant(FrozenClass):
00735     """
00736     Create an OPC-UA Variant object.
00737     if no argument a Null Variant is created.
00738     if not variant type is given, attemps to guess type from python type
00739     if a variant is given as value, the new objects becomes a copy of the argument
00740 
00741     :ivar Value:
00742     :vartype Value: Any supported type
00743     :ivar VariantType:
00744     :vartype VariantType: VariantType
00745     """
00746 
00747     def __init__(self, value=None, varianttype=None, dimensions=None):
00748         self.Value = value
00749         self.VariantType = varianttype
00750         self.Dimensions = dimensions
00751         self._freeze = True
00752         if isinstance(value, Variant):
00753             self.Value = value.Value
00754             self.VariantType = value.VariantType
00755         if self.VariantType is None:
00756             self.VariantType = self._guess_type(self.Value)
00757         if self.Value is None and self.VariantType not in (
00758                 VariantType.Null,
00759                 VariantType.String,
00760                 VariantType.DateTime):
00761             raise UaError("Variant of type {0} cannot have value None".format(self.VariantType))
00762         if self.Dimensions is None and type(self.Value) in (list, tuple):
00763             dims = get_shape(self.Value)
00764             if len(dims) > 1:
00765                 self.Dimensions = dims
00766 
00767     def __eq__(self, other):
00768         if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
00769             return True
00770         return False
00771 
00772     def __ne__(self, other):
00773         return not self.__eq__(other)
00774 
00775     def _guess_type(self, val):
00776         if isinstance(val, (list, tuple)):
00777             error_val = val
00778         while isinstance(val, (list, tuple)):
00779             if len(val) == 0:
00780                 raise UaError("could not guess UA type of variable {0}".format(error_val))
00781             val = val[0]
00782         if val is None:
00783             return VariantType.Null
00784         elif isinstance(val, bool):
00785             return VariantType.Boolean
00786         elif isinstance(val, float):
00787             return VariantType.Double
00788         elif isinstance(val, int):
00789             return VariantType.Int64
00790         elif type(val) in (str, unicode):
00791             return VariantType.String
00792         elif isinstance(val, bytes):
00793             return VariantType.ByteString
00794         elif isinstance(val, datetime):
00795             return VariantType.DateTime
00796         elif isinstance(val, uuid.UUID):
00797             return VariantType.Guid
00798         else:
00799             if isinstance(val, object):
00800                 try:
00801                     return getattr(VariantType, val.__class__.__name__)
00802                 except AttributeError:
00803                     return VariantType.ExtensionObject
00804             else:
00805                 raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
00806 
00807     def __str__(self):
00808         return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
00809     __repr__ = __str__
00810 
00811     def to_binary(self):
00812         b = []
00813         encoding = self.VariantType.value & 0b111111
00814         if type(self.Value) in (list, tuple):
00815             if self.Dimensions is not None:
00816                 encoding = uabin.set_bit(encoding, 6)
00817             encoding = uabin.set_bit(encoding, 7)
00818             b.append(uabin.Primitives.UInt8.pack(encoding))
00819             b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
00820             if self.Dimensions is not None:
00821                 b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
00822         else:
00823             b.append(uabin.Primitives.UInt8.pack(encoding))
00824             b.append(uabin.pack_uatype(self.VariantType, self.Value))
00825 
00826         return b"".join(b)
00827 
00828     @staticmethod
00829     def from_binary(data):
00830         dimensions = None
00831         encoding = ord(data.read(1))
00832         int_type = encoding & 0b00111111
00833         vtype = datatype_to_varianttype(int_type)
00834         if vtype == VariantType.Null:
00835             return Variant(None, vtype, encoding)
00836         if uabin.test_bit(encoding, 7):
00837             value = uabin.unpack_uatype_array(vtype, data)
00838         else:
00839             value = uabin.unpack_uatype(vtype, data)
00840         if uabin.test_bit(encoding, 6):
00841             dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
00842             value = reshape(value, dimensions)
00843 
00844         return Variant(value, vtype, dimensions)
00845 
00846 
00847 def reshape(flat, dims):
00848     subdims = dims[1:]
00849     subsize = 1
00850     for i in subdims:
00851         if i == 0:
00852             i = 1
00853         subsize *= i
00854     while dims[0] * subsize > len(flat):
00855         flat.append([])
00856     if not subdims or subdims == [0]:
00857         return flat
00858     return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
00859 
00860 
00861 def _split_list(l, n):
00862     n = max(1, n)
00863     return [l[i:i + n] for i in range(0, len(l), n)]
00864 
00865 
00866 def flatten_and_get_shape(mylist):
00867     dims = []
00868     dims.append(len(mylist))
00869     while isinstance(mylist[0], (list, tuple)):
00870         dims.append(len(mylist[0]))
00871         mylist = [item for sublist in mylist for item in sublist]
00872         if len(mylist) == 0:
00873             break
00874     return mylist, dims
00875 
00876 
00877 def flatten(mylist):
00878     if len(mylist) == 0:
00879         return mylist
00880     while isinstance(mylist[0], (list, tuple)):
00881         mylist = [item for sublist in mylist for item in sublist]
00882         if len(mylist) == 0:
00883             break
00884     return mylist
00885 
00886 
00887 def get_shape(mylist):
00888     dims = []
00889     while isinstance(mylist, (list, tuple)):
00890         dims.append(len(mylist))
00891         if len(mylist) == 0:
00892             break
00893         mylist = mylist[0]
00894     return dims
00895 
00896 
00897 class XmlElement(FrozenClass):
00898     """
00899     An XML element encoded as an UTF-8 string.
00900     """
00901 
00902     def __init__(self, binary=None):
00903         if binary is not None:
00904             self._binary_init(binary)
00905             self._freeze = True
00906             return
00907         self.Value = []
00908         self._freeze = True
00909 
00910     def to_binary(self):
00911         return uabin.Primitives.String.pack(self.Value)
00912 
00913     @staticmethod
00914     def from_binary(data):
00915         return XmlElement(data)
00916 
00917     def _binary_init(self, data):
00918         self.Value = uabin.Primitives.String.unpack(data)
00919 
00920     def __str__(self):
00921         return 'XmlElement(Value:' + str(self.Value) + ')'
00922 
00923     __repr__ = __str__
00924 
00925 
00926 class DataValue(FrozenClass):
00927     """
00928     A value with an associated timestamp, and quality.
00929     Automatically generated from xml , copied and modified here to fix errors in xml spec
00930 
00931     :ivar Value:
00932     :vartype Value: Variant
00933     :ivar StatusCode:
00934     :vartype StatusCode: StatusCode
00935     :ivar SourceTimestamp:
00936     :vartype SourceTimestamp: datetime
00937     :ivar SourcePicoSeconds:
00938     :vartype SourcePicoSeconds: int
00939     :ivar ServerTimestamp:
00940     :vartype ServerTimestamp: datetime
00941     :ivar ServerPicoseconds:
00942     :vartype ServerPicoseconds: int
00943     """
00944 
00945     def __init__(self, variant=None, status=None):
00946         self.Encoding = 0
00947         if not isinstance(variant, Variant):
00948             variant = Variant(variant)
00949         self.Value = variant
00950         if status is None:
00951             self.StatusCode = StatusCode()
00952         else:
00953             self.StatusCode = status
00954         self.SourceTimestamp = None  # DateTime()
00955         self.SourcePicoseconds = None
00956         self.ServerTimestamp = None  # DateTime()
00957         self.ServerPicoseconds = None
00958         self._freeze = True
00959 
00960     def to_binary(self):
00961         packet = []
00962         if self.Value:
00963             self.Encoding |= (1 << 0)
00964         if self.StatusCode:
00965             self.Encoding |= (1 << 1)
00966         if self.SourceTimestamp:
00967             self.Encoding |= (1 << 2)
00968         if self.ServerTimestamp:
00969             self.Encoding |= (1 << 3)
00970         if self.SourcePicoseconds:
00971             self.Encoding |= (1 << 4)
00972         if self.ServerPicoseconds:
00973             self.Encoding |= (1 << 5)
00974         packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
00975         if self.Value:
00976             packet.append(self.Value.to_binary())
00977         if self.StatusCode:
00978             packet.append(self.StatusCode.to_binary())
00979         if self.SourceTimestamp:
00980             packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
00981         if self.ServerTimestamp:
00982             packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
00983         if self.SourcePicoseconds:
00984             packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
00985         if self.ServerPicoseconds:
00986             packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
00987         return b''.join(packet)
00988 
00989     @staticmethod
00990     def from_binary(data):
00991         encoding = ord(data.read(1))
00992         if encoding & (1 << 0):
00993             value = Variant.from_binary(data)
00994         else:
00995             value = None
00996         if encoding & (1 << 1):
00997             status = StatusCode.from_binary(data)
00998         else:
00999             status = None
01000         obj = DataValue(value, status)
01001         obj.Encoding = encoding
01002         if obj.Encoding & (1 << 2):
01003             obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
01004         if obj.Encoding & (1 << 3):
01005             obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
01006         if obj.Encoding & (1 << 4):
01007             obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
01008         if obj.Encoding & (1 << 5):
01009             obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
01010         return obj
01011 
01012     def __str__(self):
01013         s = 'DataValue(Value:{0}'.format(self.Value)
01014         if self.StatusCode is not None:
01015             s += ', StatusCode:{0}'.format(self.StatusCode)
01016         if self.SourceTimestamp is not None:
01017             s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
01018         if self.ServerTimestamp is not None:
01019             s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
01020         if self.SourcePicoseconds is not None:
01021             s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
01022         if self.ServerPicoseconds is not None:
01023             s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
01024         s += ')'
01025         return s
01026 
01027     __repr__ = __str__
01028 
01029 
01030 def datatype_to_varianttype(int_type):
01031     """
01032     Takes a NodeId or int and return a VariantType
01033     This is only supported if int_type < 63 due to VariantType encoding
01034     At low level we do not have access to address space thus decoding is limited
01035     a better version of this method can be find in ua_utils.py
01036     """
01037     if isinstance(int_type, NodeId):
01038         int_type = int_type.Identifier
01039 
01040     if int_type <= 25:
01041         return VariantType(int_type)
01042     else:
01043         return VariantTypeCustom(int_type)
01044 
01045 
01046 def get_default_value(vtype):
01047     """
01048     Given a variant type return default value for this type
01049     """
01050     if vtype == VariantType.Null:
01051         return None
01052     elif vtype == VariantType.Boolean:
01053         return False
01054     elif vtype in (VariantType.SByte, VariantType.Byte, VariantType.ByteString):
01055         return b""
01056     elif 4 <= vtype.value <= 9:
01057         return 0
01058     elif vtype in (VariantType.Float, VariantType.Double):
01059         return 0.0
01060     elif vtype == VariantType.String:
01061         return None  # a string can be null
01062     elif vtype == VariantType.DateTime:
01063         return datetime.now()
01064     elif vtype == VariantType.Guid:
01065         return uuid.uuid4()
01066     elif vtype == VariantType.XmlElement:
01067         return None  #Not sure this is correct
01068     elif vtype == VariantType.NodeId:
01069         return NodeId()
01070     elif vtype == VariantType.ExpandedNodeId:
01071         return NodeId()
01072     elif vtype == VariantType.StatusCode:
01073         return StatusCode()
01074     elif vtype == VariantType.QualifiedName:
01075         return QualifiedName()
01076     elif vtype == VariantType.LocalizedText:
01077         return LocalizedText()
01078     elif vtype == VariantType.ExtensionObject:
01079         return ExtensionObject()
01080     elif vtype == VariantType.DataValue:
01081         return DataValue()
01082     elif vtype == VariantType.Variant:
01083         return Variant()
01084     else:
01085         raise RuntimeError("function take a uatype as argument, got:", vtype)
01086 
01087 


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:24