test_plugin.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2011, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 PKG = 'app_manager'
35 
36 import os
37 import sys
38 import time
39 import unittest
40 
41 import rospy
42 import rostest
43 import rosunit
44 
45 from app_manager.msg import KeyValue
46 from app_manager.srv import *
47 from std_msgs.msg import String
48 
49 class StopAppTest(unittest.TestCase):
50 
51  def __init__(self, *args):
52  super(StopAppTest, self).__init__(*args)
53  rospy.init_node('stop_app_test')
54 
55  def cb(self, msg):
56  rospy.logwarn("{} received".format(msg))
57  self.msg = msg
58  message = eval(msg.data)
59  if ('start_plugin' in message
60  and message['start_plugin']['fuga'] == 300
61  and message['start_plugin']['hoge'] == 100):
62  self.msg_started = True
63  if ('stop_plugin' in message
64  and 'exit_code' in message
65  and 'stopped' in message
66  and 'timeout' in message
67  and message['stop_plugin']['fuga'] == 3000
68  and message['stop_plugin']['hoge'] == 1000):
69  self.msg_ctx_exit_code = message['exit_code']
70  self.msg_ctx_stopped = message['stopped']
71  self.msg_ctx_timeout = message['timeout']
72  self.msg_stopped = True
73  if ('param1' in message
74  and 'param2' in message
75  and message['param1'] == 'hello'
76  and message['param2'] == 'world'):
77  self.msg_plugin_started = True
78  if ('param1' in message
79  and 'param2' in message
80  and message['param1'] == 'param1'
81  and message['param2'] == 'param2'):
82  self.msg_app_started = True
83  self.msg_received = self.msg_received + 1
84 
85  def setUp(self):
86  self.msg = None
87  self.msg_received = 0
88  self.msg_started = False
89  self.msg_stopped = False
90  self.msg_plugin_started = False
91  self.msg_app_started = False
92  self.msg_ctx_exit_code = None
93  self.msg_ctx_stopped = None
94  self.msg_ctx_timeout = None
95  rospy.Subscriber('/test_plugin', String, self.cb)
96  rospy.wait_for_service('/robot/list_apps')
97  rospy.wait_for_service('/robot/start_app')
98  rospy.wait_for_service('/robot/stop_app')
99  self.list = rospy.ServiceProxy('/robot/list_apps', ListApps)
100  self.start = rospy.ServiceProxy('/robot/start_app', StartApp)
101  self.stop = rospy.ServiceProxy('/robot/stop_app', StopApp)
102 
104  # wait for plugins
105  list_req = ListAppsRequest()
106  list_res = ListAppsResponse()
107  while not 'app_manager/test_plugin' in list(map(lambda x: x.name, list_res.available_apps)):
108  list_res = self.list.call(list_req)
109  # rospy.logwarn("received 'list_apps' {}".format(list_res))
110  time.sleep(1)
111  # start plugin
112  start_req = StartAppRequest(name='app_manager/test_plugin')
113  start_res = self.start.call(start_req)
114  rospy.logwarn('start app {}'.format(start_res))
115  self.assertEqual(start_res.error_code, 0)
116  while (not rospy.is_shutdown()
117  and not self.msg_started):
118  rospy.logwarn('Wait for start message received..')
119  rospy.sleep(1)
120 
121  # check app and plugin both started
122  while (not rospy.is_shutdown()
123  and not self.msg_app_started
124  and not self.msg_plugin_started):
125  rospy.logwarn('Wait for app/plugin message received..')
126  rospy.sleep(1)
127 
128  # stop plugin
129  stop_req = StopAppRequest(name='app_manager/test_plugin')
130  stop_res = self.stop.call(stop_req)
131  rospy.logwarn('stop app {}'.format(stop_res))
132  self.assertEqual(stop_res.error_code, 0)
133 
134  while (not rospy.is_shutdown()
135  and not self.msg_stopped):
136  rospy.logwarn('Wait for stop message received..')
137  rospy.sleep(1)
138 
139  self.assertEqual(self.msg_ctx_exit_code, None)
140  self.assertEqual(self.msg_ctx_stopped, True)
141  self.assertEqual(self.msg_ctx_timeout, None)
142 
144  # wait for plugins
145  list_req = ListAppsRequest()
146  list_res = ListAppsResponse()
147  while not 'app_manager/test_plugin' in list(map(lambda x: x.name, list_res.available_apps)):
148  list_res = self.list.call(list_req)
149  # rospy.logwarn("received 'list_apps' {}".format(list_res))
150  time.sleep(1)
151  # start plugin
152  start_req = StartAppRequest(name='app_manager/test_plugin_timeout')
153  start_res = self.start.call(start_req)
154  rospy.logwarn('start app {}'.format(start_res))
155  self.assertEqual(start_res.error_code, 0)
156  while (not rospy.is_shutdown()
157  and not self.msg_started):
158  rospy.logwarn('Wait for start message received..')
159  rospy.sleep(1)
160 
161  # check app and plugin both started
162  while (not rospy.is_shutdown()
163  and not self.msg_app_started
164  and not self.msg_plugin_started):
165  rospy.logwarn('Wait for app/plugin message received..')
166  rospy.sleep(1)
167 
168  while (not rospy.is_shutdown()
169  and not self.msg_stopped):
170  rospy.logwarn('Wait for stop message received..')
171  rospy.sleep(1)
172 
173  self.assertEqual(self.msg_ctx_exit_code, None)
174  self.assertEqual(self.msg_ctx_stopped, True)
175  self.assertEqual(self.msg_ctx_timeout, True)
176 
178  # wait for plugins
179  list_req = ListAppsRequest()
180  list_res = ListAppsResponse()
181  while not 'app_manager/test_plugin' in list(map(lambda x: x.name, list_res.available_apps)):
182  list_res = self.list.call(list_req)
183  # rospy.logwarn("received 'list_apps' {}".format(list_res))
184  time.sleep(1)
185  # start plugin
186  start_req = StartAppRequest(
187  name='app_manager/test_plugin',
188  args=[KeyValue(key='fail', value='true')])
189  start_res = self.start.call(start_req)
190  rospy.logwarn('start app {}'.format(start_res))
191  self.assertEqual(start_res.error_code, 0)
192  while (not rospy.is_shutdown()
193  and not self.msg_started):
194  rospy.logwarn('Wait for start message received..')
195  rospy.sleep(1)
196 
197  # check app and plugin both started
198  while (not rospy.is_shutdown()
199  and not self.msg_app_started
200  and not self.msg_plugin_started):
201  rospy.logwarn('Wait for app/plugin message received..')
202  rospy.sleep(1)
203 
204  while (not rospy.is_shutdown()
205  and not self.msg_stopped):
206  rospy.logwarn('Wait for stop message received..')
207  rospy.sleep(1)
208 
209  self.assertEqual(self.msg_ctx_exit_code, 1)
210  self.assertEqual(self.msg_ctx_stopped, None)
211  self.assertEqual(self.msg_ctx_timeout, None)
212 
214  # wait for plugins
215  list_req = ListAppsRequest()
216  list_res = ListAppsResponse()
217  while not 'app_manager/test_plugin' in list(map(lambda x: x.name, list_res.available_apps)):
218  list_res = self.list.call(list_req)
219  # rospy.logwarn("received 'list_apps' {}".format(list_res))
220  time.sleep(1)
221  # start plugin
222  start_req = StartAppRequest(
223  name='app_manager/test_plugin',
224  args=[KeyValue(key='success', value='true')])
225  start_res = self.start.call(start_req)
226  rospy.logwarn('start app {}'.format(start_res))
227  self.assertEqual(start_res.error_code, 0)
228  while (not rospy.is_shutdown()
229  and not self.msg_started):
230  rospy.logwarn('Wait for start message received..')
231  rospy.sleep(1)
232 
233  # check app and plugin both started
234  while (not rospy.is_shutdown()
235  and not self.msg_app_started
236  and not self.msg_plugin_started):
237  rospy.logwarn('Wait for app/plugin message received..')
238  rospy.sleep(1)
239 
240  while (not rospy.is_shutdown()
241  and not self.msg_stopped):
242  rospy.logwarn('Wait for stop message received..')
243  rospy.sleep(1)
244 
245  self.assertEqual(self.msg_ctx_exit_code, 0)
246  self.assertEqual(self.msg_ctx_stopped, None)
247  self.assertEqual(self.msg_ctx_timeout, None)
248 
249 
250 if __name__ == '__main__':
251  try:
252  rostest.run('stop_app_test', PKG, StopAppTest, sys.argv)
253  except KeyboardInterrupt:
254  pass
255  print("{} exiting".format(PKG))
def test_start_stop_app_timeout(self)
Definition: test_plugin.py:143
def test_start_stop_app(self)
Definition: test_plugin.py:103
def test_start_stop_app_fail(self)
Definition: test_plugin.py:177
def cb(self, msg)
Definition: test_plugin.py:55
def test_start_stop_app_success(self)
Definition: test_plugin.py:213
def __init__(self, args)
Definition: test_plugin.py:51


app_manager
Author(s): Jeremy Leibs, Ken Conley, Yuki Furuta
autogenerated on Thu Oct 13 2022 02:59:17