12 super(CloseWaitCheck, self).
__init__(*args)
13 rospy.init_node(
"ClosWaitCheck")
18 @unittest.expectedFailure
20 rospy.loginfo(
"check close_wait...")
23 num = int(subprocess.check_output(
"lsof -nl | grep rosmaster | grep CLOSE_WAIT | wc -l", shell=
True))
24 rospy.loginfo(
"number of close_wait is ... %d (%d)"%(num, i))
25 self.assert_(num>1,
"some socket is not closed, found CLOSE_WAIT")
26 subprocess.call(
"rosnode kill sample_topic_buffer_server", shell=
True)
27 rospy.sleep(rospy.Duration(1.0))
32 if __name__ ==
'__main__':
34 rostest.run(
'rostest',
"CloseWaitCheck", CloseWaitCheck, sys.argv)
35 except KeyboardInterrupt, e:
def test_close_wait_check(self)