test_proxies.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import unittest
3 import rospy
4 import actionlib
5 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached, ProxyActionClient, ProxyServiceCaller
6 
7 from std_msgs.msg import String
8 from std_srvs.srv import Trigger, TriggerRequest
9 from flexbe_msgs.msg import BehaviorExecutionAction, BehaviorExecutionGoal, BehaviorExecutionResult
10 
11 
12 class TestProxies(unittest.TestCase):
13 
15  t1 = '/pubsub_1'
16  t2 = '/pubsub_2'
17  pub = ProxyPublisher({t1: String})
18  pub = ProxyPublisher({t2: String}, _latch=True)
19  sub = ProxySubscriberCached({t1: String})
20  self.assertTrue(pub.is_available(t1))
21 
22  self.assertTrue(pub.wait_for_any(t1))
23  self.assertFalse(pub.wait_for_any(t2))
24  pub.publish(t1, String('1'))
25  pub.publish(t2, String('2'))
26 
27  rospy.sleep(.5) # make sure latched message is sent before subscriber is added
28  sub = ProxySubscriberCached({t2: String})
29  rospy.sleep(.5) # make sure latched message can be received before checking
30 
31  self.assertTrue(sub.has_msg(t1))
32  self.assertEqual(sub.get_last_msg(t1).data, '1')
33  sub.remove_last_msg(t1)
34  self.assertFalse(sub.has_msg(t1))
35  self.assertIsNone(sub.get_last_msg(t1))
36 
37  self.assertTrue(sub.has_msg(t2))
38  self.assertEqual(sub.get_last_msg(t2).data, '2')
39 
41  t1 = '/buffered_1'
42  pub = ProxyPublisher({t1: String})
43  sub = ProxySubscriberCached({t1: String})
44  sub.enable_buffer(t1)
45  self.assertTrue(pub.wait_for_any(t1))
46 
47  pub.publish(t1, String('1'))
48  pub.publish(t1, String('2'))
49  rospy.sleep(.5) # make sure messages can be received
50 
51  self.assertTrue(sub.has_msg(t1))
52  self.assertTrue(sub.has_buffered(t1))
53  self.assertEqual(sub.get_from_buffer(t1).data, '1')
54 
55  pub.publish(t1, String('3'))
56  rospy.sleep(.5) # make sure messages can be received
57 
58  self.assertEqual(sub.get_from_buffer(t1).data, '2')
59  self.assertEqual(sub.get_from_buffer(t1).data, '3')
60  self.assertIsNone(sub.get_from_buffer(t1))
61  self.assertFalse(sub.has_buffered(t1))
62 
64  t1 = '/service_1'
65  rospy.Service(t1, Trigger, lambda r: (True, 'ok'))
66 
67  srv = ProxyServiceCaller({t1: Trigger})
68 
69  result = srv.call(t1, TriggerRequest())
70  self.assertIsNotNone(result)
71  self.assertTrue(result.success)
72  self.assertEqual(result.message, 'ok')
73 
74  self.assertFalse(srv.is_available('/not_there'))
75  srv = ProxyServiceCaller({'/invalid': Trigger}, wait_duration=.1)
76  self.assertFalse(srv.is_available('/invalid'))
77 
78  def test_action_client(self):
79  t1 = '/action_1'
80  server = None
81 
82  def execute_cb(goal):
83  rospy.sleep(.5)
84  if server.is_preempt_requested():
85  server.set_preempted()
86  else:
87  server.set_succeeded(BehaviorExecutionResult(outcome='ok'))
88 
89  server = actionlib.SimpleActionServer(t1, BehaviorExecutionAction, execute_cb, auto_start=False)
90  server.start()
91 
92  client = ProxyActionClient({t1: BehaviorExecutionAction})
93  self.assertFalse(client.has_result(t1))
94  client.send_goal(t1, BehaviorExecutionGoal())
95 
96  rate = rospy.Rate(20)
97  for i in range(20):
98  self.assertTrue(client.is_active(t1) or client.has_result(t1))
99  rate.sleep()
100  self.assertTrue(client.has_result(t1))
101 
102  result = client.get_result(t1)
103  self.assertEqual(result.outcome, 'ok')
104 
105  client.send_goal(t1, BehaviorExecutionGoal())
106  rospy.sleep(.1)
107  client.cancel(t1)
108  rospy.sleep(.9)
109 
110  self.assertFalse(client.is_active(t1))
111 
112  self.assertFalse(client.is_available('/not_there'))
113  client = ProxyActionClient({'/invalid': BehaviorExecutionAction}, wait_duration=.1)
114  self.assertFalse(client.is_available('/invalid'))
115 
116 
117 if __name__ == '__main__':
118  rospy.init_node('test_flexbe_proxies')
119  import rostest
120  rostest.rosrun('flexbe_core', 'test_flexbe_proxies', TestProxies)
def test_subscribe_buffer(self)
Definition: test_proxies.py:40
def test_publish_subscribe(self)
Definition: test_proxies.py:14


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39