test_world_canvas_server.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import sys
00004 import rospy
00005 import unittest
00006 from nav_msgs.msg import *
00007 from nav_msgs.srv import *
00008 from map_store.srv import *
00009  
00010 ## Tests map store my reading a map from the map topic,
00011 ## reading a map from the dynamic_map service, renaming
00012 ## both maps, republishing both maps, and deleting both
00013 ## maps.
00014 class TestMapStore(unittest.TestCase):
00015     ## test 1 == 1
00016     def compare_maps(self, map_a, map_b):
00017         return str(map_a) == str(map_b)
00018 
00019     def create_map(self, d):
00020         test_map = OccupancyGrid()
00021         test_map.info.resolution = 1.0 + d
00022         test_map.info.width = 10
00023         test_map.info.height = 10
00024         test_map.info.origin.position.x = 0.0 + d
00025         test_map.info.origin.position.y = 1.0 + d
00026         test_map.info.origin.position.z = 2.0 + d
00027         test_map.info.origin.orientation.x = 3.0 + d
00028         test_map.info.origin.orientation.y = 4.0 + d
00029         test_map.info.origin.orientation.z = 5.0 + d
00030         test_map.info.origin.orientation.w = 6.0 + d
00031         test_map.data = []
00032         for i in range(0, 100):
00033             test_map.data.append(i)
00034         return test_map
00035 
00036     def on_map(self, msg):
00037         if (self.wait_for_map and self.check_map != None):
00038             #self.assertEquals(self.compare_maps(self.check_map, msg), True, "Invalid map return")
00039             self.wait_for_map = False
00040         
00041         
00042 
00043     def test_annotations_store(self):
00044         rospy.init_node('test_annotations_store')
00045 
00046         #Create the two test maps
00047         test_map_1 = self.create_map(00.0)
00048         test_map_2 = self.create_map(10.0)
00049         test_map_2.data.reverse()
00050 
00051         self.wait_for_map = False
00052         self.check_map = None
00053         
00054         #Create services, publishers, and subscribers
00055         map_pub = rospy.Publisher('/map', OccupancyGrid, queue_size=5)
00056         test_map_sub = rospy.Subscriber('/test_map', OccupancyGrid, self.on_map)
00057         dynamic_map = rospy.Service('dynamic_map', GetMap, lambda x: GetMapResponse(map=test_map_2))
00058         print "Wait for /list_maps"
00059         rospy.wait_for_service("/list_maps")
00060         list_maps = rospy.ServiceProxy('/list_maps', ListMaps)
00061         print "Wait for /name_latest_map"
00062         rospy.wait_for_service("/save_map")
00063         name_latest_map = rospy.ServiceProxy('/save_map', SaveMap)
00064         print "Wait for /delete_map"
00065         rospy.wait_for_service("/delete_map")
00066         delete_map = rospy.ServiceProxy('/delete_map', DeleteMap)
00067         print "Wait for /rename_map"
00068         rospy.wait_for_service("/rename_map")
00069         rename_map = rospy.ServiceProxy('/rename_map', RenameMap)
00070         print "Wait for /publish_map"
00071         rospy.wait_for_service("/publish_map")
00072         publish_map = rospy.ServiceProxy('/publish_map', PublishMap)
00073         print "Wait for /test_dynamic_map"
00074         rospy.wait_for_service("/test_dynamic_map")
00075         test_dynamic_map = rospy.ServiceProxy('/test_dynamic_map', GetMap)
00076 
00077 
00078         print "Wait 1 second for everything to start up"
00079         rospy.sleep(1.0)
00080 
00081         #Get the initial list of all maps
00082         initial_map_list = []
00083         for m in list_maps().map_list:
00084             initial_map_list.append(m.map_id)
00085         print "Initial maps:", initial_map_list
00086 
00087         #Write out map 1
00088         print "Sending map 1"
00089         map_pub.publish(test_map_1)
00090         rospy.sleep(5.0)
00091 
00092         #Get the first map
00093         map_id_1 = None
00094         for i in list_maps().map_list:
00095             if not i.map_id in initial_map_list:
00096                 self.assertEquals(map_id_1, None, "Two or more maps from /map topic")
00097                 map_id_1 = i.map_id
00098         print "First map is:", map_id_1
00099         self.assertNotEquals(map_id_1, None, "Map was not loaded from the /map topic")
00100         
00101         #Save the second map
00102         saved_map_name = "test_map_srv"
00103         name_latest_map(map_name=saved_map_name)
00104         rospy.sleep(5.0)
00105 
00106         #Get the second map
00107         map_id_2 = None
00108         for i in list_maps().map_list:
00109             if not i.map_id in initial_map_list and i.map_id != map_id_1:
00110                 self.assertEquals(map_id_2, None, "Two or more maps from dynamic_map")
00111                 self.assertEquals(i.name, saved_map_name, \
00112                                       "Saved map has the wrong name: %s instead of %s"%(i.name, saved_map_name))
00113                 map_id_2 = i.map_id
00114         print "Second map is:", map_id_2
00115         self.assertNotEquals(map_id_2, None, "Map was not loaded from the dynamic_map")
00116         
00117         #Re-name the first map
00118         print "Renaming first map"
00119         topic_map_name = "test_map_msg"
00120         rename_map(map_id=map_id_1, new_name=topic_map_name)
00121         rospy.sleep(1.0)
00122 
00123         #Ensure the renaming happened
00124         for i in list_maps().map_list:
00125             if i.map_id == map_id_1:
00126                 self.assertEquals(i.name, topic_map_name, \
00127                                       "Saved map has the wrong name: %s instead of %s"%(i.name, topic_map_name))
00128         
00129         
00130         #Display both maps
00131         print "Displaying first map"
00132         self.wait_for_map = True
00133         self.check_map = test_map_1
00134         publish_map(map_id_1)
00135         while (self.wait_for_map):
00136             rospy.sleep(1.0)
00137         self.assertEquals(self.compare_maps(test_dynamic_map().map, test_map_1), True, "Test map 1 is bad")
00138         
00139         print "Displaying second map"
00140         self.wait_for_map = True
00141         self.check_map = test_map_2
00142         publish_map(map_id_2)
00143         while (self.wait_for_map):
00144             rospy.sleep(1.0)
00145         self.assertEquals(self.compare_maps(test_dynamic_map().map, test_map_2), True, "Test map 2 is bad")
00146         
00147         #Delete both maps
00148         print "Deleting both maps"
00149         delete_map(map_id_1)
00150         delete_map(map_id_2)
00151         rospy.sleep(1.0)
00152         
00153         #Check that they are gone
00154         print "Ensuring that the maps are gone"
00155         for i in list_maps().map_list:
00156             self.assertNotEquals(i.map_id, map_id_1, "The /map topic map could not be deleted")
00157             self.assertNotEquals(i.map_id, map_id_2, "The dynamic_map map could not be deleted")
00158 
00159         
00160         #rospy.spin()
00161         rospy.sleep(1.0)
00162         print "Finished"
00163 
00164 
00165 
00166 
00167 if __name__ == '__main__':
00168     import rostest
00169     rostest.rosrun(PKG, 'test_annotations_store', TestMapStore)


world_canvas_server
Author(s): Jorge Santos Simón
autogenerated on Thu Jun 6 2019 21:25:07