Package nxt :: Module telegram

Source Code for Module nxt.telegram

  1  # nxt.telegram module -- LEGO Mindstorms NXT telegram formatting and parsing 
  2  # Copyright (C) 2006  Douglas P Lau 
  3  # Copyright (C) 2009  Marcus Wanner 
  4  # 
  5  # This program is free software: you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation, either version 3 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # This program is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14   
 15  'Used by nxt.system for sending telegrams to the NXT' 
 16   
 17  from cStringIO import StringIO 
 18  from struct import pack, unpack 
 19  import nxt.error 
 20   
21 -class InvalidReplyError(Exception):
22 pass
23
24 -class InvalidOpcodeError(Exception):
25 pass
26
27 -class Telegram(object):
28 29 TYPE = 0 # type byte offset 30 CODE = 1 # code byte offset 31 DATA = 2 # data byte offset 32 33 TYPE_NOT_DIRECT = 0x01 # system vs. direct type 34 TYPE_REPLY = 0x02 # reply type (from NXT brick) 35 TYPE_REPLY_NOT_REQUIRED = 0x80 # reply not required flag 36
37 - def __init__(self, direct=False, opcode=0, reply_req=True, pkt=None):
38 if pkt: 39 self.pkt = StringIO(pkt) 40 self.typ = self.parse_u8() 41 self.opcode = self.parse_u8() 42 if not self.is_reply(): 43 raise InvalidReplyError 44 if self.opcode != opcode: 45 raise InvalidOpcodeError, self.opcode 46 else: 47 self.pkt = StringIO() 48 typ = 0 49 if not direct: 50 typ |= Telegram.TYPE_NOT_DIRECT 51 if not reply_req: 52 typ |= Telegram.TYPE_REPLY_NOT_REQUIRED 53 self.add_u8(typ) 54 self.add_u8(opcode)
55
56 - def __str__(self):
57 return self.pkt.getvalue()
58
59 - def is_reply(self):
60 return self.typ == Telegram.TYPE_REPLY
61
62 - def add_string(self, n_bytes, v):
63 self.pkt.write(pack('%ds' % n_bytes, v))
64
65 - def add_filename(self, fname):
66 self.pkt.write(pack('20s', fname))
67
68 - def add_s8(self, v):
69 self.pkt.write(pack('<b', v))
70
71 - def add_u8(self, v):
72 self.pkt.write(pack('<B', v))
73
74 - def add_s16(self, v):
75 self.pkt.write(pack('<h', v))
76
77 - def add_u16(self, v):
78 self.pkt.write(pack('<H', v))
79
80 - def add_s32(self, v):
81 self.pkt.write(pack('<i', v))
82
83 - def add_u32(self, v):
84 self.pkt.write(pack('<I', v))
85
86 - def parse_string(self, n_bytes=0):
87 if n_bytes: 88 return unpack('%ss' % n_bytes, 89 self.pkt.read(n_bytes))[0] 90 else: 91 return self.pkt.read()
92
93 - def parse_s8(self):
94 return unpack('<b', self.pkt.read(1))[0]
95
96 - def parse_u8(self):
97 return unpack('<B', self.pkt.read(1))[0]
98
99 - def parse_s16(self):
100 return unpack('<h', self.pkt.read(2))[0]
101
102 - def parse_u16(self):
103 return unpack('<H', self.pkt.read(2))[0]
104
105 - def parse_s32(self):
106 return unpack('<i', self.pkt.read(4))[0]
107
108 - def parse_u32(self):
109 return unpack('<I', self.pkt.read(4))[0]
110
111 - def check_status(self):
113 114 import nxt.direct 115 import nxt.system 116 117 OPCODES = dict(nxt.system.OPCODES) 118 OPCODES.update(nxt.direct.OPCODES) 119