00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 from __future__ import print_function
00038
00039 PKG = 'test_rospy'
00040
00041 import sys
00042 import time
00043 import unittest
00044 import gc
00045 import weakref
00046
00047 import rospy
00048 import rostest
00049 from std_msgs.msg import String
00050 from test_rospy.srv import EmptySrv
00051
00052 PUBTOPIC = 'test_unpublish_chatter'
00053 SUBTOPIC = 'test_unsubscribe_chatter'
00054 SERVICE = 'test_unregister_service'
00055
00056 TIMEOUT = 10.0
00057
00058 _last_callback = None
00059 def callback(data):
00060 global _last_callback
00061 print("message received", data.data)
00062 _last_callback = data
00063
00064 try:
00065 from xmlrpc.client import ServerProxy
00066 except ImportError:
00067 from xmlrpclib import ServerProxy
00068
00069 class TestDeregister(unittest.TestCase):
00070
00071 def test_unpublish(self):
00072 node_proxy = ServerProxy(rospy.get_node_uri())
00073
00074 _, _, pubs = node_proxy.getPublications('/foo')
00075 pubs = [p for p in pubs if p[0] != '/rosout']
00076 self.assert_(not pubs, pubs)
00077
00078 print("Publishing ", PUBTOPIC)
00079 pub = rospy.Publisher(PUBTOPIC, String, queue_size=1)
00080 impl = weakref.ref(pub.impl)
00081 topic = rospy.resolve_name(PUBTOPIC)
00082 _, _, pubs = node_proxy.getPublications('/foo')
00083 pubs = [p for p in pubs if p[0] != '/rosout']
00084 self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs)
00085
00086
00087 for i in range(0, 10):
00088 pub.publish(String("hi [%s]"%i))
00089 time.sleep(0.1)
00090
00091
00092 pub.unregister()
00093
00094
00095 timeout_t = time.time() + 2.0
00096 while timeout_t < time.time():
00097 time.sleep(1.0)
00098 self.assert_(_last_callback is None)
00099
00100
00101 _, _, pubs = node_proxy.getPublications('/foo')
00102 pubs = [p for p in pubs if p[0] != '/rosout']
00103 self.assert_(not pubs, "Node still has pubs: %s"%pubs)
00104 n = rospy.get_caller_id()
00105 self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master")
00106
00107
00108 gc.collect()
00109 self.assertIsNone(impl())
00110
00111 def test_unsubscribe(self):
00112 global _last_callback
00113
00114 uri = rospy.get_node_uri()
00115 node_proxy = ServerProxy(uri)
00116 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00117 self.assert_(not subscriptions, 'subscriptions present: %s'%str(subscriptions))
00118
00119 print("Subscribing to ", SUBTOPIC)
00120 sub = rospy.Subscriber(SUBTOPIC, String, callback)
00121 topic = rospy.resolve_name(SUBTOPIC)
00122 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00123 self.assertEquals([[topic, String._type]], subscriptions, "Subscriptions were %s"%subscriptions)
00124
00125
00126 timeout_t = time.time() + TIMEOUT
00127 while _last_callback is None and time.time() < timeout_t:
00128 time.sleep(0.1)
00129 self.assert_(_last_callback is not None, "No messages received from talker")
00130
00131
00132 sub.unregister()
00133
00134
00135 _last_callback = None
00136 timeout_t = time.time() + 2.0
00137
00138
00139 while timeout_t < time.time():
00140 time.sleep(1.0)
00141 self.assert_(_last_callback is None)
00142
00143
00144 _, _, subscriptions = node_proxy.getSubscriptions('/foo')
00145
00146 self.assert_(not subscriptions, "Node still has subscriptions: %s"%subscriptions)
00147 n = rospy.get_caller_id()
00148 self.assert_(not rostest.is_subscriber(topic, n), "subscription is still active on master")
00149
00150 def test_unservice(self):
00151 import rosgraph
00152 master = rosgraph.Master('/test_dereg')
00153
00154 state = master.getSystemState()
00155 _, _, srv = state
00156
00157
00158 srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00159 self.failIf(srv, srv)
00160
00161 print("Creating service ", SERVICE)
00162 service = rospy.Service(SERVICE, EmptySrv, callback)
00163
00164
00165
00166 state = master.getSystemState()
00167 _, _, srv = state
00168 srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00169 self.assertEquals(srv, [[rospy.resolve_name(SERVICE), [rospy.get_caller_id()]]])
00170
00171
00172 service.shutdown()
00173
00174 time.sleep(1.0)
00175
00176 state = master.getSystemState()
00177 _, _, srv = state
00178 srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
00179 self.failIf(srv, srv)
00180
00181
00182 if __name__ == '__main__':
00183 rospy.init_node('test_dereg', disable_rostime=True)
00184 rostest.run(PKG, 'rospy_deregister', TestDeregister, sys.argv)