Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 PKG = 'actionlib'
00032 import rospy
00033
00034 import unittest
00035 import threading
00036
00037 from actionlib import goal_id_generator, status_tracker
00038 import actionlib_msgs.msg
00039
00040
00041
00042 class TestGoalIDGenerator(unittest.TestCase):
00043
00044 def test_generator(self):
00045 gen1 = goal_id_generator.GoalIDGenerator()
00046
00047 id1 = gen1.generate_ID()
00048 id2 = gen1.generate_ID()
00049 id3 = gen1.generate_ID()
00050 nn1, s1, ts1 = id1.id.split('-')
00051 nn2, s2, ts2 = id2.id.split('-')
00052 nn3, s3, ts3 = id3.id.split('-')
00053
00054 self.assertEquals(nn1, "/test_goal_id_generator", "node names are different")
00055 self.assertEquals(nn1, nn2, "node names are different")
00056 self.assertEquals(nn1, nn3, "node names are different")
00057
00058 self.assertEquals(s1, "1", "Sequence numbers mismatch")
00059 self.assertEquals(s2, "2", "Sequence numbers mismatch")
00060 self.assertEquals(s3, "3", "Sequence numbers mismatch")
00061
00062 def test_threaded_generation(self):
00063 """
00064 This test checks that all the ids are generated unique. This test should fail if the synchronization is set incorrectly.
00065 @note this test is can succeed when the errors are present.
00066 """
00067 global ids_lists
00068 ids_lists = {}
00069
00070 def gen_ids(tID=1, num_ids=1000):
00071 gen = goal_id_generator.GoalIDGenerator()
00072 for i in range(0, num_ids):
00073 id = gen.generate_ID()
00074 ids_lists[tID].append(id)
00075
00076 num_ids = 1000
00077 num_threads = 200
00078 threads = []
00079 for tID in range(0, num_threads):
00080 ids_lists[tID] = []
00081 t = threading.Thread(None, gen_ids, None, (), {'tID': tID, 'num_ids': num_ids})
00082 threads.append(t)
00083 t.start()
00084
00085 for t in threads:
00086 t.join()
00087
00088 all_ids = {}
00089 for tID in range(0, num_threads):
00090 self.assertEquals(len(ids_lists[tID]), num_ids)
00091 for id in ids_lists[tID]:
00092 nn, s, ts = id.id.split('-')
00093 self.assertFalse(s in all_ids, "Duplicate ID found")
00094 all_ids[s] = 1
00095
00096 def test_status_tracker(self):
00097 tracker = status_tracker.StatusTracker("test-id", actionlib_msgs.msg.GoalStatus.PENDING)
00098 self.assertEquals(tracker.status.goal_id, "test-id")
00099 self.assertEquals(tracker.status.status, actionlib_msgs.msg.GoalStatus.PENDING)
00100
00101
00102 if __name__ == '__main__':
00103 import rostest
00104 rospy.init_node("test_goal_id_generator")
00105 rostest.rosrun(PKG, 'test_goal_id_generator', TestGoalIDGenerator)