00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 PKG = 'test_rospy'
00038
00039 import sys
00040 import time
00041 import unittest
00042
00043 import rospy
00044 import rostest
00045 from std_msgs.msg import String
00046 from test_rospy.srv import EmptySrv
00047
00048 PUBTOPIC = 'test_unpublish_chatter'
00049 SUBTOPIC = 'test_unsubscribe_chatter'
00050 SERVICE = 'test_unregister_service'
00051
00052 TIMEOUT = 10.0
00053
00054 _last_callback = None
00055 def callback(data):
00056 global _last_callback
00057 print "message received", data.data
00058 _last_callback = data
00059
00060 import xmlrpclib
00061
00062 class TestDeregister(unittest.TestCase):
00063
00064 def test_unpublish(self):
00065 node_proxy = xmlrpclib.ServerProxy(rospy.get_node_uri())
00066
00067 _, _, pubs = node_proxy.getPublications('/foo')
00068 pubs = [p for p in pubs if p[0] != '/rosout']
00069 self.assert_(not pubs, pubs)
00070
00071 print "Publishing ", PUBTOPIC
00072 pub = rospy.Publisher(PUBTOPIC, String)
00073 topic = rospy.resolve_name(PUBTOPIC)
00074 _, _, pubs = node_proxy.getPublications('/foo')
00075 pubs = [p for p in pubs if p[0] != '/rosout']
00076 self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs)
00077
00078
00079 for i in xrange(0, 10):
00080 pub.publish(String("hi [%s]"%i))
00081 time.sleep(0.1)
00082
00083
00084 pub.unregister()
00085
00086
00087 timeout_t = time.time() + 2.0
00088 while timeout_t < time.time():
00089 time.sleep(1.0)
00090 self.assert_(_last_callback is None)
00091
00092
00093 _, _, pubs = node_proxy.getPublications('/foo')
00094 pubs = [p for p in pubs if p[0] != '/rosout']
00095 self.assert_(not pubs, "Node still has pubs: %s"%pubs)
00096 n = rospy.get_caller_id()
00097 self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master")
00098
00099
00100 def test_unsubscribe(self):
00101 global _last_callback
00102
00103 uri = rospy.get_node_uri()
00104 node_proxy = xmlrpclib.ServerProxy(uri)
00105 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00106 self.assert_(not subscriptions, 'subscriptions present: %s'%str(subscriptions))
00107
00108 print "Subscribing to ", SUBTOPIC
00109 sub = rospy.Subscriber(SUBTOPIC, String, callback)
00110 topic = rospy.resolve_name(SUBTOPIC)
00111 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00112 self.assertEquals([[topic, String._type]], subscriptions, "Subscriptions were %s"%subscriptions)
00113
00114
00115 timeout_t = time.time() + TIMEOUT
00116 while _last_callback is None and time.time() < timeout_t:
00117 time.sleep(0.1)
00118 self.assert_(_last_callback is not None, "No messages received from talker")
00119
00120
00121 sub.unregister()
00122
00123
00124 _last_callback = None
00125 timeout_t = time.time() + 2.0
00126
00127
00128 while timeout_t < time.time():
00129 time.sleep(1.0)
00130 self.assert_(_last_callback is None)
00131
00132
00133 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00134
00135 self.assert_(not subscriptions, "Node still has subscriptions: %s"%subscriptions)
00136 n = rospy.get_caller_id()
00137 self.assert_(not rostest.is_subscriber(topic, n), "subscription is still active on master")
00138
00139 def test_unservice(self):
00140 import rosgraph
00141 master = rosgraph.Master('/test_dereg')
00142
00143 state = master.getSystemState()
00144 _, _, srv = state
00145
00146
00147 srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00148 self.failIf(srv, srv)
00149
00150 print "Creating service ", SERVICE
00151 service = rospy.Service(SERVICE, EmptySrv, callback)
00152
00153
00154
00155 state = master.getSystemState()
00156 _, _, srv = state
00157 srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00158 self.assertEquals(srv, [[rospy.resolve_name(SERVICE), [rospy.get_caller_id()]]])
00159
00160
00161 service.shutdown()
00162
00163 time.sleep(1.0)
00164
00165 state = master.getSystemState()
00166 _, _, srv = state
00167 srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00168 self.failIf(srv, srv)
00169
00170
00171 if __name__ == '__main__':
00172 rospy.init_node('test_dereg', disable_rostime=True)
00173 rostest.run(PKG, 'rospy_deregister', TestDeregister, sys.argv)