2 ******************************************************************* 3 Copyright (c) 2013, 2020 IBM Corp. 5 All rights reserved. This program and the accompanying materials 6 are made available under the terms of the Eclipse Public License v2.0 7 and Eclipse Distribution License v1.0 which accompany this distribution. 9 The Eclipse Public License is available at 10 https://www.eclipse.org/legal/epl-2.0/ 11 and the Eclipse Distribution License is available at 12 http://www.eclipse.org/org/documents/edl-v10.php. 15 Ian Craggs - initial implementation and/or documentation 16 Ian Craggs - add MQTTV5 support 17 ******************************************************************* 19 from __future__
import print_function
36 import SocketServer
as socketserver
37 import MQTTV3112
as MQTTV311
53 self.socket.shutdown(socket.SHUT_RDWR)
61 header1 = ord(self.socket.recv(1))
62 header2 = ord(self.socket.recv(1))
66 opcode = (header1 & 0x0f)
67 maskbit = (header2 & 0x80) == 0x80
68 length = (header2 & 0x7f)
70 lb1 = ord(self.socket.recv(1))
71 lb2 = ord(self.socket.recv(1))
72 length = lb1*256 + lb2
76 length += ord(self.socket.recv(1)) * 2**((7 - i)*8)
77 assert maskbit ==
True 79 mask = self.socket.recv(4)
80 mpayload = bytearray()
81 while len(mpayload) < length:
82 mpayload += self.socket.recv(length -
len(mpayload))
87 buffer.append(i ^ mask[mi])
97 out = self.
buffer[:bufsize]
101 out = self.
buffer[:bufsize]
105 self.socket.recv(bufsize -
len(self.
buffer))
110 return getattr(self.
socket, name)
120 """ If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are 123 header += bytearray([126, l // 256, l % 256])
125 """ If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the 126 most significant bit MUST be 0) are the payload length. 129 for i
in range(0, 7):
130 divisor = 2**((7 - i)*8)
131 mybytes.append(l // divisor)
134 header += bytearray(mybytes)
135 totaldata = header + data
137 sent = self.socket.send(totaldata)
138 while sent <
len(totaldata):
139 sent += self.socket.send(totaldata[sent:])
144 now = datetime.datetime.now()
145 return now.strftime(
'%Y%m%d %H%M%S')+str(float(
"."+str(now.microsecond)))[1:]
154 "return headers: keys are converted to upper case so that checks are case insensitive" 156 lines = data.splitlines()
157 for curline
in lines[1:]:
158 if curline.find(
":") != -1:
159 key, value = curline.split(
": ", 1)
160 headers[key.upper()] = value
164 GUID =
"258EAFA5-E914-47DA-95CA-C5AB0DC85B11" 165 data = client.recv(1024).
decode(
'utf-8')
167 digest = base64.b64encode(hashlib.sha1(
168 (headers[
'SEC-WEBSOCKET-KEY'] + GUID).
encode(
"utf-8")).digest())
169 resp = b
"HTTP/1.1 101 Switching Protocols\r\n" +\
170 b
"Upgrade: websocket\r\n" +\
171 b
"Connection: Upgrade\r\n" +\
172 b
"Sec-WebSocket-Protocol: mqtt\r\n" +\
173 b
"Sec-WebSocket-Accept: " + digest + b
"\r\n\r\n" 174 return client.send(resp)
178 if not hasattr(self,
"ids"):
180 if not hasattr(self,
"versions"):
187 sock_no = clients.fileno()
188 brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
189 brokers.connect((brokerhost, brokerport))
191 while inbuf !=
None and not terminated:
192 (i, o, e) = select.select([clients, brokers], [], [])
196 if s == clients
and s
not in suspended:
198 char = clients.recv(1)
199 clients.rebuffer(char)
202 clients.websockets =
True 203 print(
"Switching to websockets for socket %d" % sock_no)
204 inbuf = MQTT.getPacket(clients)
209 if inbuf[0] >> 4 == 1:
210 protocol_string = b
'MQTT' 211 pos = inbuf.find(protocol_string)
213 version = inbuf[pos +
214 len(protocol_string)]
219 packet = MQTT.unpackPacket(inbuf)
220 if hasattr(packet.fh,
"MessageType"):
221 packet_type = packet.fh.MessageType
222 publish_type = MQTT.PUBLISH
223 connect_type = MQTT.CONNECT
225 packet_type = packet.fh.PacketType
226 publish_type = MQTT.PacketTypes.PUBLISH
227 connect_type = MQTT.PacketTypes.CONNECT
228 if packet_type == publish_type
and \
229 packet.topicName ==
"MQTTSAS topic" and \
230 packet.data == b
"TERMINATE":
231 print(
"Terminating client", self.
ids[id(clients)])
236 elif packet_type == publish_type
and \
237 packet.topicName ==
"MQTTSAS topic" and \
238 packet.data == b
"TERMINATE_SERVER":
239 print(
"Suspending client ", self.
ids[id(clients)])
240 suspended.append(clients)
241 elif packet_type == connect_type:
243 ] = packet.ClientIdentifier
246 self.
ids[id(clients)], str(packet))
250 traceback.print_exc()
253 inbuf = MQTT.getPacket(brokers)
257 print(
timestamp(),
"S to C", self.
ids[id(clients)], str(MQTT.unpackPacket(inbuf)))
259 traceback.print_exc()
264 print(repr((i, o, e)), repr(inbuf))
265 traceback.print_exc()
266 if id(clients)
in self.ids.keys():
267 del self.
ids[id(clients)]
268 elif id(clients)
in self.versions.keys():
277 global brokerhost, brokerport
279 if len(sys.argv) > 1:
280 brokerhost = sys.argv[1]
282 brokerhost =
'127.0.0.1' 284 if len(sys.argv) > 2:
285 brokerport = int(sys.argv[2])
289 if len(sys.argv) > 3:
290 myport = int(sys.argv[3])
292 if brokerhost == myhost:
293 myport = brokerport + 1
297 print(
"Listening on port", str(myport)+
", broker on port", brokerport)
298 s = ThreadingTCPServer((
"127.0.0.1", myport), MyHandler)
302 if __name__ ==
"__main__":
def __init__(self, socket)
def handshake(self, client)
def getheaders(self, data)
void print(std::FILE *f, const S &format_str, Args &&...args)
def __getattr__(self, name)