bag_csv_test.py
Go to the documentation of this file.
00001 #!/usr/bin/python
00002 #
00003 # Software License Agreement (BSD License)
00004 #
00005 # Copyright (c) 2008, Willow Garage, Inc.
00006 # All rights reserved.
00007 #
00008 # Redistribution and use in source and binary forms, with or without
00009 # modification, are permitted provided that the following conditions
00010 # are met:
00011 #
00012 #  * Redistributions of source code must retain the above copyright
00013 #    notice, this list of conditions and the following disclaimer.
00014 #  * Redistributions in binary form must reproduce the above
00015 #    copyright notice, this list of conditions and the following
00016 #    disclaimer in the documentation and/or other materials provided
00017 #    with the distribution.
00018 #  * Neither the name of the Willow Garage nor the names of its
00019 #    contributors may be used to endorse or promote products derived
00020 #    from this software without specific prior written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 
00035 # Author: Kevin Watts
00036 
00037 PKG = 'diagnostic_analysis'
00038 
00039 import roslib; roslib.load_manifest(PKG)
00040 import rostest
00041 import unittest
00042 
00043 import rosbag
00044 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00045 
00046 import random
00047 import tempfile
00048 import time, os
00049 import csv
00050 
00051 from diagnostic_analysis.exporter import LogExporter
00052 from diagnostic_analysis.sparse import *
00053 
00054 row_count = 100
00055 
00056 ##\brief Make DiagnosticArray message for testing
00057 def make_status_msg(count):
00058     array = DiagnosticArray()
00059     stat = DiagnosticStatus()
00060     stat.level = 0
00061     stat.message = 'OK'
00062     stat.name = 'Unit Test'
00063     stat.hardware_id = 'HW ID'
00064     stat.values = [ 
00065         KeyValue('Value A', str(count)),
00066         KeyValue('Value B', str(count)),
00067         KeyValue('Value C', str(count))]
00068     array.status = [ stat ]
00069     return array
00070 
00071 ##\brief Tests convert logfile to CSV and making sparse
00072 class TestBagToCSV(unittest.TestCase):
00073     def setUp(self):
00074         # Make logfile with bogus messages
00075         self.bag = tempfile.NamedTemporaryFile()
00076 
00077         rebagger = rosbag.Bag(self.bag.name, 'w')
00078         for i in range(0, row_count):
00079             rebagger.write("/diagnostics", make_status_msg(i))
00080         rebagger.close()
00081 
00082         # Make CSV
00083         self.exp = LogExporter(None, self.bag.name)
00084         self.exp.process_log()
00085         self.exp.finish_logfile()
00086         self.filename = self.exp.get_filename('Unit Test')
00087 
00088         ## Make sparse CSV's
00089         self.skip_10 = make_sparse_skip(self.filename, 10)
00090         self.length_10 = make_sparse_length(self.filename, 10)
00091 
00092     ##\brief Tests that exported file exists and is not None
00093     def test_file_exists(self):
00094         self.assert_(self.filename is not None, "CSV file is None")
00095         self.assert_(os.path.isfile(self.filename), "CSV file doesn't exist")
00096 
00097     ##\brief Test that CSV file has correct data, number of lines
00098     def test_export(self):
00099         # Read CSV, count rows
00100         input_reader = csv.reader(open(self.filename, 'rb'))
00101         count = -1
00102         for row in input_reader:
00103             if count == -1:
00104                 self.assert_(row[2].strip() == 'Message')
00105                 self.assert_(row[3].strip() == 'Hardware ID')
00106                 self.assert_(row[4].strip() == 'Value A')
00107                 count += 1
00108                 continue
00109 
00110             self.assert_(row[2].strip() == 'OK')
00111             self.assert_(row[3].strip() == 'HW ID')
00112             self.assert_(row[4].strip() == str(count))
00113             count += 1
00114       
00115         self.assert_(count == row_count, "Row count doesn't match")
00116 
00117     ##\brief Tests that sparse CSV made with 'skip' option has correct number of lines
00118     def test_sparse_skip(self):
00119         self.assert_(len(open(self.skip_10).read().split('\n')) <= int(row_count / 10) + 2, "Length of sparse CSV (skipped) incorrect")
00120 
00121     ##\brief Tests that sparse CSV made with 'length' option has correct number of lines
00122     def test_sparse_length(self):
00123         self.assert_(len(open(self.length_10).read().split('\n')) == 12, "Length of sparse CSV incorrect")
00124 
00125     def tearDown(self):
00126         self.bag.close()
00127         os.remove(self.skip_10)
00128         os.remove(self.length_10)
00129 
00130         self.exp.remove_files()
00131 
00132         
00133 if __name__ == '__main__':
00134     if True: # Use rostest for accurate results
00135         rostest.unitrun(PKG, 'bag_csv_test', TestBagToCSV)
00136     else:
00137         # Manual test suite
00138         suite = unittest.TestSuite()
00139         suite.addTest(TestBagToCSV('test_file_exists'))
00140         suite.addTest(TestBagToCSV('test_export'))
00141         suite.addTest(TestBagToCSV('test_sparse_skip'))
00142         suite.addTest(TestBagToCSV('test_sparse_length'))
00143         
00144         unittest.TextTestRunner(verbosity = 2).run(suite)


diagnostic_analysis
Author(s): Eric Berger, Kevin Watts
autogenerated on Fri Jan 3 2014 11:19:15