$search
00001 #!/usr/bin/env python 00002 # Copyright (c) 2009, Willow Garage, Inc. 00003 # All rights reserved. 00004 # 00005 # Redistribution and use in source and binary forms, with or without 00006 # modification, are permitted provided that the following conditions are met: 00007 # 00008 # * Redistributions of source code must retain the above copyright 00009 # notice, this list of conditions and the following disclaimer. 00010 # * Redistributions in binary form must reproduce the above copyright 00011 # notice, this list of conditions and the following disclaimer in the 00012 # documentation and/or other materials provided with the distribution. 00013 # * Neither the name of the Willow Garage, Inc. nor the names of its 00014 # contributors may be used to endorse or promote products derived from 00015 # this software without specific prior written permission. 00016 # 00017 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 # POSSIBILITY OF SUCH DAMAGE. 00028 00029 # Author: Alexander Sorokin. 00030 00031 PKG='actionlib' 00032 import roslib; roslib.load_manifest(PKG) 00033 import rospy 00034 00035 import sys 00036 import unittest 00037 import threading 00038 00039 from actionlib import goal_id_generator,status_tracker 00040 import actionlib_msgs.msg 00041 00042 ## A sample python unit test 00043 class TestGoalIDGenerator(unittest.TestCase): 00044 00045 def test_generator(self): 00046 gen1 = goal_id_generator.GoalIDGenerator(); 00047 00048 id1 = gen1.generate_ID() 00049 id2 = gen1.generate_ID() 00050 id3 = gen1.generate_ID() 00051 nn1,s1,ts1 = id1.id.split('-'); 00052 nn2,s2,ts2 = id2.id.split('-'); 00053 nn3,s3,ts3 = id3.id.split('-'); 00054 00055 self.assertEquals(nn1, "/test_goal_id_generator","node names are different") 00056 self.assertEquals(nn1, nn2, "node names are different") 00057 self.assertEquals(nn1, nn3, "node names are different") 00058 00059 self.assertEquals(s1, "1", "Sequence numbers mismatch") 00060 self.assertEquals(s2, "2", "Sequence numbers mismatch") 00061 self.assertEquals(s3, "3", "Sequence numbers mismatch") 00062 00063 00064 def test_threaded_generation(self): 00065 """ 00066 This test checks that all the ids are generated unique. This test should fail if the synchronization is set incorrectly. 00067 @note this test is can succeed when the errors are present. 00068 """ 00069 global ids_lists 00070 ids_lists={}; 00071 def gen_ids(tID=1,num_ids=1000): 00072 gen = goal_id_generator.GoalIDGenerator(); 00073 for i in range(0,num_ids): 00074 id=gen.generate_ID(); 00075 ids_lists[tID].append(id); 00076 00077 num_ids=1000; 00078 num_threads=200; 00079 threads=[]; 00080 for tID in range(0,num_threads): 00081 ids_lists[tID]=[]; 00082 t=threading.Thread(None,gen_ids,None,(),{'tID':tID,'num_ids':num_ids}); 00083 threads.append(t); 00084 t.start(); 00085 00086 for t in threads: 00087 t.join(); 00088 00089 all_ids={}; 00090 for tID in range(0,num_threads): 00091 self.assertEquals(len(ids_lists[tID]),num_ids) 00092 for id in ids_lists[tID]: 00093 nn,s,ts = id.id.split('-'); 00094 self.assertFalse(s in all_ids,"Duplicate ID found"); 00095 all_ids[s]=1; 00096 00097 def test_status_tracker(self): 00098 tracker=status_tracker.StatusTracker("test-id",actionlib_msgs.msg.GoalStatus.PENDING); 00099 self.assertEquals(tracker.status.goal_id,"test-id"); 00100 self.assertEquals(tracker.status.status,actionlib_msgs.msg.GoalStatus.PENDING); 00101 00102 00103 00104 if __name__ == '__main__': 00105 import rostest 00106 rospy.init_node("test_goal_id_generator") 00107 rostest.rosrun(PKG, 'test_goal_id_generator', TestGoalIDGenerator) 00108