22 _IS_PY3 = sys.version_info[0] >= 3
25 from io
import BytesIO
as StringIO
28 from cStringIO
import StringIO
30 from StringIO
import StringIO
46 CBOR_UINT8_FOLLOWS = 24
47 CBOR_UINT16_FOLLOWS = 25
48 CBOR_UINT32_FOLLOWS = 26
49 CBOR_UINT64_FOLLOWS = 27
54 CBOR_FALSE = (CBOR_7 | 20)
55 CBOR_TRUE = (CBOR_7 | 21)
56 CBOR_NULL = (CBOR_7 | 22)
57 CBOR_UNDEFINED = (CBOR_7 | 23)
59 CBOR_FLOAT16 = (CBOR_7 | 25)
60 CBOR_FLOAT32 = (CBOR_7 | 26)
61 CBOR_FLOAT64 = (CBOR_7 | 27)
63 CBOR_TAG_DATE_STRING = 0
64 CBOR_TAG_DATE_ARRAY = 1
66 CBOR_TAG_NEGBIGNUM = 3
69 CBOR_TAG_BASE64URL = 21
75 CBOR_TAG_BASE64URL = 33
79 CBOR_TAG_CBOR_FILEHEADER = 55799
81 _CBOR_TAG_BIGNUM_BYTES = struct.pack(
'B', CBOR_TAG | CBOR_TAG_BIGNUM)
85 "return bytes representing int val in CBOR" 89 return struct.pack(
'B', val)
91 return struct.pack(
'BB', CBOR_UINT8_FOLLOWS, val)
93 return struct.pack(
'!BH', CBOR_UINT16_FOLLOWS, val)
94 if val <= 0x0ffffffff:
95 return struct.pack(
'!BI', CBOR_UINT32_FOLLOWS, val)
96 if val <= 0x0ffffffffffffffff:
97 return struct.pack(
'!BQ', CBOR_UINT64_FOLLOWS, val)
99 return _CBOR_TAG_BIGNUM_BYTES +
_encode_type_num(CBOR_BYTES, len(outb)) + outb
108 out.insert(0, val & 0x0ff)
115 out.insert(0, chr(val & 0x0ff))
121 return struct.pack(
"!Bd", CBOR_FLOAT64, val)
124 _CBOR_TAG_NEGBIGNUM_BYTES = struct.pack(
'B', CBOR_TAG | CBOR_TAG_NEGBIGNUM)
128 """For some CBOR primary type [0..7] and an auxiliary unsigned number, return CBOR encoded bytes""" 131 return struct.pack(
'B', cbor_type | val)
133 return struct.pack(
'BB', cbor_type | CBOR_UINT8_FOLLOWS, val)
135 return struct.pack(
'!BH', cbor_type | CBOR_UINT16_FOLLOWS, val)
136 if val <= 0x0ffffffff:
137 return struct.pack(
'!BI', cbor_type | CBOR_UINT32_FOLLOWS, val)
138 if (((cbor_type == CBOR_NEGINT)
and (val <= 0x07fffffffffffffff))
or 139 ((cbor_type != CBOR_NEGINT)
and (val <= 0x0ffffffffffffffff))):
140 return struct.pack(
'!BQ', cbor_type | CBOR_UINT64_FOLLOWS, val)
141 if cbor_type != CBOR_NEGINT:
142 raise Exception(
"value too big for CBOR unsigned number: {0!r}".format(val))
144 return _CBOR_TAG_NEGBIGNUM_BYTES +
_encode_type_num(CBOR_BYTES, len(outb)) + outb
149 return isinstance(val, str)
152 return isinstance(val, unicode)
157 val = val.encode(
'utf8')
160 if (is_bytes)
or not (is_text ==
True):
167 parts = [
dumps(x, sort_keys=sort_keys)
for x
in arr]
168 return head + b
''.join(parts)
176 for k
in sorted(d.keys()):
178 parts.append(
dumps(k, sort_keys=sort_keys))
179 parts.append(
dumps(v, sort_keys=sort_keys))
181 for k,v
in d.items():
182 parts.append(
dumps(k, sort_keys=sort_keys))
183 parts.append(
dumps(v, sort_keys=sort_keys))
184 return b
''.join(parts)
190 for k
in sorted(d.iterkeys()):
192 parts.append(
dumps(k, sort_keys=sort_keys))
193 parts.append(
dumps(v, sort_keys=sort_keys))
195 for k,v
in d.iteritems():
196 parts.append(
dumps(k, sort_keys=sort_keys))
197 parts.append(
dumps(v, sort_keys=sort_keys))
198 return b
''.join(parts)
203 return struct.pack(
'B', CBOR_TRUE)
204 return struct.pack(
'B', CBOR_FALSE)
213 return isinstance(x, (str, bytes))
215 return isinstance(x, int)
218 return isinstance(x, (str, basestring, bytes, unicode))
220 return isinstance(x, (int, long))
225 return struct.pack(
'B', CBOR_NULL)
226 if isinstance(ob, bool):
230 if isinstance(ob, (list, tuple)):
233 if isinstance(ob, dict):
235 if isinstance(ob, float):
239 if isinstance(ob, Tag):
240 return dumps_tag(ob, sort_keys=sort_keys)
241 raise Exception(
"don't know how to cbor serialize object of type %s", type(ob))
245 def dump(obj, fp, sort_keys=False):
247 obj: Python object to serialize 248 fp: file-like object capable of .write(bytes) 252 blob =
dumps(obj, sort_keys=sort_keys)
262 return "Tag({0!r}, {1!r})".format(self.
tag, self.
value)
265 if not isinstance(other, Tag):
267 return (self.
tag == other.tag)
and (self.
value == other.value)
272 Parse CBOR bytes and return Python objects. 275 raise ValueError(
"got None for buffer to decode in loads")
282 Parse and return object from fp, a file-like object supporting .read(n) 292 tag = tb & CBOR_TYPE_MASK
293 tag_aux = tb & CBOR_INFO_BITS
296 elif tag_aux == CBOR_UINT8_FOLLOWS:
298 aux = struct.unpack_from(
"!B", data, 0)[0]
300 elif tag_aux == CBOR_UINT16_FOLLOWS:
302 aux = struct.unpack_from(
"!H", data, 0)[0]
304 elif tag_aux == CBOR_UINT32_FOLLOWS:
306 aux = struct.unpack_from(
"!I", data, 0)[0]
308 elif tag_aux == CBOR_UINT64_FOLLOWS:
310 aux = struct.unpack_from(
"!Q", data, 0)[0]
313 assert tag_aux == CBOR_VAR_FOLLOWS,
"bogus tag {0:02x}".format(tb)
316 return tag, tag_aux, aux, bytes_read
330 while tb != CBOR_BREAK:
331 (subob, sub_len) =
_loads_tb(fp, tb, limit, depth, returntags)
332 bytes_read += 1 + sub_len
335 return (ob, bytes_read + 1)
341 while tb != CBOR_BREAK:
342 (subk, sub_len) =
_loads_tb(fp, tb, limit, depth, returntags)
343 bytes_read += 1 + sub_len
344 (subv, sub_len) =
_loads(fp, limit, depth, returntags)
345 bytes_read += sub_len
348 return (ob, bytes_read + 1)
355 subob, subpos =
_loads(fp)
358 return ob, bytes_read
359 def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
367 return ob, bytes_read
369 def _loads_array(fp, limit, depth, returntags, aux, bytes_read):
371 for i
in xrange(aux):
372 subob, subpos =
_loads(fp)
375 return ob, bytes_read
376 def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
378 for i
in xrange(aux):
384 return ob, bytes_read
387 def _loads(fp, limit=None, depth=0, returntags=False):
388 "return (object, bytes read)" 389 if depth > _MAX_DEPTH:
390 raise Exception(
"hit CBOR loads recursion depth limit")
394 return _loads_tb(fp, tb, limit, depth, returntags)
396 def _loads_tb(fp, tb, limit=None, depth=0, returntags=False):
398 if tb == CBOR_FLOAT16:
400 hibyte, lowbyte = struct.unpack_from(
"BB", data, 0)
401 exp = (hibyte >> 2) & 0x1F
402 mant = ((hibyte & 0x03) << 8) | lowbyte
404 val = mant * (2.0 ** -24)
411 val = (mant + 1024.0) * (2 ** (exp - 25))
415 elif tb == CBOR_FLOAT32:
417 pf = struct.unpack_from(
"!f", data, 0)
419 elif tb == CBOR_FLOAT64:
421 pf = struct.unpack_from(
"!d", data, 0)
424 tag, tag_aux, aux, bytes_read =
_tag_aux(fp, tb)
427 return (aux, bytes_read)
428 elif tag == CBOR_NEGINT:
429 return (-1 - aux, bytes_read)
430 elif tag == CBOR_BYTES:
432 return (ob, bytes_read + subpos)
433 elif tag == CBOR_TEXT:
435 ob = raw.decode(
'utf8')
436 return (ob, bytes_read + subpos)
437 elif tag == CBOR_ARRAY:
440 return _loads_array(fp, limit, depth, returntags, aux, bytes_read)
441 elif tag == CBOR_MAP:
444 return _loads_map(fp, limit, depth, returntags, aux, bytes_read)
445 elif tag == CBOR_TAG:
454 return ob, bytes_read
457 return (
True, bytes_read)
459 return (
False, bytes_read)
461 return (
None, bytes_read)
462 if tb == CBOR_UNDEFINED:
463 return (
None, bytes_read)
464 raise ValueError(
"unknown cbor tag 7 byte: {:02x}".format(tb))
481 total_bytes_read += 1
483 tag, tag_aux, aux, bytes_read =
_tag_aux(fp, tb)
484 assert tag == btag,
'variable length value contains unexpected component' 487 total_bytes_read += bytes_read + aux
488 return (b
''.join(chunklist), total_bytes_read)
511 if aux == CBOR_TAG_DATE_STRING:
514 if aux == CBOR_TAG_DATE_ARRAY:
515 return datetime.datetime.utcfromtimestamp(ob)
516 if aux == CBOR_TAG_BIGNUM:
518 if aux == CBOR_TAG_NEGBIGNUM:
520 if aux == CBOR_TAG_REGEX:
522 return re.compile(ob)
def _loads_map(fp, limit, depth, returntags, aux, bytes_read)
def _bytes_to_biguint(bs)
def _loads_tb(fp, tb, limit=None, depth=0, returntags=False)
def dumps_array(arr, sort_keys=False)
def _loads_var_array(fp, limit, depth, returntags, bytes_read)
def dumps(ob, sort_keys=False)
def _encode_type_num(cbor_type, val)
def loads_bytes(fp, aux, btag=CBOR_BYTES)
def _loads(fp, limit=None, depth=0, returntags=False)
def dumps_tag(t, sort_keys=False)
def __init__(self, tag=None, value=None)
def _loads_var_map(fp, limit, depth, returntags, bytes_read)
def dumps_string(val, is_text=None, is_bytes=None)
def dump(obj, fp, sort_keys=False)
def _loads_array(fp, limit, depth, returntags, aux, bytes_read)
def _dumps_bignum_to_bytearray(val)
def dumps_dict(d, sort_keys=False)