$search
00001 #!/usr/bin/env python 00002 00003 PKG = 'nodelet_topic_tools' 00004 import roslib; roslib.load_manifest(PKG) 00005 00006 import unittest 00007 import threading 00008 00009 import rospy 00010 00011 from std_msgs.msg import String 00012 00013 class TestNodeletThrottle(unittest.TestCase): 00014 def __init__(self, *args): 00015 super(TestNodeletThrottle, self).__init__(*args) 00016 00017 self._lock = threading.RLock() 00018 00019 self._msgs_rec = 0 00020 00021 self._pub = rospy.Publisher('string_in', String) 00022 self._sub = rospy.Subscriber('string_out', String, self._cb) 00023 00024 def _cb(self, msg): 00025 with self._lock: 00026 self._msgs_rec += 1 00027 00028 def test_nodelet_throttle(self): 00029 for i in range(0,10): 00030 self._pub.publish(String('hello, world')) 00031 rospy.sleep(1.0) 00032 00033 self.assert_(self._msgs_rec > 0, "No messages received from nodelet throttle on topic \"string_out\"") 00034 00035 if __name__ == '__main__': 00036 rospy.init_node('test_nodelet_throttle') 00037 00038 import rostest 00039 rostest.rosrun(PKG, 'test_nodelet_throttle', TestNodeletThrottle) 00040