diag_error.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 #
00004 # Software License Agreement (BSD License)
00005 #
00006 # Copyright (c) 2008, Willow Garage, Inc.
00007 # All rights reserved.
00008 #
00009 # Redistribution and use in source and binary forms, with or without
00010 # modification, are permitted provided that the following conditions
00011 # are met:
00012 #
00013 #  * Redistributions of source code must retain the above copyright
00014 #    notice, this list of conditions and the following disclaimer.
00015 #  * Redistributions in binary form must reproduce the above
00016 #    copyright notice, this list of conditions and the following
00017 #    disclaimer in the documentation and/or other materials provided
00018 #    with the distribution.
00019 #  * Neither the name of Willow Garage, Inc. nor the names of its
00020 #    contributors may be used to endorse or promote products derived
00021 #    from this software without specific prior written permission.
00022 #
00023 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00024 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00025 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00026 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00027 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00028 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00029 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00030 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00031 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00032 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00033 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00034 # POSSIBILITY OF SUCH DAMAGE.
00035 #
00036 
00037 
00038 ##\author Derek King
00039 ##\brief DiagnosticError 
00040 
00041 PKG = 'mtrace_tools'
00042 import roslib; roslib.load_manifest(PKG)
00043 import sys 
00044 import rospy
00045 from collections import deque
00046 import unittest
00047 from itertools import izip
00048 
00049 
00050 class BaseError(object):
00051     def __init__(self, name, t, desc):
00052         self.name = name     # compenent name
00053         self.t = t           # time
00054         self.desc = desc     # description of error
00055         self.sub_errors = [] # error caused by this error
00056         self.parents = []    # errors causing/contributing to this error
00057     def short_desc(self):
00058         return self.desc
00059 
00060 class NoError(BaseError):
00061     """ Represents non-error, good for flushing error queues """
00062     def __init__(self, name, t, desc):
00063         BaseError.__init__(self, name, t, desc)
00064 
00065 class GenericError(BaseError):
00066     """ Represents any type of unknown error """
00067     def __init__(self, name, t, desc):
00068         BaseError.__init__(self, name, t, desc)
00069 
00070 
00071 class ErrorDelay:
00072     """ FIFO of error messages.  Errors are delayed by <delay> seconds.
00073     If reorder is True, will sort Errors using timestamps.
00074     If reorder is not True, expects input to be perfectly ordered.
00075     """
00076     def __init__(self, delay, reorder=False):
00077         self.delay = rospy.Duration.from_sec(delay)
00078         self.queue = deque()  #[]  #heapq()
00079         self.reorder = reorder
00080         self.begin_time = None
00081 
00082     def process(self, input_list):
00083         """ Takes input list of error messages and returns output list of delayed messages """
00084         if len(input_list) == 0:
00085             return []
00086         self.queue.extend(input_list)
00087         if self.reorder:
00088             if self.begin_time != None and (input_list[0].t < self.begin_time):
00089                 raise Exception("Delay is not long enough to guarentee ordered output")
00090             self.queue = deque(sorted(self.queue, key=lambda error : error.t))
00091         queue = self.queue
00092         self.begin_time = queue[-1].t - self.delay
00093         # anything newer than begin_time can be returned
00094         output_list = []
00095         while len(queue) > 0:
00096             error = queue.popleft()
00097             if error.t < self.begin_time:
00098                 output_list.append(error)
00099             else:
00100                 queue.appendleft(error)
00101                 break
00102         return output_list
00103 
00104     def all(self):
00105         """ Return all values and in queue """
00106         return list(self.queue)
00107 
00108     def peek(self):
00109         """ Returns first error in queue """
00110         return self.queue[0]
00111 
00112 
00113 class TestErrorDelay(unittest.TestCase):
00114     # Test that ErrorDelay class delays error messages properly
00115     def setUp(self):
00116         pass
00117 
00118     def run_list(self, error_list, error_delay, delay):
00119         ros_delay = rospy.Duration.from_sec(delay)
00120         result_list = []
00121         for error in error_list:
00122             results = error_delay.process([error])
00123             len(results)
00124             result_list += results
00125             begin_time = error.t - ros_delay
00126             for error2 in result_list:
00127                 self.assertTrue(begin_time >= error2.t)
00128             self.assertTrue(begin_time <= error_delay.peek().t, "%f, %f" % (begin_time.to_sec(), error_delay.peek().t.to_sec()))
00129         result_list += error_delay.all()
00130         self.verify_order(result_list)
00131         return result_list
00132 
00133     def run_list_in_steps(self, error_list, error_delay, delay, step_sizes):
00134         ros_delay = rospy.Duration.from_sec(delay)
00135         result_list = []
00136         start = 0
00137         for step in step_sizes:
00138             end = start+step+1
00139             slice = error_list[start:end]
00140             start = end
00141             results = error_delay.process(slice)
00142             if len(results) > 0:
00143                 result_list += results
00144                 begin_time = slice[-1].t - ros_delay
00145                 for error2 in result_list:
00146                     self.assertTrue(begin_time >= error2.t)
00147                 self.assertTrue(begin_time <= error_delay.peek().t, "%f, %f" % (begin_time.to_sec(), error_delay.peek().t.to_sec()))
00148         self.assertTrue(end>=len(error_list),'end=%d,len=%d'%(end,len(error_list)))
00149         result_list += error_delay.all()
00150         self.verify_order(result_list)
00151         return result_list
00152 
00153     # verify ordering of errors in list
00154     def verify_order(self, error_list):
00155         prev_error = error_list[0]
00156         for error in error_list:
00157             self.assertTrue(prev_error.t <= error.t, "%f, %f" % (prev_error.t.to_sec(), error.t.to_sec()))
00158             prev_error = error
00159 
00160     # verify every item in both lists match. 
00161     def verify_match(self, error_list, result_list):
00162         self.assertEqual(len(result_list), len(error_list), '%d, %d'%(len(result_list), len(error_list)))
00163         for index in range(len(result_list)):
00164             result, error = (result_list[index], error_list[index])
00165             self.assertEqual(result, error, 'i=%d, result[i]=%s, error[i]=%s'%(index,str(result),str(error)) )
00166         
00167 
00168     # Simple test that only inserts one new Error at a time
00169     def test_1_simple(self):
00170         # create list of errors equaly spaced out
00171         error_list = []
00172         for i in range(20):
00173             t = rospy.Time(i+10)
00174             error_list.append(GenericError('name', t, 'desc'))
00175         delay = 2.5
00176         error_delay = ErrorDelay(delay)
00177         result_list = self.run_list(error_list, error_delay, delay)
00178         self.verify_match(error_list, result_list)
00179 
00180     # Tests reordering capability of delay list
00181     def test_2_reorder(self):
00182         # create list of errors of semi-ordered items
00183         error_list = []
00184         for i in range(10):
00185             t = rospy.Time(i+10.5)
00186             error_list.append(GenericError('name', t, 'desc'))
00187             t = rospy.Time(i+10.0)
00188             error_list.append(GenericError('name', t, 'desc'))
00189         # ErrorDelay 
00190         delay = 2.5
00191         error_delay = ErrorDelay(delay, True)
00192         self.run_list(error_list, error_delay, delay)
00193         
00194     # Makes sure reordering throws error when delay is not large enough to reorder list
00195     def test_3_reorder(self):
00196         # create list of errors of semi-ordered items
00197         error_list = []
00198         for i in range(10):
00199             t = rospy.Time(i+10.5)
00200             error_list.append(GenericError('name', t, 'desc'))
00201             t = rospy.Time(i+10.0)
00202             error_list.append(GenericError('name', t, 'desc'))
00203         t = rospy.Time(10.0)
00204         error_list.append(GenericError('name', t, 'desc'))
00205         # ErrorDelay 
00206         delay = 2.5
00207         error_delay = ErrorDelay(delay, True)
00208         self.assertRaises(Exception, self.run_list, error_list, error_delay, delay)
00209 
00210 
00211     # Makes sure reordering returns empty list (not None) when there is nothing to return
00212     def test_4_empty(self):
00213         # create list of errors of semi-ordered items
00214         error_delay = ErrorDelay(10, True)
00215         error_list = error_delay.process([])
00216         result_type = type(error_list).__name__
00217         self.assertTrue(result_type == 'list', result_type)
00218 
00219     # Run test that inserts errors in different amounts
00220     def test_5_simple_steps(self):
00221         error_list = [ GenericError('name', rospy.Time(i+10), 'desc') for i in range(20) ]
00222         delay = 2.5
00223         error_delay = ErrorDelay(delay)
00224         steps = [i for i in range(len(error_list))]
00225         result_list = self.run_list_in_steps(error_list, error_delay, delay, steps)
00226         self.verify_match(error_list, result_list)
00227 
00228         
00229 
00230 if __name__ == '__main__':
00231     unittest.main()


mtrace_tools
Author(s): Derek
autogenerated on Sat Dec 28 2013 17:58:07