test_server_components.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Copyright (c) 2009, Willow Garage, Inc.
00003 # All rights reserved.
00004 # 
00005 # Redistribution and use in source and binary forms, with or without
00006 # modification, are permitted provided that the following conditions are met:
00007 # 
00008 #     * Redistributions of source code must retain the above copyright
00009 #       notice, this list of conditions and the following disclaimer.
00010 #     * Redistributions in binary form must reproduce the above copyright
00011 #       notice, this list of conditions and the following disclaimer in the
00012 #       documentation and/or other materials provided with the distribution.
00013 #     * Neither the name of the Willow Garage, Inc. nor the names of its
00014 #       contributors may be used to endorse or promote products derived from
00015 #       this software without specific prior written permission.
00016 # 
00017 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027 # POSSIBILITY OF SUCH DAMAGE.
00028 
00029 # Author: Alexander Sorokin. 
00030 
00031 PKG='actionlib'
00032 import rospy
00033 
00034 import sys
00035 import unittest
00036 import threading
00037 
00038 from actionlib import goal_id_generator,status_tracker
00039 import actionlib_msgs.msg 
00040 
00041 ## A sample python unit test
00042 class TestGoalIDGenerator(unittest.TestCase):
00043 
00044     def test_generator(self):
00045         gen1 = goal_id_generator.GoalIDGenerator();
00046         
00047         id1 = gen1.generate_ID()
00048         id2 = gen1.generate_ID()
00049         id3 = gen1.generate_ID()
00050         nn1,s1,ts1 = id1.id.split('-');
00051         nn2,s2,ts2 = id2.id.split('-');
00052         nn3,s3,ts3 = id3.id.split('-');
00053 
00054         self.assertEquals(nn1, "/test_goal_id_generator","node names are different")
00055         self.assertEquals(nn1, nn2, "node names are different")
00056         self.assertEquals(nn1, nn3, "node names are different")
00057 
00058         self.assertEquals(s1, "1", "Sequence numbers mismatch")
00059         self.assertEquals(s2, "2", "Sequence numbers mismatch")
00060         self.assertEquals(s3, "3", "Sequence numbers mismatch")
00061 
00062 
00063     def test_threaded_generation(self):
00064         """
00065         This test checks that all the ids are generated unique. This test should fail if the synchronization is set incorrectly. 
00066         @note this test is can succeed when the errors are present.
00067         """
00068         global ids_lists
00069         ids_lists={};
00070         def gen_ids(tID=1,num_ids=1000):
00071             gen = goal_id_generator.GoalIDGenerator();
00072             for i in range(0,num_ids):
00073                 id=gen.generate_ID();
00074                 ids_lists[tID].append(id);
00075 
00076         num_ids=1000;
00077         num_threads=200;
00078         threads=[];
00079         for tID in range(0,num_threads):
00080             ids_lists[tID]=[];
00081             t=threading.Thread(None,gen_ids,None,(),{'tID':tID,'num_ids':num_ids});
00082             threads.append(t);
00083             t.start();
00084         
00085         for t in threads:
00086             t.join();
00087 
00088         all_ids={};
00089         for tID in range(0,num_threads):
00090             self.assertEquals(len(ids_lists[tID]),num_ids)
00091             for id in ids_lists[tID]:
00092                 nn,s,ts = id.id.split('-');
00093                 self.assertFalse(s in all_ids,"Duplicate ID found");
00094                 all_ids[s]=1;
00095 
00096     def test_status_tracker(self):
00097         tracker=status_tracker.StatusTracker("test-id",actionlib_msgs.msg.GoalStatus.PENDING);
00098         self.assertEquals(tracker.status.goal_id,"test-id");
00099         self.assertEquals(tracker.status.status,actionlib_msgs.msg.GoalStatus.PENDING);
00100 
00101         
00102 
00103 if __name__ == '__main__':
00104     import rostest
00105     rospy.init_node("test_goal_id_generator")
00106     rostest.rosrun(PKG, 'test_goal_id_generator', TestGoalIDGenerator)
00107 


actionlib
Author(s): Eitan Marder-Eppstein, Vijay Pradeep
autogenerated on Thu Jan 2 2014 11:03:49