test_context.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import os
3 import re
4 import rospy
5 import rospkg
6 import roslaunch
7 
8 from .logger import Logger
9 
10 
11 class Callback(roslaunch.pmon.ProcessListener):
12  def __init__(self, callback):
13  self._callback = callback
14 
15  def process_died(self, process_name, exit_code):
16  rospy.loginfo("Process {} exited with {}".format(process_name, exit_code))
17  self._callback(process_name, exit_code)
18 
19 
20 class TestContext(object):
21  """
22  Default context for a test case.
23  Use as a 'with' statement and run 'verify' to check whether the context is valid.
24  """
25 
26  def __init__(self):
27  pass
28 
29  def __enter__(self):
30  pass
31 
32  def verify(self):
33  return True
34 
35  def spin_once(self):
36  pass
37 
38  def __exit__(self, exception_type, exception_value, traceback):
39  pass
40 
41  def wait_for_finishing(self):
42  pass
43 
44  @property
45  def success(self):
46  return True
47 
48 
50  """ Test context that runs a specified launch file configuration. """
51 
52  def __init__(self, launch_config, wait_cond="True"):
53  self._run_id = rospy.get_param('/run_id')
54  launchpath = None
55  launchcontent = None
56 
58  self._exit_codes = {}
59 
60  # load from system path
61  if launch_config.startswith('~') or launch_config.startswith('/'):
62  launchpath = os.path.expanduser(launch_config)
63  # load from package path
64  elif re.match(r'.+\.launch$', launch_config):
65  rp = rospkg.RosPack()
66  pkgpath = rp.get_path(launch_config.split('/')[0])
67  launchpath = os.path.join(pkgpath, '/'.join(launch_config.split('/')[1:]))
68  # load from config definition
69  else:
70  launchcontent = launch_config
71 
72  launchconfig = roslaunch.config.ROSLaunchConfig()
73  loader = roslaunch.xmlloader.XmlLoader()
74  if launchpath is not None:
75  loader.load(launchpath, launchconfig, verbose=False)
76  else:
77  loader.load_string(launchcontent, launchconfig, verbose=False)
78  self._launchrunner = roslaunch.launch.ROSLaunchRunner(self._run_id, launchconfig)
79 
80  def store(process_name, exit_code):
81  self._exit_codes[process_name] = exit_code
82  self._launchrunner.add_process_listener(Callback(store))
83  self._wait_cond = wait_cond
84  self._valid = True
85 
86  def __enter__(self):
87  self._launchrunner.launch()
89  Logger.print_positive('launchfile running')
90  self._valid = True
91 
92  self._launched_proc_names = [p.name for p in self._launchrunner.pm.procs]
93 
94  try:
95  check_running_rate = rospy.Rate(10)
96  is_running = False
97  while not is_running:
98  is_running = eval(self._wait_cond)
99  check_running_rate.sleep()
100  Logger.print_positive('waiting condition satisfied')
101  except Exception as e:
102  self._valid = False
103  Logger.print_negative('unable to check waiting condition:\n\t%s' % str(e))
104 
105  def verify(self):
106  return self._valid
107 
108  def spin_once(self):
109  self._launchrunner.spin_once()
110 
112  check_exited_rate = rospy.Rate(10)
113  rospy.loginfo("Waiting for all launched nodes to exit")
114  while not all(name in self._exit_codes for name in self._launched_proc_names):
115  check_exited_rate.sleep()
116 
117  def __exit__(self, exception_type, exception_value, traceback):
118  self._launchrunner.stop()
119  Logger.print_positive('launchfile stopped')
120 
121  @property
122  def success(self):
123  return not any(code > 0 for code in self._exit_codes.values())
flexbe_testing.test_context.TestContext.success
def success(self)
Definition: test_context.py:45
flexbe_testing.test_context.LaunchContext._launched_proc_names
_launched_proc_names
Definition: test_context.py:57
flexbe_testing.test_context.Callback._callback
_callback
Definition: test_context.py:13
flexbe_testing.test_context.LaunchContext
Definition: test_context.py:49
flexbe_testing.test_context.LaunchContext.__init__
def __init__(self, launch_config, wait_cond="True")
Definition: test_context.py:52
flexbe_testing.test_context.LaunchContext._exit_codes
_exit_codes
Definition: test_context.py:58
flexbe_testing.test_context.LaunchContext._launchrunner
_launchrunner
Definition: test_context.py:78
flexbe_testing.test_context.LaunchContext.wait_for_finishing
def wait_for_finishing(self)
Definition: test_context.py:111
flexbe_testing.test_context.LaunchContext.verify
def verify(self)
Definition: test_context.py:105
flexbe_testing.test_context.LaunchContext.spin_once
def spin_once(self)
Definition: test_context.py:108
flexbe_testing.test_context.LaunchContext.__enter__
def __enter__(self)
Definition: test_context.py:86
flexbe_testing.test_context.LaunchContext._wait_cond
_wait_cond
Definition: test_context.py:83
flexbe_testing.test_context.Callback
Definition: test_context.py:11
flexbe_testing.test_context.Callback.__init__
def __init__(self, callback)
Definition: test_context.py:12
flexbe_testing.test_context.TestContext.__enter__
def __enter__(self)
Definition: test_context.py:29
flexbe_testing.test_context.LaunchContext._run_id
_run_id
Definition: test_context.py:53
flexbe_testing.test_context.TestContext.spin_once
def spin_once(self)
Definition: test_context.py:35
flexbe_testing.test_context.Callback.process_died
def process_died(self, process_name, exit_code)
Definition: test_context.py:15
flexbe_testing.test_context.TestContext
Definition: test_context.py:20
flexbe_testing.test_context.LaunchContext.__exit__
def __exit__(self, exception_type, exception_value, traceback)
Definition: test_context.py:117
flexbe_testing.test_context.TestContext.__exit__
def __exit__(self, exception_type, exception_value, traceback)
Definition: test_context.py:38
flexbe_testing.test_context.TestContext.wait_for_finishing
def wait_for_finishing(self)
Definition: test_context.py:41
flexbe_testing.test_context.LaunchContext._valid
_valid
Definition: test_context.py:84
flexbe_testing.test_context.TestContext.verify
def verify(self)
Definition: test_context.py:32
flexbe_testing.test_context.LaunchContext.success
def success(self)
Definition: test_context.py:122
flexbe_testing.test_context.TestContext.__init__
def __init__(self)
Definition: test_context.py:26


flexbe_testing
Author(s): Philipp Schillinger
autogenerated on Fri Jul 21 2023 02:26:14