Go to the documentation of this file.00001
00002
00003 import roslib; roslib.load_manifest('test_nodelet')
00004 import rospy
00005 import unittest
00006 import rostest
00007 import signal
00008 import subprocess
00009 import time
00010
00011 from nodelet.srv import *
00012
00013 class TestBondBreakOnShutdown(unittest.TestCase):
00014 def test_bond_break_on_shutdown(self):
00015 '''
00016 Test that the bond is broken cleanly when closing a nodelet loader (#50).
00017 This relies on a debug message printed by the bondcpp package in case
00018 of error.
00019 '''
00020
00021
00022 proc_manager = subprocess.Popen(["rosrun", "nodelet", "nodelet", "manager", "__name:=nodelet_manager"],
00023 stdout=subprocess.PIPE,
00024 stderr=subprocess.PIPE,
00025 bufsize=-1
00026 )
00027
00028
00029 try:
00030 rospy.wait_for_service('/nodelet_manager/load_nodelet', timeout=2)
00031 except:
00032 self.fail("Could not determine that nodelet manager has started")
00033
00034
00035 proc_nodelet = subprocess.Popen(["rosrun", "nodelet", "nodelet", "load", "test_nodelet/Plus", "nodelet_manager", "__name:=test"],
00036 stdout=subprocess.PIPE,
00037 stderr=subprocess.PIPE,
00038 bufsize=-1
00039 )
00040
00041
00042 try:
00043 rospy.wait_for_service('test/get_loggers', timeout=2)
00044 except:
00045 self.fail("Could not determine that nodelet has started")
00046 time.sleep(1)
00047
00048
00049 proc_nodelet.send_signal(signal.SIGINT)
00050 (n_out, n_err) = proc_nodelet.communicate()
00051
00052
00053 proc_manager.send_signal(signal.SIGINT)
00054 (m_out, m_err) = proc_manager.communicate()
00055
00056
00057 self.assertIn('Unloading nodelet /test from manager nodelet_manager', n_out)
00058 self.assertNotIn('Bond failed to break on destruction', m_out)
00059 self.assertNotIn('Bond failed to break on destruction', n_out)
00060
00061 if __name__ == '__main__':
00062 rospy.init_node('test_bond_break_on_shutdown')
00063 rostest.unitrun('test_bond_break_on_shutdown', 'test_bond_break_on_shutdown', TestBondBreakOnShutdown)
00064