37 PKG =
'diagnostic_analysis'
39 import roslib; roslib.load_manifest(PKG)
44 from diagnostic_msgs.msg
import DiagnosticArray, DiagnosticStatus, KeyValue
58 array = DiagnosticArray()
59 stat = DiagnosticStatus()
62 stat.name =
'Unit Test'
63 stat.hardware_id =
'HW ID'
65 KeyValue(
'Value A', str(count)),
66 KeyValue(
'Value B', str(count)),
67 KeyValue(
'Value C', str(count))]
68 array.status = [ stat ]
75 self.
bag = tempfile.NamedTemporaryFile()
78 for i
in range(0, row_count):
84 self.
exp.process_log()
85 self.
exp.finish_logfile()
94 self.assert_(self.
filename is not None,
"CSV file is None")
95 self.assert_(os.path.isfile(self.
filename),
"CSV file doesn't exist")
100 input_reader = csv.reader(open(self.
filename, newline=
''), delimiter=
',')
102 for row
in input_reader:
104 self.assert_(row[2].strip() ==
'Message')
105 self.assert_(row[3].strip() ==
'Hardware ID')
106 self.assert_(row[4].strip() ==
'Value A')
110 self.assert_(row[2].strip() ==
'OK')
111 self.assert_(row[3].strip() ==
'HW ID')
112 self.assert_(row[4].strip() == str(count))
115 self.assert_(count == row_count,
"Row count doesn't match")
119 self.assert_(len(open(self.
skip_10).read().split(
'\n')) <= int(row_count / 10) + 2,
"Length of sparse CSV (skipped) incorrect")
123 self.assert_(len(open(self.
length_10).read().split(
'\n')) == 12,
"Length of sparse CSV incorrect")
130 self.
exp.remove_files()
133 if __name__ ==
'__main__':
135 rostest.unitrun(PKG,
'bag_csv_test', TestBagToCSV)
138 suite = unittest.TestSuite()
144 unittest.TextTestRunner(verbosity = 2).run(suite)