00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 PKG='actionlib'
00032 import roslib; roslib.load_manifest(PKG)
00033 import rospy
00034
00035 import sys
00036 import unittest
00037 import threading
00038
00039 from actionlib import goal_id_generator,status_tracker
00040 import actionlib_msgs.msg
00041
00042
00043 class TestGoalIDGenerator(unittest.TestCase):
00044
00045 def test_generator(self):
00046 gen1 = goal_id_generator.GoalIDGenerator();
00047
00048 id1 = gen1.generate_ID()
00049 id2 = gen1.generate_ID()
00050 id3 = gen1.generate_ID()
00051 nn1,s1,ts1 = id1.id.split('-');
00052 nn2,s2,ts2 = id2.id.split('-');
00053 nn3,s3,ts3 = id3.id.split('-');
00054
00055 self.assertEquals(nn1, "/test_goal_id_generator","node names are different")
00056 self.assertEquals(nn1, nn2, "node names are different")
00057 self.assertEquals(nn1, nn3, "node names are different")
00058
00059 self.assertEquals(s1, "1", "Sequence numbers mismatch")
00060 self.assertEquals(s2, "2", "Sequence numbers mismatch")
00061 self.assertEquals(s3, "3", "Sequence numbers mismatch")
00062
00063
00064 def test_threaded_generation(self):
00065 """
00066 This test checks that all the ids are generated unique. This test should fail if the synchronization is set incorrectly.
00067 @note this test is can succeed when the errors are present.
00068 """
00069 global ids_lists
00070 ids_lists={};
00071 def gen_ids(tID=1,num_ids=1000):
00072 gen = goal_id_generator.GoalIDGenerator();
00073 for i in range(0,num_ids):
00074 id=gen.generate_ID();
00075 ids_lists[tID].append(id);
00076
00077 num_ids=1000;
00078 num_threads=200;
00079 threads=[];
00080 for tID in range(0,num_threads):
00081 ids_lists[tID]=[];
00082 t=threading.Thread(None,gen_ids,None,(),{'tID':tID,'num_ids':num_ids});
00083 threads.append(t);
00084 t.start();
00085
00086 for t in threads:
00087 t.join();
00088
00089 all_ids={};
00090 for tID in range(0,num_threads):
00091 self.assertEquals(len(ids_lists[tID]),num_ids)
00092 for id in ids_lists[tID]:
00093 nn,s,ts = id.id.split('-');
00094 self.assertFalse(s in all_ids,"Duplicate ID found");
00095 all_ids[s]=1;
00096
00097 def test_status_tracker(self):
00098 tracker=status_tracker.StatusTracker("test-id",actionlib_msgs.msg.GoalStatus.PENDING);
00099 self.assertEquals(tracker.status.goal_id,"test-id");
00100 self.assertEquals(tracker.status.status,actionlib_msgs.msg.GoalStatus.PENDING);
00101
00102
00103
00104 if __name__ == '__main__':
00105 import rostest
00106 rospy.init_node("test_goal_id_generator")
00107 rostest.rosrun(PKG, 'test_goal_id_generator', TestGoalIDGenerator)
00108