00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 PKG = 'test_rospy'
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 import sys
00043 import time
00044 import unittest
00045
00046 import rospy
00047 import rostest
00048 from std_msgs.msg import String
00049 from test_rospy.srv import EmptySrv
00050
00051 PUBTOPIC = 'test_unpublish_chatter'
00052 SUBTOPIC = 'test_unsubscribe_chatter'
00053 SERVICE = 'test_unregister_service'
00054
00055 TIMEOUT = 10.0
00056
00057 _last_callback = None
00058 def callback(data):
00059 global _last_callback
00060 print "message received", data.data
00061 _last_callback = data
00062
00063 import xmlrpclib
00064
00065 class TestDeregister(unittest.TestCase):
00066
00067 def test_unpublish(self):
00068 node_proxy = xmlrpclib.ServerProxy(rospy.get_node_uri())
00069
00070 _, _, pubs = node_proxy.getPublications('/foo')
00071 pubs = [p for p in pubs if p[0] != '/rosout']
00072 self.assert_(not pubs, pubs)
00073
00074 print "Publishing ", PUBTOPIC
00075 pub = rospy.Publisher(PUBTOPIC, String)
00076 topic = rospy.resolve_name(PUBTOPIC)
00077 _, _, pubs = node_proxy.getPublications('/foo')
00078 pubs = [p for p in pubs if p[0] != '/rosout']
00079 self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs)
00080
00081
00082 for i in xrange(0, 10):
00083 pub.publish(String("hi [%s]"%i))
00084 time.sleep(0.1)
00085
00086
00087 pub.unregister()
00088
00089
00090 timeout_t = time.time() + 2.0
00091 while timeout_t < time.time():
00092 time.sleep(1.0)
00093 self.assert_(_last_callback is None)
00094
00095
00096 _, _, pubs = node_proxy.getPublications('/foo')
00097 pubs = [p for p in pubs if p[0] != '/rosout']
00098 self.assert_(not pubs, "Node still has pubs: %s"%pubs)
00099 n = rospy.get_caller_id()
00100 self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master")
00101
00102
00103 def test_unsubscribe(self):
00104 global _last_callback
00105
00106 uri = rospy.get_node_uri()
00107 node_proxy = xmlrpclib.ServerProxy(uri)
00108 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00109 self.assert_(not subscriptions, 'subscriptions present: %s'%str(subscriptions))
00110
00111 print "Subscribing to ", SUBTOPIC
00112 sub = rospy.Subscriber(SUBTOPIC, String, callback)
00113 topic = rospy.resolve_name(SUBTOPIC)
00114 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00115 self.assertEquals([[topic, String._type]], subscriptions, "Subscriptions were %s"%subscriptions)
00116
00117
00118 timeout_t = time.time() + TIMEOUT
00119 while _last_callback is None and time.time() < timeout_t:
00120 time.sleep(0.1)
00121 self.assert_(_last_callback is not None, "No messages received from talker")
00122
00123
00124 sub.unregister()
00125
00126
00127 _last_callback = None
00128 timeout_t = time.time() + 2.0
00129
00130
00131 while timeout_t < time.time():
00132 time.sleep(1.0)
00133 self.assert_(_last_callback is None)
00134
00135
00136 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00137
00138 self.assert_(not subscriptions, "Node still has subscriptions: %s"%subscriptions)
00139 n = rospy.get_caller_id()
00140 self.assert_(not rostest.is_subscriber(topic, n), "subscription is still active on master")
00141
00142 def test_unservice(self):
00143 import roslib.scriptutil
00144 master = roslib.scriptutil.get_master()
00145
00146 code, msg, state = master.getSystemState('/test_dereg')
00147 self.assertEquals(1, code, msg)
00148 _, _, srv = state
00149
00150
00151 srv = [s for s in srv if not s[0].startswith('/rosout/')]
00152 self.failIf(srv, srv)
00153
00154 print "Creating service ", SERVICE
00155 service = rospy.Service(SERVICE, EmptySrv, callback)
00156
00157
00158
00159 code, msg, state = master.getSystemState('/test_dereg')
00160 self.assertEquals(1, code, msg)
00161 _, _, srv = state
00162 srv = [s for s in srv if not s[0].startswith('/rosout/')]
00163 self.assertEquals(srv, [[rospy.resolve_name(SERVICE), [rospy.get_caller_id()]]])
00164
00165
00166 service.shutdown()
00167
00168 time.sleep(1.0)
00169
00170 code, msg, state = master.getSystemState('/test_dereg')
00171 self.assertEquals(1, code, msg)
00172 _, _, srv = state
00173 srv = [s for s in srv if not s[0].startswith('/rosout/')]
00174 self.failIf(srv, srv)
00175
00176
00177 if __name__ == '__main__':
00178 rospy.init_node('test_dereg', disable_rostime=True)
00179 rostest.run(PKG, 'rospy_deregister', TestDeregister, sys.argv)