ua_binary.py
Go to the documentation of this file.
1 """
2 Binary protocol specific functions and constants
3 """
4 
5 import sys
6 import struct
7 import logging
8 from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 from calendar import timegm
10 import uuid
11 
12 from opcua.ua.uaerrors import UaError
13 
14 
15 if sys.version_info.major > 2:
16  unicode = str
17 
18 logger = logging.getLogger('__name__')
19 
20 EPOCH_AS_FILETIME = 116444736000000000 # January 1, 1970 as MS file time
21 HUNDREDS_OF_NANOSECONDS = 10000000
22 FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
23 
24 
25 def test_bit(data, offset):
26  mask = 1 << offset
27  return data & mask
28 
29 
30 def set_bit(data, offset):
31  mask = 1 << offset
32  return data | mask
33 
34 
35 def unset_bit(data, offset):
36  mask = 1 << offset
37  return data & ~mask
38 
39 
40 class UTC(tzinfo):
41  """
42  UTC
43  """
44 
45  def utcoffset(self, dt):
46  return timedelta(0)
47 
48  def tzname(self, dt):
49  return "UTC"
50 
51  def dst(self, dt):
52  return timedelta(0)
53 
54 
55 # method copied from David Buxton <david@gasmark6.com> sample code
57  if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
58  dt = dt.replace(tzinfo=UTC())
59  ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
60  return ft + (dt.microsecond * 10)
61 
62 
64  try:
65  return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
66  except OverflowError:
67  # FILETIMEs after 31 Dec 9999 can't be converted to datetime
68  logger.warning("datetime overflow: %s", epch)
69  return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
70 
71 
72 def build_array_format_py2(prefix, length, fmtchar):
73  return prefix + str(length) + fmtchar
74 
75 
76 def build_array_format_py3(prefix, length, fmtchar):
77  return prefix + str(length) + chr(fmtchar)
78 
79 
80 if sys.version_info.major < 3:
81  build_array_format = build_array_format_py2
82 else:
83  build_array_format = build_array_format_py3
84 
85 
86 class _Primitive(object):
87 
88  def pack_array(self, array):
89  if array is None:
90  return b'\xff\xff\xff\xff'
91  length = len(array)
92  b = [self.pack(val) for val in array]
93  b.insert(0, Primitives.Int32.pack(length))
94 
95  def unpack_array(self, data):
96  length = Primitives.Int32.unpack(data)
97  if length == -1:
98  return None
99  elif length == 0:
100  return []
101  else:
102  return [self.unpack(data) for _ in range(length)]
103 
104 
106 
107  @staticmethod
108  def pack(dt):
109  epch = datetime_to_win_epoch(dt)
110  return Primitives.Int64.pack(epch)
111 
112  @staticmethod
113  def unpack(data):
114  epch = Primitives.Int64.unpack(data)
115  return win_epoch_to_datetime(epch)
116 
117 
119 
120  @staticmethod
121  def pack(string):
122  if string is None:
123  return Primitives.Int32.pack(-1)
124  if isinstance(string, unicode):
125  string = string.encode('utf-8')
126  length = len(string)
127  return Primitives.Int32.pack(length) + string
128 
129  @staticmethod
130  def unpack(data):
131  b = _Bytes.unpack(data)
132  if sys.version_info.major < 3:
133  return b
134  else:
135  if b is None:
136  return b
137  return b.decode("utf-8")
138 
139 
141 
142  @staticmethod
143  def pack(data):
144  return _String.pack(data)
145 
146  @staticmethod
147  def unpack(data):
148  length = Primitives.Int32.unpack(data)
149  if length == -1:
150  return None
151  return data.read(length)
152 
153 
155 
156  @staticmethod
157  def pack(data):
158  return b""
159 
160  @staticmethod
161  def unpack(data):
162  return None
163 
164 
166 
167  @staticmethod
168  def pack(guid):
169  # convert python UUID 6 field format to OPC UA 4 field format
170  f1 = Primitives.UInt32.pack(guid.time_low)
171  f2 = Primitives.UInt16.pack(guid.time_mid)
172  f3 = Primitives.UInt16.pack(guid.time_hi_version)
173  f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
174  f4b = Primitives.Byte.pack(guid.clock_seq_low)
175  f4c = struct.pack('>Q', guid.node)[2:8] # no primitive .pack available for 6 byte int
176  f4 = f4a+f4b+f4c
177  # concat byte fields
178  b = f1+f2+f3+f4
179 
180  return b
181 
182  @staticmethod
183  def unpack(data):
184  # convert OPC UA 4 field format to python UUID bytes
185  f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
186  f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
187  f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
188  f4 = data.read(8)
189  # concat byte fields
190  b = f1 + f2 + f3 + f4
191 
192  return uuid.UUID(bytes=b)
193 
194 
196  def __init__(self, fmt):
197  self.struct = struct.Struct(fmt)
198  self.size = self.struct.size
199  self.format = self.struct.format
200 
201  def pack(self, data):
202  return struct.pack(self.format, data)
203 
204  def unpack(self, data):
205  return struct.unpack(self.format, data.read(self.size))[0]
206 
207  #def pack_array(self, array):
208  #"""
209  #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
210  #"""
211  #if array is None:
212  #return b'\xff\xff\xff\xff'
213  #length = len(array)
214  #if length == 0:
215  #return b'\x00\x00\x00\x00'
216  #if length == 1:
217  #return b'\x01\x00\x00\x00' + self.pack(array[0])
218  #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
219 
220 
221 class Primitives1(object):
222  Int8 = _Primitive1("<b")
223  SByte = Int8
224  Int16 = _Primitive1("<h")
225  Int32 = _Primitive1("<i")
226  Int64 = _Primitive1("<q")
227  UInt8 = _Primitive1("<B")
228  Char = UInt8
229  Byte = UInt8
230  UInt16 = _Primitive1("<H")
231  UInt32 = _Primitive1("<I")
232  UInt64 = _Primitive1("<Q")
233  Boolean = _Primitive1("<?")
234  Double = _Primitive1("<d")
235  Float = _Primitive1("<f")
236 
237 
239  Null = _Null()
240  String = _String()
241  Bytes = _Bytes()
242  ByteString = _Bytes()
243  CharArray = _Bytes()
244  DateTime = _DateTime()
245  Guid = _Guid()
246 
247 
248 def pack_uatype_array(vtype, array):
249  if array is None:
250  return b'\xff\xff\xff\xff'
251  length = len(array)
252  b = [pack_uatype(vtype, val) for val in array]
253  b.insert(0, Primitives.Int32.pack(length))
254  return b"".join(b)
255 
256 
257 def pack_uatype(vtype, value):
258  if hasattr(Primitives, vtype.name):
259  return getattr(Primitives, vtype.name).pack(value)
260  elif vtype.value > 25:
261  return Primitives.Bytes.pack(value)
262  elif vtype.name == "ExtensionObject":
263  # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
264  # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
265  # Using local import to avoid import loop
266  from opcua.ua.uaprotocol_auto import extensionobject_to_binary
267  return extensionobject_to_binary(value)
268  else:
269  try:
270  return value.to_binary()
271  except AttributeError:
272  raise UaError("{0} could not be packed with value {1}".format(vtype, value))
273 
274 
275 def unpack_uatype(vtype, data):
276  if hasattr(Primitives, vtype.name):
277  st = getattr(Primitives, vtype.name)
278  return st.unpack(data)
279  elif vtype.value > 25:
280  return Primitives.Bytes.unpack(data)
281  elif vtype.name == "ExtensionObject":
282  # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
283  # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
284  # Using local import to avoid import loop
285  from opcua.ua.uaprotocol_auto import extensionobject_from_binary
286  return extensionobject_from_binary(data)
287  else:
288  from opcua.ua import uatypes
289  if hasattr(uatypes, vtype.name):
290  klass = getattr(uatypes, vtype.name)
291  return klass.from_binary(data)
292  else:
293  raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
294 
295 
296 def unpack_uatype_array(vtype, data):
297  if hasattr(Primitives, vtype.name):
298  st = getattr(Primitives, vtype.name)
299  return st.unpack_array(data)
300  else:
301  length = Primitives.Int32.unpack(data)
302  if length == -1:
303  return None
304  else:
305  return [unpack_uatype(vtype, data) for _ in range(length)]
306 
307 
def pack_array(self, array)
Definition: ua_binary.py:88
def build_array_format_py3(prefix, length, fmtchar)
Definition: ua_binary.py:76
def utcoffset(self, dt)
Definition: ua_binary.py:45
def unset_bit(data, offset)
Definition: ua_binary.py:35
def unpack_array(self, data)
Definition: ua_binary.py:95
def build_array_format_py2(prefix, length, fmtchar)
Definition: ua_binary.py:72
def tzname(self, dt)
Definition: ua_binary.py:48
def pack_uatype_array(vtype, array)
Definition: ua_binary.py:248
def win_epoch_to_datetime(epch)
Definition: ua_binary.py:63
def pack_uatype(vtype, value)
Definition: ua_binary.py:257
def dst(self, dt)
Definition: ua_binary.py:51
def set_bit(data, offset)
Definition: ua_binary.py:30
def unpack_uatype(vtype, data)
Definition: ua_binary.py:275
def test_bit(data, offset)
Definition: ua_binary.py:25
def unpack_uatype_array(vtype, data)
Definition: ua_binary.py:296
def datetime_to_win_epoch(dt)
Definition: ua_binary.py:56


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44