defragmentation.py
Go to the documentation of this file.
00001 from rosbridge_library.capability import Capability
00002 from datetime import datetime
00003 import threading
00004 
00005 class ReceivedFragments():
00006     """
00007     Singleton class to hold lists of received fragments in one 'global' object
00008     """
00009     class __impl:
00010         """ Implementation of the singleton interface """
00011         def spam(self):
00012             """ Test method, return singleton id """
00013             return id(self)
00014 
00015     __instance = None
00016     # List of defragmentation instances
00017     # Format:
00018     # {
00019     #   <<message1_ID>> : {
00020     #     "timestamp_last_append" : <<datetime-object>>,
00021     #     "total" : <<total_fragments>>,
00022     #     "fragment_list" : {
00023     #       <<fragment1ID>>: <<fragment1_data>>,
00024     #       <<fragment2ID>>: <<fragment2_data>>,
00025     #       ...
00026     #     }
00027     # },
00028     # ...
00029     lists = {}
00030 
00031     def __init__(self):
00032         """ Create singleton instance """
00033         if ReceivedFragments.__instance is None:
00034             ReceivedFragments.__instance = ReceivedFragments.__impl()
00035             self.lists = {}
00036 
00037         self.__dict__['_ReceivedFragments__instance'] = ReceivedFragments.__instance
00038 
00039     def __getattr__(self, attr):
00040         """ Delegate access to implementation """
00041         return getattr(self.__instance, attr)
00042 
00043     def __setattr__(self, attr, value):
00044         """ Delegate access to implementation """
00045         return setattr(self.__instance, attr, value)
00046 
00047 class Defragment(Capability, threading.Thread):
00048 
00049     fragment_timeout = 600
00050     opcode = "fragment"
00051     global received_fragments
00052 
00053     protocol = None
00054 
00055     def __init__(self, protocol):
00056         Capability.__init__(self, protocol)
00057 
00058         self.protocol = protocol
00059 
00060         # populate parameters
00061         if self.protocol.parameters != None:
00062             self.fragment_timeout = self.protocol.parameters["fragment_timeout"]
00063 
00064         protocol.register_operation(self.opcode, self.defragment)
00065 
00066         self.received_fragments = ReceivedFragments().lists
00067         threading.Thread.__init__(self)
00068 
00069 
00070     # defragment() does:
00071     #   1) take any incoming message with op-code "fragment"
00072     #   2) check all existing fragment lists for time out                       # could be done by a thread but should be okay this way:
00073     #   2.a) remove timed out lists (only if new fragment is not for this list) #   - checking whenever a new fragment is received should suffice
00074     #   3) create a new fragment list for new message ids                       #     to have control over growth of fragment lists
00075     #   3.a) check message fields
00076     #   3.b) append the new fragment to 'the' list
00077     #   3.c) add time stamp (last_fragment_appended) to 'this' list
00078     #   4) check if the list of current fragment (message id) is complete
00079     #   4.a) reconstruct the original message by concatenating the fragments
00080     #   4.b) pass the reconstructed message string to protocol.incoming()       # protocol.incoming is checking message fields by itself, so no need to do this before passing the reconstructed message to protocol
00081     #   4.c) remove the fragment list to free up memory                        
00082     def defragment(self, message):
00083         now = datetime.now()
00084 
00085         if self.received_fragments != None:
00086             for id in self.received_fragments.keys() :
00087                 time_diff = now - self.received_fragments[id]["timestamp_last_append"]
00088                 if (time_diff.total_seconds() > self.fragment_timeout and
00089                     not self.received_fragments[id]["is_reconstructing"]):
00090                     log_msg = "fragment list " + str(id) + " timed out.."
00091 
00092                     if message["id"] != id:
00093                         log_msg += " -> removing it.."
00094                         del self.received_fragments[id]
00095                     else:
00096                         log_msg += " -> but we're just about to add fragment #"
00097                         log_msg += str(message.get("num")) + " of "
00098                         log_msg += str(self.received_fragments[message.get("id")]["total"])
00099                         log_msg += " ..keeping the list"
00100                     self.protocol.log("warning", log_msg)
00101 
00102         msg_opcode = message.get("op")
00103         msg_id = message.get("id")
00104         msg_num = message.get("num")
00105         msg_total = message.get("total")
00106         msg_data = message.get("data")
00107 
00108         # Abort if any message field is missing
00109         if ((msg_opcode == None) or (msg_id == None) or
00110             (msg_num == None) or (msg_total == None) or
00111             (msg_data == None)):
00112             self.protocol.log("error", "received invalid fragment!")
00113             return
00114 
00115         log_msg = "fragment for messageID: " + str(msg_id) + " received."
00116         self.protocol.log("debug", log_msg)
00117 
00118         # Create fragment container if none exists yet
00119         if msg_id not in self.received_fragments.keys():
00120             self.received_fragments[msg_id] = {
00121                 "is_reconstructing": False,
00122                 "total": message["total"],
00123                 "timestamp_last_append": now,
00124                 "fragment_list": {}
00125             }
00126             log_msg = "opened new fragment list for messageID " + str(msg_id)
00127             self.protocol.log("debug", log_msg)
00128 
00129         #print "received fragments:", len(self.received_fragments[msg_id]["fragment_list"].keys())
00130 
00131         # Add fragment to fragment container's list if not already in list
00132         if ((msg_num not in self.received_fragments[msg_id]["fragment_list"].keys() ) and
00133              msg_num <= self.received_fragments[msg_id]["total"] and
00134              msg_total == self.received_fragments[msg_id]["total"]
00135             ):
00136             self.received_fragments[msg_id]["fragment_list"][msg_num] = msg_data
00137             self.received_fragments[msg_id]["timestamp_last_append"] = now
00138             log_msg = "appended fragment #" + str(msg_num)
00139             log_msg += " (total: " + str(msg_total)+ ") to fragment list for messageID " + str(msg_id)
00140             self.protocol.log("debug", log_msg)
00141         else:
00142             log_msg = "error while trying to append fragment " + str(msg_num)
00143             self.protocol.log("error", log_msg)
00144             return
00145 
00146         received_all_fragments = False
00147         existing_fragments = len(self.received_fragments[msg_id]["fragment_list"])
00148         announced_total = self.received_fragments[msg_id]["total"]
00149 
00150         # Make sure total number of fragments received
00151         if existing_fragments == announced_total:
00152             log_msg = "enough/all fragments for messageID " + str(msg_id) + " received"
00153             log_msg += " [" + str(existing_fragments) + "]"
00154             self.protocol.log("debug", log_msg)
00155             # Check each fragment matches up
00156             received_all_fragments = True
00157             for i in range(0, announced_total):
00158                 if i not in self.received_fragments[msg_id]["fragment_list"]:
00159                     received_all_fragments = False
00160                     log_msg = "fragment #" +str(i)
00161                     log_msg += " for messageID " + str(msg_id) + " is missing! "
00162                     self.protocol.log("error", log_msg)
00163 
00164         self.received_fragments[msg_id]["is_reconstructing"] = received_all_fragments
00165 
00166         if received_all_fragments:
00167             log_msg = "reconstructing original message " + str(msg_id)
00168             self.protocol.log("debug", log_msg)
00169 
00170             # Reconstruct the message
00171             reconstructed_msg = ""
00172             for i in range(0,message["total"]):
00173                 reconstructed_msg += self.received_fragments[msg_id]["fragment_list"][i]
00174 
00175             log_msg = "reconstructed original message:\n"
00176             log_msg += reconstructed_msg
00177             self.protocol.log("debug", log_msg)
00178 
00179             duration = datetime.now() - now
00180 
00181             # Pass the reconstructed message to rosbridge
00182             self.protocol.incoming(reconstructed_msg)
00183             log_msg = "reconstructed message (ID:" + str(msg_id) + ") from "
00184             log_msg += str(msg_total) + " fragments. "
00185             # cannot access msg.data if message is a service_response or else!
00186             #log_msg += "[message length: " + str(len(str(json.loads(reconstructed_msg)["msg"]["data"]))) +"]"
00187             log_msg += "[duration: " + str(duration.total_seconds()) +  " s]"
00188             self.protocol.log("info", log_msg)
00189 
00190             # Remove fragmentation container
00191             del self.received_fragments[msg_id]
00192             log_msg = "removed fragment list for messageID " + str(msg_id)
00193             self.protocol.log("debug", log_msg)
00194 
00195     def finish(self):
00196         self.received_fragments = None
00197         self.protocol.unregister_operation("fragment")


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Mon Oct 6 2014 06:58:09