defragmentation.py
Go to the documentation of this file.
1 from rosbridge_library.capability import Capability
2 from datetime import datetime
3 import threading
4 
6  """
7  Singleton class to hold lists of received fragments in one 'global' object
8  """
9  class __impl:
10  """ Implementation of the singleton interface """
11  def spam(self):
12  """ Test method, return singleton id """
13  return id(self)
14 
15  __instance = None
16  # List of defragmentation instances
17  # Format:
18  # {
19  # <<message1_ID>> : {
20  # "timestamp_last_append" : <<datetime-object>>,
21  # "total" : <<total_fragments>>,
22  # "fragment_list" : {
23  # <<fragment1ID>>: <<fragment1_data>>,
24  # <<fragment2ID>>: <<fragment2_data>>,
25  # ...
26  # }
27  # },
28  # ...
29  lists = {}
30 
31  def __init__(self):
32  """ Create singleton instance """
33  if ReceivedFragments.__instance is None:
34  ReceivedFragments.__instance = ReceivedFragments.__impl()
35  self.lists = {}
36 
37  self.__dict__['_ReceivedFragments__instance'] = ReceivedFragments.__instance
38 
39  def __getattr__(self, attr):
40  """ Delegate access to implementation """
41  return getattr(self.__instance, attr)
42 
43  def __setattr__(self, attr, value):
44  """ Delegate access to implementation """
45  return setattr(self.__instance, attr, value)
46 
47 class Defragment(Capability, threading.Thread):
48 
49  fragment_timeout = 600
50  opcode = "fragment"
51  global received_fragments
52 
53  protocol = None
54 
55  def __init__(self, protocol):
56  Capability.__init__(self, protocol)
57 
58  self.protocol = protocol
59 
60  # populate parameters
61  if self.protocol.parameters != None:
62  self.fragment_timeout = self.protocol.parameters["fragment_timeout"]
63 
64  protocol.register_operation(self.opcode, self.defragment)
65 
67  threading.Thread.__init__(self)
68 
69 
70  # defragment() does:
71  # 1) take any incoming message with op-code "fragment"
72  # 2) check all existing fragment lists for time out # could be done by a thread but should be okay this way:
73  # 2.a) remove timed out lists (only if new fragment is not for this list) # - checking whenever a new fragment is received should suffice
74  # 3) create a new fragment list for new message ids # to have control over growth of fragment lists
75  # 3.a) check message fields
76  # 3.b) append the new fragment to 'the' list
77  # 3.c) add time stamp (last_fragment_appended) to 'this' list
78  # 4) check if the list of current fragment (message id) is complete
79  # 4.a) reconstruct the original message by concatenating the fragments
80  # 4.b) pass the reconstructed message string to protocol.incoming() # protocol.incoming is checking message fields by itself, so no need to do this before passing the reconstructed message to protocol
81  # 4.c) remove the fragment list to free up memory
82  def defragment(self, message):
83  now = datetime.now()
84 
85  if self.received_fragments != None:
86  for id in self.received_fragments.keys() :
87  time_diff = now - self.received_fragments[id]["timestamp_last_append"]
88  if (time_diff.total_seconds() > self.fragment_timeout and
89  not self.received_fragments[id]["is_reconstructing"]):
90  log_msg = ["fragment list ", str(id), " timed out.."]
91 
92  if message["id"] != id:
93  log_msg.append(" -> removing it..")
94  del self.received_fragments[id]
95  else:
96  log_msg.extend([" -> but we're just about to add fragment #"])
97  log_msg.extend([str(message.get("num")), " of "])
98  log_msg.extend([str(self.received_fragments[message.get("id")]["total"])])
99  log_msg.extend([" ..keeping the list"])
100  self.protocol.log("warning", ''.join(log_msg))
101 
102  msg_opcode = message.get("op")
103  msg_id = message.get("id")
104  msg_num = message.get("num")
105  msg_total = message.get("total")
106  msg_data = message.get("data")
107 
108  # Abort if any message field is missing
109  if ((msg_opcode == None) or (msg_id == None) or
110  (msg_num == None) or (msg_total == None) or
111  (msg_data == None)):
112  self.protocol.log("error", "received invalid fragment!")
113  return
114 
115  log_msg = "fragment for messageID: " + str(msg_id) + " received."
116  self.protocol.log("debug", log_msg)
117 
118  # Create fragment container if none exists yet
119  if msg_id not in self.received_fragments.keys():
120  self.received_fragments[msg_id] = {
121  "is_reconstructing": False,
122  "total": message["total"],
123  "timestamp_last_append": now,
124  "fragment_list": {}
125  }
126  log_msg = "opened new fragment list for messageID " + str(msg_id)
127  self.protocol.log("debug", log_msg)
128 
129  #print "received fragments:", len(self.received_fragments[msg_id]["fragment_list"].keys())
130 
131  # Add fragment to fragment container's list if not already in list
132  if ((msg_num not in self.received_fragments[msg_id]["fragment_list"].keys() ) and
133  msg_num <= self.received_fragments[msg_id]["total"] and
134  msg_total == self.received_fragments[msg_id]["total"]
135  ):
136  self.received_fragments[msg_id]["fragment_list"][msg_num] = msg_data
137  self.received_fragments[msg_id]["timestamp_last_append"] = now
138  log_msg = ["appended fragment #" + str(msg_num)]
139  log_msg.extend([" (total: ", str(msg_total), ") to fragment list for messageID ", str(msg_id)])
140  self.protocol.log("debug", ''.join(log_msg))
141  else:
142  log_msg = "error while trying to append fragment " + str(msg_num)
143  self.protocol.log("error", log_msg)
144  return
145 
146  received_all_fragments = False
147  existing_fragments = len(self.received_fragments[msg_id]["fragment_list"])
148  announced_total = self.received_fragments[msg_id]["total"]
149 
150  # Make sure total number of fragments received
151  if existing_fragments == announced_total:
152  log_msg = ["enough/all fragments for messageID " + str(msg_id) + " received"]
153  log_msg.extend([" [", str(existing_fragments), "]"])
154  log_msg = ''.join(log_msg)
155  self.protocol.log("debug", log_msg)
156  # Check each fragment matches up
157  received_all_fragments = True
158  for i in range(0, announced_total):
159  if i not in self.received_fragments[msg_id]["fragment_list"]:
160  received_all_fragments = False
161  log_msg = "fragment #" +str(i)
162  log_msg += " for messageID " + str(msg_id) + " is missing! "
163  self.protocol.log("error", log_msg)
164 
165  self.received_fragments[msg_id]["is_reconstructing"] = received_all_fragments
166 
167  if received_all_fragments:
168  log_msg = "reconstructing original message " + str(msg_id)
169  self.protocol.log("debug", log_msg)
170 
171  # Reconstruct the message
172  reconstructed_msg = ''.join(self.received_fragments[msg_id]["fragment_list"].values())
173 
174  log_msg = ["reconstructed original message:\n"]
175  log_msg.append(reconstructed_msg)
176  log_msg = ''.join(log_msg)
177  self.protocol.log("debug", log_msg)
178 
179  duration = datetime.now() - now
180 
181  # Pass the reconstructed message to rosbridge
182  self.protocol.incoming(reconstructed_msg)
183  log_msg = ["reconstructed message (ID:" + str(msg_id) + ") from "]
184  log_msg.extend([str(msg_total), " fragments. "])
185  # cannot access msg.data if message is a service_response or else!
186  #log_msg += "[message length: " + str(len(str(json.loads(reconstructed_msg)["msg"]["data"]))) +"]"
187  log_msg.extend(["[duration: ", str(duration.total_seconds()), " s]"])
188  log_msg = ''.join(log_msg)
189  self.protocol.log("info", log_msg)
190 
191  # Remove fragmentation container
192  del self.received_fragments[msg_id]
193  log_msg = "removed fragment list for messageID " + str(msg_id)
194  self.protocol.log("debug", log_msg)
195 
196  def finish(self):
197  self.received_fragments = None
198  self.protocol.unregister_operation("fragment")
rosbridge_library.capabilities.defragmentation.ReceivedFragments.__instance
__instance
Definition: defragmentation.py:15
rosbridge_library.capabilities.defragmentation.ReceivedFragments.lists
dictionary lists
Definition: defragmentation.py:29
rosbridge_library.capabilities.defragmentation.Defragment.defragment
def defragment(self, message)
Definition: defragmentation.py:82
rosbridge_library.capabilities.defragmentation.Defragment.protocol
protocol
Definition: defragmentation.py:53
rosbridge_library.capabilities.defragmentation.Defragment.__init__
def __init__(self, protocol)
Definition: defragmentation.py:55
rosbridge_library.capability
Definition: capability.py:1
rosbridge_library.capabilities.defragmentation.ReceivedFragments
Definition: defragmentation.py:5
rosbridge_library.capability.Capability
Definition: capability.py:37
rosbridge_library.capabilities.defragmentation.ReceivedFragments.__setattr__
def __setattr__(self, attr, value)
Definition: defragmentation.py:43
rosbridge_library.capabilities.defragmentation.ReceivedFragments.__init__
def __init__(self)
Definition: defragmentation.py:31
rosbridge_library.capabilities.defragmentation.ReceivedFragments.__getattr__
def __getattr__(self, attr)
Definition: defragmentation.py:39
rosbridge_library.capabilities.defragmentation.ReceivedFragments.__impl.spam
def spam(self)
Definition: defragmentation.py:11
rosbridge_library.capabilities.defragmentation.Defragment
Definition: defragmentation.py:47
rosbridge_library.capabilities.defragmentation.Defragment.opcode
string opcode
Definition: defragmentation.py:50
test_non-ros_service_client_complex-srv.incoming
incoming
Definition: test_non-ros_service_client_complex-srv.py:53
rosbridge_library.capabilities.defragmentation.Defragment.fragment_timeout
int fragment_timeout
Definition: defragmentation.py:49
rosbridge_library.capabilities.defragmentation.Defragment.received_fragments
received_fragments
Definition: defragmentation.py:66
rosbridge_library.capabilities.defragmentation.Defragment.finish
def finish(self)
Definition: defragmentation.py:196
rosbridge_library.capabilities.defragmentation.ReceivedFragments.__impl
Definition: defragmentation.py:9


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:45