Go to the documentation of this file.00001
00002 import sys
00003 import socket
00004 import binascii
00005 from contextlib import closing
00006
00007 class PatliteState(object):
00008 """ This is a patlite state class """
00009
00010
00011 class Target(object):
00012 LIGHT_RED = 0
00013 LIGHT_YELLOW = 1
00014 LIGHT_GREEN = 2
00015 LIGHT_BLUE = 3
00016 LIGHT_WHITE = 4
00017 BUZZER = 5
00018 __slots__ = []
00019
00020
00021 class LightState(object):
00022 OFF = '\x00'
00023 ON = '\x01'
00024 FLASH_1 = '\x02'
00025 FLASH_2 = '\x03'
00026 REMAIN = '\x09'
00027 __slots__ = []
00028
00029
00030 class BuzzerState(object):
00031 OFF = '\x00'
00032 ON_1 = '\x01'
00033 ON_2 = '\x02'
00034 ON_3 = '\x03'
00035 ON_4 = '\x04'
00036 REMAIN = '\x09'
00037 __slots__ = []
00038
00039 def int2code(self, i):
00040 if i == 0: return '\x00'
00041 elif i == 1: return '\x01'
00042 elif i == 2: return '\x02'
00043 elif i == 3: return '\x03'
00044 elif i == 4: return '\x04'
00045 else: return '\x09'
00046
00047 def code2int(self, c):
00048 if c == '\x00': return 0
00049 elif c == '\x01': return 1
00050 elif c == '\x02': return 2
00051 elif c == '\x03': return 3
00052 elif c == '\x04':return 4
00053 else: return 9
00054
00055 def clear(self):
00056 self.state = \
00057 self.LightState.REMAIN + \
00058 self.LightState.REMAIN + \
00059 self.LightState.REMAIN + \
00060 self.LightState.REMAIN + \
00061 self.LightState.REMAIN + \
00062 self.BuzzerState.REMAIN
00063
00064 def __init__(self, state=None):
00065 if state is None:
00066 self.clear()
00067 else:
00068 self.state = state
00069
00070 def red(self, value):
00071 self.state = value + self.state[1:]
00072 return self.state
00073
00074 def yellow(self, value):
00075 self.state = self.state[0] + value + self.state[2:]
00076 return self.state
00077
00078 def green(self, value):
00079 self.state = self.state[:2] + value + self.state[3:]
00080 return self.state
00081
00082 def blue(self, value):
00083 self.state = self.state[:3] + value + self.state[4:]
00084 return self.state
00085
00086 def white(self, value):
00087 self.state = self.state[:4] + value + self.state[5:]
00088 return self.state
00089
00090 def buzzer(self, value):
00091 self.state = self.state[:5] + value
00092 return self.state
00093
00094 def set_from_int(self, target, state):
00095 if not self.is_valid(target, self.int2code(state)):
00096 raise
00097 self.state = self.state[:target] + self.int2code(state) + self.state[target+1:]
00098
00099 def is_valid(self, target, state):
00100
00101 return True
00102
00103 def __repr__(self):
00104 return "<PatliteState '%s'>" % (''.join(['%x ' % ord(s) for s in self.state]))
00105
00106
00107
00108 class Patlite(object):
00109 """ This is a controller for a network patlite """
00110
00111
00112 WRITE_HEADER = '\x58\x58\x53\x00\x00\x06'
00113
00114 ACK = '\x06'
00115 NAK = '\x15'
00116
00117
00118 READ = '\x58\x58\x47\x00\x00\x00'
00119
00120
00121 CLEAR = '\x58\x58\x43\x00\x00\x00'
00122
00123 def __init__(self, host, port=10000, timeout=1):
00124 self.host = host
00125 self.port = port
00126 self.timeout = timeout
00127 self.bufsize = 512
00128 self.sock = None
00129 self.state = PatliteState()
00130
00131 def __enter__(self):
00132 self.open()
00133
00134 def __exit__(self, exc_type, exc_value, traceback):
00135 self.close()
00136 return False
00137
00138 def open(self):
00139 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00140 self.sock.settimeout(self.timeout)
00141 return self.sock.connect((self.host, self.port))
00142
00143 def close(self):
00144 return self.sock.close()
00145
00146 def write(self, state=None):
00147 ret = None
00148 if state is None:
00149 state = self.state
00150 try:
00151 self.sock.sendall(self.WRITE_HEADER + state.state)
00152 ret = self.sock.recv(self.bufsize)
00153 except socket.timeout:
00154 print "timeout"
00155 if (ret != self.ACK):
00156 return False
00157 return True
00158
00159 def read(self):
00160 try:
00161 self.sock.sendall(self.READ)
00162 tmp_state = PatliteState(self.sock.recv(self.bufsize))
00163 except socket.timeout:
00164 print "timeout"
00165 return None
00166 return tmp_state
00167
00168 def clear(self):
00169 ret = None
00170 try:
00171 self.sock.sendall(self.CLEAR)
00172 ret = self.sock.recv(self.bufsize)
00173 except socket.timeout:
00174 print "timeout"
00175 return False
00176 if (ret != self.ACK):
00177 return False
00178 return True
00179
00180 def red(self, value):
00181 self.state.clear()
00182 self.state.red(value)
00183 return self.write()
00184
00185 def yellow(self, value):
00186 self.state.clear()
00187 self.state.yellow(value)
00188 return self.write()
00189
00190 def green(self, value):
00191 self.state.clear()
00192 self.state.green(value)
00193 return self.write()
00194
00195 def blue(self, value):
00196 self.state.clear()
00197 self.state.blue(value)
00198 return self.write()
00199
00200 def white(self, value):
00201 self.state.clear()
00202 self.state.white(value)
00203 return self.write()
00204
00205 def buzzer(self, value):
00206 self.state.clear()
00207 self.state.buzzer(value)
00208 return self.write()
00209
00210
00211 def test():
00212 patlite = Patlite("10.68.0.10")
00213 with patlite:
00214 patlite.red(patlite.state.LightState.ON)
00215 patlite.yellow(patlite.state.LightState.ON)
00216 patlite.green(patlite.state.LightState.ON)
00217
00218 if __name__ == '__main__':
00219 test()
00220