test_throttle.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 PKG = 'nodelet_topic_tools'
4 import roslib; roslib.load_manifest(PKG)
5 
6 import unittest
7 import threading
8 
9 import rospy
10 
11 from std_msgs.msg import String
12 
13 class TestNodeletThrottle(unittest.TestCase):
14  def __init__(self, *args):
15  super(TestNodeletThrottle, self).__init__(*args)
16 
17  self._lock = threading.RLock()
18 
19  self._msgs_rec = 0
20 
21  self._pub = rospy.Publisher('string_in', String)
22  self._sub = rospy.Subscriber('string_out', String, self._cb)
23 
24  def _cb(self, msg):
25  with self._lock:
26  self._msgs_rec += 1
27 
29  for i in range(0,10):
30  self._pub.publish(String('hello, world'))
31  rospy.sleep(1.0)
32 
33  self.assert_(self._msgs_rec > 0, "No messages received from nodelet throttle on topic \"string_out\"")
34 
35 if __name__ == '__main__':
36  rospy.init_node('test_nodelet_throttle')
37 
38  import rostest
39  rostest.rosrun(PKG, 'test_nodelet_throttle', TestNodeletThrottle)
40 


test_nodelet_topic_tools
Author(s): Radu Bogdan Rusu, Tully Foote
autogenerated on Mon Feb 18 2019 03:26:52