test_tf_web_republisher.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'test_tf2_web_republisher'
00003 
00004 import actionlib
00005 import rospy
00006 from tf2_web_republisher.msg import TFSubscriptionAction, TFSubscriptionGoal, TFArray
00007 from tf2_web_republisher.srv import RepublishTFs, RepublishTFsRequest
00008 
00009 import sys
00010 import unittest
00011 
00012 
00013 class TestTfRepublisher(unittest.TestCase):
00014     def setUp(self):
00015         self.msgs_received = 0
00016 
00017     def transform_cb(self, _):
00018         self.msgs_received += 1
00019 
00020     """
00021     Test the action interface for tf2_web_republisher.
00022     """
00023     def test_action(self):
00024         client = actionlib.SimpleActionClient('tf2_web_republisher',
00025                                               TFSubscriptionAction)
00026         client.wait_for_server(timeout=rospy.Duration(2))
00027         goal = TFSubscriptionGoal(
00028             source_frames=['foo', 'bar'],
00029             target_frame='world',
00030             angular_thres=0.1,
00031             trans_thres=0.05,
00032             rate=2.0)
00033 
00034         client.send_goal(goal, done_cb=None, active_cb=None,
00035                          feedback_cb=self.transform_cb)
00036         rospy.sleep(1.3)
00037         client.cancel_goal()
00038         # We should have gotten two feedback messages by now
00039         self.assertEquals(2, self.msgs_received)
00040         rospy.sleep(1.0)
00041         # We cancelled, so we expect no further feedback
00042         self.assertEquals(2, self.msgs_received)
00043 
00044     def test_service(self):
00045         self.assertEquals(0, self.msgs_received)
00046         rospy.wait_for_service('republish_tfs', 2.0)
00047         proxy = rospy.ServiceProxy('republish_tfs', RepublishTFs)
00048         result = proxy(RepublishTFsRequest(source_frames=['foo', 'bar'],
00049                                            target_frame='world',
00050                                            angular_thres=0.1,
00051                                            trans_thres=0.05,
00052                                            rate=2,
00053                                            timeout=rospy.Duration(1.0)))
00054         sub = rospy.Subscriber(result.topic_name,
00055                                TFArray,
00056                                self.transform_cb)
00057         rospy.sleep(1.3)
00058         self.assertTrue(any([topic_tuple[0] == result.topic_name
00059                              for topic_tuple in rospy.get_published_topics()]),
00060                             msg=str(rospy.get_published_topics()))
00061         sub.unregister()
00062         self.assertEquals(2, self.msgs_received)
00063         rospy.sleep(2.0)
00064         self.assertFalse(any([topic_tuple[0] == result.topic_name
00065                               for topic_tuple in rospy.get_published_topics()]))
00066 
00067 
00068 if __name__ == '__main__':
00069     import rostest
00070     rospy.init_node('test_tf_web_republisher')
00071     rostest.rosrun(PKG, 'test_tf2_web_republisher', TestTfRepublisher)
00072 


tf2_web_republisher
Author(s): Julius Kammer
autogenerated on Thu Jun 6 2019 20:34:26