48 from std_msgs.msg
import String
53 super(StartFailTest, self).
__init__(*args)
54 rospy.init_node(
'start_fail_test')
57 rospy.logwarn(
"{} received".format(msg))
62 rospy.wait_for_service(
'/robot/list_apps')
63 rospy.wait_for_service(
'/robot/stop_app')
64 rospy.wait_for_service(
'/robot/start_app')
65 self.
list = rospy.ServiceProxy(
'/robot/list_apps', ListApps)
66 self.
stop = rospy.ServiceProxy(
'/robot/stop_app', StopApp)
67 self.
start = rospy.ServiceProxy(
'/robot/start_app', StartApp)
70 rospy.Subscriber(
'/chatter', String, self.
cb)
74 rospy.logwarn(
"Wait for application")
75 list_req = ListAppsRequest()
76 list_res = ListAppsResponse()
77 while not 'app_manager/appA' in list(map(
lambda x: x.name, list_res.available_apps)):
78 list_res = self.
list.call(list_req)
83 rospy.logwarn(
"Start application with wrong arguments")
84 start_req = StartAppRequest(name=
'app_manager/appA', args=[KeyValue(
'launch_prefix',
'no_command')])
85 start_res = self.
start.call(start_req)
86 rospy.logwarn(
"start 'started_app' {}".format(start_res))
87 self.assertEqual(start_res.started,
False)
90 rospy.logwarn(
"Check running application")
91 list_req = ListAppsRequest()
92 list_res = ListAppsResponse(running_apps=[App(name=
'app_manager/appA')])
93 while 'app_manager/appA' in list(map(
lambda x: x.name, list_res.running_apps)):
94 list_res = self.
list.call(list_req)
100 rospy.logwarn(
"Start application")
101 start_req = StartAppRequest(name=
'app_manager/appA', args=[KeyValue(
'launch_prefix',
'python{}'.format(os.environ[
'ROS_PYTHON_VERSION']))])
102 start_res = self.
start.call(start_req)
103 rospy.logwarn(
"received 'started_app' {}".format(start_res))
104 self.assertEqual(start_res.started,
True)
107 while (
not rospy.is_shutdown())
and self.
msg ==
None:
108 rospy.logwarn(
'Wait for /chatter message received..')
112 stop_req = StopAppRequest(name=
'app_manager/appA')
113 stop_res = self.
stop.call(stop_req)
114 rospy.logwarn(
'stop app {}'.format(stop_res))
115 self.assertEqual(stop_res.stopped,
True)
117 if __name__ ==
'__main__':
119 rostest.run(
'start_fail_test', PKG, StartFailTest, sys.argv)
120 except KeyboardInterrupt:
122 print(
"{} exiting".format(PKG))
def test_start_fail_app(self)