test_deregister.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 ## Integration test for empty services to test serializers
00035 ## and transport
00036 
00037 PKG = 'test_rospy'
00038 
00039 import sys
00040 import time
00041 import unittest
00042 
00043 import rospy
00044 import rostest
00045 from std_msgs.msg import String
00046 from test_rospy.srv import EmptySrv
00047 
00048 PUBTOPIC = 'test_unpublish_chatter'
00049 SUBTOPIC = 'test_unsubscribe_chatter'
00050 SERVICE = 'test_unregister_service'
00051 
00052 TIMEOUT = 10.0 #seconds
00053 
00054 _last_callback = None
00055 def callback(data):
00056     global _last_callback
00057     print "message received", data.data
00058     _last_callback = data
00059 
00060 import xmlrpclib
00061 
00062 class TestDeregister(unittest.TestCase):
00063         
00064     def test_unpublish(self):
00065         node_proxy = xmlrpclib.ServerProxy(rospy.get_node_uri())
00066         
00067         _, _, pubs = node_proxy.getPublications('/foo')
00068         pubs = [p for p in pubs if p[0] != '/rosout']
00069         self.assert_(not pubs, pubs)
00070         
00071         print "Publishing ", PUBTOPIC
00072         pub = rospy.Publisher(PUBTOPIC, String)
00073         topic = rospy.resolve_name(PUBTOPIC)
00074         _, _, pubs = node_proxy.getPublications('/foo')
00075         pubs = [p for p in pubs if p[0] != '/rosout']
00076         self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs)
00077 
00078         # publish about 10 messages for fun
00079         for i in xrange(0, 10):
00080             pub.publish(String("hi [%s]"%i))
00081             time.sleep(0.1)
00082         
00083         # begin actual test by unsubscribing
00084         pub.unregister()
00085         
00086         # make sure no new messages are received in the next 2 seconds
00087         timeout_t = time.time() + 2.0
00088         while timeout_t < time.time():
00089             time.sleep(1.0)
00090         self.assert_(_last_callback is None)
00091 
00092         # verify that close cleaned up master and node state
00093         _, _, pubs = node_proxy.getPublications('/foo')
00094         pubs = [p for p in pubs if p[0] != '/rosout']
00095         self.assert_(not pubs, "Node still has pubs: %s"%pubs)
00096         n = rospy.get_caller_id()
00097         self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master")
00098 
00099         
00100     def test_unsubscribe(self):
00101         global _last_callback
00102 
00103         uri = rospy.get_node_uri()
00104         node_proxy = xmlrpclib.ServerProxy(uri)
00105         _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00106         self.assert_(not subscriptions, 'subscriptions present: %s'%str(subscriptions))
00107         
00108         print "Subscribing to ", SUBTOPIC
00109         sub = rospy.Subscriber(SUBTOPIC, String, callback)
00110         topic = rospy.resolve_name(SUBTOPIC)
00111         _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00112         self.assertEquals([[topic, String._type]], subscriptions, "Subscriptions were %s"%subscriptions)
00113         
00114         # wait for the first message to be received
00115         timeout_t = time.time() + TIMEOUT
00116         while _last_callback is None and time.time() < timeout_t:
00117             time.sleep(0.1)
00118         self.assert_(_last_callback is not None, "No messages received from talker")
00119         
00120         # begin actual test by unsubscribing
00121         sub.unregister()
00122 
00123         # clear last callback data, i.e. last message received
00124         _last_callback = None
00125         timeout_t = time.time() + 2.0
00126         
00127         # make sure no new messages are received in the next 2 seconds
00128         while timeout_t < time.time():
00129             time.sleep(1.0)
00130         self.assert_(_last_callback is None)
00131 
00132         # verify that close cleaned up master and node state
00133         _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00134 
00135         self.assert_(not subscriptions, "Node still has subscriptions: %s"%subscriptions)
00136         n = rospy.get_caller_id()
00137         self.assert_(not rostest.is_subscriber(topic, n), "subscription is still active on master")
00138 
00139     def test_unservice(self):
00140         import rosgraph
00141         master = rosgraph.Master('/test_dereg')
00142 
00143         state = master.getSystemState()
00144         _, _, srv = state
00145         # filter out rosout services
00146         #[['/rosout/set_logger_level', ['/rosout']], ['/rosout/get_loggers', ['/rosout']]]
00147         srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00148         self.failIf(srv, srv)
00149 
00150         print "Creating service ", SERVICE
00151         service = rospy.Service(SERVICE, EmptySrv, callback)
00152         # we currently cannot interrogate a node's services, have to rely on master
00153 
00154         # verify that master has service
00155         state = master.getSystemState()
00156         _, _, srv = state
00157         srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00158         self.assertEquals(srv, [[rospy.resolve_name(SERVICE), [rospy.get_caller_id()]]])
00159         
00160         # begin actual test by unsubscribing
00161         service.shutdown()
00162 
00163         time.sleep(1.0) # give API 1 second to sync with master
00164         
00165         state = master.getSystemState()
00166         _, _, srv = state
00167         srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00168         self.failIf(srv, srv)
00169 
00170         
00171 if __name__ == '__main__':
00172     rospy.init_node('test_dereg', disable_rostime=True)
00173     rostest.run(PKG, 'rospy_deregister', TestDeregister, sys.argv)


test_rospy
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:47:19