5 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached, ProxyActionClient, ProxyServiceCaller
7 from std_msgs.msg
import String
8 from std_srvs.srv
import Trigger, TriggerRequest
9 from flexbe_msgs.msg
import BehaviorExecutionAction, BehaviorExecutionGoal, BehaviorExecutionResult
17 pub = ProxyPublisher({t1: String})
18 pub = ProxyPublisher({t2: String}, _latch=
True)
19 sub = ProxySubscriberCached({t1: String})
20 self.assertTrue(pub.is_available(t1))
22 self.assertTrue(pub.wait_for_any(t1))
23 self.assertFalse(pub.wait_for_any(t2))
24 pub.publish(t1, String(
'1'))
25 pub.publish(t2, String(
'2'))
28 sub = ProxySubscriberCached({t2: String})
31 self.assertTrue(sub.has_msg(t1))
32 self.assertEqual(sub.get_last_msg(t1).data,
'1')
33 sub.remove_last_msg(t1)
34 self.assertFalse(sub.has_msg(t1))
35 self.assertIsNone(sub.get_last_msg(t1))
37 self.assertTrue(sub.has_msg(t2))
38 self.assertEqual(sub.get_last_msg(t2).data,
'2')
42 pub = ProxyPublisher({t1: String})
43 sub = ProxySubscriberCached({t1: String})
45 self.assertTrue(pub.wait_for_any(t1))
47 pub.publish(t1, String(
'1'))
48 pub.publish(t1, String(
'2'))
51 self.assertTrue(sub.has_msg(t1))
52 self.assertTrue(sub.has_buffered(t1))
53 self.assertEqual(sub.get_from_buffer(t1).data,
'1')
55 pub.publish(t1, String(
'3'))
58 self.assertEqual(sub.get_from_buffer(t1).data,
'2')
59 self.assertEqual(sub.get_from_buffer(t1).data,
'3')
60 self.assertIsNone(sub.get_from_buffer(t1))
61 self.assertFalse(sub.has_buffered(t1))
65 rospy.Service(t1, Trigger,
lambda r: (
True,
'ok'))
67 srv = ProxyServiceCaller({t1: Trigger})
69 result = srv.call(t1, TriggerRequest())
70 self.assertIsNotNone(result)
71 self.assertTrue(result.success)
72 self.assertEqual(result.message,
'ok')
74 self.assertFalse(srv.is_available(
'/not_there'))
75 srv = ProxyServiceCaller({
'/invalid': Trigger}, wait_duration=.1)
76 self.assertFalse(srv.is_available(
'/invalid'))
84 if server.is_preempt_requested():
85 server.set_preempted()
87 server.set_succeeded(BehaviorExecutionResult(outcome=
'ok'))
92 client = ProxyActionClient({t1: BehaviorExecutionAction})
93 self.assertFalse(client.has_result(t1))
94 client.send_goal(t1, BehaviorExecutionGoal())
98 self.assertTrue(client.is_active(t1)
or client.has_result(t1))
100 self.assertTrue(client.has_result(t1))
102 result = client.get_result(t1)
103 self.assertEqual(result.outcome,
'ok')
105 client.send_goal(t1, BehaviorExecutionGoal())
110 self.assertFalse(client.is_active(t1))
112 self.assertFalse(client.is_available(
'/not_there'))
113 client = ProxyActionClient({
'/invalid': BehaviorExecutionAction}, wait_duration=.1)
114 self.assertFalse(client.is_available(
'/invalid'))
117 if __name__ ==
'__main__':
118 rospy.init_node(
'test_flexbe_proxies')
120 rostest.rosrun(
'flexbe_core',
'test_flexbe_proxies', TestProxies)
def test_subscribe_buffer(self)
def test_service_caller(self)
def test_action_client(self)
def test_publish_subscribe(self)