behavior.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from flexbe_core.logger import Logger
3 from flexbe_core.core import PreemptableState, OperatableStateMachine, LockableStateMachine
4 
5 
6 class Behavior(object):
7  '''
8  This is the superclass for all implemented behaviors.
9  '''
10 
11  def __init__(self):
12  '''
13  Please call this superclass constructor first when overriding it with your behavior.
14  '''
15  self._state_machine = None
16  self.name = "unnamed behavior"
17  self.id = 0
18 
19  self.contains = {}
20  self._behaviors = {}
21 
22  self._autonomy_level = 3
23  self._debug = False
24 
26 
27  # Please implement those:
28 
29  def create(self):
30  """
31  Should create the state machine for this behavior and return it.
32  It is called immediately before executing the behavior,
33  so used parameters will have their final value when called.
34 
35  @return The complete state machine for this behavior.
36  """
37  pass
38 
39  # Use those if you need them:
40 
41  def add_parameter(self, name, default):
42  """
43  Adds a parameter to this behavior.
44  The parameter should be declared in the behavior manifest.
45 
46  @type name: string
47  @param name: The name of the parameter.
48 
49  @type default: object
50  @param default: The default value of this parameter. Be sure to set it to the right type.
51  """
52  setattr(self, name, default)
53 
54  def add_behavior(self, behavior_class, behavior_id):
55  """
56  Adds another behavior as part of this behavior.
57  This other behavior should be declared as contained in the behavior manifest.
58 
59  @type behavior_class: class
60  @param behavior_class: The class implementing the other behavior.
61 
62  @type behavior_id: string
63  @param behavior_id: Unique identifier for this behavior instance.
64  """
65  if not hasattr(self, 'contains'):
66  Logger.logerr('Behavior was not initialized! Please call superclass constructor.')
67  instance = behavior_class()
68  self.contains[behavior_id] = instance
69 
70  def use_behavior(self, behavior_class, behavior_id, default_keys=None, parameters=None):
71  """
72  Creates a state machine implementing the given behavior to use it in the behavior state machine.
73  Behavior has to be added first.
74 
75  @type behavior_class: class
76  @param behavior_class: The class implementing the other behavior.
77 
78  @type behavior_id: string
79  @param behavior_id: Same identifier as used for adding.
80 
81  @type default_keys: list
82  @param default_keys: List of input keys of the behavior which should be ignored
83  and instead use the default values as given by the behavior.
84 
85  @type parameters: dict
86  @param parameters: Optional assignment of values to behavior parameters.
87  Any assigned parameter will be ignored for runtime customization,
88  i.e., cannot be overwritten by a user who runs the behavior.
89  """
90  if behavior_id not in self.contains:
91  Logger.logerr('Tried to use a behavior without adding it!')
92  return None
93 
94  if parameters is not None:
95  for parameter, value in parameters.items():
96  setattr(self.contains[behavior_id], parameter, value)
97 
98  state_machine = self.contains[behavior_id]._get_state_machine()
99 
100  if default_keys is not None:
101  state_machine._input_keys = list(set(state_machine._input_keys) - set(default_keys))
102  for key in state_machine._input_keys:
103  state_machine._own_userdata(remove_key=key)
104 
105  return state_machine
106 
107  # Lifecycle
108 
109  def prepare_for_execution(self, input_data=None):
110  """
111  Prepares this behavior for execution by building its state machine.
112  """
113  OperatableStateMachine.autonomy_level = self._autonomy_level
114 
115  self._state_machine = self.create()
116 
117  if input_data is None:
118  input_data = dict()
119  for k, v in input_data.items():
120  if k in self._state_machine._own_userdata:
121  self._state_machine._own_userdata[k] = v
122 
123  def set_parameter(self, name, value):
124  """
125  Set the given parameter of this behavior or any sub-behavior.
126  Use a path specification to refer to the corresponding sub-behavior.
127  The parameter value is propagated to all sub-behaviors according to the propagation rules.
128  Also, the value is casted to the type of the parameter if required.
129 
130  @type name: string
131  @param name: Name of the parameter, possibly including the path to a sub-behavior.
132 
133  @type value: object
134  @param value: New value of the parameter of same type as before (or can be casted to that type).
135 
136  @return: Whether the parameter existed and could be set.
137  """
138  name_split = name.rsplit('/', 1)
139  behavior = name_split[0] if len(name_split) == 2 else ''
140  key = name_split[-1]
141  behaviors = self.get_contained_behaviors()
142  behaviors[''] = self # add this behavior as well
143  found = False
144  for b in behaviors:
145  if b.startswith(behavior) and hasattr(behaviors[b], key):
146  behaviors[b]._set_typed_attribute(key, value)
147  found = True
148  return found
149 
150  def confirm(self):
151  """
152  Confirms that this behavior is ready for execution.
153  """
154  LockableStateMachine.path_for_switch = self.requested_state_path
155 
156  self._state_machine.confirm(self.name, self.id)
157 
158  def execute(self):
159  """
160  Called when the behavior is executed.
161  Need to call self.execute_behavior when ready to start the state machine and return its result.
162 
163  @return: A string containing the execution result such as finished or failed.
164  """
165  result = self._state_machine.spin()
166  self._state_machine.destroy()
167  return result
168 
169  def prepare_for_switch(self, state):
170  """
171  Prepares the behavior for being executed after a behavior switch.
172 
173  @type name: string
174  @param name: The name of this behavior.
175  """
176  state._locked = True # make sure the state cannot transition during preparations
177  states = self._get_states_of_path(state.path, self._state_machine)
178  if states is None:
179  raise RuntimeError("Did not find locked state in new behavior!")
180  state_container = state._parent
181  state_container.remove_state(state) # remove from old state machine
182  for sm in states[1:]:
183  # update userdata in new state machine
184  sm.replace_userdata(state_container.userdata)
185  state_container = state_container._parent
186  states[1].replace_state(state) # add to new state machine
187  self.requested_state_path = state.path # set start after switch
188 
189  def get_current_state(self):
190  return self._state_machine.get_deep_state()
191 
192  def get_locked_state(self):
193  state = self._state_machine.get_deep_state()
194  while state is not None:
195  if state.is_locked():
196  return state
197  else:
198  state = state._parent
199  return None
200 
201  def preempt(self):
202  PreemptableState.preempt = True
203 
204  # For internal use only
205 
207  if self._state_machine is None:
208  self._state_machine = self.create()
209  return self._state_machine
210 
211  def _collect_contained(self, obj, path):
212  contain_list = {path+"/"+key: value for (key, value) in getattr(obj, 'contains', {}).items()}
213  add_to_list = {}
214  for b_id, b_inst in contain_list.items():
215  add_to_list.update(self._collect_contained(b_inst, b_id))
216  contain_list.update(add_to_list)
217  return contain_list
218 
220  return self._collect_contained(self, '')
221 
222  def _set_typed_attribute(self, name, value):
223  attr = getattr(self, name)
224  # convert type if required
225  if not isinstance(value, type(attr)):
226  if type(attr) is int:
227  value = int(value)
228  elif type(attr) is float:
229  value = float(value)
230  elif type(attr) is bool:
231  value = (value != "0")
232  elif type(attr) is dict:
233  import yaml
234  value = getattr(yaml, 'full_load', yaml.load)(value)
235  setattr(self, name, value)
236 
237  def set_up(self, id, autonomy_level, debug):
238  self.id = id
239  self._autonomy_level = autonomy_level
240  self._debug = debug
241 
242  def _get_states_of_path(self, path, container):
243  path_elements = path.split('/')
244  if len(path_elements) < 2:
245  return [container] # actually a state in this case
246  state_label = path_elements[1]
247  new_path = "/".join(path_elements[1:])
248  # collect along the path and append self
249  if state_label in container:
250  childlist = self._get_states_of_path(new_path, container[state_label])
251  if childlist is None:
252  return None
253  childlist.append(container)
254  return childlist
255  else:
256  return None
def use_behavior(self, behavior_class, behavior_id, default_keys=None, parameters=None)
Definition: behavior.py:70
def set_up(self, id, autonomy_level, debug)
Definition: behavior.py:237
def add_behavior(self, behavior_class, behavior_id)
Definition: behavior.py:54
def set_parameter(self, name, value)
Definition: behavior.py:123
def _set_typed_attribute(self, name, value)
Definition: behavior.py:222
def prepare_for_execution(self, input_data=None)
Definition: behavior.py:109
def _get_states_of_path(self, path, container)
Definition: behavior.py:242
def add_parameter(self, name, default)
Definition: behavior.py:41
def _collect_contained(self, obj, path)
Definition: behavior.py:211
def prepare_for_switch(self, state)
Definition: behavior.py:169


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39