lockable_state.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from flexbe_core.logger import Logger
3 
4 from flexbe_msgs.msg import CommandFeedback
5 from std_msgs.msg import String
6 
7 from flexbe_core.core.manually_transitionable_state import ManuallyTransitionableState
8 
9 
11  """
12  A state that can be locked.
13  When locked, no transition can be done regardless of the resulting outcome.
14  However, if any outcome would be triggered, the outcome will be stored
15  and the state won't be executed anymore until it is unlocked and the stored outcome is set.
16  """
17 
18  def __init__(self, *args, **kwargs):
19  super(LockableState, self).__init__(*args, **kwargs)
20  self.__execute = self.execute
22 
23  self._locked = False
24  self._stored_outcome = None
25 
26  self._feedback_topic = 'flexbe/command_feedback'
27  self._lock_topic = 'flexbe/command/lock'
28  self._unlock_topic = 'flexbe/command/unlock'
29 
30  def _lockable_execute(self, *args, **kwargs):
31  if self._is_controlled and self._sub.has_msg(self._lock_topic):
32  msg = self._sub.get_last_msg(self._lock_topic)
33  self._sub.remove_last_msg(self._lock_topic)
34  self._execute_lock(msg.data)
35 
36  if self._is_controlled and self._sub.has_msg(self._unlock_topic):
37  msg = self._sub.get_last_msg(self._unlock_topic)
38  self._sub.remove_last_msg(self._unlock_topic)
39  self._execute_unlock(msg.data)
40 
41  # locked, so execute until we want to transition
42  if self._locked:
43  if self._stored_outcome is None or self._stored_outcome == 'None':
44  self._stored_outcome = self.__execute(*args, **kwargs)
45  return None
46 
47  # not locked, but there still is a transition we want to trigger
48  if not self._locked and self._stored_outcome is not None and not self._stored_outcome == 'None':
49  if self.parent.transition_allowed(self.name, self._stored_outcome):
50  outcome = self._stored_outcome
51  self._stored_outcome = None
52  return outcome
53  else:
54  return None
55 
56  outcome = self.__execute(*args, **kwargs)
57 
58  if outcome is None or outcome == 'None':
59  return None
60 
61  # not locked, but still, a parent could be locked so we need to ensure that we do not cause an outcome there
62  if self.parent is not None and not self.parent.transition_allowed(self.name, outcome):
63  self._stored_outcome = outcome
64  return None
65 
66  return outcome
67 
68  def _execute_lock(self, target):
69  if target == self.path or target == '':
70  target = self.path
71  found_target = True
72  self._locked = True
73  else:
74  found_target = self._parent.lock(target)
75  # provide feedback about lock
76  self._pub.publish(self._feedback_topic, CommandFeedback(command="lock",
77  args=[target, target if found_target else ""]))
78  if not found_target:
79  Logger.logwarn("Wanted to lock %s, but could not find it in current path %s." % (target, self.path))
80  else:
81  Logger.localinfo("--> Locking in state %s" % target)
82 
83  def _execute_unlock(self, target):
84  if target == self.path or (self._locked and target == ''):
85  target = self.path
86  found_target = True
87  self._locked = False
88  else:
89  found_target = self._parent.unlock(target)
90  # provide feedback about unlock
91  self._pub.publish(self._feedback_topic, CommandFeedback(command="unlock",
92  args=[target, target if found_target else ""]))
93  if not found_target:
94  Logger.logwarn("Wanted to unlock %s, but could not find it in current path %s." % (target, self.path))
95  else:
96  Logger.localinfo("--> Unlocking in state %s" % target)
97 
99  super(LockableState, self)._enable_ros_control()
100  self._pub.createPublisher(self._feedback_topic, CommandFeedback)
101  self._sub.subscribe(self._lock_topic, String)
102  self._sub.subscribe(self._unlock_topic, String)
103 
105  super(LockableState, self)._disable_ros_control()
106  self._sub.unsubscribe_topic(self._lock_topic)
107  self._sub.unsubscribe_topic(self._unlock_topic)
108 
109  def is_locked(self):
110  return self._locked
flexbe_core.core.lockable_state.LockableState.execute
execute
Definition: lockable_state.py:21
flexbe_core.core.lockable_state.LockableState._execute_unlock
def _execute_unlock(self, target)
Definition: lockable_state.py:83
flexbe_core.core.lockable_state.LockableState._feedback_topic
_feedback_topic
Definition: lockable_state.py:26
flexbe_core.core.ros_state.RosState._sub
_sub
Definition: ros_state.py:19
flexbe_core.core.lockable_state.LockableState._lockable_execute
def _lockable_execute(self, *args, **kwargs)
Definition: lockable_state.py:30
flexbe_core.core.lockable_state.LockableState._lock_topic
_lock_topic
Definition: lockable_state.py:27
flexbe_core.core.lockable_state.LockableState.is_locked
def is_locked(self)
Definition: lockable_state.py:109
flexbe_core.logger
Definition: logger.py:1
flexbe_core.core.ros_state.RosState._is_controlled
_is_controlled
Definition: ros_state.py:16
flexbe_core.core.lockable_state.LockableState
Definition: lockable_state.py:10
flexbe_core.core.lockable_state.LockableState._execute_lock
def _execute_lock(self, target)
Definition: lockable_state.py:68
flexbe_core.core.manually_transitionable_state
Definition: manually_transitionable_state.py:1
flexbe_core.core.lockable_state.LockableState._unlock_topic
_unlock_topic
Definition: lockable_state.py:28
flexbe_core.core.state.State.path
def path(self)
Definition: state.py:69
flexbe_core.core.state.State.parent
def parent(self)
Definition: state.py:59
flexbe_core.core.lockable_state.LockableState._disable_ros_control
def _disable_ros_control(self)
Definition: lockable_state.py:104
flexbe_core.core.lockable_state.LockableState._enable_ros_control
def _enable_ros_control(self)
Definition: lockable_state.py:98
flexbe_core.core.manually_transitionable_state.ManuallyTransitionableState
Definition: manually_transitionable_state.py:9
flexbe_core.core.lockable_state.LockableState.__execute
__execute
Definition: lockable_state.py:20
flexbe_core.core.lockable_state.LockableState._stored_outcome
_stored_outcome
Definition: lockable_state.py:24
flexbe_core.core.lockable_state.LockableState._locked
_locked
Definition: lockable_state.py:23
flexbe_core.core.lockable_state.LockableState.__init__
def __init__(self, *args, **kwargs)
Definition: lockable_state.py:18
flexbe_core.core.state.State._parent
_parent
Definition: state.py:22
flexbe_core.core.ros_state.RosState._pub
_pub
Definition: ros_state.py:18
flexbe_core.core.state.State.name
def name(self)
Definition: state.py:49


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Fri Jul 21 2023 02:26:05