test_throttle.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 PKG = 'nodelet_topic_tools'
00004 import roslib; roslib.load_manifest(PKG)
00005 
00006 import unittest
00007 import threading
00008 
00009 import rospy
00010 
00011 from std_msgs.msg import String
00012 
00013 class TestNodeletThrottle(unittest.TestCase):
00014     def __init__(self, *args):
00015         super(TestNodeletThrottle, self).__init__(*args)
00016 
00017         self._lock = threading.RLock()
00018 
00019         self._msgs_rec = 0
00020 
00021         self._pub = rospy.Publisher('string_in', String)
00022         self._sub = rospy.Subscriber('string_out', String, self._cb)
00023 
00024     def _cb(self, msg):
00025         with self._lock:
00026             self._msgs_rec += 1
00027 
00028     def test_nodelet_throttle(self):
00029         for i in range(0,10):
00030             self._pub.publish(String('hello, world'))
00031             rospy.sleep(1.0)
00032 
00033         self.assert_(self._msgs_rec > 0, "No messages received from nodelet throttle on topic \"string_out\"")
00034 
00035 if __name__ == '__main__':
00036     rospy.init_node('test_nodelet_throttle')
00037 
00038     import rostest
00039     rostest.rosrun(PKG, 'test_nodelet_throttle', TestNodeletThrottle)
00040 


test_nodelet_topic_tools
Author(s): Radu Bogdan Rusu, Tully Foote
autogenerated on Sun Feb 17 2019 03:43:57