lockable_state.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 
4 from flexbe_core.core.manually_transitionable_state import ManuallyTransitionableState
5 
6 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
7 from flexbe_msgs.msg import CommandFeedback
8 from std_msgs.msg import String
9 
10 
12  """
13  A state that can be locked.
14  When locked, no transition can be done regardless of the resulting outcome.
15  However, if any outcome would be triggered, the outcome will be stored
16  and the state won't be executed anymore until it is unlocked and the stored outcome is set.
17  """
18 
19  def __init__(self, *args, **kwargs):
20  super(LockableState, self).__init__(*args, **kwargs)
21 
22  self._locked = False
23  self._stored_outcome = None
24 
25  self.__execute = self.execute
27 
28  self._feedback_topic = 'flexbe/command_feedback'
29  self._lock_topic = 'flexbe/command/lock'
30  self._unlock_topic = 'flexbe/command/unlock'
31 
32  self._pub = ProxyPublisher()
33  self._sub = ProxySubscriberCached()
34 
35 
36  def _lockable_execute(self, *args, **kwargs):
37  if self._is_controlled and self._sub.has_msg(self._lock_topic):
38  msg = self._sub.get_last_msg(self._lock_topic)
39  self._sub.remove_last_msg(self._lock_topic)
40  self._execute_lock(msg.data)
41 
42  if self._is_controlled and self._sub.has_msg(self._unlock_topic):
43  msg = self._sub.get_last_msg(self._unlock_topic)
44  self._sub.remove_last_msg(self._unlock_topic)
45  self._execute_unlock(msg.data)
46 
47  if self._locked:
48  if self._stored_outcome is None or self._stored_outcome == 'None':
49  self._stored_outcome = self.__execute(*args, **kwargs)
50  return None
51 
52  if not self._locked and not self._stored_outcome is None and not self._stored_outcome == 'None':
53  if self._parent.transition_allowed(self.name, self._stored_outcome):
54  outcome = self._stored_outcome
55  self._stored_outcome = None
56  return outcome
57  else:
58  return None
59 
60  outcome = self.__execute(*args, **kwargs)
61 
62  if outcome is None or outcome == 'None':
63  return None
64 
65  if not self._parent is None and not self._parent.transition_allowed(self.name, outcome):
66  self._stored_outcome = outcome
67  return None
68 
69  return outcome
70 
71 
72  def _execute_lock(self, target):
73  if target == self._get_path() or target == '':
74  target = self._get_path()
75  found_target = True
76  self._locked = True
77  else:
78  found_target = self._parent.lock(target)
79 
80  self._pub.publish(self._feedback_topic, CommandFeedback(command="lock", args=[target, target if found_target else ""]))
81  if not found_target:
82  rospy.logwarn("--> Wanted to lock %s, but could not find it in current path %s.", target, self._get_path())
83  else:
84  rospy.loginfo("--> Locking in state %s", target)
85 
86 
87  def _execute_unlock(self, target):
88  if target == self._get_path() or (self._locked and target == ''):
89  target = self._get_path()
90  found_target = True
91  self._locked = False
92  else:
93  found_target = self._parent.unlock(target)
94 
95  self._pub.publish(self._feedback_topic, CommandFeedback(command="unlock", args=[target, target if found_target else ""]))
96  if not found_target:
97  rospy.logwarn("--> Wanted to unlock %s, but could not find it in current path %s.", target, self._get_path())
98  else:
99  rospy.loginfo("--> Unlocking in state %s", target)
100 
101 
103  super(LockableState, self)._enable_ros_control()
104  self._pub.createPublisher(self._feedback_topic, CommandFeedback)
105  self._sub.subscribe(self._lock_topic, String)
106  self._sub.subscribe(self._unlock_topic, String)
107 
109  super(LockableState, self)._disable_ros_control()
110  self._sub.unsubscribe_topic(self._lock_topic)
111  self._sub.unsubscribe_topic(self._unlock_topic)
112 
113 
114  def is_locked(self):
115  return self._locked


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Wed Jun 5 2019 21:51:59