Go to the documentation of this file.00001
00002
00003 PKG = 'nodelet_topic_tools'
00004 import roslib; roslib.load_manifest(PKG)
00005
00006 import unittest
00007 import threading
00008
00009 import rospy
00010
00011 from std_msgs.msg import String
00012
00013 class TestNodeletThrottle(unittest.TestCase):
00014 def __init__(self, *args):
00015 super(TestNodeletThrottle, self).__init__(*args)
00016
00017 self._lock = threading.RLock()
00018
00019 self._msgs_rec = 0
00020
00021 self._pub = rospy.Publisher('string_in', String)
00022 self._sub = rospy.Subscriber('string_out', String, self._cb)
00023
00024 def _cb(self, msg):
00025 with self._lock:
00026 self._msgs_rec += 1
00027
00028 def test_nodelet_throttle(self):
00029 for i in range(0,10):
00030 self._pub.publish(String('hello, world'))
00031 rospy.sleep(1.0)
00032
00033 self.assert_(self._msgs_rec > 0, "No messages received from nodelet throttle on topic \"string_out\"")
00034
00035 if __name__ == '__main__':
00036 rospy.init_node('test_nodelet_throttle')
00037
00038 import rostest
00039 rostest.rosrun(PKG, 'test_nodelet_throttle', TestNodeletThrottle)
00040