$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 ## Integration test for empty services to test serializers 00037 ## and transport 00038 00039 PKG = 'test_rospy' 00040 import roslib; roslib.load_manifest(PKG) 00041 00042 import sys 00043 import time 00044 import unittest 00045 00046 import rospy 00047 import rostest 00048 from std_msgs.msg import String 00049 from test_rospy.srv import EmptySrv 00050 00051 PUBTOPIC = 'test_unpublish_chatter' 00052 SUBTOPIC = 'test_unsubscribe_chatter' 00053 SERVICE = 'test_unregister_service' 00054 00055 TIMEOUT = 10.0 #seconds 00056 00057 _last_callback = None 00058 def callback(data): 00059 global _last_callback 00060 print "message received", data.data 00061 _last_callback = data 00062 00063 import xmlrpclib 00064 00065 class TestDeregister(unittest.TestCase): 00066 00067 def test_unpublish(self): 00068 node_proxy = xmlrpclib.ServerProxy(rospy.get_node_uri()) 00069 00070 _, _, pubs = node_proxy.getPublications('/foo') 00071 pubs = [p for p in pubs if p[0] != '/rosout'] 00072 self.assert_(not pubs, pubs) 00073 00074 print "Publishing ", PUBTOPIC 00075 pub = rospy.Publisher(PUBTOPIC, String) 00076 topic = rospy.resolve_name(PUBTOPIC) 00077 _, _, pubs = node_proxy.getPublications('/foo') 00078 pubs = [p for p in pubs if p[0] != '/rosout'] 00079 self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs) 00080 00081 # publish about 10 messages for fun 00082 for i in xrange(0, 10): 00083 pub.publish(String("hi [%s]"%i)) 00084 time.sleep(0.1) 00085 00086 # begin actual test by unsubscribing 00087 pub.unregister() 00088 00089 # make sure no new messages are received in the next 2 seconds 00090 timeout_t = time.time() + 2.0 00091 while timeout_t < time.time(): 00092 time.sleep(1.0) 00093 self.assert_(_last_callback is None) 00094 00095 # verify that close cleaned up master and node state 00096 _, _, pubs = node_proxy.getPublications('/foo') 00097 pubs = [p for p in pubs if p[0] != '/rosout'] 00098 self.assert_(not pubs, "Node still has pubs: %s"%pubs) 00099 n = rospy.get_caller_id() 00100 self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master") 00101 00102 00103 def test_unsubscribe(self): 00104 global _last_callback 00105 00106 uri = rospy.get_node_uri() 00107 node_proxy = xmlrpclib.ServerProxy(uri) 00108 _, _, subscriptions = node_proxy.getSubscriptions('/foo') 00109 self.assert_(not subscriptions, 'subscriptions present: %s'%str(subscriptions)) 00110 00111 print "Subscribing to ", SUBTOPIC 00112 sub = rospy.Subscriber(SUBTOPIC, String, callback) 00113 topic = rospy.resolve_name(SUBTOPIC) 00114 _, _, subscriptions = node_proxy.getSubscriptions('/foo') 00115 self.assertEquals([[topic, String._type]], subscriptions, "Subscriptions were %s"%subscriptions) 00116 00117 # wait for the first message to be received 00118 timeout_t = time.time() + TIMEOUT 00119 while _last_callback is None and time.time() < timeout_t: 00120 time.sleep(0.1) 00121 self.assert_(_last_callback is not None, "No messages received from talker") 00122 00123 # begin actual test by unsubscribing 00124 sub.unregister() 00125 00126 # clear last callback data, i.e. last message received 00127 _last_callback = None 00128 timeout_t = time.time() + 2.0 00129 00130 # make sure no new messages are received in the next 2 seconds 00131 while timeout_t < time.time(): 00132 time.sleep(1.0) 00133 self.assert_(_last_callback is None) 00134 00135 # verify that close cleaned up master and node state 00136 _, _, subscriptions = node_proxy.getSubscriptions('/foo') 00137 00138 self.assert_(not subscriptions, "Node still has subscriptions: %s"%subscriptions) 00139 n = rospy.get_caller_id() 00140 self.assert_(not rostest.is_subscriber(topic, n), "subscription is still active on master") 00141 00142 def test_unservice(self): 00143 import roslib.scriptutil 00144 master = roslib.scriptutil.get_master() 00145 00146 code, msg, state = master.getSystemState('/test_dereg') 00147 self.assertEquals(1, code, msg) 00148 _, _, srv = state 00149 # filter out rosout services 00150 #[['/rosout/set_logger_level', ['/rosout']], ['/rosout/get_loggers', ['/rosout']]] 00151 srv = [s for s in srv if not s[0].startswith('/rosout/')] 00152 self.failIf(srv, srv) 00153 00154 print "Creating service ", SERVICE 00155 service = rospy.Service(SERVICE, EmptySrv, callback) 00156 # we currently cannot interrogate a node's services, have to rely on master 00157 00158 # verify that master has service 00159 code, msg, state = master.getSystemState('/test_dereg') 00160 self.assertEquals(1, code, msg) 00161 _, _, srv = state 00162 srv = [s for s in srv if not s[0].startswith('/rosout/')] 00163 self.assertEquals(srv, [[rospy.resolve_name(SERVICE), [rospy.get_caller_id()]]]) 00164 00165 # begin actual test by unsubscribing 00166 service.shutdown() 00167 00168 time.sleep(1.0) # give API 1 second to sync with master 00169 00170 code, msg, state = master.getSystemState('/test_dereg') 00171 self.assertEquals(1, code, msg) 00172 _, _, srv = state 00173 srv = [s for s in srv if not s[0].startswith('/rosout/')] 00174 self.failIf(srv, srv) 00175 00176 00177 if __name__ == '__main__': 00178 rospy.init_node('test_dereg', disable_rostime=True) 00179 rostest.run(PKG, 'rospy_deregister', TestDeregister, sys.argv)