ua_binary.py
Go to the documentation of this file.
00001 """
00002 Binary protocol specific functions and constants
00003 """
00004 
00005 import sys
00006 import struct
00007 import logging
00008 from datetime import datetime, timedelta, tzinfo, MAXYEAR
00009 from calendar import timegm
00010 import uuid
00011 
00012 from opcua.ua.uaerrors import UaError
00013 
00014 
00015 if sys.version_info.major > 2:
00016     unicode = str
00017 
00018 logger = logging.getLogger('__name__')
00019 
00020 EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
00021 HUNDREDS_OF_NANOSECONDS = 10000000
00022 FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
00023 
00024 
00025 def test_bit(data, offset):
00026     mask = 1 << offset
00027     return data & mask
00028 
00029 
00030 def set_bit(data, offset):
00031     mask = 1 << offset
00032     return data | mask
00033 
00034 
00035 def unset_bit(data, offset):
00036     mask = 1 << offset
00037     return data & ~mask
00038 
00039 
00040 class UTC(tzinfo):
00041     """
00042     UTC
00043     """
00044 
00045     def utcoffset(self, dt):
00046         return timedelta(0)
00047 
00048     def tzname(self, dt):
00049         return "UTC"
00050 
00051     def dst(self, dt):
00052         return timedelta(0)
00053 
00054 
00055 # method copied from David Buxton <david@gasmark6.com> sample code
00056 def datetime_to_win_epoch(dt):
00057     if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
00058         dt = dt.replace(tzinfo=UTC())
00059     ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
00060     return ft + (dt.microsecond * 10)
00061 
00062 
00063 def win_epoch_to_datetime(epch):
00064     try:
00065         return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
00066     except OverflowError:
00067         # FILETIMEs after 31 Dec 9999 can't be converted to datetime
00068         logger.warning("datetime overflow: %s", epch)
00069         return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
00070 
00071 
00072 def build_array_format_py2(prefix, length, fmtchar):
00073     return prefix + str(length) + fmtchar
00074 
00075 
00076 def build_array_format_py3(prefix, length, fmtchar):
00077     return prefix + str(length) + chr(fmtchar)
00078 
00079 
00080 if sys.version_info.major < 3:
00081     build_array_format = build_array_format_py2
00082 else:
00083     build_array_format = build_array_format_py3
00084 
00085 
00086 class _Primitive(object):
00087 
00088     def pack_array(self, array):
00089         if array is None:
00090             return b'\xff\xff\xff\xff'
00091         length = len(array)
00092         b = [self.pack(val) for val in array]
00093         b.insert(0, Primitives.Int32.pack(length))
00094 
00095     def unpack_array(self, data):
00096         length = Primitives.Int32.unpack(data)
00097         if length == -1:
00098             return None
00099         elif length == 0:
00100             return []
00101         else:
00102             return [self.unpack(data) for _ in range(length)]
00103 
00104 
00105 class _DateTime(_Primitive):
00106 
00107     @staticmethod
00108     def pack(dt):
00109         epch = datetime_to_win_epoch(dt)
00110         return Primitives.Int64.pack(epch)
00111 
00112     @staticmethod
00113     def unpack(data):
00114         epch = Primitives.Int64.unpack(data)
00115         return win_epoch_to_datetime(epch)
00116 
00117 
00118 class _String(_Primitive):
00119 
00120     @staticmethod
00121     def pack(string):
00122         if string is None:
00123             return Primitives.Int32.pack(-1)
00124         if isinstance(string, unicode):
00125             string = string.encode('utf-8')
00126         length = len(string)
00127         return Primitives.Int32.pack(length) + string
00128 
00129     @staticmethod
00130     def unpack(data):
00131         b = _Bytes.unpack(data)
00132         if sys.version_info.major < 3:
00133             return b
00134         else:
00135             if b is None:
00136                 return b
00137             return b.decode("utf-8")
00138 
00139 
00140 class _Bytes(_Primitive):
00141 
00142     @staticmethod
00143     def pack(data):
00144         return _String.pack(data)
00145 
00146     @staticmethod
00147     def unpack(data):
00148         length = Primitives.Int32.unpack(data)
00149         if length == -1:
00150             return None
00151         return data.read(length)
00152 
00153 
00154 class _Null(_Primitive):
00155 
00156     @staticmethod
00157     def pack(data):
00158         return b""
00159 
00160     @staticmethod
00161     def unpack(data):
00162         return None
00163 
00164 
00165 class _Guid(_Primitive):
00166 
00167     @staticmethod
00168     def pack(guid):
00169         # convert python UUID 6 field format to OPC UA 4 field format
00170         f1 = Primitives.UInt32.pack(guid.time_low)
00171         f2 = Primitives.UInt16.pack(guid.time_mid)
00172         f3 = Primitives.UInt16.pack(guid.time_hi_version)
00173         f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
00174         f4b = Primitives.Byte.pack(guid.clock_seq_low)
00175         f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
00176         f4 = f4a+f4b+f4c
00177         # concat byte fields
00178         b = f1+f2+f3+f4
00179 
00180         return b
00181 
00182     @staticmethod
00183     def unpack(data):
00184         # convert OPC UA 4 field format to python UUID bytes
00185         f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
00186         f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
00187         f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
00188         f4 = data.read(8)
00189         # concat byte fields
00190         b = f1 + f2 + f3 + f4
00191 
00192         return uuid.UUID(bytes=b)
00193 
00194 
00195 class _Primitive1(_Primitive):
00196     def __init__(self, fmt):
00197         self.struct = struct.Struct(fmt)
00198         self.size = self.struct.size
00199         self.format = self.struct.format
00200 
00201     def pack(self, data):
00202         return struct.pack(self.format, data)
00203 
00204     def unpack(self, data):
00205         return struct.unpack(self.format, data.read(self.size))[0]
00206     
00207     #def pack_array(self, array):
00208         #"""
00209         #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
00210         #"""
00211         #if array is None:
00212             #return b'\xff\xff\xff\xff'
00213         #length = len(array)
00214         #if length == 0:
00215             #return b'\x00\x00\x00\x00'
00216         #if length == 1:
00217             #return b'\x01\x00\x00\x00' + self.pack(array[0])
00218         #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
00219 
00220 
00221 class Primitives1(object):
00222     Int8 = _Primitive1("<b")
00223     SByte = Int8
00224     Int16 = _Primitive1("<h")
00225     Int32 = _Primitive1("<i")
00226     Int64 = _Primitive1("<q")
00227     UInt8 = _Primitive1("<B")
00228     Char = UInt8
00229     Byte = UInt8
00230     UInt16 = _Primitive1("<H")
00231     UInt32 = _Primitive1("<I")
00232     UInt64 = _Primitive1("<Q")
00233     Boolean = _Primitive1("<?")
00234     Double = _Primitive1("<d")
00235     Float = _Primitive1("<f")
00236 
00237 
00238 class Primitives(Primitives1):
00239     Null = _Null()
00240     String = _String()
00241     Bytes = _Bytes()
00242     ByteString = _Bytes()
00243     CharArray = _Bytes()
00244     DateTime = _DateTime()
00245     Guid = _Guid()
00246 
00247 
00248 def pack_uatype_array(vtype, array):
00249     if array is None:
00250         return b'\xff\xff\xff\xff'
00251     length = len(array)
00252     b = [pack_uatype(vtype, val) for val in array]
00253     b.insert(0, Primitives.Int32.pack(length))
00254     return b"".join(b)
00255 
00256 
00257 def pack_uatype(vtype, value):
00258     if hasattr(Primitives, vtype.name):
00259         return getattr(Primitives, vtype.name).pack(value)
00260     elif vtype.value > 25:
00261         return Primitives.Bytes.pack(value)
00262     elif vtype.name == "ExtensionObject":
00263         # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
00264         # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
00265         # Using local import to avoid import loop
00266         from opcua.ua.uaprotocol_auto import extensionobject_to_binary
00267         return extensionobject_to_binary(value)
00268     else:
00269         try:
00270             return value.to_binary()
00271         except AttributeError:
00272             raise UaError("{0} could not be packed with value {1}".format(vtype, value))
00273 
00274 
00275 def unpack_uatype(vtype, data):
00276     if hasattr(Primitives, vtype.name):
00277         st = getattr(Primitives, vtype.name)
00278         return st.unpack(data)
00279     elif vtype.value > 25:
00280         return Primitives.Bytes.unpack(data)
00281     elif vtype.name == "ExtensionObject":
00282         # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
00283         # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
00284         # Using local import to avoid import loop
00285         from opcua.ua.uaprotocol_auto import extensionobject_from_binary
00286         return extensionobject_from_binary(data)
00287     else:
00288         from opcua.ua import uatypes
00289         if hasattr(uatypes, vtype.name):
00290             klass = getattr(uatypes, vtype.name)
00291             return klass.from_binary(data)
00292         else:
00293             raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
00294 
00295 
00296 def unpack_uatype_array(vtype, data):
00297     if hasattr(Primitives, vtype.name):
00298         st = getattr(Primitives, vtype.name)
00299         return st.unpack_array(data)
00300     else:
00301         length = Primitives.Int32.unpack(data)
00302         if length == -1:
00303             return None
00304         else:
00305             return [unpack_uatype(vtype, data) for _ in range(length)]
00306 
00307 


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23