5 from flexbe_core
import EventState, OperatableStateMachine, ConcurrencyContainer
10 from std_msgs.msg
import Bool, Empty, UInt8, String
11 from flexbe_msgs.msg
import CommandFeedback, OutcomeRequest
17 super(TestSubjectState, self).
__init__(outcomes=[
'done',
'error'])
49 state._enable_ros_control()
50 with OperatableStateMachine(outcomes=[
'done',
'error']):
51 OperatableStateMachine.add(
'subject', state,
52 transitions={
'done':
'done',
'error':
'error'},
53 autonomy={
'done': 1,
'error': 2})
57 state.last_events = []
58 return state.parent.execute(
None)
61 rate = rospy.Rate(100)
62 for i
in range(int(timeout * 100)):
63 if sub.has_msg(topic):
64 received = sub.get_last_msg(topic)
65 sub.remove_last_msg(topic)
69 raise AssertionError(
'Did not receive message on topic %s, expected:\n%s'
71 for slot
in msg.__slots__:
72 expected = getattr(msg, slot)
73 actual = getattr(received, slot)
74 error =
"Mismatch for %s, is %s but expected %s" % (slot, actual, expected)
75 if isinstance(expected, list):
76 self.assertListEqual(expected, actual, error)
78 self.assertEqual(expected, actual, error)
81 rate = rospy.Rate(100)
82 for i
in range(int(timeout * 100)):
83 if sub.has_msg(topic):
84 received = sub.get_last_msg(topic)
85 sub.remove_last_msg(topic)
86 raise AssertionError(
'Should not receive message on topic %s, but got:\n%s'
87 % (topic, str(received)))
94 fb_topic =
'flexbe/command_feedback'
100 self.assertListEqual([
'on_enter'], state.last_events)
102 self.assertListEqual([], state.last_events)
105 state._sub._callback(Bool(
True),
'flexbe/command/pause')
107 self.assertListEqual([
'on_pause'], state.last_events)
108 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
"pause"))
109 state.result =
'error'
112 self.assertIsNone(outcome)
113 state._sub._callback(Bool(
False),
'flexbe/command/pause')
115 self.assertListEqual([
'on_resume'], state.last_events)
116 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
"resume"))
119 state._sub._callback(Empty(),
'flexbe/command/repeat')
121 self.assertListEqual([
'on_exit'], state.last_events)
122 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
"repeat"))
124 self.assertListEqual([
'on_enter'], state.last_events)
126 self.assertListEqual([], state.last_events)
129 state.result =
'done'
131 self.assertListEqual([
'on_exit'], state.last_events)
132 self.assertEqual(
'done', outcome)
136 out_topic =
'flexbe/mirror/outcome'
137 req_topic =
'flexbe/outcome_request'
142 state.result =
'error'
148 OperatableStateMachine.autonomy_level = 2
151 self.
assertMessage(sub, req_topic, OutcomeRequest(outcome=1, target=
'/subject'))
154 self.
assertMessage(sub, req_topic, OutcomeRequest(outcome=255, target=
'/subject'))
157 state.result =
'done'
163 OperatableStateMachine.autonomy_level = 0
166 self.
assertMessage(sub, req_topic, OutcomeRequest(outcome=0, target=
'/subject'))
167 OperatableStateMachine.autonomy_level = 3
173 fb_topic =
'flexbe/command_feedback'
178 PreemptableState.preempt =
True
180 self.assertEqual(outcome, PreemptableState._preempted_name)
181 self.assertRaises(StateMachineError,
lambda: state.parent.current_state)
182 PreemptableState.preempt =
False
184 self.assertIsNone(outcome)
187 state._sub._callback(Empty(),
'flexbe/command/preempt')
189 self.assertEqual(outcome, PreemptableState._preempted_name)
190 self.assertRaises(StateMachineError,
lambda: state.parent.current_state)
191 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'preempt'))
192 PreemptableState.preempt =
False
196 fb_topic =
'flexbe/command_feedback'
201 state._sub._callback(String(
'/subject'),
'flexbe/command/lock')
202 state.result =
'done'
204 self.assertIsNone(outcome)
205 self.assertTrue(state._locked)
206 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'lock', args=[
'/subject',
'/subject']))
208 state._sub._callback(String(
'/subject'),
'flexbe/command/unlock')
210 self.assertEqual(outcome,
'done')
211 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'unlock', args=[
'/subject',
'/subject']))
214 state._sub._callback(String(
''),
'flexbe/command/lock')
215 state.result =
'done'
217 self.assertIsNone(outcome)
218 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'lock', args=[
'/subject',
'/subject']))
219 state._sub._callback(String(
''),
'flexbe/command/unlock')
221 self.assertEqual(outcome,
'done')
222 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'unlock', args=[
'/subject',
'/subject']))
225 state._sub._callback(String(
'/invalid'),
'flexbe/command/lock')
227 self.assertEqual(outcome,
'done')
228 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'lock', args=[
'/invalid',
'']))
231 state._sub._callback(String(
''),
'flexbe/command/unlock')
233 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'unlock', args=[
'',
'']))
236 state.parent._locked =
True
238 self.assertIsNone(outcome)
239 state.parent._locked =
False
242 self.assertEqual(outcome,
'done')
246 fb_topic =
'flexbe/command_feedback'
251 state._sub._callback(OutcomeRequest(target=
'subject', outcome=1),
'flexbe/command/transition')
253 self.assertEqual(outcome,
'error')
254 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'transition', args=[
'subject',
'subject']))
257 state._sub._callback(OutcomeRequest(target=
'invalid', outcome=1),
'flexbe/command/transition')
259 self.assertIsNone(outcome)
260 self.
assertMessage(sub, fb_topic, CommandFeedback(command=
'transition', args=[
'invalid',
'subject']))
266 start = rospy.get_time()
269 duration = rospy.get_time() - start
271 self.assertAlmostEqual(duration, 1., places=2)
272 except AssertionError:
273 rospy.logwarn(f
"elapsed time duration {duration} is not 1. - likely due to OS sleep")
275 self.assertAlmostEqual(state.sleep_duration, .1, places=2)
276 except AssertionError:
277 rospy.logwarn(f
"Sleep duration {state.sleep_duration} is not .1 - likely due to OS sleep")
281 start = rospy.get_time()
283 duration = rospy.get_time() - start
285 self.assertAlmostEqual(duration, 1., places=2)
286 except AssertionError:
287 rospy.logwarn(f
"elapsed time duration {duration} is not 1. - likely due to OS sleep")
289 self.assertAlmostEqual(state.sleep_duration, 1., places=2)
290 except AssertionError:
291 rospy.logwarn(f
"Sleep duration {state.sleep_duration} is not 1. - likely due to OS sleep")
297 OperatableStateMachine.autonomy_level = 0
298 state.result =
'error'
300 self.assertIsNone(outcome)
301 state._sub._callback(OutcomeRequest(target=
'subject', outcome=0),
'flexbe/command/transition')
303 self.assertEqual(outcome,
'done')
304 OperatableStateMachine.autonomy_level = 3
308 state._sub._callback(String(
'/subject'),
'flexbe/command/lock')
310 self.assertIsNone(outcome)
311 state._sub._callback(OutcomeRequest(target=
'subject', outcome=1),
'flexbe/command/transition')
313 self.assertIsNone(outcome)
314 state._sub._callback(String(
'/subject'),
'flexbe/command/unlock')
316 self.assertEqual(outcome,
'error')
319 OperatableStateMachine.autonomy_level = 0
320 state.result =
'error'
322 self.assertIsNone(outcome)
323 state._sub._callback(Empty(),
'flexbe/command/preempt')
325 self.assertEqual(outcome, PreemptableState._preempted_name)
326 PreemptableState.preempt =
False
327 OperatableStateMachine.autonomy_level = 3
331 state._sub._callback(String(
'/subject'),
'flexbe/command/lock')
333 self.assertIsNone(outcome)
334 state._sub._callback(Empty(),
'flexbe/command/preempt')
336 self.assertEqual(outcome, PreemptableState._preempted_name)
337 PreemptableState.preempt =
False
338 state._sub._callback(String(
'/subject'),
'flexbe/command/unlock')
340 self.assertIsNone(outcome)
343 cc = ConcurrencyContainer(outcomes=[
'done',
'error'],
345 (
'error', [(
'main',
'error')]),
346 (
'error', [(
'side',
'error')]),
347 (
'done', [(
'main',
'done'), (
'side',
'done')])
351 transitions={
'done':
'done',
'error':
'error'},
352 autonomy={
'done': 1,
'error': 2})
354 transitions={
'done':
'done',
'error':
'error'},
355 autonomy={
'done': 1,
'error': 2})
356 with OperatableStateMachine(outcomes=[
'done',
'error']):
357 OperatableStateMachine.add(
'cc', cc,
358 transitions={
'done':
'done',
'error':
'error'},
359 autonomy={
'done': 1,
'error': 2})
361 class FakeRate(object):
364 return rospy.Duration(0)
374 self.assertAlmostEqual(cc.sleep_duration, .1, places=2)
375 except AssertionError:
376 rospy.logwarn(f
"Sleep duration {cc.sleep_duration} is not .1 - likely due to OS sleep")
379 cc[
'main'].set_rate(15)
380 cc[
'side'].set_rate(10)
383 start = rospy.get_time()
385 while rospy.get_time() - start <= 1.:
388 self.assertLessEqual(cc.sleep_duration, .1)
390 self.assertIn(cc[
'main'].count, [14, 15, 16])
391 self.assertIn(cc[
'side'].count, [9, 10, 11])
392 self.assertLessEqual(cc_count, 27)
395 cc._enable_ros_control()
396 self.assertTrue(cc[
'main']._is_controlled)
397 self.assertFalse(cc[
'side']._is_controlled)
398 cc[
'main']._rate = FakeRate()
399 cc[
'side']._rate = FakeRate()
402 outcome = cc.execute(
None)
403 self.assertIsNone(outcome)
404 cc[
'main'].result =
'error'
405 outcome = cc.execute(
None)
406 self.assertEqual(outcome,
'error')
407 cc[
'main'].result =
None
408 cc[
'side'].result =
'error'
409 outcome = cc.execute(
None)
410 self.assertEqual(outcome,
'error')
411 cc[
'side'].result =
'done'
412 outcome = cc.execute(
None)
413 self.assertIsNone(outcome)
414 cc[
'main'].result =
'done'
415 outcome = cc.execute(
None)
416 self.assertEqual(outcome,
'done')
417 cc[
'main'].result =
None
418 cc[
'side'].result =
None
421 outcome = cc.execute(
None)
422 self.assertIsNone(outcome)
423 cc[
'main'].last_events = []
424 cc[
'side'].last_events = []
425 cc[
'main'].result =
'error'
426 outcome = cc.execute(
None)
427 self.assertEqual(outcome,
'error')
428 self.assertListEqual(cc[
'main'].last_events, [
'on_exit'])
429 self.assertListEqual(cc[
'side'].last_events, [
'on_exit'])
432 class TestUserdata(EventState):
434 def __init__(self, out_content='test_data'):
435 super(TestUserdata, self).__init__(outcomes=[
'done'], input_keys=[
'data_in'], output_keys=[
'data_out'])
439 def execute(self, userdata):
440 rospy.logwarn(
'\033[0m%s\n%s' % (self.path, str(userdata)))
441 self.
data = userdata.data_in
445 inner_sm = OperatableStateMachine(outcomes=[
'done'], input_keys=[
'sm_in'], output_keys=[
'sm_out'])
446 inner_sm.userdata.own =
'own_data'
448 OperatableStateMachine.add(
'own_state', TestUserdata(
'inner_data'), transitions={
'done':
'outside_state'},
449 remapping={
'data_in':
'own',
'data_out':
'sm_out'})
450 OperatableStateMachine.add(
'outside_state', TestUserdata(), transitions={
'done':
'internal_state'},
451 remapping={
'data_in':
'sm_in',
'data_out':
'data_in'})
452 OperatableStateMachine.add(
'internal_state', TestUserdata(), transitions={
'done':
'done'},
455 sm = OperatableStateMachine(outcomes=[
'done'])
456 sm.userdata.outside =
'outside_data'
458 OperatableStateMachine.add(
'before_state', TestUserdata(), transitions={
'done':
'inner_sm'},
459 remapping={
'data_in':
'outside'})
460 OperatableStateMachine.add(
'inner_sm', inner_sm, transitions={
'done':
'after_state'},
461 remapping={
'sm_in':
'outside'})
462 OperatableStateMachine.add(
'after_state', TestUserdata(
'last_data'), transitions={
'done':
'modify_state'},
463 remapping={
'data_in':
'sm_out'})
464 OperatableStateMachine.add(
'modify_state', TestUserdata(), transitions={
'done':
'final_state'},
465 remapping={
'data_out':
'outside',
'data_in':
'outside'})
466 OperatableStateMachine.add(
'final_state', TestUserdata(), transitions={
'done':
'done'},
467 remapping={
'data_in':
'data_out'})
471 self.assertEqual(sm[
'before_state'].data,
'outside_data')
472 self.assertEqual(sm._userdata.data_out,
'test_data')
476 self.assertEqual(sm[
'inner_sm'][
'own_state'].data,
'own_data')
477 self.assertNotIn(
'own', sm._userdata)
481 self.assertEqual(sm[
'inner_sm'][
'outside_state'].data,
'outside_data')
484 self.assertIn(
'data_in', sm[
'inner_sm']._userdata)
486 self.assertEqual(sm[
'inner_sm'][
'internal_state'].data,
'test_data')
487 self.assertNotIn(
'data_in', sm._userdata)
490 self.assertEqual(sm._userdata.sm_out,
'inner_data')
494 self.assertEqual(sm[
'after_state'].data,
'inner_data')
498 self.assertEqual(sm[
'modify_state'].data,
'outside_data')
499 self.assertEqual(sm._userdata.outside,
'test_data')
502 outcome = sm.execute(
None)
503 self.assertEqual(sm[
'final_state'].data,
'last_data')
504 self.assertEqual(outcome,
'done')
507 if __name__ ==
'__main__':
508 rospy.init_node(
'test_flexbe_core')
510 rostest.rosrun(
'flexbe_core',
'test_flexbe_core', TestCore)