Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 import roslib; roslib.load_manifest('test_nodelet')
00032 import rospy
00033 import unittest
00034 import rostest
00035 import random
00036 import subprocess
00037
00038 from nodelet.srv import *
00039
00040 class TestLoader(unittest.TestCase):
00041 def test_loader(self):
00042 load = rospy.ServiceProxy('/nodelet_manager/load_nodelet', NodeletLoad)
00043 unload = rospy.ServiceProxy('/nodelet_manager/unload_nodelet', NodeletUnload)
00044 list = rospy.ServiceProxy('/nodelet_manager/list', NodeletList)
00045
00046 load.wait_for_service()
00047 unload.wait_for_service()
00048 list.wait_for_service()
00049
00050 req = NodeletLoadRequest()
00051 req.name = "/my_nodelet"
00052 req.type = "test_nodelet/Plus"
00053
00054 res = load.call(req)
00055 self.assertTrue(res.success)
00056
00057 req = NodeletListRequest()
00058 res = list.call(req)
00059 self.assertTrue("/my_nodelet" in res.nodelets)
00060
00061 req = NodeletUnloadRequest()
00062 req.name = "/my_nodelet"
00063 res = unload.call(req)
00064 self.assertTrue(res.success)
00065
00066 def test_loader_failed_initialization(self):
00067 load = rospy.ServiceProxy('/nodelet_manager/load_nodelet', NodeletLoad)
00068 unload = rospy.ServiceProxy('/nodelet_manager/unload_nodelet', NodeletUnload)
00069 list = rospy.ServiceProxy('/nodelet_manager/list', NodeletList)
00070
00071 load.wait_for_service()
00072 unload.wait_for_service()
00073 list.wait_for_service()
00074
00075 req = NodeletLoadRequest()
00076 req.name = "/my_nodelet"
00077 req.type = "test_nodelet/FailingNodelet"
00078 self.assertRaises(rospy.ServiceException, load.call, req)
00079
00080
00081
00082 req = NodeletListRequest()
00083 res = list.call(req)
00084 self.assertFalse("/my_nodelet" in res.nodelets)
00085
00086 req = NodeletUnloadRequest()
00087 req.name = "/my_nodelet"
00088 self.assertRaises(rospy.ServiceException, unload.call, req)
00089
00090 def test_loader_error_message(self):
00091 proc = subprocess.Popen(["rosrun", "test_nodelet", "create_instance_cb_error"],
00092 stderr=subprocess.PIPE
00093 )
00094
00095
00096 self.assertEqual(proc.wait(), 1)
00097
00098
00099 out = proc.stderr.read()
00100 self.assertIn('NODELET_TEST_FAILURE', out)
00101
00102 if __name__ == '__main__':
00103 rospy.init_node('test_loader')
00104 rostest.unitrun('test_loader', 'test_loader', TestLoader)
00105