37 from __future__
import print_function
40 NAME =
'test_embed_msg'
49 from test_rospy.msg
import EmbedTest, Val, ArrayVal
52 LPNODE =
'listenerpublisher'
53 LPTOPIC =
'listenerpublisher'
68 self.assertTrue(self.
callback_data is None,
"invalid test fixture")
71 timeout_t = time.time() + 5.0
72 while not rostest.is_subscriber(
73 rospy.resolve_name(PUBTOPIC),
74 rospy.resolve_name(LPNODE))
and time.time() < timeout_t:
77 self.assertTrue(rostest.is_subscriber(
78 rospy.resolve_name(PUBTOPIC),
79 rospy.resolve_name(LPNODE)),
"%s is not up"%LPNODE)
81 print(
"Publishing to ", PUBTOPIC)
82 pub = rospy.Publisher(PUBTOPIC, MSG, queue_size=0)
87 val = random.randint(0, 109812312)
89 for i
in range(0, 10):
95 MSG(String(msg), Int32(val),
96 [Int32(val+1), Int32(val+2), Int32(val+3)],
98 [Val(msg), Val(
"two")],
99 [ArrayVal([Val(
"av1"), Val(
"av2")]),
107 self.assertTrue(self.
callback_data is not None,
"no callback data from listenerpublisher")
109 errorstr =
"callback msg field [%s] from listenerpublisher does not match"
111 errorstr%
"str1.data")
113 errorstr%
"int1.data")
114 for i
in range(1, 4):
116 errorstr%
"ints[i-1].data")
120 errorstr%
"vals[0].val")
122 errorstr%
"vals[1].val")
125 errorstr%
"len arrayval")
126 self.assertEqual(2, len(self.
callback_data.arrayval[0].vals),
127 errorstr%
"len arrayval[0].vals")
128 self.assertEqual(
"av1", self.
callback_data.arrayval[0].vals[0].val,
129 errorstr%
"arrayval[0].vals[0].val")
130 self.assertEqual(
"av2", self.
callback_data.arrayval[0].vals[1].val,
131 errorstr%
"arrayval[0].vals[1].val")
132 self.assertEqual(0, len(self.
callback_data.arrayval[1].vals),
133 errorstr%
"len arrayval[1].vals")
136 if __name__ ==
'__main__':
137 rospy.init_node(NAME)
138 rostest.run(PKG, NAME, TestEmbedMsg, sys.argv)