test_core.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import unittest
3 import rospy
4 
5 from flexbe_core import EventState, OperatableStateMachine, ConcurrencyContainer
6 from flexbe_core.core import PreemptableState
7 from flexbe_core.proxy import ProxySubscriberCached
8 from flexbe_core.core.exceptions import StateMachineError
9 
10 from std_msgs.msg import Bool, Empty, UInt8, String
11 from flexbe_msgs.msg import CommandFeedback, OutcomeRequest
12 
13 
14 class TestSubjectState(EventState):
15 
16  def __init__(self):
17  super(TestSubjectState, self).__init__(outcomes=['done', 'error'])
18  self.result = None
19  self.last_events = []
20  self.count = 0
21 
22  def execute(self, userdata):
23  self.count += 1
24  return self.result
25 
26  def on_enter(self, userdata):
27  self.last_events.append('on_enter')
28 
29  def on_exit(self, userdata):
30  self.last_events.append('on_exit')
31 
32  def on_start(self):
33  self.last_events.append('on_start')
34 
35  def on_stop(self):
36  self.last_events.append('on_stop')
37 
38  def on_pause(self):
39  self.last_events.append('on_pause')
40 
41  def on_resume(self, userdata):
42  self.last_events.append('on_resume')
43 
44 
45 class TestCore(unittest.TestCase):
46 
47  def _create(self):
48  state = TestSubjectState()
49  state._enable_ros_control()
50  with OperatableStateMachine(outcomes=['done', 'error']):
51  OperatableStateMachine.add('subject', state,
52  transitions={'done': 'done', 'error': 'error'},
53  autonomy={'done': 1, 'error': 2})
54  return state
55 
56  def _execute(self, state):
57  state.last_events = []
58  return state.parent.execute(None)
59 
60  def assertMessage(self, sub, topic, msg, timeout=1):
61  rate = rospy.Rate(100)
62  for i in range(int(timeout * 100)):
63  if sub.has_msg(topic):
64  received = sub.get_last_msg(topic)
65  sub.remove_last_msg(topic)
66  break
67  rate.sleep()
68  else:
69  raise AssertionError('Did not receive message on topic %s, expected:\n%s'
70  % (topic, str(msg)))
71  for slot in msg.__slots__:
72  expected = getattr(msg, slot)
73  actual = getattr(received, slot)
74  error = "Mismatch for %s, is %s but expected %s" % (slot, actual, expected)
75  if isinstance(expected, list):
76  self.assertListEqual(expected, actual, error)
77  else:
78  self.assertEqual(expected, actual, error)
79 
80  def assertNoMessage(self, sub, topic, timeout=1):
81  rate = rospy.Rate(100)
82  for i in range(int(timeout * 100)):
83  if sub.has_msg(topic):
84  received = sub.get_last_msg(topic)
85  sub.remove_last_msg(topic)
86  raise AssertionError('Should not receive message on topic %s, but got:\n%s'
87  % (topic, str(received)))
88  rate.sleep()
89 
90  # Test Cases
91 
92  def test_event_state(self):
93  state = self._create()
94  fb_topic = 'flexbe/command_feedback'
95  sub = ProxySubscriberCached({fb_topic: CommandFeedback})
96  rospy.sleep(0.2) # wait for pub/sub
97 
98  # enter during first execute
99  self._execute(state)
100  self.assertListEqual(['on_enter'], state.last_events)
101  self._execute(state)
102  self.assertListEqual([], state.last_events)
103 
104  # pause and resume as commanded
105  state._sub._callback(Bool(True), 'flexbe/command/pause')
106  self._execute(state)
107  self.assertListEqual(['on_pause'], state.last_events)
108  self.assertMessage(sub, fb_topic, CommandFeedback(command="pause"))
109  state.result = 'error'
110  outcome = self._execute(state)
111  state.result = None
112  self.assertIsNone(outcome)
113  state._sub._callback(Bool(False), 'flexbe/command/pause')
114  self._execute(state)
115  self.assertListEqual(['on_resume'], state.last_events)
116  self.assertMessage(sub, fb_topic, CommandFeedback(command="resume"))
117 
118  # repeat triggers exit and enter again
119  state._sub._callback(Empty(), 'flexbe/command/repeat')
120  self._execute(state)
121  self.assertListEqual(['on_exit'], state.last_events)
122  self.assertMessage(sub, fb_topic, CommandFeedback(command="repeat"))
123  self._execute(state)
124  self.assertListEqual(['on_enter'], state.last_events)
125  self._execute(state)
126  self.assertListEqual([], state.last_events)
127 
128  # exit during last execute when returning an outcome
129  state.result = 'done'
130  outcome = self._execute(state)
131  self.assertListEqual(['on_exit'], state.last_events)
132  self.assertEqual('done', outcome)
133 
135  state = self._create()
136  out_topic = 'flexbe/mirror/outcome'
137  req_topic = 'flexbe/outcome_request'
138  sub = ProxySubscriberCached({out_topic: UInt8, req_topic: OutcomeRequest})
139  rospy.sleep(0.2) # wait for pub/sub
140 
141  # return outcome in full autonomy, no request
142  state.result = 'error'
143  self._execute(state)
144  self.assertNoMessage(sub, req_topic)
145  self.assertMessage(sub, out_topic, UInt8(1))
146 
147  # request outcome on same autnomy and clear request on loopback
148  OperatableStateMachine.autonomy_level = 2
149  self._execute(state)
150  self.assertNoMessage(sub, out_topic)
151  self.assertMessage(sub, req_topic, OutcomeRequest(outcome=1, target='/subject'))
152  state.result = None
153  self._execute(state)
154  self.assertMessage(sub, req_topic, OutcomeRequest(outcome=255, target='/subject'))
155 
156  # still return other outcomes
157  state.result = 'done'
158  self._execute(state)
159  self.assertNoMessage(sub, req_topic)
160  self.assertMessage(sub, out_topic, UInt8(0))
161 
162  # request outcome on lower autonomy, return outcome after level increase
163  OperatableStateMachine.autonomy_level = 0
164  self._execute(state)
165  self.assertNoMessage(sub, out_topic)
166  self.assertMessage(sub, req_topic, OutcomeRequest(outcome=0, target='/subject'))
167  OperatableStateMachine.autonomy_level = 3
168  self._execute(state)
169  self.assertMessage(sub, out_topic, UInt8(0))
170 
172  state = self._create()
173  fb_topic = 'flexbe/command_feedback'
174  sub = ProxySubscriberCached({fb_topic: CommandFeedback})
175  rospy.sleep(0.2) # wait for pub/sub
176 
177  # preempt when trigger variable is set
178  PreemptableState.preempt = True
179  outcome = self._execute(state)
180  self.assertEqual(outcome, PreemptableState._preempted_name)
181  self.assertRaises(StateMachineError, lambda: state.parent.current_state)
182  PreemptableState.preempt = False
183  outcome = self._execute(state)
184  self.assertIsNone(outcome)
185 
186  # preempt when command is received
187  state._sub._callback(Empty(), 'flexbe/command/preempt')
188  outcome = self._execute(state)
189  self.assertEqual(outcome, PreemptableState._preempted_name)
190  self.assertRaises(StateMachineError, lambda: state.parent.current_state)
191  self.assertMessage(sub, fb_topic, CommandFeedback(command='preempt'))
192  PreemptableState.preempt = False
193 
195  state = self._create()
196  fb_topic = 'flexbe/command_feedback'
197  sub = ProxySubscriberCached({fb_topic: CommandFeedback})
198  rospy.sleep(0.2) # wait for pub/sub
199 
200  # lock and unlock as commanded, return outcome after unlock
201  state._sub._callback(String('/subject'), 'flexbe/command/lock')
202  state.result = 'done'
203  outcome = self._execute(state)
204  self.assertIsNone(outcome)
205  self.assertTrue(state._locked)
206  self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/subject', '/subject']))
207  state.result = None
208  state._sub._callback(String('/subject'), 'flexbe/command/unlock')
209  outcome = self._execute(state)
210  self.assertEqual(outcome, 'done')
211  self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['/subject', '/subject']))
212 
213  # lock and unlock without target
214  state._sub._callback(String(''), 'flexbe/command/lock')
215  state.result = 'done'
216  outcome = self._execute(state)
217  self.assertIsNone(outcome)
218  self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/subject', '/subject']))
219  state._sub._callback(String(''), 'flexbe/command/unlock')
220  outcome = self._execute(state)
221  self.assertEqual(outcome, 'done')
222  self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['/subject', '/subject']))
223 
224  # reject invalid lock command
225  state._sub._callback(String('/invalid'), 'flexbe/command/lock')
226  outcome = self._execute(state)
227  self.assertEqual(outcome, 'done')
228  self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/invalid', '']))
229 
230  # reject generic unlock command when not locked
231  state._sub._callback(String(''), 'flexbe/command/unlock')
232  self._execute(state)
233  self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['', '']))
234 
235  # do not transition out of locked container
236  state.parent._locked = True
237  outcome = self._execute(state)
238  self.assertIsNone(outcome)
239  state.parent._locked = False
240  state.result = None
241  outcome = self._execute(state)
242  self.assertEqual(outcome, 'done')
243 
245  state = self._create()
246  fb_topic = 'flexbe/command_feedback'
247  sub = ProxySubscriberCached({fb_topic: CommandFeedback})
248  rospy.sleep(0.2) # wait for pub/sub
249 
250  # return requested outcome
251  state._sub._callback(OutcomeRequest(target='subject', outcome=1), 'flexbe/command/transition')
252  outcome = self._execute(state)
253  self.assertEqual(outcome, 'error')
254  self.assertMessage(sub, fb_topic, CommandFeedback(command='transition', args=['subject', 'subject']))
255 
256  # reject outcome request for different target
257  state._sub._callback(OutcomeRequest(target='invalid', outcome=1), 'flexbe/command/transition')
258  outcome = self._execute(state)
259  self.assertIsNone(outcome)
260  self.assertMessage(sub, fb_topic, CommandFeedback(command='transition', args=['invalid', 'subject']))
261 
262  def test_ros_state(self):
263  state = self._create()
264 
265  # default rate is 10Hz
266  start = rospy.get_time()
267  for i in range(10):
268  state.sleep()
269  duration = rospy.get_time() - start
270  self.assertAlmostEqual(duration, 1., places=2)
271  self.assertAlmostEqual(state.sleep_duration, .1, places=2)
272 
273  # change of rate works as expected
274  state.set_rate(1)
275  start = rospy.get_time()
276  state.sleep()
277  duration = rospy.get_time() - start
278  self.assertAlmostEqual(duration, 1., places=2)
279  self.assertAlmostEqual(state.sleep_duration, 1., places=2)
280 
282  state = self._create()
283 
284  # manual transition works on low autonomy
285  OperatableStateMachine.autonomy_level = 0
286  state.result = 'error'
287  outcome = self._execute(state)
288  self.assertIsNone(outcome)
289  state._sub._callback(OutcomeRequest(target='subject', outcome=0), 'flexbe/command/transition')
290  outcome = self._execute(state)
291  self.assertEqual(outcome, 'done')
292  OperatableStateMachine.autonomy_level = 3
293  state.result = None
294 
295  # manual transition blocked by lock
296  state._sub._callback(String('/subject'), 'flexbe/command/lock')
297  outcome = self._execute(state)
298  self.assertIsNone(outcome)
299  state._sub._callback(OutcomeRequest(target='subject', outcome=1), 'flexbe/command/transition')
300  outcome = self._execute(state)
301  self.assertIsNone(outcome)
302  state._sub._callback(String('/subject'), 'flexbe/command/unlock')
303  outcome = self._execute(state)
304  self.assertEqual(outcome, 'error')
305 
306  # preempt works on low autonomy
307  OperatableStateMachine.autonomy_level = 0
308  state.result = 'error'
309  outcome = self._execute(state)
310  self.assertIsNone(outcome)
311  state._sub._callback(Empty(), 'flexbe/command/preempt')
312  outcome = self._execute(state)
313  self.assertEqual(outcome, PreemptableState._preempted_name)
314  PreemptableState.preempt = False
315  OperatableStateMachine.autonomy_level = 3
316  state.result = None
317 
318  # preempt also works when locked
319  state._sub._callback(String('/subject'), 'flexbe/command/lock')
320  outcome = self._execute(state)
321  self.assertIsNone(outcome)
322  state._sub._callback(Empty(), 'flexbe/command/preempt')
323  outcome = self._execute(state)
324  self.assertEqual(outcome, PreemptableState._preempted_name)
325  PreemptableState.preempt = False
326  state._sub._callback(String('/subject'), 'flexbe/command/unlock')
327  outcome = self._execute(state)
328  self.assertIsNone(outcome)
329 
331  cc = ConcurrencyContainer(outcomes=['done', 'error'],
332  conditions=[
333  ('error', [('main', 'error')]),
334  ('error', [('side', 'error')]),
335  ('done', [('main', 'done'), ('side', 'done')])
336  ])
337  with cc:
338  OperatableStateMachine.add('main', TestSubjectState(),
339  transitions={'done': 'done', 'error': 'error'},
340  autonomy={'done': 1, 'error': 2})
341  OperatableStateMachine.add('side', TestSubjectState(),
342  transitions={'done': 'done', 'error': 'error'},
343  autonomy={'done': 1, 'error': 2})
344  with OperatableStateMachine(outcomes=['done', 'error']):
345  OperatableStateMachine.add('cc', cc,
346  transitions={'done': 'done', 'error': 'error'},
347  autonomy={'done': 1, 'error': 2})
348 
349  class FakeRate(object):
350 
351  def remaining(self):
352  return rospy.Duration(0)
353 
354  def sleep(self):
355  pass
356 
357  # all states are called with their correct rate
358  cc.execute(None)
359  cc.sleep()
360  cc.execute(None)
361  self.assertAlmostEqual(cc.sleep_duration, .1, places=2)
362  cc.sleep()
363  cc['main'].set_rate(15)
364  cc['side'].set_rate(10)
365  cc['main'].count = 0
366  cc['side'].count = 0
367  start = rospy.get_time()
368  cc_count = 0
369  while rospy.get_time() - start <= 1.:
370  cc_count += 1
371  cc.execute(None)
372  self.assertLessEqual(cc.sleep_duration, .1)
373  cc.sleep()
374  self.assertIn(cc['main'].count, [14, 15, 16])
375  self.assertIn(cc['side'].count, [9, 10, 11])
376  self.assertLessEqual(cc_count, 27)
377 
378  # verify ROS properties and disable sleep
379  cc._enable_ros_control()
380  self.assertTrue(cc['main']._is_controlled)
381  self.assertFalse(cc['side']._is_controlled)
382  cc['main']._rate = FakeRate()
383  cc['side']._rate = FakeRate()
384 
385  # return outcome when all return done or any returns error
386  outcome = cc.execute(None)
387  self.assertIsNone(outcome)
388  cc['main'].result = 'error'
389  outcome = cc.execute(None)
390  self.assertEqual(outcome, 'error')
391  cc['main'].result = None
392  cc['side'].result = 'error'
393  outcome = cc.execute(None)
394  self.assertEqual(outcome, 'error')
395  cc['side'].result = 'done'
396  outcome = cc.execute(None)
397  self.assertIsNone(outcome)
398  cc['main'].result = 'done'
399  outcome = cc.execute(None)
400  self.assertEqual(outcome, 'done')
401  cc['main'].result = None
402  cc['side'].result = None
403 
404  # always call on_exit exactly once when returning an outcome
405  outcome = cc.execute(None)
406  self.assertIsNone(outcome)
407  cc['main'].last_events = []
408  cc['side'].last_events = []
409  cc['main'].result = 'error'
410  outcome = cc.execute(None)
411  self.assertEqual(outcome, 'error')
412  self.assertListEqual(cc['main'].last_events, ['on_exit'])
413  self.assertListEqual(cc['side'].last_events, ['on_exit'])
414 
415  def test_user_data(self):
416  class TestUserdata(EventState):
417 
418  def __init__(self, out_content='test_data'):
419  super(TestUserdata, self).__init__(outcomes=['done'], input_keys=['data_in'], output_keys=['data_out'])
420  self.data = None
421  self._out_content = out_content
422 
423  def execute(self, userdata):
424  rospy.logwarn('\033[0m%s\n%s' % (self.path, str(userdata))) # log for manual inspection
425  self.data = userdata.data_in
426  userdata.data_out = self._out_content
427  return 'done'
428 
429  inner_sm = OperatableStateMachine(outcomes=['done'], input_keys=['sm_in'], output_keys=['sm_out'])
430  inner_sm.userdata.own = 'own_data'
431  with inner_sm:
432  OperatableStateMachine.add('own_state', TestUserdata('inner_data'), transitions={'done': 'outside_state'},
433  remapping={'data_in': 'own', 'data_out': 'sm_out'})
434  OperatableStateMachine.add('outside_state', TestUserdata(), transitions={'done': 'internal_state'},
435  remapping={'data_in': 'sm_in', 'data_out': 'data_in'})
436  OperatableStateMachine.add('internal_state', TestUserdata(), transitions={'done': 'done'},
437  remapping={})
438 
439  sm = OperatableStateMachine(outcomes=['done'])
440  sm.userdata.outside = 'outside_data'
441  with sm:
442  OperatableStateMachine.add('before_state', TestUserdata(), transitions={'done': 'inner_sm'},
443  remapping={'data_in': 'outside'})
444  OperatableStateMachine.add('inner_sm', inner_sm, transitions={'done': 'after_state'},
445  remapping={'sm_in': 'outside'})
446  OperatableStateMachine.add('after_state', TestUserdata('last_data'), transitions={'done': 'modify_state'},
447  remapping={'data_in': 'sm_out'})
448  OperatableStateMachine.add('modify_state', TestUserdata(), transitions={'done': 'final_state'},
449  remapping={'data_out': 'outside', 'data_in': 'outside'})
450  OperatableStateMachine.add('final_state', TestUserdata(), transitions={'done': 'done'},
451  remapping={'data_in': 'data_out'})
452 
453  # can pass userdata to state and get it from state
454  sm.execute(None)
455  self.assertEqual(sm['before_state'].data, 'outside_data')
456  self.assertEqual(sm._userdata.data_out, 'test_data')
457 
458  # sub-state machine can set its own local userdata
459  sm.execute(None)
460  self.assertEqual(sm['inner_sm']['own_state'].data, 'own_data')
461  self.assertNotIn('own', sm._userdata) # transparent to outer sm
462 
463  # sub-state machine can read data from parent state machine
464  sm.execute(None)
465  self.assertEqual(sm['inner_sm']['outside_state'].data, 'outside_data')
466 
467  # sub-state machine can pass along its local userdata
468  self.assertIn('data_in', sm['inner_sm']._userdata)
469  sm.execute(None)
470  self.assertEqual(sm['inner_sm']['internal_state'].data, 'test_data')
471  self.assertNotIn('data_in', sm._userdata) # transparent to outer sm
472 
473  # sub-state machine userdata is wrote back to the parent
474  self.assertEqual(sm._userdata.sm_out, 'inner_data')
475 
476  # outer state machine can read data set by inner state machine
477  sm.execute(None)
478  self.assertEqual(sm['after_state'].data, 'inner_data')
479 
480  # can remap different keys to achieve read-write access
481  sm.execute(None)
482  self.assertEqual(sm['modify_state'].data, 'outside_data')
483  self.assertEqual(sm._userdata.outside, 'test_data')
484 
485  # one state can read data set by another one
486  outcome = sm.execute(None)
487  self.assertEqual(sm['final_state'].data, 'last_data')
488  self.assertEqual(outcome, 'done')
489 
490 
491 if __name__ == '__main__':
492  rospy.init_node('test_flexbe_core')
493  import rostest
494  rostest.rosrun('flexbe_core', 'test_flexbe_core', TestCore)
def test_preemptable_state(self)
Definition: test_core.py:171
def test_concurrency_container(self)
Definition: test_core.py:330
def assertMessage(self, sub, topic, msg, timeout=1)
Definition: test_core.py:60
def on_resume(self, userdata)
Definition: test_core.py:41
def test_manually_transitionable_state(self)
Definition: test_core.py:244
def test_event_state(self)
Definition: test_core.py:92
def test_user_data(self)
Definition: test_core.py:415
def test_cross_combinations(self)
Definition: test_core.py:281
def _create(self)
Definition: test_core.py:47
def test_lockable_state(self)
Definition: test_core.py:194
def test_operatable_state(self)
Definition: test_core.py:134
def _execute(self, state)
Definition: test_core.py:56
def execute(self, userdata)
Definition: test_core.py:22
def on_enter(self, userdata)
Definition: test_core.py:26
def test_ros_state(self)
Definition: test_core.py:262
def on_exit(self, userdata)
Definition: test_core.py:29
def assertNoMessage(self, sub, topic, timeout=1)
Definition: test_core.py:80


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39