cbor.py
Go to the documentation of this file.
1 #!python
2 # -*- Python -*-
3 # Copyright 2014-2015 Brian Olson
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import datetime
18 import re
19 import struct
20 import sys
21 
22 _IS_PY3 = sys.version_info[0] >= 3
23 
24 if _IS_PY3:
25  from io import BytesIO as StringIO
26 else:
27  try:
28  from cStringIO import StringIO
29  except:
30  from StringIO import StringIO
31 
32 
33 CBOR_TYPE_MASK = 0xE0 # top 3 bits
34 CBOR_INFO_BITS = 0x1F # low 5 bits
35 
36 
37 CBOR_UINT = 0x00
38 CBOR_NEGINT = 0x20
39 CBOR_BYTES = 0x40
40 CBOR_TEXT = 0x60
41 CBOR_ARRAY = 0x80
42 CBOR_MAP = 0xA0
43 CBOR_TAG = 0xC0
44 CBOR_7 = 0xE0 # float and other types
45 
46 CBOR_UINT8_FOLLOWS = 24 # 0x18
47 CBOR_UINT16_FOLLOWS = 25 # 0x19
48 CBOR_UINT32_FOLLOWS = 26 # 0x1a
49 CBOR_UINT64_FOLLOWS = 27 # 0x1b
50 CBOR_VAR_FOLLOWS = 31 # 0x1f
51 
52 CBOR_BREAK = 0xFF
53 
54 CBOR_FALSE = (CBOR_7 | 20)
55 CBOR_TRUE = (CBOR_7 | 21)
56 CBOR_NULL = (CBOR_7 | 22)
57 CBOR_UNDEFINED = (CBOR_7 | 23) # js 'undefined' value
58 
59 CBOR_FLOAT16 = (CBOR_7 | 25)
60 CBOR_FLOAT32 = (CBOR_7 | 26)
61 CBOR_FLOAT64 = (CBOR_7 | 27)
62 
63 CBOR_TAG_DATE_STRING = 0 # RFC3339
64 CBOR_TAG_DATE_ARRAY = 1 # any number type follows, seconds since 1970-01-01T00:00:00 UTC
65 CBOR_TAG_BIGNUM = 2 # big endian byte string follows
66 CBOR_TAG_NEGBIGNUM = 3 # big endian byte string follows
67 CBOR_TAG_DECIMAL = 4 # [ 10^x exponent, number ]
68 CBOR_TAG_BIGFLOAT = 5 # [ 2^x exponent, number ]
69 CBOR_TAG_BASE64URL = 21
70 CBOR_TAG_BASE64 = 22
71 CBOR_TAG_BASE16 = 23
72 CBOR_TAG_CBOR = 24 # following byte string is embedded CBOR data
73 
74 CBOR_TAG_URI = 32
75 CBOR_TAG_BASE64URL = 33
76 CBOR_TAG_BASE64 = 34
77 CBOR_TAG_REGEX = 35
78 CBOR_TAG_MIME = 36 # following text is MIME message, headers, separators and all
79 CBOR_TAG_CBOR_FILEHEADER = 55799 # can open a file with 0xd9d9f7
80 
81 _CBOR_TAG_BIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_BIGNUM)
82 
83 
84 def dumps_int(val):
85  "return bytes representing int val in CBOR"
86  if val >= 0:
87  # CBOR_UINT is 0, so I'm lazy/efficient about not OR-ing it in.
88  if val <= 23:
89  return struct.pack('B', val)
90  if val <= 0x0ff:
91  return struct.pack('BB', CBOR_UINT8_FOLLOWS, val)
92  if val <= 0x0ffff:
93  return struct.pack('!BH', CBOR_UINT16_FOLLOWS, val)
94  if val <= 0x0ffffffff:
95  return struct.pack('!BI', CBOR_UINT32_FOLLOWS, val)
96  if val <= 0x0ffffffffffffffff:
97  return struct.pack('!BQ', CBOR_UINT64_FOLLOWS, val)
98  outb = _dumps_bignum_to_bytearray(val)
99  return _CBOR_TAG_BIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb
100  val = -1 - val
101  return _encode_type_num(CBOR_NEGINT, val)
102 
103 
104 if _IS_PY3:
106  out = []
107  while val > 0:
108  out.insert(0, val & 0x0ff)
109  val = val >> 8
110  return bytes(out)
111 else:
113  out = []
114  while val > 0:
115  out.insert(0, chr(val & 0x0ff))
116  val = val >> 8
117  return b''.join(out)
118 
119 
120 def dumps_float(val):
121  return struct.pack("!Bd", CBOR_FLOAT64, val)
122 
123 
124 _CBOR_TAG_NEGBIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_NEGBIGNUM)
125 
126 
127 def _encode_type_num(cbor_type, val):
128  """For some CBOR primary type [0..7] and an auxiliary unsigned number, return CBOR encoded bytes"""
129  assert val >= 0
130  if val <= 23:
131  return struct.pack('B', cbor_type | val)
132  if val <= 0x0ff:
133  return struct.pack('BB', cbor_type | CBOR_UINT8_FOLLOWS, val)
134  if val <= 0x0ffff:
135  return struct.pack('!BH', cbor_type | CBOR_UINT16_FOLLOWS, val)
136  if val <= 0x0ffffffff:
137  return struct.pack('!BI', cbor_type | CBOR_UINT32_FOLLOWS, val)
138  if (((cbor_type == CBOR_NEGINT) and (val <= 0x07fffffffffffffff)) or
139  ((cbor_type != CBOR_NEGINT) and (val <= 0x0ffffffffffffffff))):
140  return struct.pack('!BQ', cbor_type | CBOR_UINT64_FOLLOWS, val)
141  if cbor_type != CBOR_NEGINT:
142  raise Exception("value too big for CBOR unsigned number: {0!r}".format(val))
143  outb = _dumps_bignum_to_bytearray(val)
144  return _CBOR_TAG_NEGBIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb
145 
146 
147 if _IS_PY3:
148  def _is_unicode(val):
149  return isinstance(val, str)
150 else:
151  def _is_unicode(val):
152  return isinstance(val, unicode)
153 
154 
155 def dumps_string(val, is_text=None, is_bytes=None):
156  if _is_unicode(val):
157  val = val.encode('utf8')
158  is_text = True
159  is_bytes = False
160  if (is_bytes) or not (is_text == True):
161  return _encode_type_num(CBOR_BYTES, len(val)) + val
162  return _encode_type_num(CBOR_TEXT, len(val)) + val
163 
164 
165 def dumps_array(arr, sort_keys=False):
166  head = _encode_type_num(CBOR_ARRAY, len(arr))
167  parts = [dumps(x, sort_keys=sort_keys) for x in arr]
168  return head + b''.join(parts)
169 
170 
171 if _IS_PY3:
172  def dumps_dict(d, sort_keys=False):
173  head = _encode_type_num(CBOR_MAP, len(d))
174  parts = [head]
175  if sort_keys:
176  for k in sorted(d.keys()):
177  v = d[k]
178  parts.append(dumps(k, sort_keys=sort_keys))
179  parts.append(dumps(v, sort_keys=sort_keys))
180  else:
181  for k,v in d.items():
182  parts.append(dumps(k, sort_keys=sort_keys))
183  parts.append(dumps(v, sort_keys=sort_keys))
184  return b''.join(parts)
185 else:
186  def dumps_dict(d, sort_keys=False):
187  head = _encode_type_num(CBOR_MAP, len(d))
188  parts = [head]
189  if sort_keys:
190  for k in sorted(d.iterkeys()):
191  v = d[k]
192  parts.append(dumps(k, sort_keys=sort_keys))
193  parts.append(dumps(v, sort_keys=sort_keys))
194  else:
195  for k,v in d.iteritems():
196  parts.append(dumps(k, sort_keys=sort_keys))
197  parts.append(dumps(v, sort_keys=sort_keys))
198  return b''.join(parts)
199 
200 
201 def dumps_bool(b):
202  if b:
203  return struct.pack('B', CBOR_TRUE)
204  return struct.pack('B', CBOR_FALSE)
205 
206 
207 def dumps_tag(t, sort_keys=False):
208  return _encode_type_num(CBOR_TAG, t.tag) + dumps(t.value, sort_keys=sort_keys)
209 
210 
211 if _IS_PY3:
213  return isinstance(x, (str, bytes))
214  def _is_intish(x):
215  return isinstance(x, int)
216 else:
217  def _is_stringish(x):
218  return isinstance(x, (str, basestring, bytes, unicode))
219  def _is_intish(x):
220  return isinstance(x, (int, long))
221 
222 
223 def dumps(ob, sort_keys=False):
224  if ob is None:
225  return struct.pack('B', CBOR_NULL)
226  if isinstance(ob, bool):
227  return dumps_bool(ob)
228  if _is_stringish(ob):
229  return dumps_string(ob)
230  if isinstance(ob, (list, tuple)):
231  return dumps_array(ob, sort_keys=sort_keys)
232  # TODO: accept other enumerables and emit a variable length array
233  if isinstance(ob, dict):
234  return dumps_dict(ob, sort_keys=sort_keys)
235  if isinstance(ob, float):
236  return dumps_float(ob)
237  if _is_intish(ob):
238  return dumps_int(ob)
239  if isinstance(ob, Tag):
240  return dumps_tag(ob, sort_keys=sort_keys)
241  raise Exception("don't know how to cbor serialize object of type %s", type(ob))
242 
243 
244 # same basic signature as json.dump, but with no options (yet)
245 def dump(obj, fp, sort_keys=False):
246  """
247  obj: Python object to serialize
248  fp: file-like object capable of .write(bytes)
249  """
250  # this is kinda lame, but probably not inefficient for non-huge objects
251  # TODO: .write() to fp as we go as each inner object is serialized
252  blob = dumps(obj, sort_keys=sort_keys)
253  fp.write(blob)
254 
255 
256 class Tag(object):
257  def __init__(self, tag=None, value=None):
258  self.tag = tag
259  self.value = value
260 
261  def __repr__(self):
262  return "Tag({0!r}, {1!r})".format(self.tag, self.value)
263 
264  def __eq__(self, other):
265  if not isinstance(other, Tag):
266  return False
267  return (self.tag == other.tag) and (self.value == other.value)
268 
269 
270 def loads(data):
271  """
272  Parse CBOR bytes and return Python objects.
273  """
274  if data is None:
275  raise ValueError("got None for buffer to decode in loads")
276  fp = StringIO(data)
277  return _loads(fp)[0]
278 
279 
280 def load(fp):
281  """
282  Parse and return object from fp, a file-like object supporting .read(n)
283  """
284  return _loads(fp)[0]
285 
286 
287 _MAX_DEPTH = 100
288 
289 
290 def _tag_aux(fp, tb):
291  bytes_read = 1
292  tag = tb & CBOR_TYPE_MASK
293  tag_aux = tb & CBOR_INFO_BITS
294  if tag_aux <= 23:
295  aux = tag_aux
296  elif tag_aux == CBOR_UINT8_FOLLOWS:
297  data = fp.read(1)
298  aux = struct.unpack_from("!B", data, 0)[0]
299  bytes_read += 1
300  elif tag_aux == CBOR_UINT16_FOLLOWS:
301  data = fp.read(2)
302  aux = struct.unpack_from("!H", data, 0)[0]
303  bytes_read += 2
304  elif tag_aux == CBOR_UINT32_FOLLOWS:
305  data = fp.read(4)
306  aux = struct.unpack_from("!I", data, 0)[0]
307  bytes_read += 4
308  elif tag_aux == CBOR_UINT64_FOLLOWS:
309  data = fp.read(8)
310  aux = struct.unpack_from("!Q", data, 0)[0]
311  bytes_read += 8
312  else:
313  assert tag_aux == CBOR_VAR_FOLLOWS, "bogus tag {0:02x}".format(tb)
314  aux = None
315 
316  return tag, tag_aux, aux, bytes_read
317 
318 
319 def _read_byte(fp):
320  tb = fp.read(1)
321  if len(tb) == 0:
322  # I guess not all file-like objects do this
323  raise EOFError()
324  return ord(tb)
325 
326 
327 def _loads_var_array(fp, limit, depth, returntags, bytes_read):
328  ob = []
329  tb = _read_byte(fp)
330  while tb != CBOR_BREAK:
331  (subob, sub_len) = _loads_tb(fp, tb, limit, depth, returntags)
332  bytes_read += 1 + sub_len
333  ob.append(subob)
334  tb = _read_byte(fp)
335  return (ob, bytes_read + 1)
336 
337 
338 def _loads_var_map(fp, limit, depth, returntags, bytes_read):
339  ob = {}
340  tb = _read_byte(fp)
341  while tb != CBOR_BREAK:
342  (subk, sub_len) = _loads_tb(fp, tb, limit, depth, returntags)
343  bytes_read += 1 + sub_len
344  (subv, sub_len) = _loads(fp, limit, depth, returntags)
345  bytes_read += sub_len
346  ob[subk] = subv
347  tb = _read_byte(fp)
348  return (ob, bytes_read + 1)
349 
350 
351 if _IS_PY3:
352  def _loads_array(fp, limit, depth, returntags, aux, bytes_read):
353  ob = []
354  for i in range(aux):
355  subob, subpos = _loads(fp)
356  bytes_read += subpos
357  ob.append(subob)
358  return ob, bytes_read
359  def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
360  ob = {}
361  for i in range(aux):
362  subk, subpos = _loads(fp)
363  bytes_read += subpos
364  subv, subpos = _loads(fp)
365  bytes_read += subpos
366  ob[subk] = subv
367  return ob, bytes_read
368 else:
369  def _loads_array(fp, limit, depth, returntags, aux, bytes_read):
370  ob = []
371  for i in xrange(aux):
372  subob, subpos = _loads(fp)
373  bytes_read += subpos
374  ob.append(subob)
375  return ob, bytes_read
376  def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
377  ob = {}
378  for i in xrange(aux):
379  subk, subpos = _loads(fp)
380  bytes_read += subpos
381  subv, subpos = _loads(fp)
382  bytes_read += subpos
383  ob[subk] = subv
384  return ob, bytes_read
385 
386 
387 def _loads(fp, limit=None, depth=0, returntags=False):
388  "return (object, bytes read)"
389  if depth > _MAX_DEPTH:
390  raise Exception("hit CBOR loads recursion depth limit")
391 
392  tb = _read_byte(fp)
393 
394  return _loads_tb(fp, tb, limit, depth, returntags)
395 
396 def _loads_tb(fp, tb, limit=None, depth=0, returntags=False):
397  # Some special cases of CBOR_7 best handled by special struct.unpack logic here
398  if tb == CBOR_FLOAT16:
399  data = fp.read(2)
400  hibyte, lowbyte = struct.unpack_from("BB", data, 0)
401  exp = (hibyte >> 2) & 0x1F
402  mant = ((hibyte & 0x03) << 8) | lowbyte
403  if exp == 0:
404  val = mant * (2.0 ** -24)
405  elif exp == 31:
406  if mant == 0:
407  val = float('Inf')
408  else:
409  val = float('NaN')
410  else:
411  val = (mant + 1024.0) * (2 ** (exp - 25))
412  if hibyte & 0x80:
413  val = -1.0 * val
414  return (val, 3)
415  elif tb == CBOR_FLOAT32:
416  data = fp.read(4)
417  pf = struct.unpack_from("!f", data, 0)
418  return (pf[0], 5)
419  elif tb == CBOR_FLOAT64:
420  data = fp.read(8)
421  pf = struct.unpack_from("!d", data, 0)
422  return (pf[0], 9)
423 
424  tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb)
425 
426  if tag == CBOR_UINT:
427  return (aux, bytes_read)
428  elif tag == CBOR_NEGINT:
429  return (-1 - aux, bytes_read)
430  elif tag == CBOR_BYTES:
431  ob, subpos = loads_bytes(fp, aux)
432  return (ob, bytes_read + subpos)
433  elif tag == CBOR_TEXT:
434  raw, subpos = loads_bytes(fp, aux, btag=CBOR_TEXT)
435  ob = raw.decode('utf8')
436  return (ob, bytes_read + subpos)
437  elif tag == CBOR_ARRAY:
438  if aux is None:
439  return _loads_var_array(fp, limit, depth, returntags, bytes_read)
440  return _loads_array(fp, limit, depth, returntags, aux, bytes_read)
441  elif tag == CBOR_MAP:
442  if aux is None:
443  return _loads_var_map(fp, limit, depth, returntags, bytes_read)
444  return _loads_map(fp, limit, depth, returntags, aux, bytes_read)
445  elif tag == CBOR_TAG:
446  ob, subpos = _loads(fp)
447  bytes_read += subpos
448  if returntags:
449  # Don't interpret the tag, return it and the tagged object.
450  ob = Tag(aux, ob)
451  else:
452  # attempt to interpet the tag and the value into a Python object.
453  ob = tagify(ob, aux)
454  return ob, bytes_read
455  elif tag == CBOR_7:
456  if tb == CBOR_TRUE:
457  return (True, bytes_read)
458  if tb == CBOR_FALSE:
459  return (False, bytes_read)
460  if tb == CBOR_NULL:
461  return (None, bytes_read)
462  if tb == CBOR_UNDEFINED:
463  return (None, bytes_read)
464  raise ValueError("unknown cbor tag 7 byte: {:02x}".format(tb))
465 
466 
467 def loads_bytes(fp, aux, btag=CBOR_BYTES):
468  # TODO: limit to some maximum number of chunks and some maximum total bytes
469  if aux is not None:
470  # simple case
471  ob = fp.read(aux)
472  return (ob, aux)
473  # read chunks of bytes
474  chunklist = []
475  total_bytes_read = 0
476  while True:
477  tb = fp.read(1)[0]
478  if not _IS_PY3:
479  tb = ord(tb)
480  if tb == CBOR_BREAK:
481  total_bytes_read += 1
482  break
483  tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb)
484  assert tag == btag, 'variable length value contains unexpected component'
485  ob = fp.read(aux)
486  chunklist.append(ob)
487  total_bytes_read += bytes_read + aux
488  return (b''.join(chunklist), total_bytes_read)
489 
490 
491 if _IS_PY3:
493  out = 0
494  for ch in bs:
495  out = out << 8
496  out = out | ch
497  return out
498 else:
499  def _bytes_to_biguint(bs):
500  out = 0
501  for ch in bs:
502  out = out << 8
503  out = out | ord(ch)
504  return out
505 
506 
507 def tagify(ob, aux):
508  # TODO: make this extensible?
509  # cbor.register_tag_handler(tagnumber, tag_handler)
510  # where tag_handler takes (tagnumber, tagged_object)
511  if aux == CBOR_TAG_DATE_STRING:
512  # TODO: parse RFC3339 date string
513  pass
514  if aux == CBOR_TAG_DATE_ARRAY:
515  return datetime.datetime.utcfromtimestamp(ob)
516  if aux == CBOR_TAG_BIGNUM:
517  return _bytes_to_biguint(ob)
518  if aux == CBOR_TAG_NEGBIGNUM:
519  return -1 - _bytes_to_biguint(ob)
520  if aux == CBOR_TAG_REGEX:
521  # Is this actually a good idea? Should we just return the tag and the raw value to the user somehow?
522  return re.compile(ob)
523  return Tag(aux, ob)
rosbridge_library.util.cbor.load
def load(fp)
Definition: cbor.py:280
rosbridge_library.util.cbor._loads_tb
def _loads_tb(fp, tb, limit=None, depth=0, returntags=False)
Definition: cbor.py:396
rosbridge_library.util.cbor.Tag
Definition: cbor.py:256
rosbridge_library.util.cbor._loads
def _loads(fp, limit=None, depth=0, returntags=False)
Definition: cbor.py:387
rosbridge_library.util.cbor.dumps_float
def dumps_float(val)
Definition: cbor.py:120
rosbridge_library.util.cbor.tagify
def tagify(ob, aux)
Definition: cbor.py:507
rosbridge_library.util.cbor.loads
def loads(data)
Definition: cbor.py:270
rosbridge_library.util.cbor._tag_aux
def _tag_aux(fp, tb)
Definition: cbor.py:290
rosbridge_library.util.cbor._is_stringish
def _is_stringish(x)
Definition: cbor.py:212
rosbridge_library.util.cbor._encode_type_num
def _encode_type_num(cbor_type, val)
Definition: cbor.py:127
rosbridge_library.util.cbor.dumps_dict
def dumps_dict(d, sort_keys=False)
Definition: cbor.py:172
rosbridge_library.util.cbor.Tag.__eq__
def __eq__(self, other)
Definition: cbor.py:264
rosbridge_library.util.cbor.dumps
def dumps(ob, sort_keys=False)
Definition: cbor.py:223
rosbridge_library.util.cbor._read_byte
def _read_byte(fp)
Definition: cbor.py:319
rosbridge_library.util.cbor.Tag.__init__
def __init__(self, tag=None, value=None)
Definition: cbor.py:257
rosbridge_library.util.cbor.dumps_bool
def dumps_bool(b)
Definition: cbor.py:201
rosbridge_library.util.cbor.dumps_string
def dumps_string(val, is_text=None, is_bytes=None)
Definition: cbor.py:155
rosbridge_library.util.cbor.dumps_array
def dumps_array(arr, sort_keys=False)
Definition: cbor.py:165
rosbridge_library.util.cbor.Tag.value
value
Definition: cbor.py:259
rosbridge_library.util.cbor.dump
def dump(obj, fp, sort_keys=False)
Definition: cbor.py:245
rosbridge_library.util.cbor._dumps_bignum_to_bytearray
def _dumps_bignum_to_bytearray(val)
Definition: cbor.py:105
rosbridge_library.util.cbor._is_unicode
def _is_unicode(val)
Definition: cbor.py:148
rosbridge_library.util.cbor._bytes_to_biguint
def _bytes_to_biguint(bs)
Definition: cbor.py:492
rosbridge_library.util.cbor.loads_bytes
def loads_bytes(fp, aux, btag=CBOR_BYTES)
Definition: cbor.py:467
rosbridge_library.util.cbor._loads_array
def _loads_array(fp, limit, depth, returntags, aux, bytes_read)
Definition: cbor.py:352
rosbridge_library.util.cbor._loads_var_array
def _loads_var_array(fp, limit, depth, returntags, bytes_read)
Definition: cbor.py:327
rosbridge_library.util.cbor._loads_map
def _loads_map(fp, limit, depth, returntags, aux, bytes_read)
Definition: cbor.py:359
rosbridge_library.util.cbor.Tag.tag
tag
Definition: cbor.py:258
rosbridge_library.util.cbor._loads_var_map
def _loads_var_map(fp, limit, depth, returntags, bytes_read)
Definition: cbor.py:338
rosbridge_library.util.cbor.Tag.__repr__
def __repr__(self)
Definition: cbor.py:261
rosbridge_library.util.cbor._is_intish
def _is_intish(x)
Definition: cbor.py:214
rosbridge_library.util.cbor.dumps_tag
def dumps_tag(t, sort_keys=False)
Definition: cbor.py:207
rosbridge_library.util.cbor.dumps_int
def dumps_int(val)
Definition: cbor.py:84


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:45