Go to the documentation of this file.00001
00002 PKG = 'test_tf2_web_republisher'
00003
00004 import actionlib
00005 import rospy
00006 from tf2_web_republisher.msg import TFSubscriptionAction, TFSubscriptionGoal, TFArray
00007 from tf2_web_republisher.srv import RepublishTFs, RepublishTFsRequest
00008
00009 import sys
00010 import unittest
00011
00012
00013 class TestTfRepublisher(unittest.TestCase):
00014 def setUp(self):
00015 self.msgs_received = 0
00016
00017 def transform_cb(self, _):
00018 self.msgs_received += 1
00019
00020 """
00021 Test the action interface for tf2_web_republisher.
00022 """
00023 def test_action(self):
00024 client = actionlib.SimpleActionClient('tf2_web_republisher',
00025 TFSubscriptionAction)
00026 client.wait_for_server(timeout=rospy.Duration(2))
00027 goal = TFSubscriptionGoal(
00028 source_frames=['foo', 'bar'],
00029 target_frame='world',
00030 angular_thres=0.1,
00031 trans_thres=0.05,
00032 rate=2.0)
00033
00034 client.send_goal(goal, done_cb=None, active_cb=None,
00035 feedback_cb=self.transform_cb)
00036 rospy.sleep(1.3)
00037 client.cancel_goal()
00038
00039 self.assertEquals(2, self.msgs_received)
00040 rospy.sleep(1.0)
00041
00042 self.assertEquals(2, self.msgs_received)
00043
00044 def test_service(self):
00045 self.assertEquals(0, self.msgs_received)
00046 rospy.wait_for_service('republish_tfs', 2.0)
00047 proxy = rospy.ServiceProxy('republish_tfs', RepublishTFs)
00048 result = proxy(RepublishTFsRequest(source_frames=['foo', 'bar'],
00049 target_frame='world',
00050 angular_thres=0.1,
00051 trans_thres=0.05,
00052 rate=2,
00053 timeout=rospy.Duration(1.0)))
00054 sub = rospy.Subscriber(result.topic_name,
00055 TFArray,
00056 self.transform_cb)
00057 rospy.sleep(1.3)
00058 self.assertTrue(any([topic_tuple[0] == result.topic_name
00059 for topic_tuple in rospy.get_published_topics()]),
00060 msg=str(rospy.get_published_topics()))
00061 sub.unregister()
00062 self.assertEquals(2, self.msgs_received)
00063 rospy.sleep(2.0)
00064 self.assertFalse(any([topic_tuple[0] == result.topic_name
00065 for topic_tuple in rospy.get_published_topics()]))
00066
00067
00068 if __name__ == '__main__':
00069 import rostest
00070 rospy.init_node('test_tf_web_republisher')
00071 rostest.rosrun(PKG, 'test_tf2_web_republisher', TestTfRepublisher)
00072