00001 """
00002 Binary protocol specific functions and constants
00003 """
00004
00005 import sys
00006 import struct
00007 import logging
00008 from datetime import datetime, timedelta, tzinfo, MAXYEAR
00009 from calendar import timegm
00010 import uuid
00011
00012 from opcua.ua.uaerrors import UaError
00013
00014
00015 if sys.version_info.major > 2:
00016 unicode = str
00017
00018 logger = logging.getLogger('__name__')
00019
00020 EPOCH_AS_FILETIME = 116444736000000000
00021 HUNDREDS_OF_NANOSECONDS = 10000000
00022 FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
00023
00024
00025 def test_bit(data, offset):
00026 mask = 1 << offset
00027 return data & mask
00028
00029
00030 def set_bit(data, offset):
00031 mask = 1 << offset
00032 return data | mask
00033
00034
00035 def unset_bit(data, offset):
00036 mask = 1 << offset
00037 return data & ~mask
00038
00039
00040 class UTC(tzinfo):
00041 """
00042 UTC
00043 """
00044
00045 def utcoffset(self, dt):
00046 return timedelta(0)
00047
00048 def tzname(self, dt):
00049 return "UTC"
00050
00051 def dst(self, dt):
00052 return timedelta(0)
00053
00054
00055
00056 def datetime_to_win_epoch(dt):
00057 if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
00058 dt = dt.replace(tzinfo=UTC())
00059 ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
00060 return ft + (dt.microsecond * 10)
00061
00062
00063 def win_epoch_to_datetime(epch):
00064 try:
00065 return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
00066 except OverflowError:
00067
00068 logger.warning("datetime overflow: %s", epch)
00069 return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
00070
00071
00072 def build_array_format_py2(prefix, length, fmtchar):
00073 return prefix + str(length) + fmtchar
00074
00075
00076 def build_array_format_py3(prefix, length, fmtchar):
00077 return prefix + str(length) + chr(fmtchar)
00078
00079
00080 if sys.version_info.major < 3:
00081 build_array_format = build_array_format_py2
00082 else:
00083 build_array_format = build_array_format_py3
00084
00085
00086 class _Primitive(object):
00087
00088 def pack_array(self, array):
00089 if array is None:
00090 return b'\xff\xff\xff\xff'
00091 length = len(array)
00092 b = [self.pack(val) for val in array]
00093 b.insert(0, Primitives.Int32.pack(length))
00094
00095 def unpack_array(self, data):
00096 length = Primitives.Int32.unpack(data)
00097 if length == -1:
00098 return None
00099 elif length == 0:
00100 return []
00101 else:
00102 return [self.unpack(data) for _ in range(length)]
00103
00104
00105 class _DateTime(_Primitive):
00106
00107 @staticmethod
00108 def pack(dt):
00109 epch = datetime_to_win_epoch(dt)
00110 return Primitives.Int64.pack(epch)
00111
00112 @staticmethod
00113 def unpack(data):
00114 epch = Primitives.Int64.unpack(data)
00115 return win_epoch_to_datetime(epch)
00116
00117
00118 class _String(_Primitive):
00119
00120 @staticmethod
00121 def pack(string):
00122 if string is None:
00123 return Primitives.Int32.pack(-1)
00124 if isinstance(string, unicode):
00125 string = string.encode('utf-8')
00126 length = len(string)
00127 return Primitives.Int32.pack(length) + string
00128
00129 @staticmethod
00130 def unpack(data):
00131 b = _Bytes.unpack(data)
00132 if sys.version_info.major < 3:
00133 return b
00134 else:
00135 if b is None:
00136 return b
00137 return b.decode("utf-8")
00138
00139
00140 class _Bytes(_Primitive):
00141
00142 @staticmethod
00143 def pack(data):
00144 return _String.pack(data)
00145
00146 @staticmethod
00147 def unpack(data):
00148 length = Primitives.Int32.unpack(data)
00149 if length == -1:
00150 return None
00151 return data.read(length)
00152
00153
00154 class _Null(_Primitive):
00155
00156 @staticmethod
00157 def pack(data):
00158 return b""
00159
00160 @staticmethod
00161 def unpack(data):
00162 return None
00163
00164
00165 class _Guid(_Primitive):
00166
00167 @staticmethod
00168 def pack(guid):
00169
00170 f1 = Primitives.UInt32.pack(guid.time_low)
00171 f2 = Primitives.UInt16.pack(guid.time_mid)
00172 f3 = Primitives.UInt16.pack(guid.time_hi_version)
00173 f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
00174 f4b = Primitives.Byte.pack(guid.clock_seq_low)
00175 f4c = struct.pack('>Q', guid.node)[2:8]
00176 f4 = f4a+f4b+f4c
00177
00178 b = f1+f2+f3+f4
00179
00180 return b
00181
00182 @staticmethod
00183 def unpack(data):
00184
00185 f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
00186 f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
00187 f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
00188 f4 = data.read(8)
00189
00190 b = f1 + f2 + f3 + f4
00191
00192 return uuid.UUID(bytes=b)
00193
00194
00195 class _Primitive1(_Primitive):
00196 def __init__(self, fmt):
00197 self.struct = struct.Struct(fmt)
00198 self.size = self.struct.size
00199 self.format = self.struct.format
00200
00201 def pack(self, data):
00202 return struct.pack(self.format, data)
00203
00204 def unpack(self, data):
00205 return struct.unpack(self.format, data.read(self.size))[0]
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221 class Primitives1(object):
00222 Int8 = _Primitive1("<b")
00223 SByte = Int8
00224 Int16 = _Primitive1("<h")
00225 Int32 = _Primitive1("<i")
00226 Int64 = _Primitive1("<q")
00227 UInt8 = _Primitive1("<B")
00228 Char = UInt8
00229 Byte = UInt8
00230 UInt16 = _Primitive1("<H")
00231 UInt32 = _Primitive1("<I")
00232 UInt64 = _Primitive1("<Q")
00233 Boolean = _Primitive1("<?")
00234 Double = _Primitive1("<d")
00235 Float = _Primitive1("<f")
00236
00237
00238 class Primitives(Primitives1):
00239 Null = _Null()
00240 String = _String()
00241 Bytes = _Bytes()
00242 ByteString = _Bytes()
00243 CharArray = _Bytes()
00244 DateTime = _DateTime()
00245 Guid = _Guid()
00246
00247
00248 def pack_uatype_array(vtype, array):
00249 if array is None:
00250 return b'\xff\xff\xff\xff'
00251 length = len(array)
00252 b = [pack_uatype(vtype, val) for val in array]
00253 b.insert(0, Primitives.Int32.pack(length))
00254 return b"".join(b)
00255
00256
00257 def pack_uatype(vtype, value):
00258 if hasattr(Primitives, vtype.name):
00259 return getattr(Primitives, vtype.name).pack(value)
00260 elif vtype.value > 25:
00261 return Primitives.Bytes.pack(value)
00262 elif vtype.name == "ExtensionObject":
00263
00264
00265
00266 from opcua.ua.uaprotocol_auto import extensionobject_to_binary
00267 return extensionobject_to_binary(value)
00268 else:
00269 try:
00270 return value.to_binary()
00271 except AttributeError:
00272 raise UaError("{0} could not be packed with value {1}".format(vtype, value))
00273
00274
00275 def unpack_uatype(vtype, data):
00276 if hasattr(Primitives, vtype.name):
00277 st = getattr(Primitives, vtype.name)
00278 return st.unpack(data)
00279 elif vtype.value > 25:
00280 return Primitives.Bytes.unpack(data)
00281 elif vtype.name == "ExtensionObject":
00282
00283
00284
00285 from opcua.ua.uaprotocol_auto import extensionobject_from_binary
00286 return extensionobject_from_binary(data)
00287 else:
00288 from opcua.ua import uatypes
00289 if hasattr(uatypes, vtype.name):
00290 klass = getattr(uatypes, vtype.name)
00291 return klass.from_binary(data)
00292 else:
00293 raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
00294
00295
00296 def unpack_uatype_array(vtype, data):
00297 if hasattr(Primitives, vtype.name):
00298 st = getattr(Primitives, vtype.name)
00299 return st.unpack_array(data)
00300 else:
00301 length = Primitives.Int32.unpack(data)
00302 if length == -1:
00303 return None
00304 else:
00305 return [unpack_uatype(vtype, data) for _ in range(length)]
00306
00307