00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 PKG = 'diagnostic_analysis'
00038
00039 import roslib; roslib.load_manifest(PKG)
00040 import rostest
00041 import unittest
00042
00043 import rosbag
00044 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00045
00046 import random
00047 import tempfile
00048 import time, os
00049 import csv
00050
00051 from diagnostic_analysis.exporter import LogExporter
00052 from diagnostic_analysis.sparse import *
00053
00054 row_count = 100
00055
00056
00057 def make_status_msg(count):
00058 array = DiagnosticArray()
00059 stat = DiagnosticStatus()
00060 stat.level = 0
00061 stat.message = 'OK'
00062 stat.name = 'Unit Test'
00063 stat.hardware_id = 'HW ID'
00064 stat.values = [
00065 KeyValue('Value A', str(count)),
00066 KeyValue('Value B', str(count)),
00067 KeyValue('Value C', str(count))]
00068 array.status = [ stat ]
00069 return array
00070
00071
00072 class TestBagToCSV(unittest.TestCase):
00073 def setUp(self):
00074
00075 self.bag = tempfile.NamedTemporaryFile()
00076
00077 rebagger = rosbag.Bag(self.bag.name, 'w')
00078 for i in range(0, row_count):
00079 rebagger.write("/diagnostics", make_status_msg(i))
00080 rebagger.close()
00081
00082
00083 self.exp = LogExporter(None, self.bag.name)
00084 self.exp.process_log()
00085 self.exp.finish_logfile()
00086 self.filename = self.exp.get_filename('Unit Test')
00087
00088
00089 self.skip_10 = make_sparse_skip(self.filename, 10)
00090 self.length_10 = make_sparse_length(self.filename, 10)
00091
00092
00093 def test_file_exists(self):
00094 self.assert_(self.filename is not None, "CSV file is None")
00095 self.assert_(os.path.isfile(self.filename), "CSV file doesn't exist")
00096
00097
00098 def test_export(self):
00099
00100 input_reader = csv.reader(open(self.filename, 'rb'))
00101 count = -1
00102 for row in input_reader:
00103 if count == -1:
00104 self.assert_(row[2].strip() == 'Message')
00105 self.assert_(row[3].strip() == 'Hardware ID')
00106 self.assert_(row[4].strip() == 'Value A')
00107 count += 1
00108 continue
00109
00110 self.assert_(row[2].strip() == 'OK')
00111 self.assert_(row[3].strip() == 'HW ID')
00112 self.assert_(row[4].strip() == str(count))
00113 count += 1
00114
00115 self.assert_(count == row_count, "Row count doesn't match")
00116
00117
00118 def test_sparse_skip(self):
00119 self.assert_(len(open(self.skip_10).read().split('\n')) <= int(row_count / 10) + 2, "Length of sparse CSV (skipped) incorrect")
00120
00121
00122 def test_sparse_length(self):
00123 self.assert_(len(open(self.length_10).read().split('\n')) == 12, "Length of sparse CSV incorrect")
00124
00125 def tearDown(self):
00126 self.bag.close()
00127 os.remove(self.skip_10)
00128 os.remove(self.length_10)
00129
00130 self.exp.remove_files()
00131
00132
00133 if __name__ == '__main__':
00134 if True:
00135 rostest.unitrun(PKG, 'bag_csv_test', TestBagToCSV)
00136 else:
00137
00138 suite = unittest.TestSuite()
00139 suite.addTest(TestBagToCSV('test_file_exists'))
00140 suite.addTest(TestBagToCSV('test_export'))
00141 suite.addTest(TestBagToCSV('test_sparse_skip'))
00142 suite.addTest(TestBagToCSV('test_sparse_length'))
00143
00144 unittest.TextTestRunner(verbosity = 2).run(suite)