Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 PKG = 'diagnostic_analysis'
00038 
00039 import roslib; roslib.load_manifest(PKG)
00040 import rostest
00041 import unittest
00042 
00043 import rosbag
00044 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00045 
00046 import random
00047 import tempfile
00048 import time, os
00049 import csv
00050 
00051 from diagnostic_analysis.exporter import LogExporter
00052 from diagnostic_analysis.sparse import *
00053 
00054 row_count = 100
00055 
00056 
00057 def make_status_msg(count):
00058     array = DiagnosticArray()
00059     stat = DiagnosticStatus()
00060     stat.level = 0
00061     stat.message = 'OK'
00062     stat.name = 'Unit Test'
00063     stat.hardware_id = 'HW ID'
00064     stat.values = [ 
00065         KeyValue('Value A', str(count)),
00066         KeyValue('Value B', str(count)),
00067         KeyValue('Value C', str(count))]
00068     array.status = [ stat ]
00069     return array
00070 
00071 
00072 class TestBagToCSV(unittest.TestCase):
00073     def setUp(self):
00074         
00075         self.bag = tempfile.NamedTemporaryFile()
00076 
00077         rebagger = rosbag.Bag(self.bag.name, 'w')
00078         for i in range(0, row_count):
00079             rebagger.write("/diagnostics", make_status_msg(i))
00080         rebagger.close()
00081 
00082         
00083         self.exp = LogExporter(None, self.bag.name)
00084         self.exp.process_log()
00085         self.exp.finish_logfile()
00086         self.filename = self.exp.get_filename('Unit Test')
00087 
00088         
00089         self.skip_10 = make_sparse_skip(self.filename, 10)
00090         self.length_10 = make_sparse_length(self.filename, 10)
00091 
00092     
00093     def test_file_exists(self):
00094         self.assert_(self.filename is not None, "CSV file is None")
00095         self.assert_(os.path.isfile(self.filename), "CSV file doesn't exist")
00096 
00097     
00098     def test_export(self):
00099         
00100         input_reader = csv.reader(open(self.filename, 'rb'))
00101         count = -1
00102         for row in input_reader:
00103             if count == -1:
00104                 self.assert_(row[2].strip() == 'Message')
00105                 self.assert_(row[3].strip() == 'Hardware ID')
00106                 self.assert_(row[4].strip() == 'Value A')
00107                 count += 1
00108                 continue
00109 
00110             self.assert_(row[2].strip() == 'OK')
00111             self.assert_(row[3].strip() == 'HW ID')
00112             self.assert_(row[4].strip() == str(count))
00113             count += 1
00114       
00115         self.assert_(count == row_count, "Row count doesn't match")
00116 
00117     
00118     def test_sparse_skip(self):
00119         self.assert_(len(open(self.skip_10).read().split('\n')) <= int(row_count / 10) + 2, "Length of sparse CSV (skipped) incorrect")
00120 
00121     
00122     def test_sparse_length(self):
00123         self.assert_(len(open(self.length_10).read().split('\n')) == 12, "Length of sparse CSV incorrect")
00124 
00125     def tearDown(self):
00126         self.bag.close()
00127         os.remove(self.skip_10)
00128         os.remove(self.length_10)
00129 
00130         self.exp.remove_files()
00131 
00132         
00133 if __name__ == '__main__':
00134     if True: 
00135         rostest.unitrun(PKG, 'bag_csv_test', TestBagToCSV)
00136     else:
00137         
00138         suite = unittest.TestSuite()
00139         suite.addTest(TestBagToCSV('test_file_exists'))
00140         suite.addTest(TestBagToCSV('test_export'))
00141         suite.addTest(TestBagToCSV('test_sparse_skip'))
00142         suite.addTest(TestBagToCSV('test_sparse_length'))
00143         
00144         unittest.TextTestRunner(verbosity = 2).run(suite)