Go to the documentation of this file.00001 from rosbridge_library.capability import Capability
00002 from datetime import datetime
00003 import threading
00004
00005 class ReceivedFragments():
00006 """
00007 Singleton class to hold lists of received fragments in one 'global' object
00008 """
00009 class __impl:
00010 """ Implementation of the singleton interface """
00011 def spam(self):
00012 """ Test method, return singleton id """
00013 return id(self)
00014
00015 __instance = None
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 lists = {}
00030
00031 def __init__(self):
00032 """ Create singleton instance """
00033 if ReceivedFragments.__instance is None:
00034 ReceivedFragments.__instance = ReceivedFragments.__impl()
00035 self.lists = {}
00036
00037 self.__dict__['_ReceivedFragments__instance'] = ReceivedFragments.__instance
00038
00039 def __getattr__(self, attr):
00040 """ Delegate access to implementation """
00041 return getattr(self.__instance, attr)
00042
00043 def __setattr__(self, attr, value):
00044 """ Delegate access to implementation """
00045 return setattr(self.__instance, attr, value)
00046
00047 class Defragment(Capability, threading.Thread):
00048
00049 fragment_timeout = 600
00050 opcode = "fragment"
00051 global received_fragments
00052
00053 protocol = None
00054
00055 def __init__(self, protocol):
00056 Capability.__init__(self, protocol)
00057
00058 self.protocol = protocol
00059
00060
00061 if self.protocol.parameters != None:
00062 self.fragment_timeout = self.protocol.parameters["fragment_timeout"]
00063
00064 protocol.register_operation(self.opcode, self.defragment)
00065
00066 self.received_fragments = ReceivedFragments().lists
00067 threading.Thread.__init__(self)
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082 def defragment(self, message):
00083 now = datetime.now()
00084
00085 if self.received_fragments != None:
00086 for id in self.received_fragments.keys() :
00087 time_diff = now - self.received_fragments[id]["timestamp_last_append"]
00088 if (time_diff.total_seconds() > self.fragment_timeout and
00089 not self.received_fragments[id]["is_reconstructing"]):
00090 log_msg = "fragment list " + str(id) + " timed out.."
00091
00092 if message["id"] != id:
00093 log_msg += " -> removing it.."
00094 del self.received_fragments[id]
00095 else:
00096 log_msg += " -> but we're just about to add fragment #"
00097 log_msg += str(message.get("num")) + " of "
00098 log_msg += str(self.received_fragments[message.get("id")]["total"])
00099 log_msg += " ..keeping the list"
00100 self.protocol.log("warning", log_msg)
00101
00102 msg_opcode = message.get("op")
00103 msg_id = message.get("id")
00104 msg_num = message.get("num")
00105 msg_total = message.get("total")
00106 msg_data = message.get("data")
00107
00108
00109 if ((msg_opcode == None) or (msg_id == None) or
00110 (msg_num == None) or (msg_total == None) or
00111 (msg_data == None)):
00112 self.protocol.log("error", "received invalid fragment!")
00113 return
00114
00115 log_msg = "fragment for messageID: " + str(msg_id) + " received."
00116 self.protocol.log("debug", log_msg)
00117
00118
00119 if msg_id not in self.received_fragments.keys():
00120 self.received_fragments[msg_id] = {
00121 "is_reconstructing": False,
00122 "total": message["total"],
00123 "timestamp_last_append": now,
00124 "fragment_list": {}
00125 }
00126 log_msg = "opened new fragment list for messageID " + str(msg_id)
00127 self.protocol.log("debug", log_msg)
00128
00129
00130
00131
00132 if ((msg_num not in self.received_fragments[msg_id]["fragment_list"].keys() ) and
00133 msg_num <= self.received_fragments[msg_id]["total"] and
00134 msg_total == self.received_fragments[msg_id]["total"]
00135 ):
00136 self.received_fragments[msg_id]["fragment_list"][msg_num] = msg_data
00137 self.received_fragments[msg_id]["timestamp_last_append"] = now
00138 log_msg = "appended fragment #" + str(msg_num)
00139 log_msg += " (total: " + str(msg_total)+ ") to fragment list for messageID " + str(msg_id)
00140 self.protocol.log("debug", log_msg)
00141 else:
00142 log_msg = "error while trying to append fragment " + str(msg_num)
00143 self.protocol.log("error", log_msg)
00144 return
00145
00146 received_all_fragments = False
00147 existing_fragments = len(self.received_fragments[msg_id]["fragment_list"])
00148 announced_total = self.received_fragments[msg_id]["total"]
00149
00150
00151 if existing_fragments == announced_total:
00152 log_msg = "enough/all fragments for messageID " + str(msg_id) + " received"
00153 log_msg += " [" + str(existing_fragments) + "]"
00154 self.protocol.log("debug", log_msg)
00155
00156 received_all_fragments = True
00157 for i in range(0, announced_total):
00158 if i not in self.received_fragments[msg_id]["fragment_list"]:
00159 received_all_fragments = False
00160 log_msg = "fragment #" +str(i)
00161 log_msg += " for messageID " + str(msg_id) + " is missing! "
00162 self.protocol.log("error", log_msg)
00163
00164 self.received_fragments[msg_id]["is_reconstructing"] = received_all_fragments
00165
00166 if received_all_fragments:
00167 log_msg = "reconstructing original message " + str(msg_id)
00168 self.protocol.log("debug", log_msg)
00169
00170
00171 reconstructed_msg = ""
00172 for i in range(0,message["total"]):
00173 reconstructed_msg += self.received_fragments[msg_id]["fragment_list"][i]
00174
00175 log_msg = "reconstructed original message:\n"
00176 log_msg += reconstructed_msg
00177 self.protocol.log("debug", log_msg)
00178
00179 duration = datetime.now() - now
00180
00181
00182 self.protocol.incoming(reconstructed_msg)
00183 log_msg = "reconstructed message (ID:" + str(msg_id) + ") from "
00184 log_msg += str(msg_total) + " fragments. "
00185
00186
00187 log_msg += "[duration: " + str(duration.total_seconds()) + " s]"
00188 self.protocol.log("info", log_msg)
00189
00190
00191 del self.received_fragments[msg_id]
00192 log_msg = "removed fragment list for messageID " + str(msg_id)
00193 self.protocol.log("debug", log_msg)
00194
00195 def finish(self):
00196 self.received_fragments = None
00197 self.protocol.unregister_operation("fragment")