2 from datetime
import datetime
7 Singleton class to hold lists of received fragments in one 'global' object
10 """ Implementation of the singleton interface """
12 """ Test method, return singleton id """
32 """ Create singleton instance """
33 if ReceivedFragments.__instance
is None:
37 self.__dict__[
'_ReceivedFragments__instance'] = ReceivedFragments.__instance
40 """ Delegate access to implementation """
44 """ Delegate access to implementation """
49 fragment_timeout = 600
51 global received_fragments
56 Capability.__init__(self, protocol)
67 threading.Thread.__init__(self)
90 log_msg = [
"fragment list ", str(id),
" timed out.."]
92 if message[
"id"] != id:
93 log_msg.append(
" -> removing it..")
96 log_msg.extend([
" -> but we're just about to add fragment #"])
97 log_msg.extend([str(message.get(
"num")),
" of "])
99 log_msg.extend([
" ..keeping the list"])
100 self.
protocol.log(
"warning",
''.join(log_msg))
102 msg_opcode = message.get(
"op")
103 msg_id = message.get(
"id")
104 msg_num = message.get(
"num")
105 msg_total = message.get(
"total")
106 msg_data = message.get(
"data")
109 if ((msg_opcode ==
None)
or (msg_id ==
None)
or
110 (msg_num ==
None)
or (msg_total ==
None)
or
112 self.
protocol.log(
"error",
"received invalid fragment!")
115 log_msg =
"fragment for messageID: " + str(msg_id) +
" received."
121 "is_reconstructing":
False,
122 "total": message[
"total"],
123 "timestamp_last_append": now,
126 log_msg =
"opened new fragment list for messageID " + str(msg_id)
138 log_msg = [
"appended fragment #" + str(msg_num)]
139 log_msg.extend([
" (total: ", str(msg_total),
") to fragment list for messageID ", str(msg_id)])
140 self.
protocol.log(
"debug",
''.join(log_msg))
142 log_msg =
"error while trying to append fragment " + str(msg_num)
146 received_all_fragments =
False
151 if existing_fragments == announced_total:
152 log_msg = [
"enough/all fragments for messageID " + str(msg_id) +
" received"]
153 log_msg.extend([
" [", str(existing_fragments),
"]"])
154 log_msg =
''.join(log_msg)
157 received_all_fragments =
True
158 for i
in range(0, announced_total):
160 received_all_fragments =
False
161 log_msg =
"fragment #" +str(i)
162 log_msg +=
" for messageID " + str(msg_id) +
" is missing! "
167 if received_all_fragments:
168 log_msg =
"reconstructing original message " + str(msg_id)
172 reconstructed_msg =
''.join(self.
received_fragments[msg_id][
"fragment_list"].values())
174 log_msg = [
"reconstructed original message:\n"]
175 log_msg.append(reconstructed_msg)
176 log_msg =
''.join(log_msg)
179 duration = datetime.now() - now
183 log_msg = [
"reconstructed message (ID:" + str(msg_id) +
") from "]
184 log_msg.extend([str(msg_total),
" fragments. "])
187 log_msg.extend([
"[duration: ", str(duration.total_seconds()),
" s]"])
188 log_msg =
''.join(log_msg)
193 log_msg =
"removed fragment list for messageID " + str(msg_id)
198 self.
protocol.unregister_operation(
"fragment")