test_deregister.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 ## Integration test for empty services to test serializers
00035 ## and transport
00036 
00037 from __future__ import print_function
00038 
00039 PKG = 'test_rospy'
00040 
00041 import sys
00042 import time
00043 import unittest
00044 import gc
00045 import weakref
00046 
00047 import rospy
00048 import rostest
00049 from std_msgs.msg import String
00050 from test_rospy.srv import EmptySrv
00051 
00052 PUBTOPIC = 'test_unpublish_chatter'
00053 SUBTOPIC = 'test_unsubscribe_chatter'
00054 SERVICE = 'test_unregister_service'
00055 
00056 TIMEOUT = 10.0 #seconds
00057 
00058 _last_callback = None
00059 def callback(data):
00060     global _last_callback
00061     print("message received", data.data)
00062     _last_callback = data
00063 
00064 try:
00065     from xmlrpc.client import ServerProxy
00066 except ImportError:
00067     from xmlrpclib import ServerProxy
00068 
00069 class TestDeregister(unittest.TestCase):
00070         
00071     def test_unpublish(self):
00072         node_proxy = ServerProxy(rospy.get_node_uri())
00073         
00074         _, _, pubs = node_proxy.getPublications('/foo')
00075         pubs = [p for p in pubs if p[0] != '/rosout']
00076         self.assert_(not pubs, pubs)
00077         
00078         print("Publishing ", PUBTOPIC)
00079         pub = rospy.Publisher(PUBTOPIC, String, queue_size=1)
00080         impl = weakref.ref(pub.impl)
00081         topic = rospy.resolve_name(PUBTOPIC)
00082         _, _, pubs = node_proxy.getPublications('/foo')
00083         pubs = [p for p in pubs if p[0] != '/rosout']
00084         self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs)
00085 
00086         # publish about 10 messages for fun
00087         for i in range(0, 10):
00088             pub.publish(String("hi [%s]"%i))
00089             time.sleep(0.1)
00090         
00091         # begin actual test by unsubscribing
00092         pub.unregister()
00093         
00094         # make sure no new messages are received in the next 2 seconds
00095         timeout_t = time.time() + 2.0
00096         while timeout_t < time.time():
00097             time.sleep(1.0)
00098         self.assert_(_last_callback is None)
00099 
00100         # verify that close cleaned up master and node state
00101         _, _, pubs = node_proxy.getPublications('/foo')
00102         pubs = [p for p in pubs if p[0] != '/rosout']
00103         self.assert_(not pubs, "Node still has pubs: %s"%pubs)
00104         n = rospy.get_caller_id()
00105         self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master")
00106 
00107         # verify that the impl was cleaned up
00108         gc.collect()
00109         self.assertIsNone(impl())
00110         
00111     def test_unsubscribe(self):
00112         global _last_callback
00113 
00114         uri = rospy.get_node_uri()
00115         node_proxy = ServerProxy(uri)
00116         _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00117         self.assert_(not subscriptions, 'subscriptions present: %s'%str(subscriptions))
00118         
00119         print("Subscribing to ", SUBTOPIC)
00120         sub = rospy.Subscriber(SUBTOPIC, String, callback)
00121         topic = rospy.resolve_name(SUBTOPIC)
00122         _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00123         self.assertEquals([[topic, String._type]], subscriptions, "Subscriptions were %s"%subscriptions)
00124         
00125         # wait for the first message to be received
00126         timeout_t = time.time() + TIMEOUT
00127         while _last_callback is None and time.time() < timeout_t:
00128             time.sleep(0.1)
00129         self.assert_(_last_callback is not None, "No messages received from talker")
00130         
00131         # begin actual test by unsubscribing
00132         sub.unregister()
00133 
00134         # clear last callback data, i.e. last message received
00135         _last_callback = None
00136         timeout_t = time.time() + 2.0
00137         
00138         # make sure no new messages are received in the next 2 seconds
00139         while timeout_t < time.time():
00140             time.sleep(1.0)
00141         self.assert_(_last_callback is None)
00142 
00143         # verify that close cleaned up master and node state
00144         _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00145 
00146         self.assert_(not subscriptions, "Node still has subscriptions: %s"%subscriptions)
00147         n = rospy.get_caller_id()
00148         self.assert_(not rostest.is_subscriber(topic, n), "subscription is still active on master")
00149 
00150     def test_unservice(self):
00151         import rosgraph
00152         master = rosgraph.Master('/test_dereg')
00153 
00154         state = master.getSystemState()
00155         _, _, srv = state
00156         # filter out rosout services
00157         #[['/rosout/set_logger_level', ['/rosout']], ['/rosout/get_loggers', ['/rosout']]]
00158         srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00159         self.failIf(srv, srv)
00160 
00161         print("Creating service ", SERVICE)
00162         service = rospy.Service(SERVICE, EmptySrv, callback)
00163         # we currently cannot interrogate a node's services, have to rely on master
00164 
00165         # verify that master has service
00166         state = master.getSystemState()
00167         _, _, srv = state
00168         srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00169         self.assertEquals(srv, [[rospy.resolve_name(SERVICE), [rospy.get_caller_id()]]])
00170         
00171         # begin actual test by unsubscribing
00172         service.shutdown()
00173 
00174         time.sleep(1.0) # give API 1 second to sync with master
00175         
00176         state = master.getSystemState()
00177         _, _, srv = state
00178         srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00179         self.failIf(srv, srv)
00180 
00181         
00182 if __name__ == '__main__':
00183     rospy.init_node('test_dereg', disable_rostime=True)
00184     rostest.run(PKG, 'rospy_deregister', TestDeregister, sys.argv)


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57