event_launcher.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 # vim:set ts=4 sw=4 et:
3 #
4 # Copyright 2015 Vladimir Ermakov.
5 #
6 # This file is part of the mavros package and subject to the license terms
7 # in the top-level LICENSE file of the mavros repository.
8 # https://github.com/mavlink/mavros/tree/master/LICENSE.md
9 
10 from __future__ import print_function
11 
12 import os
13 import sys
14 import time
15 import shlex
16 import rospy
17 import mavros
18 import signal
19 import argparse
20 import threading
21 import subprocess
22 
23 from std_srvs.srv import Trigger, TriggerRequest, TriggerResponse
24 from mavros_msgs.msg import State
25 
26 
27 class EventHandler(object):
28  """
29  Base class for event handlers
30  """
31 
32  __slots__ = [
33  'name', 'events', 'actions', 'lock',
34  ]
35 
36  def __init__(self, name, events=[], actions=[]):
37  self.name = name
38  self.events = events
39  self.actions = actions
40  self.lock = threading.RLock()
41 
42  def __str__(self):
43  return self.name
44 
45  def __call__(self, event):
46  if event not in self.events:
47  return # not our event
48 
49  idx = self.events.index(event)
50  action = self.actions[idx]
51 
52  if hasattr(self, 'action_' + action):
53  getattr(self, 'action_' + action)()
54  else:
55  rospy.logerr("Misconfiguration of %s, there no action '%s'",
56  self, action)
57 
58  def spin_once(self):
59  raise NotImplementedError
60 
61 
63  """
64  Simple program runner
65  """
66 
67  # XXX TODO:
68  # simple readlines() from Popen.stdout and .stderr simply don't work - it blocks
69  # so need to find a way for such logging, but for now i don't process process output.
70 
71  __slots__ = [
72  'process', 'command', 'args', 'logfile',
73  ]
74 
75  def __init__(self, name, command, args=[], logfile=None, events=[], actions=[]):
76  super(ShellHandler, self).__init__(name, events, actions)
77  self.process = None
78  self.command = command
79  self.args = args
80  self.logfile = logfile
81 
82  def action_run(self):
83  with self.lock:
84  if self.process:
85  rospy.loginfo("%s: process still active, terminating before run", str(self))
86  self.action_stop()
87 
88  rospy.loginfo("%s: starting...", self)
89 
90  args = [self.command] + self.args
91  if self.logfile:
92  child_stdout = open(self.logfile, 'a')
93  child_stderr = subprocess.STDOUT
94  else:
95  child_stdout = subprocess.PIPE
96  child_stderr = subprocess.PIPE
97 
98  if hasattr(child_stdout, 'write'):
99  child_stdout.write("\n--- run cut: %s ---\n" % time.ctime())
100 
101  self.process = subprocess.Popen(args, stdout=child_stdout, stderr=child_stderr,
102  close_fds=True, preexec_fn=os.setsid)
103 
104  poll_result = self.process.poll()
105  if poll_result is None or poll_result == 0:
106  rospy.loginfo("%s: started, pid: %s", self, self.process.pid)
107  else:
108  rospy.logerr("Failed to start '%s'", self)
109  self.process = None
110 
111  def action_stop(self):
112  with self.lock:
113  if not self.process:
114  return
115 
116  rospy.loginfo("%s: stoppig...", self)
117 
118  # code from roslaunch.nodeprocess _stop_unix
119  pid = self.process.pid
120  pgid = os.getpgid(pid)
121 
122  def poll_timeout(timeout):
123  timeout_t = time.time() + timeout
124  retcode = self.process.poll()
125  while time.time() < timeout_t and retcode is None:
126  time.sleep(0.1)
127  retcode = self.process.poll()
128  return retcode
129 
130  try:
131  rospy.loginfo("%s: sending SIGINT to pid [%s] pgid [%s]", self, pid, pgid)
132  os.killpg(pgid, signal.SIGINT)
133 
134  retcode = poll_timeout(15.0)
135  if retcode is None:
136  rospy.logwarn("%s: escalating to SIGTERM", self)
137  os.killpg(pgid, signal.SIGTERM)
138 
139  retcode = poll_timeout(2.0)
140  if retcode is None:
141  rospy.logerr("%s: escalating to SIGKILL, may still be running", self)
142  try:
143  os.killpg(pgid, signal.SIGKILL)
144  except OSError as ex:
145  rospy.logerr("%s: %s", self, ex)
146 
147  if retcode is not None:
148  rospy.loginfo("%s: process (pid %s) terminated, exit code: %s",
149  self, pid, retcode)
150  finally:
151  self.process = None
152 
153  def spin_once(self):
154  with self.lock:
155  if self.process:
156  poll_result = self.process.poll()
157  if poll_result is not None:
158  rospy.loginfo("%s: process (pid %s) terminated, exit code: %s",
159  self, self.process.pid, poll_result)
160  self.process = None
161 
162 
163 # NOTE: here was RosrunHandler and RoslaunchHandler
164 # but roslaunch.scriptapi can't launch node from worker thread
165 # so i decided to leave only ShellHandler.
166 
167 
168 class Launcher(object):
169  __slots__ = [
170  'handlers',
171  'known_events',
172  'triggers',
173  'prev_armed',
174  'state_sub',
175  ]
176 
177  def __init__(self):
178  self.handlers = []
179  self.known_events = ['armed', 'disarmed']
180  self.triggers = {}
181  self.prev_armed = False
182 
183  try:
184  params = rospy.get_param('~')
185  if not isinstance(params, dict):
186  raise KeyError("bad configuration")
187  except KeyError as e:
188  rospy.logerr('Config error: %s', e)
189  return
190 
191  # load configuration
192  for k, v in params.iteritems():
193  # TODO: add checks for mutually exclusive options
194 
195  if 'service' in v:
196  self._load_trigger(k, v)
197  elif 'shell' in v:
198  self._load_shell(k, v)
199  else:
200  rospy.logwarn("Skipping unknown section: %s", k)
201 
202  # check that events are known
203  rospy.loginfo("Known events: %s", ', '.join(self.known_events))
204  for h in self.handlers:
205  for evt in h.events:
206  if evt not in self.known_events:
207  rospy.logwarn("%s: unknown event: %s", h.name, evt)
208 
209  # config loaded, we may subscribe
210  self.state_sub = rospy.Subscriber(
211  mavros.get_topic('state'),
212  State,
213  self.mavros_state_cb)
214 
215  def _load_trigger(self, name, params):
216  rospy.logdebug("Loading trigger: %s", name)
217 
218  def gen_cb(event):
219  def cb(req):
220  self(event)
221  return TriggerResponse(success=True) # try later to check success
222  return cb
223 
224  self.known_events.append(name)
225  self.triggers[name] = rospy.Service(params['service'], Trigger, gen_cb(name))
226  rospy.loginfo("Trigger: %s (%s)", name, params['service'])
227 
228  def _load_shell(self, name, params):
229  rospy.logdebug("Loading shell: %s", name)
230 
231  events, actions = self._get_evt_act(params)
232 
233  def expandpath(p):
234  return os.path.expanduser(os.path.expandvars(p))
235 
236  args = params['shell']
237  if not isinstance(args, list):
238  args = shlex.split(args)
239 
240  command = expandpath(args[0])
241  args = args[1:]
242 
243  logfile = params.get('logfile')
244  if logfile:
245  logfile = expandpath(logfile)
246 
247  rospy.loginfo("Shell: %s (%s)", name, ' '.join([command] + [repr(v) for v in args]))
248  if logfile:
249  rospy.loginfo("Log: %s -> %s", name, logfile)
250 
251  handler = ShellHandler(name, command, args, logfile, events, actions)
252  self.handlers.append(handler)
253 
254  def _get_evt_act(self, params):
255  evt = self._param_to_list(params['event'])
256  act = self._param_to_list(params['action'])
257  if len(evt) != len(act):
258  raise ValueError("event and action fileds has different size!")
259  return evt, act
260 
261  def _param_to_list(self, str_or_list):
262  if isinstance(str_or_list, list):
263  return [it.strip() for it in str_or_list]
264  else:
265  ret = []
266  for it in str_or_list.split():
267  ret.extend([v.strip() for v in it.split(',') if v])
268  return ret
269 
270  def __call__(self, event):
271  rospy.logdebug('Event: %s', event)
272  for h in self.handlers:
273  try:
274  h(event)
275  except Exception as ex:
276  import traceback
277  rospy.logerr("Event %s -> %s exception: %s", event, h, ex)
278  rospy.logerr(traceback.format_exc())
279 
280  def spin(self):
281  if not self.handlers:
282  rospy.logwarn("No event handlers defined, terminating.")
283  return
284 
285  rate = rospy.Rate(1.0)
286  while not rospy.is_shutdown():
287  for h in self.handlers:
288  h.spin_once()
289 
290  rate.sleep()
291 
292  def mavros_state_cb(self, msg):
293  if msg.armed != self.prev_armed:
294  self.prev_armed = msg.armed
295  self('armed' if msg.armed else 'disarmed')
296 
297 
298 def main():
299  parser = argparse.ArgumentParser(
300  description="This node launch/terminate processes on events.")
301  parser.add_argument('-n', '--mavros-ns', help="ROS node namespace", default=mavros.DEFAULT_NAMESPACE)
302 
303  args = parser.parse_args(rospy.myargv(argv=sys.argv)[1:])
304 
305  rospy.init_node("event_launcher")
306  mavros.set_namespace(args.mavros_ns)
307 
308  rospy.loginfo("Starting event launcher...")
309 
310  launcher = Launcher()
311  launcher.spin()
def __init__(self, name, command, args=[], logfile=None, events=[], actions=[])
def _load_trigger(self, name, params)
def get_topic(args)
Definition: __init__.py:49
def _param_to_list(self, str_or_list)
ssize_t len
def _load_shell(self, name, params)
def __init__(self, name, events=[], actions=[])
def open(path, mode)
Definition: ftp.py:156
def set_namespace(ns=DEFAULT_NAMESPACE)
Definition: __init__.py:29


mavros
Author(s): Vladimir Ermakov
autogenerated on Tue Jun 1 2021 02:36:26