47 from std_msgs.msg
import String
52 super(StopAppTest, self).
__init__(*args)
53 rospy.init_node(
'stop_app_test')
56 rospy.logwarn(
"{} received".format(msg))
58 message = eval(msg.data)
59 if (
'start_plugin' in message
60 and message[
'start_plugin'][
'fuga'] == 300
61 and message[
'start_plugin'][
'hoge'] == 100):
63 if (
'stop_plugin' in message
64 and 'exit_code' in message
65 and 'stopped' in message
66 and 'timeout' in message
67 and message[
'stop_plugin'][
'fuga'] == 3000
68 and message[
'stop_plugin'][
'hoge'] == 1000):
73 if (
'param1' in message
74 and 'param2' in message
75 and message[
'param1'] ==
'hello' 76 and message[
'param2'] ==
'world'):
78 if (
'param1' in message
79 and 'param2' in message
80 and message[
'param1'] ==
'param1' 81 and message[
'param2'] ==
'param2'):
95 rospy.Subscriber(
'/test_plugin', String, self.
cb)
96 rospy.wait_for_service(
'/robot/list_apps')
97 rospy.wait_for_service(
'/robot/start_app')
98 rospy.wait_for_service(
'/robot/stop_app')
99 self.
list = rospy.ServiceProxy(
'/robot/list_apps', ListApps)
100 self.
start = rospy.ServiceProxy(
'/robot/start_app', StartApp)
101 self.
stop = rospy.ServiceProxy(
'/robot/stop_app', StopApp)
105 list_req = ListAppsRequest()
106 list_res = ListAppsResponse()
107 while not 'app_manager/test_plugin' in list(map(
lambda x: x.name, list_res.available_apps)):
108 list_res = self.
list.call(list_req)
112 start_req = StartAppRequest(name=
'app_manager/test_plugin')
113 start_res = self.
start.call(start_req)
114 rospy.logwarn(
'start app {}'.format(start_res))
115 self.assertEqual(start_res.error_code, 0)
116 while (
not rospy.is_shutdown()
118 rospy.logwarn(
'Wait for start message received..')
122 while (
not rospy.is_shutdown()
125 rospy.logwarn(
'Wait for app/plugin message received..')
129 stop_req = StopAppRequest(name=
'app_manager/test_plugin')
130 stop_res = self.
stop.call(stop_req)
131 rospy.logwarn(
'stop app {}'.format(stop_res))
132 self.assertEqual(stop_res.error_code, 0)
134 while (
not rospy.is_shutdown()
136 rospy.logwarn(
'Wait for stop message received..')
145 list_req = ListAppsRequest()
146 list_res = ListAppsResponse()
147 while not 'app_manager/test_plugin' in list(map(
lambda x: x.name, list_res.available_apps)):
148 list_res = self.
list.call(list_req)
152 start_req = StartAppRequest(name=
'app_manager/test_plugin_timeout')
153 start_res = self.
start.call(start_req)
154 rospy.logwarn(
'start app {}'.format(start_res))
155 self.assertEqual(start_res.error_code, 0)
156 while (
not rospy.is_shutdown()
158 rospy.logwarn(
'Wait for start message received..')
162 while (
not rospy.is_shutdown()
165 rospy.logwarn(
'Wait for app/plugin message received..')
168 while (
not rospy.is_shutdown()
170 rospy.logwarn(
'Wait for stop message received..')
179 list_req = ListAppsRequest()
180 list_res = ListAppsResponse()
181 while not 'app_manager/test_plugin' in list(map(
lambda x: x.name, list_res.available_apps)):
182 list_res = self.
list.call(list_req)
186 start_req = StartAppRequest(
187 name=
'app_manager/test_plugin',
188 args=[KeyValue(key=
'fail', value=
'true')])
189 start_res = self.
start.call(start_req)
190 rospy.logwarn(
'start app {}'.format(start_res))
191 self.assertEqual(start_res.error_code, 0)
192 while (
not rospy.is_shutdown()
194 rospy.logwarn(
'Wait for start message received..')
198 while (
not rospy.is_shutdown()
201 rospy.logwarn(
'Wait for app/plugin message received..')
204 while (
not rospy.is_shutdown()
206 rospy.logwarn(
'Wait for stop message received..')
215 list_req = ListAppsRequest()
216 list_res = ListAppsResponse()
217 while not 'app_manager/test_plugin' in list(map(
lambda x: x.name, list_res.available_apps)):
218 list_res = self.
list.call(list_req)
222 start_req = StartAppRequest(
223 name=
'app_manager/test_plugin',
224 args=[KeyValue(key=
'success', value=
'true')])
225 start_res = self.
start.call(start_req)
226 rospy.logwarn(
'start app {}'.format(start_res))
227 self.assertEqual(start_res.error_code, 0)
228 while (
not rospy.is_shutdown()
230 rospy.logwarn(
'Wait for start message received..')
234 while (
not rospy.is_shutdown()
237 rospy.logwarn(
'Wait for app/plugin message received..')
240 while (
not rospy.is_shutdown()
242 rospy.logwarn(
'Wait for stop message received..')
250 if __name__ ==
'__main__':
252 rostest.run(
'stop_app_test', PKG, StopAppTest, sys.argv)
253 except KeyboardInterrupt:
255 print(
"{} exiting".format(PKG))
def test_start_stop_app_timeout(self)
def test_start_stop_app(self)
def test_start_stop_app_fail(self)
def test_start_stop_app_success(self)