37 from __future__
import print_function
52 PUBTOPIC =
'test_unpublish_chatter' 53 SUBTOPIC =
'test_unsubscribe_chatter' 54 SERVICE =
'test_unregister_service' 61 print(
"message received", data.data)
65 from xmlrpc.client
import ServerProxy
67 from xmlrpclib
import ServerProxy
72 node_proxy = ServerProxy(rospy.get_node_uri())
74 _, _, pubs = node_proxy.getPublications(
'/foo')
75 pubs = [p
for p
in pubs
if p[0] !=
'/rosout']
76 self.assert_(
not pubs, pubs)
78 print(
"Publishing ", PUBTOPIC)
79 pub = rospy.Publisher(PUBTOPIC, String, queue_size=1)
80 impl = weakref.ref(pub.impl)
81 topic = rospy.resolve_name(PUBTOPIC)
82 _, _, pubs = node_proxy.getPublications(
'/foo')
83 pubs = [p
for p
in pubs
if p[0] !=
'/rosout']
84 self.assertEquals([[topic, String._type]], pubs,
"Pubs were %s"%pubs)
87 for i
in range(0, 10):
88 pub.publish(String(
"hi [%s]"%i))
95 timeout_t = time.time() + 2.0
96 while timeout_t < time.time():
98 self.assert_(_last_callback
is None)
101 _, _, pubs = node_proxy.getPublications(
'/foo')
102 pubs = [p
for p
in pubs
if p[0] !=
'/rosout']
103 self.assert_(
not pubs,
"Node still has pubs: %s"%pubs)
104 n = rospy.get_caller_id()
105 self.assert_(
not rostest.is_publisher(topic, n),
"publication is still active on master")
109 self.assertIsNone(impl())
112 global _last_callback
114 uri = rospy.get_node_uri()
115 node_proxy = ServerProxy(uri)
116 _, _, subscriptions = node_proxy.getSubscriptions(
'/foo')
117 self.assert_(
not subscriptions,
'subscriptions present: %s'%str(subscriptions))
119 print(
"Subscribing to ", SUBTOPIC)
120 sub = rospy.Subscriber(SUBTOPIC, String, callback)
121 topic = rospy.resolve_name(SUBTOPIC)
122 _, _, subscriptions = node_proxy.getSubscriptions(
'/foo')
123 self.assertEquals([[topic, String._type]], subscriptions,
"Subscriptions were %s"%subscriptions)
126 timeout_t = time.time() + TIMEOUT
127 while _last_callback
is None and time.time() < timeout_t:
129 self.assert_(_last_callback
is not None,
"No messages received from talker")
135 _last_callback =
None 136 timeout_t = time.time() + 2.0
139 while timeout_t < time.time():
141 self.assert_(_last_callback
is None)
144 _, _, subscriptions = node_proxy.getSubscriptions(
'/foo')
146 self.assert_(
not subscriptions,
"Node still has subscriptions: %s"%subscriptions)
147 n = rospy.get_caller_id()
148 self.assert_(
not rostest.is_subscriber(topic, n),
"subscription is still active on master")
152 master = rosgraph.Master(
'/test_dereg')
154 state = master.getSystemState()
158 srv = [s
for s
in srv
if not s[0].startswith(
'/rosout/')
and not s[0].endswith(
'/get_loggers')
and not s[0].endswith(
'/set_logger_level')]
159 self.failIf(srv, srv)
161 print(
"Creating service ", SERVICE)
162 service = rospy.Service(SERVICE, EmptySrv, callback)
166 state = master.getSystemState()
168 srv = [s
for s
in srv
if not s[0].startswith(
'/rosout/')
and not s[0].endswith(
'/get_loggers')
and not s[0].endswith(
'/set_logger_level')]
169 self.assertEquals(srv, [[rospy.resolve_name(SERVICE), [rospy.get_caller_id()]]])
176 state = master.getSystemState()
178 srv = [s
for s
in srv
if not s[0].startswith(
'/rosout/')
and not s[0].endswith(
'/get_loggers')
and not s[0].endswith(
'/set_logger_level')]
179 self.failIf(srv, srv)
182 if __name__ ==
'__main__':
183 rospy.init_node(
'test_dereg', disable_rostime=
True)
184 rostest.run(PKG,
'rospy_deregister', TestDeregister, sys.argv)
def test_unsubscribe(self)