00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 PKG = 'mtrace_tools'
00042 import roslib; roslib.load_manifest(PKG)
00043 import sys
00044 import rospy
00045 from collections import deque
00046 import unittest
00047 from itertools import izip
00048
00049
00050 class BaseError(object):
00051 def __init__(self, name, t, desc):
00052 self.name = name
00053 self.t = t
00054 self.desc = desc
00055 self.sub_errors = []
00056 self.parents = []
00057 def short_desc(self):
00058 return self.desc
00059
00060 class NoError(BaseError):
00061 """ Represents non-error, good for flushing error queues """
00062 def __init__(self, name, t, desc):
00063 BaseError.__init__(self, name, t, desc)
00064
00065 class GenericError(BaseError):
00066 """ Represents any type of unknown error """
00067 def __init__(self, name, t, desc):
00068 BaseError.__init__(self, name, t, desc)
00069
00070
00071 class ErrorDelay:
00072 """ FIFO of error messages. Errors are delayed by <delay> seconds.
00073 If reorder is True, will sort Errors using timestamps.
00074 If reorder is not True, expects input to be perfectly ordered.
00075 """
00076 def __init__(self, delay, reorder=False):
00077 self.delay = rospy.Duration.from_sec(delay)
00078 self.queue = deque()
00079 self.reorder = reorder
00080 self.begin_time = None
00081
00082 def process(self, input_list):
00083 """ Takes input list of error messages and returns output list of delayed messages """
00084 if len(input_list) == 0:
00085 return []
00086 self.queue.extend(input_list)
00087 if self.reorder:
00088 if self.begin_time != None and (input_list[0].t < self.begin_time):
00089 raise Exception("Delay is not long enough to guarentee ordered output")
00090 self.queue = deque(sorted(self.queue, key=lambda error : error.t))
00091 queue = self.queue
00092 self.begin_time = queue[-1].t - self.delay
00093
00094 output_list = []
00095 while len(queue) > 0:
00096 error = queue.popleft()
00097 if error.t < self.begin_time:
00098 output_list.append(error)
00099 else:
00100 queue.appendleft(error)
00101 break
00102 return output_list
00103
00104 def all(self):
00105 """ Return all values and in queue """
00106 return list(self.queue)
00107
00108 def peek(self):
00109 """ Returns first error in queue """
00110 return self.queue[0]
00111
00112
00113 class TestErrorDelay(unittest.TestCase):
00114
00115 def setUp(self):
00116 pass
00117
00118 def run_list(self, error_list, error_delay, delay):
00119 ros_delay = rospy.Duration.from_sec(delay)
00120 result_list = []
00121 for error in error_list:
00122 results = error_delay.process([error])
00123 len(results)
00124 result_list += results
00125 begin_time = error.t - ros_delay
00126 for error2 in result_list:
00127 self.assertTrue(begin_time >= error2.t)
00128 self.assertTrue(begin_time <= error_delay.peek().t, "%f, %f" % (begin_time.to_sec(), error_delay.peek().t.to_sec()))
00129 result_list += error_delay.all()
00130 self.verify_order(result_list)
00131 return result_list
00132
00133 def run_list_in_steps(self, error_list, error_delay, delay, step_sizes):
00134 ros_delay = rospy.Duration.from_sec(delay)
00135 result_list = []
00136 start = 0
00137 for step in step_sizes:
00138 end = start+step+1
00139 slice = error_list[start:end]
00140 start = end
00141 results = error_delay.process(slice)
00142 if len(results) > 0:
00143 result_list += results
00144 begin_time = slice[-1].t - ros_delay
00145 for error2 in result_list:
00146 self.assertTrue(begin_time >= error2.t)
00147 self.assertTrue(begin_time <= error_delay.peek().t, "%f, %f" % (begin_time.to_sec(), error_delay.peek().t.to_sec()))
00148 self.assertTrue(end>=len(error_list),'end=%d,len=%d'%(end,len(error_list)))
00149 result_list += error_delay.all()
00150 self.verify_order(result_list)
00151 return result_list
00152
00153
00154 def verify_order(self, error_list):
00155 prev_error = error_list[0]
00156 for error in error_list:
00157 self.assertTrue(prev_error.t <= error.t, "%f, %f" % (prev_error.t.to_sec(), error.t.to_sec()))
00158 prev_error = error
00159
00160
00161 def verify_match(self, error_list, result_list):
00162 self.assertEqual(len(result_list), len(error_list), '%d, %d'%(len(result_list), len(error_list)))
00163 for index in range(len(result_list)):
00164 result, error = (result_list[index], error_list[index])
00165 self.assertEqual(result, error, 'i=%d, result[i]=%s, error[i]=%s'%(index,str(result),str(error)) )
00166
00167
00168
00169 def test_1_simple(self):
00170
00171 error_list = []
00172 for i in range(20):
00173 t = rospy.Time(i+10)
00174 error_list.append(GenericError('name', t, 'desc'))
00175 delay = 2.5
00176 error_delay = ErrorDelay(delay)
00177 result_list = self.run_list(error_list, error_delay, delay)
00178 self.verify_match(error_list, result_list)
00179
00180
00181 def test_2_reorder(self):
00182
00183 error_list = []
00184 for i in range(10):
00185 t = rospy.Time(i+10.5)
00186 error_list.append(GenericError('name', t, 'desc'))
00187 t = rospy.Time(i+10.0)
00188 error_list.append(GenericError('name', t, 'desc'))
00189
00190 delay = 2.5
00191 error_delay = ErrorDelay(delay, True)
00192 self.run_list(error_list, error_delay, delay)
00193
00194
00195 def test_3_reorder(self):
00196
00197 error_list = []
00198 for i in range(10):
00199 t = rospy.Time(i+10.5)
00200 error_list.append(GenericError('name', t, 'desc'))
00201 t = rospy.Time(i+10.0)
00202 error_list.append(GenericError('name', t, 'desc'))
00203 t = rospy.Time(10.0)
00204 error_list.append(GenericError('name', t, 'desc'))
00205
00206 delay = 2.5
00207 error_delay = ErrorDelay(delay, True)
00208 self.assertRaises(Exception, self.run_list, error_list, error_delay, delay)
00209
00210
00211
00212 def test_4_empty(self):
00213
00214 error_delay = ErrorDelay(10, True)
00215 error_list = error_delay.process([])
00216 result_type = type(error_list).__name__
00217 self.assertTrue(result_type == 'list', result_type)
00218
00219
00220 def test_5_simple_steps(self):
00221 error_list = [ GenericError('name', rospy.Time(i+10), 'desc') for i in range(20) ]
00222 delay = 2.5
00223 error_delay = ErrorDelay(delay)
00224 steps = [i for i in range(len(error_list))]
00225 result_list = self.run_list_in_steps(error_list, error_delay, delay, steps)
00226 self.verify_match(error_list, result_list)
00227
00228
00229
00230 if __name__ == '__main__':
00231 unittest.main()