8 from .logger
import Logger
11 class Callback(roslaunch.pmon.ProcessListener):
16 rospy.loginfo(
"Process {} exited with {}".format(process_name, exit_code))
22 Default context for a test case. 23 Use as a 'with' statement and run 'verify' to check whether the context is valid. 38 def __exit__(self, exception_type, exception_value, traceback):
50 """ Test context that runs a specified launch file configuration. """ 52 def __init__(self, launch_config, wait_cond="True"):
61 if launch_config.startswith(
'~')
or launch_config.startswith(
'/'):
62 launchpath = os.path.expanduser(launch_config)
64 elif re.match(
r'.+\.launch$', launch_config):
66 pkgpath = rp.get_path(launch_config.split(
'/')[0])
67 launchpath = os.path.join(pkgpath,
'/'.join(launch_config.split(
'/')[1:]))
70 launchcontent = launch_config
72 launchconfig = roslaunch.config.ROSLaunchConfig()
73 loader = roslaunch.xmlloader.XmlLoader()
74 if launchpath
is not None:
75 loader.load(launchpath, launchconfig, verbose=
False)
77 loader.load_string(launchcontent, launchconfig, verbose=
False)
80 def store(process_name, exit_code):
82 self._launchrunner.add_process_listener(
Callback(store))
87 self._launchrunner.launch()
88 self._launchrunner.spin_once()
89 Logger.print_positive(
'launchfile running')
95 check_running_rate = rospy.Rate(10)
99 check_running_rate.sleep()
100 Logger.print_positive(
'waiting condition satisfied')
101 except Exception
as e:
103 Logger.print_negative(
'unable to check waiting condition:\n\t%s' % str(e))
109 self._launchrunner.spin_once()
112 check_exited_rate = rospy.Rate(10)
113 rospy.loginfo(
"Waiting for all launched nodes to exit")
115 check_exited_rate.sleep()
117 def __exit__(self, exception_type, exception_value, traceback):
118 self._launchrunner.stop()
119 Logger.print_positive(
'launchfile stopped')
123 return not any(code > 0
for code
in self._exit_codes.values())
def wait_for_finishing(self)
def __init__(self, callback)
def wait_for_finishing(self)
def __init__(self, launch_config, wait_cond="True")
def __exit__(self, exception_type, exception_value, traceback)
def __exit__(self, exception_type, exception_value, traceback)
def process_died(self, process_name, exit_code)