2 Binary protocol specific functions and constants 8 from datetime
import datetime, timedelta, tzinfo, MAXYEAR
9 from calendar
import timegm
15 if sys.version_info.major > 2:
18 logger = logging.getLogger(
'__name__')
20 EPOCH_AS_FILETIME = 116444736000000000
21 HUNDREDS_OF_NANOSECONDS = 10000000
22 FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
57 if (dt.tzinfo
is None)
or (dt.tzinfo.utcoffset(dt)
is None):
58 dt = dt.replace(tzinfo=
UTC())
59 ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
60 return ft + (dt.microsecond * 10)
65 return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
68 logger.warning(
"datetime overflow: %s", epch)
69 return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
73 return prefix + str(length) + fmtchar
77 return prefix + str(length) + chr(fmtchar)
80 if sys.version_info.major < 3:
81 build_array_format = build_array_format_py2
83 build_array_format = build_array_format_py3
90 return b
'\xff\xff\xff\xff' 92 b = [self.pack(val)
for val
in array]
93 b.insert(0, Primitives.Int32.pack(length))
96 length = Primitives.Int32.unpack(data)
102 return [self.unpack(data)
for _
in range(length)]
110 return Primitives.Int64.pack(epch)
114 epch = Primitives.Int64.unpack(data)
123 return Primitives.Int32.pack(-1)
124 if isinstance(string, unicode):
125 string = string.encode(
'utf-8')
127 return Primitives.Int32.pack(length) + string
131 b = _Bytes.unpack(data)
132 if sys.version_info.major < 3:
137 return b.decode(
"utf-8")
144 return _String.pack(data)
148 length = Primitives.Int32.unpack(data)
151 return data.read(length)
170 f1 = Primitives.UInt32.pack(guid.time_low)
171 f2 = Primitives.UInt16.pack(guid.time_mid)
172 f3 = Primitives.UInt16.pack(guid.time_hi_version)
173 f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
174 f4b = Primitives.Byte.pack(guid.clock_seq_low)
175 f4c = struct.pack(
'>Q', guid.node)[2:8]
185 f1 = struct.pack(
'>I', Primitives.UInt32.unpack(data))
186 f2 = struct.pack(
'>H', Primitives.UInt16.unpack(data))
187 f3 = struct.pack(
'>H', Primitives.UInt16.unpack(data))
190 b = f1 + f2 + f3 + f4
192 return uuid.UUID(bytes=b)
202 return struct.pack(self.
format, data)
205 return struct.unpack(self.
format, data.read(self.
size))[0]
250 return b
'\xff\xff\xff\xff' 253 b.insert(0, Primitives.Int32.pack(length))
258 if hasattr(Primitives, vtype.name):
259 return getattr(Primitives, vtype.name).pack(value)
260 elif vtype.value > 25:
261 return Primitives.Bytes.pack(value)
262 elif vtype.name ==
"ExtensionObject":
270 return value.to_binary()
271 except AttributeError:
272 raise UaError(
"{0} could not be packed with value {1}".format(vtype, value))
276 if hasattr(Primitives, vtype.name):
277 st = getattr(Primitives, vtype.name)
278 return st.unpack(data)
279 elif vtype.value > 25:
280 return Primitives.Bytes.unpack(data)
281 elif vtype.name ==
"ExtensionObject":
289 if hasattr(uatypes, vtype.name):
290 klass = getattr(uatypes, vtype.name)
291 return klass.from_binary(data)
293 raise UaError(
"can not unpack unknown vtype {0!s}".format(vtype))
297 if hasattr(Primitives, vtype.name):
298 st = getattr(Primitives, vtype.name)
299 return st.unpack_array(data)
301 length = Primitives.Int32.unpack(data)
def pack_array(self, array)
def build_array_format_py3(prefix, length, fmtchar)
def unset_bit(data, offset)
def unpack_array(self, data)
def build_array_format_py2(prefix, length, fmtchar)
def pack_uatype_array(vtype, array)
def win_epoch_to_datetime(epch)
def pack_uatype(vtype, value)
def extensionobject_from_binary(data)
def set_bit(data, offset)
def unpack_uatype(vtype, data)
def extensionobject_to_binary(obj)
def test_bit(data, offset)
def unpack_uatype_array(vtype, data)
def datetime_to_win_epoch(dt)