37 from __future__
import print_function
40 NAME =
'test_embed_msg' 49 from test_rospy.msg
import EmbedTest, Val, ArrayVal
52 LPNODE =
'listenerpublisher' 53 LPTOPIC =
'listenerpublisher' 68 self.assert_(self.
callback_data is None,
"invalid test fixture")
71 timeout_t = time.time() + 5.0
72 while not rostest.is_subscriber(
73 rospy.resolve_name(PUBTOPIC),
74 rospy.resolve_name(LPNODE))
and time.time() < timeout_t:
77 self.assert_(rostest.is_subscriber(
78 rospy.resolve_name(PUBTOPIC),
79 rospy.resolve_name(LPNODE)),
"%s is not up"%LPNODE)
81 print(
"Publishing to ", PUBTOPIC)
82 pub = rospy.Publisher(PUBTOPIC, MSG)
87 val = random.randint(0, 109812312)
89 for i
in range(0, 10):
95 MSG(String(msg), Int32(val),
96 [Int32(val+1), Int32(val+2), Int32(val+3)],
98 [Val(msg), Val(
"two")],
99 [ArrayVal([Val(
"av1"), Val(
"av2")]),
107 self.assert_(self.
callback_data is not None,
"no callback data from listenerpublisher")
108 print(
"Got ", self.callback_data.str1.data, self.callback_data.int1.data)
109 errorstr =
"callback msg field [%s] from listenerpublisher does not match" 110 self.assertEquals(msg, self.callback_data.str1.data,
111 errorstr%
"str1.data")
112 self.assertEquals(val, self.callback_data.int1.data,
113 errorstr%
"int1.data")
114 for i
in range(1, 4):
115 self.assertEquals(val+i, self.callback_data.ints[i-1].data,
116 errorstr%
"ints[i-1].data")
117 self.assertEquals(msg+msg, self.callback_data.val.val,
119 self.assertEquals(msg, self.callback_data.vals[0].val,
120 errorstr%
"vals[0].val")
121 self.assertEquals(
"two", self.callback_data.vals[1].val,
122 errorstr%
"vals[1].val")
124 self.assertEquals(2, len(self.callback_data.arrayval),
125 errorstr%
"len arrayval")
126 self.assertEquals(2, len(self.callback_data.arrayval[0].vals),
127 errorstr%
"len arrayval[0].vals")
128 self.assertEquals(
"av1", self.callback_data.arrayval[0].vals[0].val,
129 errorstr%
"arrayval[0].vals[0].val")
130 self.assertEquals(
"av2", self.callback_data.arrayval[0].vals[1].val,
131 errorstr%
"arrayval[0].vals[1].val")
132 self.assertEquals(0, len(self.callback_data.arrayval[1].vals),
133 errorstr%
"len arrayval[1].vals")
136 if __name__ ==
'__main__':
137 rospy.init_node(NAME)
138 rostest.run(PKG, NAME, TestEmbedMsg, sys.argv)
def _test_embed_msg_callback(self, data)