Go to the documentation of this file.00001 from rosbridge_library.capability import Capability
00002 from datetime import datetime
00003 import threading
00004
00005 class ReceivedFragments():
00006 """
00007 Singleton class to hold lists of received fragments in one 'global' object
00008 """
00009 class __impl:
00010 """ Implementation of the singleton interface """
00011 def spam(self):
00012 """ Test method, return singleton id """
00013 return id(self)
00014
00015 __instance = None
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 lists = {}
00030
00031 def __init__(self):
00032 """ Create singleton instance """
00033 if ReceivedFragments.__instance is None:
00034 ReceivedFragments.__instance = ReceivedFragments.__impl()
00035 self.lists = {}
00036
00037 self.__dict__['_ReceivedFragments__instance'] = ReceivedFragments.__instance
00038
00039 def __getattr__(self, attr):
00040 """ Delegate access to implementation """
00041 return getattr(self.__instance, attr)
00042
00043 def __setattr__(self, attr, value):
00044 """ Delegate access to implementation """
00045 return setattr(self.__instance, attr, value)
00046
00047 class Defragment(Capability, threading.Thread):
00048
00049 fragment_timeout = 600
00050 opcode = "fragment"
00051 global received_fragments
00052
00053 protocol = None
00054
00055 def __init__(self, protocol):
00056 Capability.__init__(self, protocol)
00057
00058 self.protocol = protocol
00059
00060
00061 if self.protocol.parameters != None:
00062 self.fragment_timeout = self.protocol.parameters["fragment_timeout"]
00063
00064 protocol.register_operation(self.opcode, self.defragment)
00065
00066 self.received_fragments = ReceivedFragments().lists
00067 threading.Thread.__init__(self)
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082 def defragment(self, message):
00083 now = datetime.now()
00084
00085 if self.received_fragments != None:
00086 for id in self.received_fragments.keys() :
00087 time_diff = now - self.received_fragments[id]["timestamp_last_append"]
00088 if (time_diff.total_seconds() > self.fragment_timeout and
00089 not self.received_fragments[id]["is_reconstructing"]):
00090 log_msg = ["fragment list ", str(id), " timed out.."]
00091
00092 if message["id"] != id:
00093 log_msg.append(" -> removing it..")
00094 del self.received_fragments[id]
00095 else:
00096 log_msg.extend([" -> but we're just about to add fragment #"])
00097 log_msg.extend([str(message.get("num")), " of "])
00098 log_msg.extend([str(self.received_fragments[message.get("id")]["total"])])
00099 log_msg.extend([" ..keeping the list"])
00100 self.protocol.log("warning", ''.join(log_msg))
00101
00102 msg_opcode = message.get("op")
00103 msg_id = message.get("id")
00104 msg_num = message.get("num")
00105 msg_total = message.get("total")
00106 msg_data = message.get("data")
00107
00108
00109 if ((msg_opcode == None) or (msg_id == None) or
00110 (msg_num == None) or (msg_total == None) or
00111 (msg_data == None)):
00112 self.protocol.log("error", "received invalid fragment!")
00113 return
00114
00115 log_msg = "fragment for messageID: " + str(msg_id) + " received."
00116 self.protocol.log("debug", log_msg)
00117
00118
00119 if msg_id not in self.received_fragments.keys():
00120 self.received_fragments[msg_id] = {
00121 "is_reconstructing": False,
00122 "total": message["total"],
00123 "timestamp_last_append": now,
00124 "fragment_list": {}
00125 }
00126 log_msg = "opened new fragment list for messageID " + str(msg_id)
00127 self.protocol.log("debug", log_msg)
00128
00129
00130
00131
00132 if ((msg_num not in self.received_fragments[msg_id]["fragment_list"].keys() ) and
00133 msg_num <= self.received_fragments[msg_id]["total"] and
00134 msg_total == self.received_fragments[msg_id]["total"]
00135 ):
00136 self.received_fragments[msg_id]["fragment_list"][msg_num] = msg_data
00137 self.received_fragments[msg_id]["timestamp_last_append"] = now
00138 log_msg = ["appended fragment #" + str(msg_num)]
00139 log_msg.extend([" (total: ", str(msg_total), ") to fragment list for messageID ", str(msg_id)])
00140 self.protocol.log("debug", ''.join(log_msg))
00141 else:
00142 log_msg = "error while trying to append fragment " + str(msg_num)
00143 self.protocol.log("error", log_msg)
00144 return
00145
00146 received_all_fragments = False
00147 existing_fragments = len(self.received_fragments[msg_id]["fragment_list"])
00148 announced_total = self.received_fragments[msg_id]["total"]
00149
00150
00151 if existing_fragments == announced_total:
00152 log_msg = ["enough/all fragments for messageID " + str(msg_id) + " received"]
00153 log_msg.extend([" [", str(existing_fragments), "]"])
00154 log_msg = ''.join(log_msg)
00155 self.protocol.log("debug", log_msg)
00156
00157 received_all_fragments = True
00158 for i in range(0, announced_total):
00159 if i not in self.received_fragments[msg_id]["fragment_list"]:
00160 received_all_fragments = False
00161 log_msg = "fragment #" +str(i)
00162 log_msg += " for messageID " + str(msg_id) + " is missing! "
00163 self.protocol.log("error", log_msg)
00164
00165 self.received_fragments[msg_id]["is_reconstructing"] = received_all_fragments
00166
00167 if received_all_fragments:
00168 log_msg = "reconstructing original message " + str(msg_id)
00169 self.protocol.log("debug", log_msg)
00170
00171
00172 reconstructed_msg = ''.join(self.received_fragments[msg_id]["fragment_list"][0:message["total"]])
00173 log_msg = ["reconstructed original message:\n"]
00174 log_msg.append(reconstructed_msg)
00175 log_msg = ''.join(log_msg)
00176 self.protocol.log("debug", log_msg)
00177
00178 duration = datetime.now() - now
00179
00180
00181 self.protocol.incoming(reconstructed_msg)
00182 log_msg = ["reconstructed message (ID:" + str(msg_id) + ") from "]
00183 log_msg.extend([str(msg_total), " fragments. "])
00184
00185
00186 log_msg.extend(["[duration: ", str(duration.total_seconds()), " s]"])
00187 log_msg = ''.join(log_msg)
00188 self.protocol.log("info", log_msg)
00189
00190
00191 del self.received_fragments[msg_id]
00192 log_msg = "removed fragment list for messageID " + str(msg_id)
00193 self.protocol.log("debug", log_msg)
00194
00195 def finish(self):
00196 self.received_fragments = None
00197 self.protocol.unregister_operation("fragment")