Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'rosjava'
00037 NAME = 'string_passthrough'
00038 import roslib; roslib.load_manifest(PKG)
00039
00040 from ros import rospy
00041 from ros import std_msgs
00042 from ros import rostest
00043
00044 import sys
00045 import time
00046 import unittest
00047
00048 from std_msgs.msg import String
00049
00050 class TestStringPassthrough(unittest.TestCase):
00051
00052 def setUp(self):
00053 rospy.init_node(NAME)
00054 self.nodes = ['/node%s'%(i) for i in xrange(10)]
00055 self.nodes_set = set(self.nodes)
00056
00057
00058 self.fixture_nodes_cb = set()
00059 self.test_nodes_cb = set()
00060
00061 rospy.Subscriber('string_in', String, self.cb_from_fixture)
00062 rospy.Subscriber('string_out', String, self.cb_from_test)
00063
00064 def cb_from_fixture(self, msg):
00065 self.fixture_nodes_cb.add(msg.data)
00066
00067 def cb_from_test(self, msg):
00068 self.test_nodes_cb.add(msg.data)
00069
00070 def test_string_passthrough(self):
00071
00072 timeout_t = time.time() + 20.
00073 print "waiting for 20 seconds for fixture to verify"
00074 while not self.fixture_nodes_cb == self.nodes_set and \
00075 not rospy.is_shutdown() and \
00076 timeout_t > time.time():
00077 time.sleep(0.2)
00078
00079 self.failIf(timeout_t < time.time(), "timeout exceeded")
00080 self.failIf(rospy.is_shutdown(), "node shutdown")
00081 self.assertEquals(self.nodes_set, self.fixture_nodes_cb, "fixture did not validate: %s vs %s"%(self.nodes_set, self.fixture_nodes_cb))
00082
00083
00084 timeout_t = time.time() + 20.
00085 print "waiting for 20 seconds for client to verify"
00086 while not self.test_nodes_cb == self.nodes_set and \
00087 not rospy.is_shutdown() and \
00088 timeout_t > time.time():
00089 time.sleep(0.2)
00090
00091 self.assertEquals(self.nodes_set, self.test_nodes_cb, "passthrough did not pass along all message")
00092
00093
00094 pub = rospy.Publisher('string_in', String)
00095 msg = 'test_publisherUpdate'
00096 timeout_t = time.time() + 20.
00097 print "waiting for 20 seconds for client to verify"
00098 while not msg in self.nodes_set and \
00099 not rospy.is_shutdown() and \
00100 timeout_t > time.time():
00101 pub.publish(data=msg)
00102 time.sleep(0.2)
00103
00104
00105
00106 if __name__ == '__main__':
00107 import rostest
00108 rostest.run(PKG, NAME, TestStringPassthrough, sys.argv)