launchman.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 # Revision $Id$
35 
36 PKG = 'launchman' # this package name
37 NAME = 'launchman'
38 
39 import roslib; roslib.load_manifest(PKG)
40 
41 import time
42 import string
43 from optparse import OptionParser
44 
45 from launchman.srv import *
46 from launchman.msg import *
47 import rospy
48 
49 import roslib.names
50 import roslib.network
51 
52 from roslaunch.config import ROSLaunchConfig
53 #from roslaunch.launch import ROSLaunchRunner
54 import roslaunch.parent
55 from roslaunch.pmon import pmon_shutdown as _pmon_shutdown
56 from roslaunch.xmlloader import *
57 
58 from launchman import app
59 
60 class AppGroup:
61  def __init__(self, manager, apprunner):
62  self.apprunner = None
63  self.manager = manager
64 
65 class AppRunner:
66  def __init__(self, taskid, manager):
67  self.taskid = taskid
68  self.task = AppUpdate(None, None, None, None, None)
69  self.app = None
70  self.runner = None
71  self.childGroups = []
72  self.manager = manager
73 
74  def __del__(self):
75  if self.manager: self.manager = None
76  if self.childGroups: self.childGroups = None
77 
78  def launch(self):
79  package, launch_file = string.split(self.app.launch_file, "/", 1)
80  path = app.getPackagePath(package)
81  print "pkgpath", path
82  print "launchfile", launch_file
83 
84  fn = os.path.join(path, launch_file)
85  if not os.path.exists(fn):
86  return False
87 
88  self.runner = roslaunch.parent.ROSLaunchParent(rospy.get_param("/run_id"), [fn], is_core=False, process_listeners=(self, ))
89  self.runner.start()
90 
91  return True
92 
93  def stop(self):
94  if not self.runner:
95  print "no runner", self
96  return
97  self.runner.shutdown()
98  self.runner = None
99 
100  def process_died(self, process_name, exit_code):
101  rospy.logerr( "process_died, but we're ignoring this: %s, %s" % ( process_name, exit_code))
102  return
103 
104  rospy.logdebug( "process_died: %s, %s" % ( process_name, exit_code))
105  if self.runner: rospy.logdebug(self.runner.pm.procs)
106 
107  if not self.runner: return
108 
109  if len(self.runner.pm.procs) == 1:
110  #make sure to clean up the runner so that we don't strand roslaunch processes
111  try:
112  self.runner.shutdown()
113  except Exception as e:
114  pass
115 
116  self.runner = None
117  print "ALL DONE!"
118  self.manager._stopTask(self)
119  else:
120  if self.runner.pm.procs:
121  self.task.status = "error"
122  self.manager._send_status()
123 
124  print "too many processes left:", len(self.runner.pm.procs)
125  for proc in self.runner.pm.procs:
126  proc.stop()
127 
128  #make sure to clean up the runner so that we don't strand roslaunch processes
129  try:
130  self.runner.shutdown()
131  except Exception as e:
132  pass
133 
134  self.runner = None
135  self.manager._stopTask(self)
136 
137 
138 
139  def __repr__(self):
140  return "<AppRunner %s: %s %s %s>" % (self.taskid, self.app.provides, self.app.taskid, len(self.childGroups))
141 
142 
143 
145  def __init__(self, kill_on_conflict=False):
146  self.app_update = rospy.Publisher("app_update", AppUpdate, self)
147  self.app_status = rospy.Publisher("app_status", AppStatus, self)
148  self._taskGroups = {}
149  self._apps = {}
150  self.kill_on_conflict = kill_on_conflict
151 
152  def _send_status(self):
153  apps = []
154  for taskid, runner in self._apps.items():
155  apps.append(Application(taskid, runner.app.name, runner.task.status, runner.app.icon, str(runner.app.provides), str(runner.app.depends)))
156  self.app_status.publish(apps)
157 
158  # Look for running tasks that provide any of the capabilities included
159  # in deps. Returns a tuple: (uncovered, pgroup). uncovered is the subset
160  # of deps not provided by any running task. pgroup
161  # is a list of AppRunner objects associated with all tasks that provide
162  # any of deps.
163  def _find_providers(self, deps):
164  if not deps:
165  return (), []
166  d = set(deps)
167  pgroup = []
168  for k in self._taskGroups:
169  isect = set(k) & d
170  if isect:
171  pgroup.append(self._taskGroups[k])
172  d = d - isect
173  return tuple(d), pgroup
174 
175  def list_tasks(self, req):
176  s = []
177  for a in self._taskGroups:
178  s.append(self._taskGroups[a].app.name)
179  return ListTasksResponse(s)
180 
181  def start_task(self, req):
182  a = app.App(req.taskid)
183  a.load()
184 
185  # Check for conflicts between this task's provides and those of
186  # running tasks.
187  pgroup = None
188  uncovered, pgroup = self._find_providers(a.provides)
189  for runner in pgroup:
190  if runner.task.taskid == req.taskid:
191  print "already running"
192  self._send_status()
193  return StartTaskResponse("This app is already running.")
194 
195  if self.kill_on_conflict:
196  self._stopTask(runner)
197  else:
198  return StartTaskResponse("The %s app conflicts with the %s app; please stop the %s app"%(a.name, runner.app.name, runner.app.name))
199 
200  # Check for satisfaction of this task's dependencies.
201  if a.depends:
202  uncovered, pgroup = self._find_providers(a.depends)
203  if uncovered:
204  print "no parent task group %s running" % str(uncovered)
205  self._send_status()
206  return StartTaskResponse("No app that provides %s is running." % str(uncovered))
207 
208  runner = AppRunner(req.taskid, self)
209  runner.app = a
210  self._apps[req.taskid] = runner
211 
212  if a.provides:
213  self._taskGroups[a.provides] = runner
214 
215  if pgroup:
216  for p in pgroup:
217  p.childGroups.append(runner)
218 
219  #print "startTask [%s, %s, %s]" % (req.taskid, a.name, req.username)
220  runner.task.taskid = req.taskid
221  runner.task.username = req.username
222  runner.task.started = rospy.get_rostime()
223  runner.task.status = "starting"
224 
225 # self.app_update.publish(runner.task)
226  self._send_status()
227 
228  try:
229  runner.launch()
230  except Exception as e:
231  import traceback
232  traceback.print_exc()
233  runner.task.status = "error"
234  self._send_status()
235 
236  self.runner = None
237  self._stopTask(runner)
238  return StartTaskResponse("Exception while trying to launch the %s app: \"%s\""%(req.taskid, e))
239 
240  runner.task.status = "running"
241 # self.app_update.publish(runner.task)
242  self._send_status()
243 
244  return StartTaskResponse("done")
245 
246  def showstate(self):
247  print "_" * 40
248  for provides, runner in self._taskGroups.items():
249  print provides, runner.childGroups
250 
251  def _stopTask(self, runner):
252  runner.task.status = "stopping"
253 # self.app_update.publish(runner.task)
254  self._send_status()
255 
256  for cgroup in runner.childGroups[:]:
257  self._stopTask(cgroup)
258 
259  runner.stop()
260 
261  runner.task.status = "stopped"
262  self._send_status()
263 
264  if runner.app.depends:
265  # Check for tasks that we need to shut down as a consequence
266  # of stopping this task.
267  uncovered, pgroup = self._find_providers(runner.app.depends)
268  for p in pgroup:
269  if runner in p.childGroups:
270  p.childGroups.remove(runner)
271 
272  if runner.app.provides:
273  print "removing", runner.app.provides
274  del self._taskGroups[runner.app.provides]
275 
276  if runner.taskid in self._apps:
277  del self._apps[runner.taskid]
278  print "removing", runner.taskid
279 
280 
281  def stop_task(self, req):
282  if req.taskid not in self._apps:
283  return StopTaskResponse("no such task: %s" % req.taskid)
284 
285  runner = self._apps[req.taskid]
286 
287  self._stopTask(runner)
288 
289  self._send_status()
290 
291  return StopTaskResponse("done")
292 
293  def status_update(self, req):
294  self._send_status()
295 # for provides, runner in self._taskGroups.items():
296 # self.app_update.publish(runner.task)
297  return StatusUpdateResponse("done")
298 
299 
300  def peer_subscribe(self, topic_name, topic_publish, peer_publish):
301  pass
302 
303  def peer_unsubscribe(self, topic_name, numPeers):
304  pass
305 
306 
307 def server(kill_on_conflict):
308  rospy.init_node(NAME)
309 
310  tm = TaskManager(kill_on_conflict)
311  s1 = rospy.Service('start_task', StartTask, tm.start_task)
312  s2 = rospy.Service('stop_task', StopTask, tm.stop_task)
313  s3 = rospy.Service('status_update', StatusUpdate, tm.status_update)
314  s4 = rospy.Service('list_tasks', ListTasks, tm.list_tasks)
315 
316  # spin() keeps Python from exiting until node is shutdown
317  rospy.spin()
318 
319 if __name__ == "__main__":
320  parser = OptionParser(usage="usage: %prog [options]", prog='launchman.py')
321  parser.add_option("-k", "--kill-on-conflict", dest="kill",
322  default=False, action="store_true",
323  help="automatically kill conflicting tasks when starting a new task")
324  options, args = parser.parse_args()
325  #initialize roslaunch in main thread
326  roslaunch.pmon._init_signal_handlers()
327  server(options.kill)
def start_task(self, req)
Definition: launchman.py:181
def process_died(self, process_name, exit_code)
Definition: launchman.py:100
def __del__(self)
Definition: launchman.py:74
def _send_status(self)
Definition: launchman.py:152
def peer_unsubscribe(self, topic_name, numPeers)
Definition: launchman.py:303
def peer_subscribe(self, topic_name, topic_publish, peer_publish)
Definition: launchman.py:300
def status_update(self, req)
Definition: launchman.py:293
def server(kill_on_conflict)
Definition: launchman.py:307
def __init__(self, manager, apprunner)
Definition: launchman.py:61
def _stopTask(self, runner)
Definition: launchman.py:251
def stop_task(self, req)
Definition: launchman.py:281
def __init__(self, taskid, manager)
Definition: launchman.py:66
def stop(self)
Definition: launchman.py:93
def list_tasks(self, req)
Definition: launchman.py:175
def __init__(self, kill_on_conflict=False)
Definition: launchman.py:145
def launch(self)
Definition: launchman.py:78
def _find_providers(self, deps)
Definition: launchman.py:163
def __repr__(self)
Definition: launchman.py:139


launchman
Author(s): Scott Noob Hassan
autogenerated on Mon Jun 10 2019 15:51:09