test_core.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import unittest
3 import rospy
4 
5 from flexbe_core import EventState, OperatableStateMachine, ConcurrencyContainer
6 from flexbe_core.core import PreemptableState
7 from flexbe_core.proxy import ProxySubscriberCached
8 from flexbe_core.core.exceptions import StateMachineError
9 
10 from std_msgs.msg import Bool, Empty, UInt8, String
11 from flexbe_msgs.msg import CommandFeedback, OutcomeRequest
12 
13 
14 class TestSubjectState(EventState):
15 
16  def __init__(self):
17  super(TestSubjectState, self).__init__(outcomes=['done', 'error'])
18  self.result = None
19  self.last_events = []
20  self.count = 0
21 
22  def execute(self, userdata):
23  self.count += 1
24  return self.result
25 
26  def on_enter(self, userdata):
27  self.last_events.append('on_enter')
28 
29  def on_exit(self, userdata):
30  self.last_events.append('on_exit')
31 
32  def on_start(self):
33  self.last_events.append('on_start')
34 
35  def on_stop(self):
36  self.last_events.append('on_stop')
37 
38  def on_pause(self):
39  self.last_events.append('on_pause')
40 
41  def on_resume(self, userdata):
42  self.last_events.append('on_resume')
43 
44 
45 class TestCore(unittest.TestCase):
46 
47  def _create(self):
48  state = TestSubjectState()
49  state._enable_ros_control()
50  with OperatableStateMachine(outcomes=['done', 'error']):
51  OperatableStateMachine.add('subject', state,
52  transitions={'done': 'done', 'error': 'error'},
53  autonomy={'done': 1, 'error': 2})
54  return state
55 
56  def _execute(self, state):
57  state.last_events = []
58  return state.parent.execute(None)
59 
60  def assertMessage(self, sub, topic, msg, timeout=1):
61  rate = rospy.Rate(100)
62  for i in range(int(timeout * 100)):
63  if sub.has_msg(topic):
64  received = sub.get_last_msg(topic)
65  sub.remove_last_msg(topic)
66  break
67  rate.sleep()
68  else:
69  raise AssertionError('Did not receive message on topic %s, expected:\n%s'
70  % (topic, str(msg)))
71  for slot in msg.__slots__:
72  expected = getattr(msg, slot)
73  actual = getattr(received, slot)
74  error = "Mismatch for %s, is %s but expected %s" % (slot, actual, expected)
75  if isinstance(expected, list):
76  self.assertListEqual(expected, actual, error)
77  else:
78  self.assertEqual(expected, actual, error)
79 
80  def assertNoMessage(self, sub, topic, timeout=1):
81  rate = rospy.Rate(100)
82  for i in range(int(timeout * 100)):
83  if sub.has_msg(topic):
84  received = sub.get_last_msg(topic)
85  sub.remove_last_msg(topic)
86  raise AssertionError('Should not receive message on topic %s, but got:\n%s'
87  % (topic, str(received)))
88  rate.sleep()
89 
90  # Test Cases
91 
92  def test_event_state(self):
93  state = self._create()
94  fb_topic = 'flexbe/command_feedback'
95  sub = ProxySubscriberCached({fb_topic: CommandFeedback})
96  rospy.sleep(0.2) # wait for pub/sub
97 
98  # enter during first execute
99  self._execute(state)
100  self.assertListEqual(['on_enter'], state.last_events)
101  self._execute(state)
102  self.assertListEqual([], state.last_events)
103 
104  # pause and resume as commanded
105  state._sub._callback(Bool(True), 'flexbe/command/pause')
106  self._execute(state)
107  self.assertListEqual(['on_pause'], state.last_events)
108  self.assertMessage(sub, fb_topic, CommandFeedback(command="pause"))
109  state.result = 'error'
110  outcome = self._execute(state)
111  state.result = None
112  self.assertIsNone(outcome)
113  state._sub._callback(Bool(False), 'flexbe/command/pause')
114  self._execute(state)
115  self.assertListEqual(['on_resume'], state.last_events)
116  self.assertMessage(sub, fb_topic, CommandFeedback(command="resume"))
117 
118  # repeat triggers exit and enter again
119  state._sub._callback(Empty(), 'flexbe/command/repeat')
120  self._execute(state)
121  self.assertListEqual(['on_exit'], state.last_events)
122  self.assertMessage(sub, fb_topic, CommandFeedback(command="repeat"))
123  self._execute(state)
124  self.assertListEqual(['on_enter'], state.last_events)
125  self._execute(state)
126  self.assertListEqual([], state.last_events)
127 
128  # exit during last execute when returning an outcome
129  state.result = 'done'
130  outcome = self._execute(state)
131  self.assertListEqual(['on_exit'], state.last_events)
132  self.assertEqual('done', outcome)
133 
135  state = self._create()
136  out_topic = 'flexbe/mirror/outcome'
137  req_topic = 'flexbe/outcome_request'
138  sub = ProxySubscriberCached({out_topic: UInt8, req_topic: OutcomeRequest})
139  rospy.sleep(0.2) # wait for pub/sub
140 
141  # return outcome in full autonomy, no request
142  state.result = 'error'
143  self._execute(state)
144  self.assertNoMessage(sub, req_topic)
145  self.assertMessage(sub, out_topic, UInt8(1))
146 
147  # request outcome on same autnomy and clear request on loopback
148  OperatableStateMachine.autonomy_level = 2
149  self._execute(state)
150  self.assertNoMessage(sub, out_topic)
151  self.assertMessage(sub, req_topic, OutcomeRequest(outcome=1, target='/subject'))
152  state.result = None
153  self._execute(state)
154  self.assertMessage(sub, req_topic, OutcomeRequest(outcome=255, target='/subject'))
155 
156  # still return other outcomes
157  state.result = 'done'
158  self._execute(state)
159  self.assertNoMessage(sub, req_topic)
160  self.assertMessage(sub, out_topic, UInt8(0))
161 
162  # request outcome on lower autonomy, return outcome after level increase
163  OperatableStateMachine.autonomy_level = 0
164  self._execute(state)
165  self.assertNoMessage(sub, out_topic)
166  self.assertMessage(sub, req_topic, OutcomeRequest(outcome=0, target='/subject'))
167  OperatableStateMachine.autonomy_level = 3
168  self._execute(state)
169  self.assertMessage(sub, out_topic, UInt8(0))
170 
172  state = self._create()
173  fb_topic = 'flexbe/command_feedback'
174  sub = ProxySubscriberCached({fb_topic: CommandFeedback})
175  rospy.sleep(0.2) # wait for pub/sub
176 
177  # preempt when trigger variable is set
178  PreemptableState.preempt = True
179  outcome = self._execute(state)
180  self.assertEqual(outcome, PreemptableState._preempted_name)
181  self.assertRaises(StateMachineError, lambda: state.parent.current_state)
182  PreemptableState.preempt = False
183  outcome = self._execute(state)
184  self.assertIsNone(outcome)
185 
186  # preempt when command is received
187  state._sub._callback(Empty(), 'flexbe/command/preempt')
188  outcome = self._execute(state)
189  self.assertEqual(outcome, PreemptableState._preempted_name)
190  self.assertRaises(StateMachineError, lambda: state.parent.current_state)
191  self.assertMessage(sub, fb_topic, CommandFeedback(command='preempt'))
192  PreemptableState.preempt = False
193 
195  state = self._create()
196  fb_topic = 'flexbe/command_feedback'
197  sub = ProxySubscriberCached({fb_topic: CommandFeedback})
198  rospy.sleep(0.2) # wait for pub/sub
199 
200  # lock and unlock as commanded, return outcome after unlock
201  state._sub._callback(String('/subject'), 'flexbe/command/lock')
202  state.result = 'done'
203  outcome = self._execute(state)
204  self.assertIsNone(outcome)
205  self.assertTrue(state._locked)
206  self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/subject', '/subject']))
207  state.result = None
208  state._sub._callback(String('/subject'), 'flexbe/command/unlock')
209  outcome = self._execute(state)
210  self.assertEqual(outcome, 'done')
211  self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['/subject', '/subject']))
212 
213  # lock and unlock without target
214  state._sub._callback(String(''), 'flexbe/command/lock')
215  state.result = 'done'
216  outcome = self._execute(state)
217  self.assertIsNone(outcome)
218  self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/subject', '/subject']))
219  state._sub._callback(String(''), 'flexbe/command/unlock')
220  outcome = self._execute(state)
221  self.assertEqual(outcome, 'done')
222  self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['/subject', '/subject']))
223 
224  # reject invalid lock command
225  state._sub._callback(String('/invalid'), 'flexbe/command/lock')
226  outcome = self._execute(state)
227  self.assertEqual(outcome, 'done')
228  self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/invalid', '']))
229 
230  # reject generic unlock command when not locked
231  state._sub._callback(String(''), 'flexbe/command/unlock')
232  self._execute(state)
233  self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['', '']))
234 
235  # do not transition out of locked container
236  state.parent._locked = True
237  outcome = self._execute(state)
238  self.assertIsNone(outcome)
239  state.parent._locked = False
240  state.result = None
241  outcome = self._execute(state)
242  self.assertEqual(outcome, 'done')
243 
245  state = self._create()
246  fb_topic = 'flexbe/command_feedback'
247  sub = ProxySubscriberCached({fb_topic: CommandFeedback})
248  rospy.sleep(0.2) # wait for pub/sub
249 
250  # return requested outcome
251  state._sub._callback(OutcomeRequest(target='subject', outcome=1), 'flexbe/command/transition')
252  outcome = self._execute(state)
253  self.assertEqual(outcome, 'error')
254  self.assertMessage(sub, fb_topic, CommandFeedback(command='transition', args=['subject', 'subject']))
255 
256  # reject outcome request for different target
257  state._sub._callback(OutcomeRequest(target='invalid', outcome=1), 'flexbe/command/transition')
258  outcome = self._execute(state)
259  self.assertIsNone(outcome)
260  self.assertMessage(sub, fb_topic, CommandFeedback(command='transition', args=['invalid', 'subject']))
261 
262  def test_ros_state(self):
263  state = self._create()
264 
265  # default rate is 10Hz
266  start = rospy.get_time()
267  for i in range(10):
268  state.sleep()
269  duration = rospy.get_time() - start
270  try:
271  self.assertAlmostEqual(duration, 1., places=2)
272  except AssertionError:
273  rospy.logwarn(f"elapsed time duration {duration} is not 1. - likely due to OS sleep")
274  try:
275  self.assertAlmostEqual(state.sleep_duration, .1, places=2)
276  except AssertionError:
277  rospy.logwarn(f"Sleep duration {state.sleep_duration} is not .1 - likely due to OS sleep")
278 
279  # change of rate works as expected
280  state.set_rate(1)
281  start = rospy.get_time()
282  state.sleep()
283  duration = rospy.get_time() - start
284  try:
285  self.assertAlmostEqual(duration, 1., places=2)
286  except AssertionError:
287  rospy.logwarn(f"elapsed time duration {duration} is not 1. - likely due to OS sleep")
288  try:
289  self.assertAlmostEqual(state.sleep_duration, 1., places=2)
290  except AssertionError:
291  rospy.logwarn(f"Sleep duration {state.sleep_duration} is not 1. - likely due to OS sleep")
292 
294  state = self._create()
295 
296  # manual transition works on low autonomy
297  OperatableStateMachine.autonomy_level = 0
298  state.result = 'error'
299  outcome = self._execute(state)
300  self.assertIsNone(outcome)
301  state._sub._callback(OutcomeRequest(target='subject', outcome=0), 'flexbe/command/transition')
302  outcome = self._execute(state)
303  self.assertEqual(outcome, 'done')
304  OperatableStateMachine.autonomy_level = 3
305  state.result = None
306 
307  # manual transition blocked by lock
308  state._sub._callback(String('/subject'), 'flexbe/command/lock')
309  outcome = self._execute(state)
310  self.assertIsNone(outcome)
311  state._sub._callback(OutcomeRequest(target='subject', outcome=1), 'flexbe/command/transition')
312  outcome = self._execute(state)
313  self.assertIsNone(outcome)
314  state._sub._callback(String('/subject'), 'flexbe/command/unlock')
315  outcome = self._execute(state)
316  self.assertEqual(outcome, 'error')
317 
318  # preempt works on low autonomy
319  OperatableStateMachine.autonomy_level = 0
320  state.result = 'error'
321  outcome = self._execute(state)
322  self.assertIsNone(outcome)
323  state._sub._callback(Empty(), 'flexbe/command/preempt')
324  outcome = self._execute(state)
325  self.assertEqual(outcome, PreemptableState._preempted_name)
326  PreemptableState.preempt = False
327  OperatableStateMachine.autonomy_level = 3
328  state.result = None
329 
330  # preempt also works when locked
331  state._sub._callback(String('/subject'), 'flexbe/command/lock')
332  outcome = self._execute(state)
333  self.assertIsNone(outcome)
334  state._sub._callback(Empty(), 'flexbe/command/preempt')
335  outcome = self._execute(state)
336  self.assertEqual(outcome, PreemptableState._preempted_name)
337  PreemptableState.preempt = False
338  state._sub._callback(String('/subject'), 'flexbe/command/unlock')
339  outcome = self._execute(state)
340  self.assertIsNone(outcome)
341 
343  cc = ConcurrencyContainer(outcomes=['done', 'error'],
344  conditions=[
345  ('error', [('main', 'error')]),
346  ('error', [('side', 'error')]),
347  ('done', [('main', 'done'), ('side', 'done')])
348  ])
349  with cc:
350  OperatableStateMachine.add('main', TestSubjectState(),
351  transitions={'done': 'done', 'error': 'error'},
352  autonomy={'done': 1, 'error': 2})
353  OperatableStateMachine.add('side', TestSubjectState(),
354  transitions={'done': 'done', 'error': 'error'},
355  autonomy={'done': 1, 'error': 2})
356  with OperatableStateMachine(outcomes=['done', 'error']):
357  OperatableStateMachine.add('cc', cc,
358  transitions={'done': 'done', 'error': 'error'},
359  autonomy={'done': 1, 'error': 2})
360 
361  class FakeRate(object):
362 
363  def remaining(self):
364  return rospy.Duration(0)
365 
366  def sleep(self):
367  pass
368 
369  # all states are called with their correct rate
370  cc.execute(None)
371  cc.sleep()
372  cc.execute(None)
373  try:
374  self.assertAlmostEqual(cc.sleep_duration, .1, places=2)
375  except AssertionError:
376  rospy.logwarn(f"Sleep duration {cc.sleep_duration} is not .1 - likely due to OS sleep")
377 
378  cc.sleep()
379  cc['main'].set_rate(15)
380  cc['side'].set_rate(10)
381  cc['main'].count = 0
382  cc['side'].count = 0
383  start = rospy.get_time()
384  cc_count = 0
385  while rospy.get_time() - start <= 1.:
386  cc_count += 1
387  cc.execute(None)
388  self.assertLessEqual(cc.sleep_duration, .1)
389  cc.sleep()
390  self.assertIn(cc['main'].count, [14, 15, 16])
391  self.assertIn(cc['side'].count, [9, 10, 11])
392  self.assertLessEqual(cc_count, 27)
393 
394  # verify ROS properties and disable sleep
395  cc._enable_ros_control()
396  self.assertTrue(cc['main']._is_controlled)
397  self.assertFalse(cc['side']._is_controlled)
398  cc['main']._rate = FakeRate()
399  cc['side']._rate = FakeRate()
400 
401  # return outcome when all return done or any returns error
402  outcome = cc.execute(None)
403  self.assertIsNone(outcome)
404  cc['main'].result = 'error'
405  outcome = cc.execute(None)
406  self.assertEqual(outcome, 'error')
407  cc['main'].result = None
408  cc['side'].result = 'error'
409  outcome = cc.execute(None)
410  self.assertEqual(outcome, 'error')
411  cc['side'].result = 'done'
412  outcome = cc.execute(None)
413  self.assertIsNone(outcome)
414  cc['main'].result = 'done'
415  outcome = cc.execute(None)
416  self.assertEqual(outcome, 'done')
417  cc['main'].result = None
418  cc['side'].result = None
419 
420  # always call on_exit exactly once when returning an outcome
421  outcome = cc.execute(None)
422  self.assertIsNone(outcome)
423  cc['main'].last_events = []
424  cc['side'].last_events = []
425  cc['main'].result = 'error'
426  outcome = cc.execute(None)
427  self.assertEqual(outcome, 'error')
428  self.assertListEqual(cc['main'].last_events, ['on_exit'])
429  self.assertListEqual(cc['side'].last_events, ['on_exit'])
430 
431  def test_user_data(self):
432  class TestUserdata(EventState):
433 
434  def __init__(self, out_content='test_data'):
435  super(TestUserdata, self).__init__(outcomes=['done'], input_keys=['data_in'], output_keys=['data_out'])
436  self.data = None
437  self._out_content = out_content
438 
439  def execute(self, userdata):
440  rospy.logwarn('\033[0m%s\n%s' % (self.path, str(userdata))) # log for manual inspection
441  self.data = userdata.data_in
442  userdata.data_out = self._out_content
443  return 'done'
444 
445  inner_sm = OperatableStateMachine(outcomes=['done'], input_keys=['sm_in'], output_keys=['sm_out'])
446  inner_sm.userdata.own = 'own_data'
447  with inner_sm:
448  OperatableStateMachine.add('own_state', TestUserdata('inner_data'), transitions={'done': 'outside_state'},
449  remapping={'data_in': 'own', 'data_out': 'sm_out'})
450  OperatableStateMachine.add('outside_state', TestUserdata(), transitions={'done': 'internal_state'},
451  remapping={'data_in': 'sm_in', 'data_out': 'data_in'})
452  OperatableStateMachine.add('internal_state', TestUserdata(), transitions={'done': 'done'},
453  remapping={})
454 
455  sm = OperatableStateMachine(outcomes=['done'])
456  sm.userdata.outside = 'outside_data'
457  with sm:
458  OperatableStateMachine.add('before_state', TestUserdata(), transitions={'done': 'inner_sm'},
459  remapping={'data_in': 'outside'})
460  OperatableStateMachine.add('inner_sm', inner_sm, transitions={'done': 'after_state'},
461  remapping={'sm_in': 'outside'})
462  OperatableStateMachine.add('after_state', TestUserdata('last_data'), transitions={'done': 'modify_state'},
463  remapping={'data_in': 'sm_out'})
464  OperatableStateMachine.add('modify_state', TestUserdata(), transitions={'done': 'final_state'},
465  remapping={'data_out': 'outside', 'data_in': 'outside'})
466  OperatableStateMachine.add('final_state', TestUserdata(), transitions={'done': 'done'},
467  remapping={'data_in': 'data_out'})
468 
469  # can pass userdata to state and get it from state
470  sm.execute(None)
471  self.assertEqual(sm['before_state'].data, 'outside_data')
472  self.assertEqual(sm._userdata.data_out, 'test_data')
473 
474  # sub-state machine can set its own local userdata
475  sm.execute(None)
476  self.assertEqual(sm['inner_sm']['own_state'].data, 'own_data')
477  self.assertNotIn('own', sm._userdata) # transparent to outer sm
478 
479  # sub-state machine can read data from parent state machine
480  sm.execute(None)
481  self.assertEqual(sm['inner_sm']['outside_state'].data, 'outside_data')
482 
483  # sub-state machine can pass along its local userdata
484  self.assertIn('data_in', sm['inner_sm']._userdata)
485  sm.execute(None)
486  self.assertEqual(sm['inner_sm']['internal_state'].data, 'test_data')
487  self.assertNotIn('data_in', sm._userdata) # transparent to outer sm
488 
489  # sub-state machine userdata is wrote back to the parent
490  self.assertEqual(sm._userdata.sm_out, 'inner_data')
491 
492  # outer state machine can read data set by inner state machine
493  sm.execute(None)
494  self.assertEqual(sm['after_state'].data, 'inner_data')
495 
496  # can remap different keys to achieve read-write access
497  sm.execute(None)
498  self.assertEqual(sm['modify_state'].data, 'outside_data')
499  self.assertEqual(sm._userdata.outside, 'test_data')
500 
501  # one state can read data set by another one
502  outcome = sm.execute(None)
503  self.assertEqual(sm['final_state'].data, 'last_data')
504  self.assertEqual(outcome, 'done')
505 
506 
507 if __name__ == '__main__':
508  rospy.init_node('test_flexbe_core')
509  import rostest
510  rostest.rosrun('flexbe_core', 'test_flexbe_core', TestCore)
test_core.TestSubjectState.__init__
def __init__(self)
Definition: test_core.py:16
test_core.TestCore.test_lockable_state
def test_lockable_state(self)
Definition: test_core.py:194
test_core.TestSubjectState.on_stop
def on_stop(self)
Definition: test_core.py:35
test_core.TestCore._out_content
_out_content
Definition: test_core.py:437
flexbe_core.core.exceptions
Definition: exceptions.py:1
test_core.TestSubjectState.on_pause
def on_pause(self)
Definition: test_core.py:38
test_core.TestCore
Definition: test_core.py:45
flexbe_core.proxy.proxy_subscriber_cached.ProxySubscriberCached
Definition: proxy_subscriber_cached.py:7
test_core.TestCore.assertNoMessage
def assertNoMessage(self, sub, topic, timeout=1)
Definition: test_core.py:80
test_core.TestSubjectState.execute
def execute(self, userdata)
Definition: test_core.py:22
test_core.TestCore.test_preemptable_state
def test_preemptable_state(self)
Definition: test_core.py:171
test_core.TestCore.data
data
Definition: test_core.py:436
test_core.TestCore.test_ros_state
def test_ros_state(self)
Definition: test_core.py:262
test_core.TestSubjectState.on_resume
def on_resume(self, userdata)
Definition: test_core.py:41
test_core.TestSubjectState.on_enter
def on_enter(self, userdata)
Definition: test_core.py:26
test_core.TestSubjectState
Definition: test_core.py:14
test_core.TestCore._create
def _create(self)
Definition: test_core.py:47
flexbe_core.proxy
Definition: proxy/__init__.py:1
test_core.TestCore.test_event_state
def test_event_state(self)
Definition: test_core.py:92
test_core.TestCore.test_concurrency_container
def test_concurrency_container(self)
Definition: test_core.py:342
test_core.TestSubjectState.count
count
Definition: test_core.py:20
test_core.TestCore.assertMessage
def assertMessage(self, sub, topic, msg, timeout=1)
Definition: test_core.py:60
test_core.TestCore.test_manually_transitionable_state
def test_manually_transitionable_state(self)
Definition: test_core.py:244
test_core.TestSubjectState.last_events
last_events
Definition: test_core.py:19
test_core.TestSubjectState.on_start
def on_start(self)
Definition: test_core.py:32
test_core.TestCore.test_cross_combinations
def test_cross_combinations(self)
Definition: test_core.py:293
test_core.TestCore._execute
def _execute(self, state)
Definition: test_core.py:56
test_core.TestSubjectState.on_exit
def on_exit(self, userdata)
Definition: test_core.py:29
test_core.TestSubjectState.result
result
Definition: test_core.py:18
flexbe_core.core
Definition: core/__init__.py:1
test_core.TestCore.test_user_data
def test_user_data(self)
Definition: test_core.py:431
test_core.TestCore.test_operatable_state
def test_operatable_state(self)
Definition: test_core.py:134


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Fri Jul 21 2023 02:26:05