task_executive.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- encoding: utf-8 -*-
3 
4 import heapq
5 import json
6 import re
7 import rospy
8 
9 from app_manager.msg import AppList
10 from app_manager.srv import StartApp
11 from app_manager.srv import StopApp
12 from std_srvs.srv import Empty
13 
14 from dialogflow_task_executive.msg import DialogResponse
15 
16 
17 def camel_to_snake(name):
18  s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
19  return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
20 
21 
22 class AppManager(object):
23  def __init__(self,
24  on_started=None,
25  on_stopped=None,
26  on_installed=None,
27  on_uninstalled=None,
28  timeout=30):
29  # init variables
30  self._latest_msg = None
31  self._last_running = None
32  self._last_available = None
33  # init callbacks
34  self._callbacks = {
35  'started': on_started,
36  'stopped': on_stopped,
37  'installed': on_installed,
38  'uninstalled': on_uninstalled,
39  }
40  # init interfaces
41  ns = rospy.get_param("robot/name", "robot")
42  self._srv_start_app = rospy.ServiceProxy(
43  ns + "/start_app", StartApp)
44  self._srv_stop_app = rospy.ServiceProxy(
45  ns + "/stop_app", StopApp)
46  self._sub_list_apps = rospy.Subscriber(
47  ns + "/app_list", AppList, self._list_apps_cb)
48  self._srv_start_app.wait_for_service(timeout=timeout)
49  self._srv_stop_app.wait_for_service(timeout=timeout)
50  r = rospy.Rate(1)
51  while not rospy.is_shutdown():
52  if self._latest_msg is not None:
53  break
54  rospy.loginfo("Waiting for apps")
55  r.sleep()
56  rospy.loginfo("AppManager initialized")
57 
58  def _list_apps_cb(self, msg):
59  self._latest_msg = msg
60  if self._last_running is not None and \
61  self._last_available is not None:
62  if self._callbacks["started"]:
63  started = set(self.running_apps) - set(self._last_running)
64  for name in started:
65  self._callbacks["started"](name)
66  if self._callbacks["stopped"]:
67  stopped = set(self._last_running) - set(self.running_apps)
68  for name in stopped:
69  self._callbacks["stopped"](name)
70  last_all = set(self._last_running) | set(self._last_available)
71  all_apps = set(self.running_apps) | set(self.available_apps)
72  if self._callbacks["installed"]:
73  installed = all_apps - last_all
74  for name in installed:
75  self._callbacks["installed"](name)
76  if self._callbacks["uninstalled"]:
77  uninstalled = last_all - all_apps
78  for name in uninstalled:
79  self._callbacks["uninstalled"](name)
80  self._last_running = self.running_apps
82 
83  @property
84  def running_apps(self):
85  return map(lambda a: a.name,
86  self._latest_msg.running_apps)
87 
88  @property
89  def available_apps(self):
90  return map(lambda a: a.name,
91  self._latest_msg.available_apps)
92 
93  def start_app(self, name):
94  if name in self.running_apps:
95  raise RuntimeError("{} is already running".format(name))
96  elif name not in self.available_apps:
97  raise RuntimeError("{} is not available".format(name))
98  res = self._srv_start_app(name=name)
99  if res.started:
100  rospy.loginfo("{} successfully started".format(name))
101  return True
102  else:
103  raise RuntimeError("{0} failed to launch: {1} ({2})".format(
104  name, res.message, res.error_code))
105 
106  def stop_app(self, name):
107  if name not in self.running_apps:
108  raise RuntimeError("{} is not running".format(name))
109  res = self._srv_stop_app(name=name)
110  if res.stopped:
111  rospy.loginfo("{} successfully stopped".format(name))
112  return True
113  else:
114  raise RuntimeError("{0} failed to stop: {1} ({2})".format(
115  name, res.message, res.error_code))
116 
117 
118 class PriorityQueue(object):
119  REMOVED = '<REMOVED>'
120 
121  def __init__(self, default_priority=5):
122  self.heap = list()
123  self.default_priority = default_priority
124  self.mark_removed = dict()
125 
126  def push(self, element, priority=None):
127  "Add a new element or update the priority of an existing element."
128  if priority is None:
129  priority = self.default_priority
130  if element in self.mark_removed:
131  self.remove(element)
132  entry = [priority, element]
133  self.mark_removed[element] = entry
134  heapq.heappush(self.heap, entry)
135 
136  def pop(self):
137  "Remove and return the highest priority element (1 is the highest)"
138  while self.heap:
139  _, element = heapq.heappop(self.heap)
140  if element is not self.REMOVED:
141  del self.mark_removed[element]
142  return element
143  raise ValueError('Empty queue')
144 
145  def remove(self, element):
146  "Remove the element"
147  entry = self.mark_removed.pop(element)
148  entry[-1] = self.REMOVED
149 
150  def __len__(self):
151  return len(self.heap)
152 
153  def __iter__(self):
154  return self
155 
156  def next(self):
157  try:
158  return self.pop()
159  except ValueError:
160  raise StopIteration()
161 
162 
163 class TaskExecutive(object):
164  def __init__(self):
166  on_started=self.app_start_cb,
167  on_stopped=self.app_stop_cb,
168  )
169  # load remappings
170  self.stop_action = rospy.get_param("~stop_action", "stop")
171  self.action_remappings = rospy.get_param("~action_remappings", {})
172  for key, app in self.action_remappings.items():
173  if app not in self.app_manager.available_apps:
174  rospy.logwarn("Action '{}' is not available".format(app))
175  del self.action_remappings[key]
176 
177  self.sub_dialog = rospy.Subscriber(
178  "dialog_response", DialogResponse,
179  self.dialog_cb)
180 
181  @property
182  def is_idle(self):
183  return len(self.app_manager.running_apps) == 0
184 
185  def dialog_cb(self, msg):
186  if not msg.action or msg.action.startswith('input.'):
187  rospy.loginfo("Action '{}' is ignored".format(msg.action))
188  return
189 
190  if not self.is_idle:
191  # check stop words
192  action = camel_to_snake(msg.action)
193  if action == self.stop_action:
194  rospy.loginfo("Stop action detected")
195  for app in self.app_manager.running_apps:
196  try:
197  self.app_manager.stop_app(app)
198  except Exception:
199  pass
200  else:
201  rospy.logerr(
202  "Action {} is already executing"
203  .format(self.app_manager.running_apps))
204 
205  return
206 
207  # check extra action remappings
208  if msg.action in self.action_remappings.values():
209  action = msg.action
210  elif msg.action in self.action_remappings:
211  action = self.action_remappings[msg.action]
212  else:
213  action = camel_to_snake(msg.action)
214  for app_name in self.app_manager.available_apps:
215  if action == app_name or action == app_name.split('/')[1]:
216  action = app_name
217  break
218  if action not in self.app_manager.available_apps:
219  rospy.logerr("Action '{}' is unknown".format(action))
220  return
221  try:
222  params = json.loads(msg.parameters)
223  rospy.set_param("/action/parameters", params)
224  except ValueError:
225  rospy.logerr(
226  "Failed to parse parameters of action '{}'".format(msg.action))
227  return
228  rospy.loginfo(
229  "Starting '{}' with parameters '{}'"
230  .format(msg.action, msg.parameters))
231  self.app_manager.start_app(action)
232 
233  def app_start_cb(self, name):
234  rospy.loginfo("{} started".format(name))
235  try:
236  stop_idle = rospy.ServiceProxy("/idle_behavior/disable", Empty)
237  stop_idle.wait_for_service(1)
238  stop_idle()
239  except rospy.ROSException:
240  pass
241 
242  def app_stop_cb(self, name):
243  rospy.loginfo("{} stopped".format(name))
244  try:
245  start_idle = rospy.ServiceProxy("/idle_behavior/enable", Empty)
246  start_idle.wait_for_service(1)
247  start_idle()
248  except rospy.ROSException:
249  pass
250  try:
251  rospy.delete_param("/action/parameters")
252  rospy.loginfo("Removed /action/parameters")
253  except KeyError:
254  pass
255 
256 
257 if __name__ == '__main__':
258  rospy.init_node("task_executive")
259  task_executive = TaskExecutive()
260  rospy.spin()
def start_app(self, name)
def camel_to_snake(name)
def push(self, element, priority=None)
def _list_apps_cb(self, msg)
def __init__(self, default_priority=5)
def __init__(self, on_started=None, on_stopped=None, on_installed=None, on_uninstalled=None, timeout=30)


dialogflow_task_executive
Author(s): Yuki Furuta
autogenerated on Tue May 11 2021 02:55:30