defragmentation.py
Go to the documentation of this file.
00001 from rosbridge_library.capability import Capability
00002 from datetime import datetime
00003 import threading
00004 
00005 class ReceivedFragments():
00006     """
00007     Singleton class to hold lists of received fragments in one 'global' object
00008     """
00009     class __impl:
00010         """ Implementation of the singleton interface """
00011         def spam(self):
00012             """ Test method, return singleton id """
00013             return id(self)
00014 
00015     __instance = None
00016     # List of defragmentation instances
00017     # Format:
00018     # {
00019     #   <<message1_ID>> : {
00020     #     "timestamp_last_append" : <<datetime-object>>,
00021     #     "total" : <<total_fragments>>,
00022     #     "fragment_list" : {
00023     #       <<fragment1ID>>: <<fragment1_data>>,
00024     #       <<fragment2ID>>: <<fragment2_data>>,
00025     #       ...
00026     #     }
00027     # },
00028     # ...
00029     lists = {}
00030 
00031     def __init__(self):
00032         """ Create singleton instance """
00033         if ReceivedFragments.__instance is None:
00034             ReceivedFragments.__instance = ReceivedFragments.__impl()
00035             self.lists = {}
00036 
00037         self.__dict__['_ReceivedFragments__instance'] = ReceivedFragments.__instance
00038 
00039     def __getattr__(self, attr):
00040         """ Delegate access to implementation """
00041         return getattr(self.__instance, attr)
00042 
00043     def __setattr__(self, attr, value):
00044         """ Delegate access to implementation """
00045         return setattr(self.__instance, attr, value)
00046 
00047 class Defragment(Capability, threading.Thread):
00048 
00049     fragment_timeout = 600
00050     opcode = "fragment"
00051     global received_fragments
00052 
00053     protocol = None
00054 
00055     def __init__(self, protocol):
00056         Capability.__init__(self, protocol)
00057 
00058         self.protocol = protocol
00059 
00060         # populate parameters
00061         if self.protocol.parameters != None:
00062             self.fragment_timeout = self.protocol.parameters["fragment_timeout"]
00063 
00064         protocol.register_operation(self.opcode, self.defragment)
00065 
00066         self.received_fragments = ReceivedFragments().lists
00067         threading.Thread.__init__(self)
00068 
00069 
00070     # defragment() does:
00071     #   1) take any incoming message with op-code "fragment"
00072     #   2) check all existing fragment lists for time out                       # could be done by a thread but should be okay this way:
00073     #   2.a) remove timed out lists (only if new fragment is not for this list) #   - checking whenever a new fragment is received should suffice
00074     #   3) create a new fragment list for new message ids                       #     to have control over growth of fragment lists
00075     #   3.a) check message fields
00076     #   3.b) append the new fragment to 'the' list
00077     #   3.c) add time stamp (last_fragment_appended) to 'this' list
00078     #   4) check if the list of current fragment (message id) is complete
00079     #   4.a) reconstruct the original message by concatenating the fragments
00080     #   4.b) pass the reconstructed message string to protocol.incoming()       # protocol.incoming is checking message fields by itself, so no need to do this before passing the reconstructed message to protocol
00081     #   4.c) remove the fragment list to free up memory                        
00082     def defragment(self, message):
00083         now = datetime.now()
00084 
00085         if self.received_fragments != None:
00086             for id in self.received_fragments.keys() :
00087                 time_diff = now - self.received_fragments[id]["timestamp_last_append"]
00088                 if (time_diff.total_seconds() > self.fragment_timeout and
00089                     not self.received_fragments[id]["is_reconstructing"]):
00090                     log_msg = ["fragment list ", str(id), " timed out.."]
00091 
00092                     if message["id"] != id:
00093                         log_msg.append(" -> removing it..")
00094                         del self.received_fragments[id]
00095                     else:
00096                         log_msg.extend([" -> but we're just about to add fragment #"])
00097                         log_msg.extend([str(message.get("num")), " of "])
00098                         log_msg.extend([str(self.received_fragments[message.get("id")]["total"])])
00099                         log_msg.extend([" ..keeping the list"])
00100                     self.protocol.log("warning", ''.join(log_msg))
00101 
00102         msg_opcode = message.get("op")
00103         msg_id = message.get("id")
00104         msg_num = message.get("num")
00105         msg_total = message.get("total")
00106         msg_data = message.get("data")
00107 
00108         # Abort if any message field is missing
00109         if ((msg_opcode == None) or (msg_id == None) or
00110             (msg_num == None) or (msg_total == None) or
00111             (msg_data == None)):
00112             self.protocol.log("error", "received invalid fragment!")
00113             return
00114 
00115         log_msg = "fragment for messageID: " + str(msg_id) + " received."
00116         self.protocol.log("debug", log_msg)
00117 
00118         # Create fragment container if none exists yet
00119         if msg_id not in self.received_fragments.keys():
00120             self.received_fragments[msg_id] = {
00121                 "is_reconstructing": False,
00122                 "total": message["total"],
00123                 "timestamp_last_append": now,
00124                 "fragment_list": {}
00125             }
00126             log_msg = "opened new fragment list for messageID " + str(msg_id)
00127             self.protocol.log("debug", log_msg)
00128 
00129         #print "received fragments:", len(self.received_fragments[msg_id]["fragment_list"].keys())
00130 
00131         # Add fragment to fragment container's list if not already in list
00132         if ((msg_num not in self.received_fragments[msg_id]["fragment_list"].keys() ) and
00133              msg_num <= self.received_fragments[msg_id]["total"] and
00134              msg_total == self.received_fragments[msg_id]["total"]
00135             ):
00136             self.received_fragments[msg_id]["fragment_list"][msg_num] = msg_data
00137             self.received_fragments[msg_id]["timestamp_last_append"] = now
00138             log_msg = ["appended fragment #" + str(msg_num)]
00139             log_msg.extend([" (total: ", str(msg_total), ") to fragment list for messageID ", str(msg_id)])
00140             self.protocol.log("debug", ''.join(log_msg))
00141         else:
00142             log_msg = "error while trying to append fragment " + str(msg_num)
00143             self.protocol.log("error", log_msg)
00144             return
00145 
00146         received_all_fragments = False
00147         existing_fragments = len(self.received_fragments[msg_id]["fragment_list"])
00148         announced_total = self.received_fragments[msg_id]["total"]
00149 
00150         # Make sure total number of fragments received
00151         if existing_fragments == announced_total:
00152             log_msg = ["enough/all fragments for messageID " + str(msg_id) + " received"]
00153             log_msg.extend([" [", str(existing_fragments), "]"])
00154             log_msg = ''.join(log_msg)
00155             self.protocol.log("debug", log_msg)
00156             # Check each fragment matches up
00157             received_all_fragments = True
00158             for i in range(0, announced_total):
00159                 if i not in self.received_fragments[msg_id]["fragment_list"]:
00160                     received_all_fragments = False
00161                     log_msg = "fragment #" +str(i)
00162                     log_msg += " for messageID " + str(msg_id) + " is missing! "
00163                     self.protocol.log("error", log_msg)
00164 
00165         self.received_fragments[msg_id]["is_reconstructing"] = received_all_fragments
00166 
00167         if received_all_fragments:
00168             log_msg = "reconstructing original message " + str(msg_id)
00169             self.protocol.log("debug", log_msg)
00170 
00171             # Reconstruct the message
00172             reconstructed_msg = ''.join(self.received_fragments[msg_id]["fragment_list"][0:message["total"]])
00173             log_msg = ["reconstructed original message:\n"]
00174             log_msg.append(reconstructed_msg)
00175             log_msg = ''.join(log_msg)
00176             self.protocol.log("debug", log_msg)
00177 
00178             duration = datetime.now() - now
00179 
00180             # Pass the reconstructed message to rosbridge
00181             self.protocol.incoming(reconstructed_msg)
00182             log_msg = ["reconstructed message (ID:" + str(msg_id) + ") from "]
00183             log_msg.extend([str(msg_total), " fragments. "])
00184             # cannot access msg.data if message is a service_response or else!
00185             #log_msg += "[message length: " + str(len(str(json.loads(reconstructed_msg)["msg"]["data"]))) +"]"
00186             log_msg.extend(["[duration: ", str(duration.total_seconds()),  " s]"])
00187             log_msg = ''.join(log_msg)
00188             self.protocol.log("info", log_msg)
00189 
00190             # Remove fragmentation container
00191             del self.received_fragments[msg_id]
00192             log_msg = "removed fragment list for messageID " + str(msg_id)
00193             self.protocol.log("debug", log_msg)
00194 
00195     def finish(self):
00196         self.received_fragments = None
00197         self.protocol.unregister_operation("fragment")


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Sep 13 2017 03:18:07