test_context.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import os
3 import re
4 import rospy
5 import rospkg
6 import roslaunch
7 
8 from .logger import Logger
9 
10 
11 class Callback(roslaunch.pmon.ProcessListener):
12  def __init__(self, callback):
13  self._callback = callback
14 
15  def process_died(self, process_name, exit_code):
16  rospy.loginfo("Process {} exited with {}".format(process_name, exit_code))
17  self._callback(process_name, exit_code)
18 
19 
20 class TestContext(object):
21  """
22  Default context for a test case.
23  Use as a 'with' statement and run 'verify' to check whether the context is valid.
24  """
25 
26  def __init__(self):
27  pass
28 
29  def __enter__(self):
30  pass
31 
32  def verify(self):
33  return True
34 
35  def spin_once(self):
36  pass
37 
38  def __exit__(self, exception_type, exception_value, traceback):
39  pass
40 
41  def wait_for_finishing(self):
42  pass
43 
44  @property
45  def success(self):
46  return True
47 
48 
50  """ Test context that runs a specified launch file configuration. """
51 
52  def __init__(self, launch_config, wait_cond="True"):
53  self._run_id = rospy.get_param('/run_id')
54  launchpath = None
55  launchcontent = None
56 
58  self._exit_codes = {}
59 
60  # load from system path
61  if launch_config.startswith('~') or launch_config.startswith('/'):
62  launchpath = os.path.expanduser(launch_config)
63  # load from package path
64  elif re.match(r'.+\.launch$', launch_config):
65  rp = rospkg.RosPack()
66  pkgpath = rp.get_path(launch_config.split('/')[0])
67  launchpath = os.path.join(pkgpath, '/'.join(launch_config.split('/')[1:]))
68  # load from config definition
69  else:
70  launchcontent = launch_config
71 
72  launchconfig = roslaunch.config.ROSLaunchConfig()
73  loader = roslaunch.xmlloader.XmlLoader()
74  if launchpath is not None:
75  loader.load(launchpath, launchconfig, verbose=False)
76  else:
77  loader.load_string(launchcontent, launchconfig, verbose=False)
78  self._launchrunner = roslaunch.launch.ROSLaunchRunner(self._run_id, launchconfig)
79 
80  def store(process_name, exit_code):
81  self._exit_codes[process_name] = exit_code
82  self._launchrunner.add_process_listener(Callback(store))
83  self._wait_cond = wait_cond
84  self._valid = True
85 
86  def __enter__(self):
87  self._launchrunner.launch()
88  self._launchrunner.spin_once()
89  Logger.print_positive('launchfile running')
90  self._valid = True
91 
92  self._launched_proc_names = [p.name for p in self._launchrunner.pm.procs]
93 
94  try:
95  check_running_rate = rospy.Rate(10)
96  is_running = False
97  while not is_running:
98  is_running = eval(self._wait_cond)
99  check_running_rate.sleep()
100  Logger.print_positive('waiting condition satisfied')
101  except Exception as e:
102  self._valid = False
103  Logger.print_negative('unable to check waiting condition:\n\t%s' % str(e))
104 
105  def verify(self):
106  return self._valid
107 
108  def spin_once(self):
109  self._launchrunner.spin_once()
110 
112  check_exited_rate = rospy.Rate(10)
113  rospy.loginfo("Waiting for all launched nodes to exit")
114  while not all(name in self._exit_codes for name in self._launched_proc_names):
115  check_exited_rate.sleep()
116 
117  def __exit__(self, exception_type, exception_value, traceback):
118  self._launchrunner.stop()
119  Logger.print_positive('launchfile stopped')
120 
121  @property
122  def success(self):
123  return not any(code > 0 for code in self._exit_codes.values())
def __init__(self, launch_config, wait_cond="True")
Definition: test_context.py:52
def __exit__(self, exception_type, exception_value, traceback)
def __exit__(self, exception_type, exception_value, traceback)
Definition: test_context.py:38
def process_died(self, process_name, exit_code)
Definition: test_context.py:15


flexbe_testing
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:44