$search
00001 #!/usr/bin/env python 00002 00003 PKG = 'map_store' 00004 import roslib; roslib.load_manifest(PKG) 00005 import sys 00006 import rospy 00007 import unittest 00008 from nav_msgs.msg import * 00009 from nav_msgs.srv import * 00010 from map_store.srv import * 00011 00012 ## Tests map store my reading a map from the map topic, 00013 ## reading a map from the dynamic_map service, renaming 00014 ## both maps, republishing both maps, and deleting both 00015 ## maps. 00016 class TestMapStore(unittest.TestCase): 00017 ## test 1 == 1 00018 def compare_maps(self, map_a, map_b): 00019 return str(map_a) == str(map_b) 00020 00021 def create_map(self, d): 00022 test_map = OccupancyGrid() 00023 test_map.info.resolution = 1.0 + d 00024 test_map.info.width = 10 00025 test_map.info.height = 10 00026 test_map.info.origin.position.x = 0.0 + d 00027 test_map.info.origin.position.y = 1.0 + d 00028 test_map.info.origin.position.z = 2.0 + d 00029 test_map.info.origin.orientation.x = 3.0 + d 00030 test_map.info.origin.orientation.y = 4.0 + d 00031 test_map.info.origin.orientation.z = 5.0 + d 00032 test_map.info.origin.orientation.w = 6.0 + d 00033 test_map.data = [] 00034 for i in range(0, 100): 00035 test_map.data.append(i) 00036 return test_map 00037 00038 def on_map(self, msg): 00039 if (self.wait_for_map and self.check_map != None): 00040 #self.assertEquals(self.compare_maps(self.check_map, msg), True, "Invalid map return") 00041 self.wait_for_map = False 00042 00043 00044 00045 def test_map_store(self): 00046 rospy.init_node('test_map_store') 00047 00048 #Create the two test maps 00049 test_map_1 = self.create_map(00.0) 00050 test_map_2 = self.create_map(10.0) 00051 test_map_2.data.reverse() 00052 00053 self.wait_for_map = False 00054 self.check_map = None 00055 00056 #Create services, publishers, and subscribers 00057 map_pub = rospy.Publisher('/map', OccupancyGrid) 00058 test_map_sub = rospy.Subscriber('/test_map', OccupancyGrid, self.on_map) 00059 dynamic_map = rospy.Service('dynamic_map', GetMap, lambda x: GetMapResponse(map=test_map_2)) 00060 print "Wait for /list_maps" 00061 rospy.wait_for_service("/list_maps") 00062 list_maps = rospy.ServiceProxy('/list_maps', ListMaps) 00063 print "Wait for /name_latest_map" 00064 rospy.wait_for_service("/save_map") 00065 name_latest_map = rospy.ServiceProxy('/save_map', SaveMap) 00066 print "Wait for /delete_map" 00067 rospy.wait_for_service("/delete_map") 00068 delete_map = rospy.ServiceProxy('/delete_map', DeleteMap) 00069 print "Wait for /rename_map" 00070 rospy.wait_for_service("/rename_map") 00071 rename_map = rospy.ServiceProxy('/rename_map', RenameMap) 00072 print "Wait for /publish_map" 00073 rospy.wait_for_service("/publish_map") 00074 publish_map = rospy.ServiceProxy('/publish_map', PublishMap) 00075 print "Wait for /test_dynamic_map" 00076 rospy.wait_for_service("/test_dynamic_map") 00077 test_dynamic_map = rospy.ServiceProxy('/test_dynamic_map', GetMap) 00078 00079 00080 print "Wait 1 second for everything to start up" 00081 rospy.sleep(1.0) 00082 00083 #Get the initial list of all maps 00084 initial_map_list = [] 00085 for m in list_maps().map_list: 00086 initial_map_list.append(m.map_id) 00087 print "Initial maps:", initial_map_list 00088 00089 #Write out map 1 00090 print "Sending map 1" 00091 map_pub.publish(test_map_1) 00092 rospy.sleep(5.0) 00093 00094 #Get the first map 00095 map_id_1 = None 00096 for i in list_maps().map_list: 00097 if not i.map_id in initial_map_list: 00098 self.assertEquals(map_id_1, None, "Two or more maps from /map topic") 00099 map_id_1 = i.map_id 00100 print "First map is:", map_id_1 00101 self.assertNotEquals(map_id_1, None, "Map was not loaded from the /map topic") 00102 00103 #Save the second map 00104 saved_map_name = "test_map_srv" 00105 name_latest_map(map_name=saved_map_name) 00106 rospy.sleep(5.0) 00107 00108 #Get the second map 00109 map_id_2 = None 00110 for i in list_maps().map_list: 00111 if not i.map_id in initial_map_list and i.map_id != map_id_1: 00112 self.assertEquals(map_id_2, None, "Two or more maps from dynamic_map") 00113 self.assertEquals(i.name, saved_map_name, \ 00114 "Saved map has the wrong name: %s instead of %s"%(i.name, saved_map_name)) 00115 map_id_2 = i.map_id 00116 print "Second map is:", map_id_2 00117 self.assertNotEquals(map_id_2, None, "Map was not loaded from the dynamic_map") 00118 00119 #Re-name the first map 00120 print "Renaming first map" 00121 topic_map_name = "test_map_msg" 00122 rename_map(map_id=map_id_1, new_name=topic_map_name) 00123 rospy.sleep(1.0) 00124 00125 #Ensure the renaming happened 00126 for i in list_maps().map_list: 00127 if i.map_id == map_id_1: 00128 self.assertEquals(i.name, topic_map_name, \ 00129 "Saved map has the wrong name: %s instead of %s"%(i.name, topic_map_name)) 00130 00131 00132 #Display both maps 00133 print "Displaying first map" 00134 self.wait_for_map = True 00135 self.check_map = test_map_1 00136 publish_map(map_id_1) 00137 while (self.wait_for_map): 00138 rospy.sleep(1.0) 00139 self.assertEquals(self.compare_maps(test_dynamic_map().map, test_map_1), True, "Test map 1 is bad") 00140 00141 print "Displaying second map" 00142 self.wait_for_map = True 00143 self.check_map = test_map_2 00144 publish_map(map_id_2) 00145 while (self.wait_for_map): 00146 rospy.sleep(1.0) 00147 self.assertEquals(self.compare_maps(test_dynamic_map().map, test_map_2), True, "Test map 2 is bad") 00148 00149 #Delete both maps 00150 print "Deleting both maps" 00151 delete_map(map_id_1) 00152 delete_map(map_id_2) 00153 rospy.sleep(1.0) 00154 00155 #Check that they are gone 00156 print "Ensuring that the maps are gone" 00157 for i in list_maps().map_list: 00158 self.assertNotEquals(i.map_id, map_id_1, "The /map topic map could not be deleted") 00159 self.assertNotEquals(i.map_id, map_id_2, "The dynamic_map map could not be deleted") 00160 00161 00162 #rospy.spin() 00163 rospy.sleep(1.0) 00164 print "Finished" 00165 00166 00167 00168 00169 if __name__ == '__main__': 00170 import rostest 00171 rostest.rosrun(PKG, 'test_map_store', TestMapStore)