5 from flexbe_core
import EventState, OperatableStateMachine, ConcurrencyContainer
10 from std_msgs.msg
import Bool, Empty, UInt8, String
11 from flexbe_msgs.msg
import CommandFeedback, OutcomeRequest
17 super(TestSubjectState, self).
__init__(outcomes=[
'done',
'error'])
27 self.last_events.append(
'on_enter')
30 self.last_events.append(
'on_exit')
33 self.last_events.append(
'on_start')
36 self.last_events.append(
'on_stop')
39 self.last_events.append(
'on_pause')
42 self.last_events.append(
'on_resume')
49 state._enable_ros_control()
50 with OperatableStateMachine(outcomes=[
'done',
'error']):
51 OperatableStateMachine.add(
'subject', state,
52 transitions={
'done':
'done',
'error':
'error'},
53 autonomy={
'done': 1,
'error': 2})
57 state.last_events = []
58 return state.parent.execute(
None)
61 rate = rospy.Rate(100)
62 for i
in range(int(timeout * 100)):
63 if sub.has_msg(topic):
64 received = sub.get_last_msg(topic)
65 sub.remove_last_msg(topic)
69 raise AssertionError(
'Did not receive message on topic %s, expected:\n%s' 71 for slot
in msg.__slots__:
72 expected = getattr(msg, slot)
73 actual = getattr(received, slot)
74 error =
"Mismatch for %s, is %s but expected %s" % (slot, actual, expected)
75 if isinstance(expected, list):
76 self.assertListEqual(expected, actual, error)
78 self.assertEqual(expected, actual, error)
81 rate = rospy.Rate(100)
82 for i
in range(int(timeout * 100)):
83 if sub.has_msg(topic):
84 received = sub.get_last_msg(topic)
85 sub.remove_last_msg(topic)
86 raise AssertionError(
'Should not receive message on topic %s, but got:\n%s' 87 % (topic, str(received)))
94 fb_topic =
'flexbe/command_feedback' 95 sub = ProxySubscriberCached({fb_topic: CommandFeedback})
100 self.assertListEqual([
'on_enter'], state.last_events)
102 self.assertListEqual([], state.last_events)
105 state._sub._callback(Bool(
True),
'flexbe/command/pause')
107 self.assertListEqual([
'on_pause'], state.last_events)
108 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
"pause"))
109 state.result =
'error' 112 self.assertIsNone(outcome)
113 state._sub._callback(Bool(
False),
'flexbe/command/pause')
115 self.assertListEqual([
'on_resume'], state.last_events)
116 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
"resume"))
119 state._sub._callback(Empty(),
'flexbe/command/repeat')
121 self.assertListEqual([
'on_exit'], state.last_events)
122 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
"repeat"))
124 self.assertListEqual([
'on_enter'], state.last_events)
126 self.assertListEqual([], state.last_events)
129 state.result =
'done' 131 self.assertListEqual([
'on_exit'], state.last_events)
132 self.assertEqual(
'done', outcome)
136 out_topic =
'flexbe/mirror/outcome' 137 req_topic =
'flexbe/outcome_request' 138 sub = ProxySubscriberCached({out_topic: UInt8, req_topic: OutcomeRequest})
142 state.result =
'error' 148 OperatableStateMachine.autonomy_level = 2
151 self.
assertMessage(sub, req_topic, OutcomeRequest(outcome=1, target=
'/subject'))
154 self.
assertMessage(sub, req_topic, OutcomeRequest(outcome=255, target=
'/subject'))
157 state.result =
'done' 163 OperatableStateMachine.autonomy_level = 0
166 self.
assertMessage(sub, req_topic, OutcomeRequest(outcome=0, target=
'/subject'))
167 OperatableStateMachine.autonomy_level = 3
173 fb_topic =
'flexbe/command_feedback' 174 sub = ProxySubscriberCached({fb_topic: CommandFeedback})
178 PreemptableState.preempt =
True 180 self.assertEqual(outcome, PreemptableState._preempted_name)
181 self.assertRaises(StateMachineError,
lambda: state.parent.current_state)
182 PreemptableState.preempt =
False 184 self.assertIsNone(outcome)
187 state._sub._callback(Empty(),
'flexbe/command/preempt')
189 self.assertEqual(outcome, PreemptableState._preempted_name)
190 self.assertRaises(StateMachineError,
lambda: state.parent.current_state)
191 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'preempt'))
192 PreemptableState.preempt =
False 196 fb_topic =
'flexbe/command_feedback' 197 sub = ProxySubscriberCached({fb_topic: CommandFeedback})
201 state._sub._callback(String(
'/subject'),
'flexbe/command/lock')
202 state.result =
'done' 204 self.assertIsNone(outcome)
205 self.assertTrue(state._locked)
206 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'lock', args=[
'/subject',
'/subject']))
208 state._sub._callback(String(
'/subject'),
'flexbe/command/unlock')
210 self.assertEqual(outcome,
'done')
211 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'unlock', args=[
'/subject',
'/subject']))
214 state._sub._callback(String(
''),
'flexbe/command/lock')
215 state.result =
'done' 217 self.assertIsNone(outcome)
218 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'lock', args=[
'/subject',
'/subject']))
219 state._sub._callback(String(
''),
'flexbe/command/unlock')
221 self.assertEqual(outcome,
'done')
222 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'unlock', args=[
'/subject',
'/subject']))
225 state._sub._callback(String(
'/invalid'),
'flexbe/command/lock')
227 self.assertEqual(outcome,
'done')
228 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'lock', args=[
'/invalid',
'']))
231 state._sub._callback(String(
''),
'flexbe/command/unlock')
233 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'unlock', args=[
'',
'']))
236 state.parent._locked =
True 238 self.assertIsNone(outcome)
239 state.parent._locked =
False 242 self.assertEqual(outcome,
'done')
246 fb_topic =
'flexbe/command_feedback' 247 sub = ProxySubscriberCached({fb_topic: CommandFeedback})
251 state._sub._callback(OutcomeRequest(target=
'subject', outcome=1),
'flexbe/command/transition')
253 self.assertEqual(outcome,
'error')
254 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'transition', args=[
'subject',
'subject']))
257 state._sub._callback(OutcomeRequest(target=
'invalid', outcome=1),
'flexbe/command/transition')
259 self.assertIsNone(outcome)
260 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'transition', args=[
'invalid',
'subject']))
266 start = rospy.get_time()
269 duration = rospy.get_time() - start
270 self.assertAlmostEqual(duration, 1., places=2)
271 self.assertAlmostEqual(state.sleep_duration, .1, places=2)
275 start = rospy.get_time()
277 duration = rospy.get_time() - start
278 self.assertAlmostEqual(duration, 1., places=2)
279 self.assertAlmostEqual(state.sleep_duration, 1., places=2)
285 OperatableStateMachine.autonomy_level = 0
286 state.result =
'error' 288 self.assertIsNone(outcome)
289 state._sub._callback(OutcomeRequest(target=
'subject', outcome=0),
'flexbe/command/transition')
291 self.assertEqual(outcome,
'done')
292 OperatableStateMachine.autonomy_level = 3
296 state._sub._callback(String(
'/subject'),
'flexbe/command/lock')
298 self.assertIsNone(outcome)
299 state._sub._callback(OutcomeRequest(target=
'subject', outcome=1),
'flexbe/command/transition')
301 self.assertIsNone(outcome)
302 state._sub._callback(String(
'/subject'),
'flexbe/command/unlock')
304 self.assertEqual(outcome,
'error')
307 OperatableStateMachine.autonomy_level = 0
308 state.result =
'error' 310 self.assertIsNone(outcome)
311 state._sub._callback(Empty(),
'flexbe/command/preempt')
313 self.assertEqual(outcome, PreemptableState._preempted_name)
314 PreemptableState.preempt =
False 315 OperatableStateMachine.autonomy_level = 3
319 state._sub._callback(String(
'/subject'),
'flexbe/command/lock')
321 self.assertIsNone(outcome)
322 state._sub._callback(Empty(),
'flexbe/command/preempt')
324 self.assertEqual(outcome, PreemptableState._preempted_name)
325 PreemptableState.preempt =
False 326 state._sub._callback(String(
'/subject'),
'flexbe/command/unlock')
328 self.assertIsNone(outcome)
331 cc = ConcurrencyContainer(outcomes=[
'done',
'error'],
333 (
'error', [(
'main',
'error')]),
334 (
'error', [(
'side',
'error')]),
335 (
'done', [(
'main',
'done'), (
'side',
'done')])
339 transitions={
'done':
'done',
'error':
'error'},
340 autonomy={
'done': 1,
'error': 2})
342 transitions={
'done':
'done',
'error':
'error'},
343 autonomy={
'done': 1,
'error': 2})
344 with OperatableStateMachine(outcomes=[
'done',
'error']):
345 OperatableStateMachine.add(
'cc', cc,
346 transitions={
'done':
'done',
'error':
'error'},
347 autonomy={
'done': 1,
'error': 2})
349 class FakeRate(object):
352 return rospy.Duration(0)
361 self.assertAlmostEqual(cc.sleep_duration, .1, places=2)
363 cc[
'main'].set_rate(15)
364 cc[
'side'].set_rate(10)
367 start = rospy.get_time()
369 while rospy.get_time() - start <= 1.:
372 self.assertLessEqual(cc.sleep_duration, .1)
374 self.assertIn(cc[
'main'].count, [14, 15, 16])
375 self.assertIn(cc[
'side'].count, [9, 10, 11])
376 self.assertLessEqual(cc_count, 27)
379 cc._enable_ros_control()
380 self.assertTrue(cc[
'main']._is_controlled)
381 self.assertFalse(cc[
'side']._is_controlled)
382 cc[
'main']._rate = FakeRate()
383 cc[
'side']._rate = FakeRate()
386 outcome = cc.execute(
None)
387 self.assertIsNone(outcome)
388 cc[
'main'].result =
'error' 389 outcome = cc.execute(
None)
390 self.assertEqual(outcome,
'error')
391 cc[
'main'].result =
None 392 cc[
'side'].result =
'error' 393 outcome = cc.execute(
None)
394 self.assertEqual(outcome,
'error')
395 cc[
'side'].result =
'done' 396 outcome = cc.execute(
None)
397 self.assertIsNone(outcome)
398 cc[
'main'].result =
'done' 399 outcome = cc.execute(
None)
400 self.assertEqual(outcome,
'done')
401 cc[
'main'].result =
None 402 cc[
'side'].result =
None 405 outcome = cc.execute(
None)
406 self.assertIsNone(outcome)
407 cc[
'main'].last_events = []
408 cc[
'side'].last_events = []
409 cc[
'main'].result =
'error' 410 outcome = cc.execute(
None)
411 self.assertEqual(outcome,
'error')
412 self.assertListEqual(cc[
'main'].last_events, [
'on_exit'])
413 self.assertListEqual(cc[
'side'].last_events, [
'on_exit'])
416 class TestUserdata(EventState):
418 def __init__(self, out_content='test_data'):
419 super(TestUserdata, self).__init__(outcomes=[
'done'], input_keys=[
'data_in'], output_keys=[
'data_out'])
423 def execute(self, userdata):
424 rospy.logwarn(
'\033[0m%s\n%s' % (self.path, str(userdata)))
425 self.
data = userdata.data_in
429 inner_sm = OperatableStateMachine(outcomes=[
'done'], input_keys=[
'sm_in'], output_keys=[
'sm_out'])
430 inner_sm.userdata.own =
'own_data' 432 OperatableStateMachine.add(
'own_state', TestUserdata(
'inner_data'), transitions={
'done':
'outside_state'},
433 remapping={
'data_in':
'own',
'data_out':
'sm_out'})
434 OperatableStateMachine.add(
'outside_state', TestUserdata(), transitions={
'done':
'internal_state'},
435 remapping={
'data_in':
'sm_in',
'data_out':
'data_in'})
436 OperatableStateMachine.add(
'internal_state', TestUserdata(), transitions={
'done':
'done'},
439 sm = OperatableStateMachine(outcomes=[
'done'])
440 sm.userdata.outside =
'outside_data' 442 OperatableStateMachine.add(
'before_state', TestUserdata(), transitions={
'done':
'inner_sm'},
443 remapping={
'data_in':
'outside'})
444 OperatableStateMachine.add(
'inner_sm', inner_sm, transitions={
'done':
'after_state'},
445 remapping={
'sm_in':
'outside'})
446 OperatableStateMachine.add(
'after_state', TestUserdata(
'last_data'), transitions={
'done':
'modify_state'},
447 remapping={
'data_in':
'sm_out'})
448 OperatableStateMachine.add(
'modify_state', TestUserdata(), transitions={
'done':
'final_state'},
449 remapping={
'data_out':
'outside',
'data_in':
'outside'})
450 OperatableStateMachine.add(
'final_state', TestUserdata(), transitions={
'done':
'done'},
451 remapping={
'data_in':
'data_out'})
455 self.assertEqual(sm[
'before_state'].data,
'outside_data')
456 self.assertEqual(sm._userdata.data_out,
'test_data')
460 self.assertEqual(sm[
'inner_sm'][
'own_state'].data,
'own_data')
461 self.assertNotIn(
'own', sm._userdata)
465 self.assertEqual(sm[
'inner_sm'][
'outside_state'].data,
'outside_data')
468 self.assertIn(
'data_in', sm[
'inner_sm']._userdata)
470 self.assertEqual(sm[
'inner_sm'][
'internal_state'].data,
'test_data')
471 self.assertNotIn(
'data_in', sm._userdata)
474 self.assertEqual(sm._userdata.sm_out,
'inner_data')
478 self.assertEqual(sm[
'after_state'].data,
'inner_data')
482 self.assertEqual(sm[
'modify_state'].data,
'outside_data')
483 self.assertEqual(sm._userdata.outside,
'test_data')
486 outcome = sm.execute(
None)
487 self.assertEqual(sm[
'final_state'].data,
'last_data')
488 self.assertEqual(outcome,
'done')
491 if __name__ ==
'__main__':
492 rospy.init_node(
'test_flexbe_core')
494 rostest.rosrun(
'flexbe_core',
'test_flexbe_core', TestCore)
def test_preemptable_state(self)
def test_concurrency_container(self)
def assertMessage(self, sub, topic, msg, timeout=1)
def on_resume(self, userdata)
def test_manually_transitionable_state(self)
def test_event_state(self)
def test_cross_combinations(self)
def test_lockable_state(self)
def test_operatable_state(self)
def _execute(self, state)
def execute(self, userdata)
def on_enter(self, userdata)
def on_exit(self, userdata)
def assertNoMessage(self, sub, topic, timeout=1)