00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 import roslib; roslib.load_manifest('test_nodelet')
00032 import rospy
00033 from std_msgs.msg import Float64
00034 import unittest
00035 import rostest
00036 import random
00037
00038 class PlusTester:
00039 def __init__(self, topic_in, topic_out, delta):
00040 self.pub = rospy.Publisher(topic_in, Float64)
00041 self.sub = rospy.Subscriber(topic_out, Float64, self.callback)
00042 self.expected_delta = delta
00043 self.send_value = random.random()
00044 self.recieved = False
00045 self.result = False
00046 self.delta = delta
00047
00048 def run(self, cycles = 10):
00049 for i in range(1, cycles):
00050 self.pub.publish(Float64(self.send_value))
00051 rospy.loginfo("Sent %f"%self.send_value);
00052 if self.recieved == True:
00053 break
00054 rospy.sleep(1.0)
00055 return self.result
00056
00057 def callback(self, data):
00058 rospy.loginfo(rospy.get_name()+" I heard %s which was a change of %f",data.data, data.data-self.send_value)
00059 if data.data == self.send_value + self.delta:
00060 self.result = True
00061 self.recieved = True
00062
00063 class TestPlus(unittest.TestCase):
00064 def test_param(self):
00065 pb = PlusTester("Plus/in", "Plus/out", 2.1)
00066 self.assertTrue(pb.run())
00067
00068 def test_default_param(self):
00069 pb = PlusTester("Plus2/in", "Plus2/out", 10.0)
00070 self.assertTrue(pb.run())
00071
00072 def test_standalone(self):
00073 pb = PlusTester("Plus2/out", "Plus3/out", 2.5)
00074 self.assertTrue(pb.run())
00075
00076 def test_remap(self):
00077 pb = PlusTester("plus_remap/in", "remapped_output", 2.1)
00078 self.assertTrue(pb.run())
00079
00080 def test_chain(self):
00081 pb = PlusTester("Plus2/in", "Plus3/out", 12.5)
00082 self.assertTrue(pb.run())
00083
00084 if __name__ == '__main__':
00085 rospy.init_node('plus_local')
00086 rostest.unitrun('test_nodelet', 'test_plus', TestPlus)
00087