00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 from __future__ import print_function
00038
00039 PKG = 'test_rospy'
00040 NAME = 'test_embed_msg'
00041
00042 import sys
00043 import time
00044 import unittest
00045
00046 import rospy
00047 import rostest
00048 from std_msgs.msg import String, Int32
00049 from test_rospy.msg import EmbedTest, Val, ArrayVal
00050
00051 PUBTOPIC = "chatter"
00052 LPNODE = 'listenerpublisher'
00053 LPTOPIC = 'listenerpublisher'
00054
00055 MSG = EmbedTest
00056
00057 TIMEOUT = 10.0
00058
00059 class TestEmbedMsg(unittest.TestCase):
00060
00061 def setUp(self):
00062 self.callback_data = None
00063
00064 def _test_embed_msg_callback(self, data):
00065 self.callback_data = data
00066
00067 def test_embed_msg(self):
00068 self.assert_(self.callback_data is None, "invalid test fixture")
00069
00070
00071 timeout_t = time.time() + 5.0
00072 while not rostest.is_subscriber(
00073 rospy.resolve_name(PUBTOPIC),
00074 rospy.resolve_name(LPNODE)) and time.time() < timeout_t:
00075 time.sleep(0.1)
00076
00077 self.assert_(rostest.is_subscriber(
00078 rospy.resolve_name(PUBTOPIC),
00079 rospy.resolve_name(LPNODE)), "%s is not up"%LPNODE)
00080
00081 print("Publishing to ", PUBTOPIC)
00082 pub = rospy.Publisher(PUBTOPIC, MSG)
00083 rospy.Subscriber(LPTOPIC, MSG, self._test_embed_msg_callback)
00084
00085
00086 import random
00087 val = random.randint(0, 109812312)
00088 msg = "hi [%s]"%val
00089 for i in range(0, 10):
00090
00091
00092
00093
00094 pub.publish(
00095 MSG(String(msg), Int32(val),
00096 [Int32(val+1), Int32(val+2), Int32(val+3)],
00097 Val(msg+msg),
00098 [Val(msg), Val("two")],
00099 [ArrayVal([Val("av1"), Val("av2")]),
00100 ArrayVal([])
00101 ]
00102 ))
00103 time.sleep(0.1)
00104
00105
00106
00107 self.assert_(self.callback_data is not None, "no callback data from listenerpublisher")
00108 print("Got ", self.callback_data.str1.data, self.callback_data.int1.data)
00109 errorstr = "callback msg field [%s] from listenerpublisher does not match"
00110 self.assertEquals(msg, self.callback_data.str1.data,
00111 errorstr%"str1.data")
00112 self.assertEquals(val, self.callback_data.int1.data,
00113 errorstr%"int1.data")
00114 for i in range(1, 4):
00115 self.assertEquals(val+i, self.callback_data.ints[i-1].data,
00116 errorstr%"ints[i-1].data")
00117 self.assertEquals(msg+msg, self.callback_data.val.val,
00118 errorstr%"val.val")
00119 self.assertEquals(msg, self.callback_data.vals[0].val,
00120 errorstr%"vals[0].val")
00121 self.assertEquals("two", self.callback_data.vals[1].val,
00122 errorstr%"vals[1].val")
00123
00124 self.assertEquals(2, len(self.callback_data.arrayval),
00125 errorstr%"len arrayval")
00126 self.assertEquals(2, len(self.callback_data.arrayval[0].vals),
00127 errorstr%"len arrayval[0].vals")
00128 self.assertEquals("av1", self.callback_data.arrayval[0].vals[0].val,
00129 errorstr%"arrayval[0].vals[0].val")
00130 self.assertEquals("av2", self.callback_data.arrayval[0].vals[1].val,
00131 errorstr%"arrayval[0].vals[1].val")
00132 self.assertEquals(0, len(self.callback_data.arrayval[1].vals),
00133 errorstr%"len arrayval[1].vals")
00134
00135
00136 if __name__ == '__main__':
00137 rospy.init_node(NAME)
00138 rostest.run(PKG, NAME, TestEmbedMsg, sys.argv)