$search
00001 #!/usr/bin/python 00002 # 00003 # Software License Agreement (BSD License) 00004 # 00005 # Copyright (c) 2008, Willow Garage, Inc. 00006 # All rights reserved. 00007 # 00008 # Redistribution and use in source and binary forms, with or without 00009 # modification, are permitted provided that the following conditions 00010 # are met: 00011 # 00012 # * Redistributions of source code must retain the above copyright 00013 # notice, this list of conditions and the following disclaimer. 00014 # * Redistributions in binary form must reproduce the above 00015 # copyright notice, this list of conditions and the following 00016 # disclaimer in the documentation and/or other materials provided 00017 # with the distribution. 00018 # * Neither the name of the Willow Garage nor the names of its 00019 # contributors may be used to endorse or promote products derived 00020 # from this software without specific prior written permission. 00021 # 00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00033 # POSSIBILITY OF SUCH DAMAGE. 00034 00035 # Author: Kevin Watts 00036 00037 PKG = 'diagnostic_analysis' 00038 00039 import roslib; roslib.load_manifest(PKG) 00040 import rostest 00041 import unittest 00042 00043 import rosbag 00044 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue 00045 00046 import random 00047 import tempfile 00048 import time, os 00049 import csv 00050 00051 from diagnostic_analysis.exporter import LogExporter 00052 from diagnostic_analysis.sparse import * 00053 00054 row_count = 100 00055 00056 ##\brief Make DiagnosticArray message for testing 00057 def make_status_msg(count): 00058 array = DiagnosticArray() 00059 stat = DiagnosticStatus() 00060 stat.level = 0 00061 stat.message = 'OK' 00062 stat.name = 'Unit Test' 00063 stat.hardware_id = 'HW ID' 00064 stat.values = [ 00065 KeyValue('Value A', str(count)), 00066 KeyValue('Value B', str(count)), 00067 KeyValue('Value C', str(count))] 00068 array.status = [ stat ] 00069 return array 00070 00071 ##\brief Tests convert logfile to CSV and making sparse 00072 class TestBagToCSV(unittest.TestCase): 00073 def setUp(self): 00074 # Make logfile with bogus messages 00075 self.bag = tempfile.NamedTemporaryFile() 00076 00077 rebagger = rosbag.Bag(self.bag.name, 'w') 00078 for i in range(0, row_count): 00079 rebagger.write("/diagnostics", make_status_msg(i)) 00080 rebagger.close() 00081 00082 # Make CSV 00083 self.exp = LogExporter(None, self.bag.name) 00084 self.exp.process_log() 00085 self.exp.finish_logfile() 00086 self.filename = self.exp.get_filename('Unit Test') 00087 00088 ## Make sparse CSV's 00089 self.skip_10 = make_sparse_skip(self.filename, 10) 00090 self.length_10 = make_sparse_length(self.filename, 10) 00091 00092 ##\brief Tests that exported file exists and is not None 00093 def test_file_exists(self): 00094 self.assert_(self.filename is not None, "CSV file is None") 00095 self.assert_(os.path.isfile(self.filename), "CSV file doesn't exist") 00096 00097 ##\brief Test that CSV file has correct data, number of lines 00098 def test_export(self): 00099 # Read CSV, count rows 00100 input_reader = csv.reader(open(self.filename, 'rb')) 00101 count = -1 00102 for row in input_reader: 00103 if count == -1: 00104 self.assert_(row[2].strip() == 'Message') 00105 self.assert_(row[3].strip() == 'Hardware ID') 00106 self.assert_(row[4].strip() == 'Value A') 00107 count += 1 00108 continue 00109 00110 self.assert_(row[2].strip() == 'OK') 00111 self.assert_(row[3].strip() == 'HW ID') 00112 self.assert_(row[4].strip() == str(count)) 00113 count += 1 00114 00115 self.assert_(count == row_count, "Row count doesn't match") 00116 00117 ##\brief Tests that sparse CSV made with 'skip' option has correct number of lines 00118 def test_sparse_skip(self): 00119 self.assert_(len(open(self.skip_10).read().split('\n')) <= int(row_count / 10) + 2, "Length of sparse CSV (skipped) incorrect") 00120 00121 ##\brief Tests that sparse CSV made with 'length' option has correct number of lines 00122 def test_sparse_length(self): 00123 self.assert_(len(open(self.length_10).read().split('\n')) == 12, "Length of sparse CSV incorrect") 00124 00125 def tearDown(self): 00126 self.bag.close() 00127 os.remove(self.skip_10) 00128 os.remove(self.length_10) 00129 00130 self.exp.remove_files() 00131 00132 00133 if __name__ == '__main__': 00134 if True: # Use rostest for accurate results 00135 rostest.unitrun(PKG, 'bag_csv_test', TestBagToCSV) 00136 else: 00137 # Manual test suite 00138 suite = unittest.TestSuite() 00139 suite.addTest(TestBagToCSV('test_file_exists')) 00140 suite.addTest(TestBagToCSV('test_export')) 00141 suite.addTest(TestBagToCSV('test_sparse_skip')) 00142 suite.addTest(TestBagToCSV('test_sparse_length')) 00143 00144 unittest.TextTestRunner(verbosity = 2).run(suite)