test_deregister.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 ## Integration test for empty services to test serializers
35 ## and transport
36 
37 from __future__ import print_function
38 
39 PKG = 'test_rospy'
40 
41 import sys
42 import time
43 import unittest
44 import gc
45 import weakref
46 
47 import rospy
48 import rostest
49 from std_msgs.msg import String
50 from test_rospy.srv import EmptySrv
51 
52 PUBTOPIC = 'test_unpublish_chatter'
53 SUBTOPIC = 'test_unsubscribe_chatter'
54 SERVICE = 'test_unregister_service'
55 
56 TIMEOUT = 10.0 #seconds
57 
58 _last_callback = None
59 def callback(data):
60  global _last_callback
61  print("message received", data.data)
62  _last_callback = data
63 
64 try:
65  from xmlrpc.client import ServerProxy
66 except ImportError:
67  from xmlrpclib import ServerProxy
68 
69 class TestDeregister(unittest.TestCase):
70 
71  def test_unpublish(self):
72  node_proxy = ServerProxy(rospy.get_node_uri())
73 
74  _, _, pubs = node_proxy.getPublications('/foo')
75  pubs = [p for p in pubs if p[0] != '/rosout']
76  self.assert_(not pubs, pubs)
77 
78  print("Publishing ", PUBTOPIC)
79  pub = rospy.Publisher(PUBTOPIC, String, queue_size=1)
80  impl = weakref.ref(pub.impl)
81  topic = rospy.resolve_name(PUBTOPIC)
82  _, _, pubs = node_proxy.getPublications('/foo')
83  pubs = [p for p in pubs if p[0] != '/rosout']
84  self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs)
85 
86  # publish about 10 messages for fun
87  for i in range(0, 10):
88  pub.publish(String("hi [%s]"%i))
89  time.sleep(0.1)
90 
91  # begin actual test by unsubscribing
92  pub.unregister()
93 
94  # make sure no new messages are received in the next 2 seconds
95  timeout_t = time.time() + 2.0
96  while timeout_t < time.time():
97  time.sleep(1.0)
98  self.assert_(_last_callback is None)
99 
100  # verify that close cleaned up master and node state
101  _, _, pubs = node_proxy.getPublications('/foo')
102  pubs = [p for p in pubs if p[0] != '/rosout']
103  self.assert_(not pubs, "Node still has pubs: %s"%pubs)
104  n = rospy.get_caller_id()
105  self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master")
106 
107  # verify that the impl was cleaned up
108  gc.collect()
109  self.assertIsNone(impl())
110 
111  def test_unsubscribe(self):
112  global _last_callback
113 
114  uri = rospy.get_node_uri()
115  node_proxy = ServerProxy(uri)
116  _, _, subscriptions = node_proxy.getSubscriptions('/foo')
117  self.assert_(not subscriptions, 'subscriptions present: %s'%str(subscriptions))
118 
119  print("Subscribing to ", SUBTOPIC)
120  sub = rospy.Subscriber(SUBTOPIC, String, callback)
121  topic = rospy.resolve_name(SUBTOPIC)
122  _, _, subscriptions = node_proxy.getSubscriptions('/foo')
123  self.assertEquals([[topic, String._type]], subscriptions, "Subscriptions were %s"%subscriptions)
124 
125  # wait for the first message to be received
126  timeout_t = time.time() + TIMEOUT
127  while _last_callback is None and time.time() < timeout_t:
128  time.sleep(0.1)
129  self.assert_(_last_callback is not None, "No messages received from talker")
130 
131  # begin actual test by unsubscribing
132  sub.unregister()
133 
134  # clear last callback data, i.e. last message received
135  _last_callback = None
136  timeout_t = time.time() + 2.0
137 
138  # make sure no new messages are received in the next 2 seconds
139  while timeout_t < time.time():
140  time.sleep(1.0)
141  self.assert_(_last_callback is None)
142 
143  # verify that close cleaned up master and node state
144  _, _, subscriptions = node_proxy.getSubscriptions('/foo')
145 
146  self.assert_(not subscriptions, "Node still has subscriptions: %s"%subscriptions)
147  n = rospy.get_caller_id()
148  self.assert_(not rostest.is_subscriber(topic, n), "subscription is still active on master")
149 
150  def test_unservice(self):
151  import rosgraph
152  master = rosgraph.Master('/test_dereg')
153 
154  state = master.getSystemState()
155  _, _, srv = state
156  # filter out rosout services
157  #[['/rosout/set_logger_level', ['/rosout']], ['/rosout/get_loggers', ['/rosout']]]
158  srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
159  self.failIf(srv, srv)
160 
161  print("Creating service ", SERVICE)
162  service = rospy.Service(SERVICE, EmptySrv, callback)
163  # we currently cannot interrogate a node's services, have to rely on master
164 
165  # verify that master has service
166  state = master.getSystemState()
167  _, _, srv = state
168  srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
169  self.assertEquals(srv, [[rospy.resolve_name(SERVICE), [rospy.get_caller_id()]]])
170 
171  # begin actual test by unsubscribing
172  service.shutdown()
173 
174  time.sleep(1.0) # give API 1 second to sync with master
175 
176  state = master.getSystemState()
177  _, _, srv = state
178  srv = [s for s in srv if not s[0].startswith('/rosout/') and not s[0].endswith('/get_loggers') and not s[0].endswith('/set_logger_level')]
179  self.failIf(srv, srv)
180 
181 
182 if __name__ == '__main__':
183  rospy.init_node('test_dereg', disable_rostime=True)
184  rostest.run(PKG, 'rospy_deregister', TestDeregister, sys.argv)
def callback(data)


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56