48 cmd = [
'rosbag',
'info',
49 '/tmp/test_rosbag_record_two_publishers.bag',
51 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
52 out,err = p.communicate()
53 self.assertEqual(p.returncode, 0,
'Failed to check bag\ncmd=%s\nstdout=%s\nstderr=%s'%(cmd,out,err))
56 for l
in out.decode().split(
'\n'):
57 f = l.strip().split(
': ')
58 if len(f) == 2
and f[0] ==
'connections':
62 self.assertEqual(conns, 2)
64 if __name__ ==
'__main__':
65 rosunit.unitrun(
'test_rosbag',
'connection_count', ConnectionCount, sys.argv)
def test_connection_count(self)