test_tf_web_republisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 PKG = 'test_tf2_web_republisher'
3 
4 import actionlib
5 import rospy
6 from tf2_web_republisher.msg import TFSubscriptionAction, TFSubscriptionGoal, TFArray
7 from tf2_web_republisher.srv import RepublishTFs, RepublishTFsRequest
8 
9 import sys
10 import unittest
11 
12 
13 class TestTfRepublisher(unittest.TestCase):
14  def setUp(self):
15  self.msgs_received = 0
16 
17  def transform_cb(self, _):
18  self.msgs_received += 1
19 
20  """
21  Test the action interface for tf2_web_republisher.
22  """
23  def test_action(self):
24  client = actionlib.SimpleActionClient('tf2_web_republisher',
25  TFSubscriptionAction)
26  client.wait_for_server(timeout=rospy.Duration(2))
27  goal = TFSubscriptionGoal(
28  source_frames=['foo', 'bar'],
29  target_frame='world',
30  angular_thres=0.1,
31  trans_thres=0.05,
32  rate=2.0)
33 
34  client.send_goal(goal, done_cb=None, active_cb=None,
35  feedback_cb=self.transform_cb)
36  rospy.sleep(1.3)
37  client.cancel_goal()
38  # We should have gotten two feedback messages by now
39  self.assertEquals(2, self.msgs_received)
40  rospy.sleep(1.0)
41  # We cancelled, so we expect no further feedback
42  self.assertEquals(2, self.msgs_received)
43 
44  def test_service(self):
45  self.assertEquals(0, self.msgs_received)
46  rospy.wait_for_service('republish_tfs', 2.0)
47  proxy = rospy.ServiceProxy('republish_tfs', RepublishTFs)
48  result = proxy(RepublishTFsRequest(source_frames=['foo', 'bar'],
49  target_frame='world',
50  angular_thres=0.1,
51  trans_thres=0.05,
52  rate=2,
53  timeout=rospy.Duration(1.0)))
54  sub = rospy.Subscriber(result.topic_name,
55  TFArray,
56  self.transform_cb)
57  rospy.sleep(1.3)
58  self.assertTrue(any([topic_tuple[0] == result.topic_name
59  for topic_tuple in rospy.get_published_topics()]),
60  msg=str(rospy.get_published_topics()))
61  sub.unregister()
62  self.assertEquals(2, self.msgs_received)
63  rospy.sleep(2.0)
64  self.assertFalse(any([topic_tuple[0] == result.topic_name
65  for topic_tuple in rospy.get_published_topics()]))
66 
67 
68 if __name__ == '__main__':
69  import rostest
70  rospy.init_node('test_tf_web_republisher')
71  rostest.rosrun(PKG, 'test_tf2_web_republisher', TestTfRepublisher)
72 


tf2_web_republisher
Author(s): Julius Kammer
autogenerated on Mon Apr 5 2021 03:02:43