Go to the documentation of this file.00001
00002
00003
00004 import sys
00005 import rospy
00006 import subprocess
00007 import unittest
00008 import rostest
00009
00010 class CloseWaitCheck(unittest.TestCase):
00011 def __init__(self, *args):
00012 super(CloseWaitCheck, self).__init__(*args)
00013 rospy.init_node("ClosWaitCheck")
00014 def setUp(self):
00015 True
00016
00017
00018 @unittest.expectedFailure
00019 def test_close_wait_check(self):
00020 rospy.loginfo("check close_wait...")
00021 num = 0
00022 for i in range(5):
00023 num = int(subprocess.check_output("lsof -nl | grep rosmaster | grep CLOSE_WAIT | wc -l", shell=True))
00024 rospy.loginfo("number of close_wait is ... %d (%d)"%(num, i))
00025 self.assert_(num>1, "some socket is not closed, found CLOSE_WAIT")
00026 subprocess.call("rosnode kill sample_topic_buffer_server", shell=True)
00027 rospy.sleep(rospy.Duration(1.0))
00028 return True
00029
00030
00031
00032 if __name__ == '__main__':
00033 try:
00034 rostest.run('rostest', "CloseWaitCheck", CloseWaitCheck, sys.argv)
00035 except KeyboardInterrupt, e:
00036 pass
00037 print "exiting"
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049