00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 PKG = 'test_rospy'
00038 NAME = 'test_embed_msg'
00039
00040 import sys
00041 import time
00042 import unittest
00043
00044 import rospy
00045 import rostest
00046 from std_msgs.msg import String, Int32
00047 from test_rospy.msg import EmbedTest, Val, ArrayVal
00048
00049 PUBTOPIC = "chatter"
00050 LPNODE = 'listenerpublisher'
00051 LPTOPIC = 'listenerpublisher'
00052
00053 MSG = EmbedTest
00054
00055 TIMEOUT = 10.0
00056
00057 class TestEmbedMsg(unittest.TestCase):
00058
00059 def setUp(self):
00060 self.callback_data = None
00061
00062 def _test_embed_msg_callback(self, data):
00063 self.callback_data = data
00064
00065 def test_embed_msg(self):
00066 self.assert_(self.callback_data is None, "invalid test fixture")
00067
00068
00069 timeout_t = time.time() + 5.0
00070 while not rostest.is_subscriber(
00071 rospy.resolve_name(PUBTOPIC),
00072 rospy.resolve_name(LPNODE)) and time.time() < timeout_t:
00073 time.sleep(0.1)
00074
00075 self.assert_(rostest.is_subscriber(
00076 rospy.resolve_name(PUBTOPIC),
00077 rospy.resolve_name(LPNODE)), "%s is not up"%LPNODE)
00078
00079 print "Publishing to ", PUBTOPIC
00080 pub = rospy.Publisher(PUBTOPIC, MSG)
00081 rospy.Subscriber(LPTOPIC, MSG, self._test_embed_msg_callback)
00082
00083
00084 import random
00085 val = random.randint(0, 109812312)
00086 msg = "hi [%s]"%val
00087 for i in xrange(0, 10):
00088
00089
00090
00091
00092 pub.publish(
00093 MSG(String(msg), Int32(val),
00094 [Int32(val+1), Int32(val+2), Int32(val+3)],
00095 Val(msg+msg),
00096 [Val(msg), Val("two")],
00097 [ArrayVal([Val("av1"), Val("av2")]),
00098 ArrayVal([])
00099 ]
00100 ))
00101 time.sleep(0.1)
00102
00103
00104
00105 self.assert_(self.callback_data is not None, "no callback data from listenerpublisher")
00106 print "Got ", self.callback_data.str1.data, self.callback_data.int1.data
00107 errorstr = "callback msg field [%s] from listenerpublisher does not match"
00108 self.assertEquals(msg, self.callback_data.str1.data,
00109 errorstr%"str1.data")
00110 self.assertEquals(val, self.callback_data.int1.data,
00111 errorstr%"int1.data")
00112 for i in xrange(1, 4):
00113 self.assertEquals(val+i, self.callback_data.ints[i-1].data,
00114 errorstr%"ints[i-1].data")
00115 self.assertEquals(msg+msg, self.callback_data.val.val,
00116 errorstr%"val.val")
00117 self.assertEquals(msg, self.callback_data.vals[0].val,
00118 errorstr%"vals[0].val")
00119 self.assertEquals("two", self.callback_data.vals[1].val,
00120 errorstr%"vals[1].val")
00121
00122 self.assertEquals(2, len(self.callback_data.arrayval),
00123 errorstr%"len arrayval")
00124 self.assertEquals(2, len(self.callback_data.arrayval[0].vals),
00125 errorstr%"len arrayval[0].vals")
00126 self.assertEquals("av1", self.callback_data.arrayval[0].vals[0].val,
00127 errorstr%"arrayval[0].vals[0].val")
00128 self.assertEquals("av2", self.callback_data.arrayval[0].vals[1].val,
00129 errorstr%"arrayval[0].vals[1].val")
00130 self.assertEquals(0, len(self.callback_data.arrayval[1].vals),
00131 errorstr%"len arrayval[1].vals")
00132
00133
00134 if __name__ == '__main__':
00135 rospy.init_node(NAME)
00136 rostest.run(PKG, NAME, TestEmbedMsg, sys.argv)