2 from datetime
import datetime
7 Singleton class to hold lists of received fragments in one 'global' object 10 """ Implementation of the singleton interface """ 12 """ Test method, return singleton id """ 32 """ Create singleton instance """ 33 if ReceivedFragments.__instance
is None:
37 self.__dict__[
'_ReceivedFragments__instance'] = ReceivedFragments.__instance
40 """ Delegate access to implementation """ 44 """ Delegate access to implementation """ 49 fragment_timeout = 600
51 global received_fragments
56 Capability.__init__(self, protocol)
61 if self.protocol.parameters !=
None:
67 threading.Thread.__init__(self)
86 for id
in self.received_fragments.keys() :
90 log_msg = [
"fragment list ", str(id),
" timed out.."]
92 if message[
"id"] != id:
93 log_msg.append(
" -> removing it..")
96 log_msg.extend([
" -> but we're just about to add fragment #"])
97 log_msg.extend([str(message.get(
"num")),
" of "])
99 log_msg.extend([
" ..keeping the list"])
100 self.protocol.log(
"warning",
''.join(log_msg))
102 msg_opcode = message.get(
"op")
103 msg_id = message.get(
"id")
104 msg_num = message.get(
"num")
105 msg_total = message.get(
"total")
106 msg_data = message.get(
"data")
109 if ((msg_opcode ==
None)
or (msg_id ==
None)
or 110 (msg_num ==
None)
or (msg_total ==
None)
or 112 self.protocol.log(
"error",
"received invalid fragment!")
115 log_msg =
"fragment for messageID: " + str(msg_id) +
" received." 116 self.protocol.log(
"debug", log_msg)
119 if msg_id
not in self.received_fragments.keys():
121 "is_reconstructing":
False,
122 "total": message[
"total"],
123 "timestamp_last_append": now,
126 log_msg =
"opened new fragment list for messageID " + str(msg_id)
127 self.protocol.log(
"debug", log_msg)
138 log_msg = [
"appended fragment #" + str(msg_num)]
139 log_msg.extend([
" (total: ", str(msg_total),
") to fragment list for messageID ", str(msg_id)])
140 self.protocol.log(
"debug",
''.join(log_msg))
142 log_msg =
"error while trying to append fragment " + str(msg_num)
143 self.protocol.log(
"error", log_msg)
146 received_all_fragments =
False 151 if existing_fragments == announced_total:
152 log_msg = [
"enough/all fragments for messageID " + str(msg_id) +
" received"]
153 log_msg.extend([
" [", str(existing_fragments),
"]"])
154 log_msg =
''.join(log_msg)
155 self.protocol.log(
"debug", log_msg)
157 received_all_fragments =
True 158 for i
in range(0, announced_total):
160 received_all_fragments =
False 161 log_msg =
"fragment #" +str(i)
162 log_msg +=
" for messageID " + str(msg_id) +
" is missing! " 163 self.protocol.log(
"error", log_msg)
167 if received_all_fragments:
168 log_msg =
"reconstructing original message " + str(msg_id)
169 self.protocol.log(
"debug", log_msg)
172 reconstructed_msg =
''.join(self.
received_fragments[msg_id][
"fragment_list"].values())
174 log_msg = [
"reconstructed original message:\n"]
175 log_msg.append(reconstructed_msg)
176 log_msg =
''.join(log_msg)
177 self.protocol.log(
"debug", log_msg)
179 duration = datetime.now() - now
182 self.protocol.incoming(reconstructed_msg)
183 log_msg = [
"reconstructed message (ID:" + str(msg_id) +
") from "]
184 log_msg.extend([str(msg_total),
" fragments. "])
187 log_msg.extend([
"[duration: ", str(duration.total_seconds()),
" s]"])
188 log_msg =
''.join(log_msg)
189 self.protocol.log(
"info", log_msg)
193 log_msg =
"removed fragment list for messageID " + str(msg_id)
194 self.protocol.log(
"debug", log_msg)
198 self.protocol.unregister_operation(
"fragment")
def __setattr__(self, attr, value)
def __getattr__(self, attr)
def defragment(self, message)
def __init__(self, protocol)