37 from actionlib
import goal_id_generator, status_tracker
38 import actionlib_msgs.msg
45 gen1 = goal_id_generator.GoalIDGenerator()
47 id1 = gen1.generate_ID()
48 id2 = gen1.generate_ID()
49 id3 = gen1.generate_ID()
50 nn1, s1, ts1 = id1.id.split(
'-')
51 nn2, s2, ts2 = id2.id.split(
'-')
52 nn3, s3, ts3 = id3.id.split(
'-')
54 self.assertEquals(nn1,
"/test_goal_id_generator",
"node names are different")
55 self.assertEquals(nn1, nn2,
"node names are different")
56 self.assertEquals(nn1, nn3,
"node names are different")
58 self.assertEquals(s1,
"1",
"Sequence numbers mismatch")
59 self.assertEquals(s2,
"2",
"Sequence numbers mismatch")
60 self.assertEquals(s3,
"3",
"Sequence numbers mismatch")
64 This test checks that all the ids are generated unique. This test should fail if the synchronization is set incorrectly. 65 @note this test is can succeed when the errors are present. 70 def gen_ids(tID=1, num_ids=1000):
71 gen = goal_id_generator.GoalIDGenerator()
72 for i
in range(0, num_ids):
73 id = gen.generate_ID()
74 ids_lists[tID].append(id)
79 for tID
in range(0, num_threads):
81 t = threading.Thread(
None, gen_ids,
None, (), {
'tID': tID,
'num_ids': num_ids})
89 for tID
in range(0, num_threads):
90 self.assertEquals(len(ids_lists[tID]), num_ids)
91 for id
in ids_lists[tID]:
92 nn, s, ts = id.id.split(
'-')
93 self.assertFalse(s
in all_ids,
"Duplicate ID found")
97 tracker = status_tracker.StatusTracker(
"test-id", actionlib_msgs.msg.GoalStatus.PENDING)
98 self.assertEquals(tracker.status.goal_id,
"test-id")
99 self.assertEquals(tracker.status.status, actionlib_msgs.msg.GoalStatus.PENDING)
102 if __name__ ==
'__main__':
104 rospy.init_node(
"test_goal_id_generator")
105 rostest.rosrun(PKG,
'test_goal_id_generator', TestGoalIDGenerator)
def test_status_tracker(self)
def test_threaded_generation(self)