00001
00002
00003 import sys
00004 import rospy
00005 import unittest
00006 from nav_msgs.msg import *
00007 from nav_msgs.srv import *
00008 from map_store.srv import *
00009
00010
00011
00012
00013
00014 class TestMapStore(unittest.TestCase):
00015
00016 def compare_maps(self, map_a, map_b):
00017 return str(map_a) == str(map_b)
00018
00019 def create_map(self, d):
00020 test_map = OccupancyGrid()
00021 test_map.info.resolution = 1.0 + d
00022 test_map.info.width = 10
00023 test_map.info.height = 10
00024 test_map.info.origin.position.x = 0.0 + d
00025 test_map.info.origin.position.y = 1.0 + d
00026 test_map.info.origin.position.z = 2.0 + d
00027 test_map.info.origin.orientation.x = 3.0 + d
00028 test_map.info.origin.orientation.y = 4.0 + d
00029 test_map.info.origin.orientation.z = 5.0 + d
00030 test_map.info.origin.orientation.w = 6.0 + d
00031 test_map.data = []
00032 for i in range(0, 100):
00033 test_map.data.append(i)
00034 return test_map
00035
00036 def on_map(self, msg):
00037 if (self.wait_for_map and self.check_map != None):
00038
00039 self.wait_for_map = False
00040
00041
00042
00043 def test_annotations_store(self):
00044 rospy.init_node('test_annotations_store')
00045
00046
00047 test_map_1 = self.create_map(00.0)
00048 test_map_2 = self.create_map(10.0)
00049 test_map_2.data.reverse()
00050
00051 self.wait_for_map = False
00052 self.check_map = None
00053
00054
00055 map_pub = rospy.Publisher('/map', OccupancyGrid, queue_size=5)
00056 test_map_sub = rospy.Subscriber('/test_map', OccupancyGrid, self.on_map)
00057 dynamic_map = rospy.Service('dynamic_map', GetMap, lambda x: GetMapResponse(map=test_map_2))
00058 print "Wait for /list_maps"
00059 rospy.wait_for_service("/list_maps")
00060 list_maps = rospy.ServiceProxy('/list_maps', ListMaps)
00061 print "Wait for /name_latest_map"
00062 rospy.wait_for_service("/save_map")
00063 name_latest_map = rospy.ServiceProxy('/save_map', SaveMap)
00064 print "Wait for /delete_map"
00065 rospy.wait_for_service("/delete_map")
00066 delete_map = rospy.ServiceProxy('/delete_map', DeleteMap)
00067 print "Wait for /rename_map"
00068 rospy.wait_for_service("/rename_map")
00069 rename_map = rospy.ServiceProxy('/rename_map', RenameMap)
00070 print "Wait for /publish_map"
00071 rospy.wait_for_service("/publish_map")
00072 publish_map = rospy.ServiceProxy('/publish_map', PublishMap)
00073 print "Wait for /test_dynamic_map"
00074 rospy.wait_for_service("/test_dynamic_map")
00075 test_dynamic_map = rospy.ServiceProxy('/test_dynamic_map', GetMap)
00076
00077
00078 print "Wait 1 second for everything to start up"
00079 rospy.sleep(1.0)
00080
00081
00082 initial_map_list = []
00083 for m in list_maps().map_list:
00084 initial_map_list.append(m.map_id)
00085 print "Initial maps:", initial_map_list
00086
00087
00088 print "Sending map 1"
00089 map_pub.publish(test_map_1)
00090 rospy.sleep(5.0)
00091
00092
00093 map_id_1 = None
00094 for i in list_maps().map_list:
00095 if not i.map_id in initial_map_list:
00096 self.assertEquals(map_id_1, None, "Two or more maps from /map topic")
00097 map_id_1 = i.map_id
00098 print "First map is:", map_id_1
00099 self.assertNotEquals(map_id_1, None, "Map was not loaded from the /map topic")
00100
00101
00102 saved_map_name = "test_map_srv"
00103 name_latest_map(map_name=saved_map_name)
00104 rospy.sleep(5.0)
00105
00106
00107 map_id_2 = None
00108 for i in list_maps().map_list:
00109 if not i.map_id in initial_map_list and i.map_id != map_id_1:
00110 self.assertEquals(map_id_2, None, "Two or more maps from dynamic_map")
00111 self.assertEquals(i.name, saved_map_name, \
00112 "Saved map has the wrong name: %s instead of %s"%(i.name, saved_map_name))
00113 map_id_2 = i.map_id
00114 print "Second map is:", map_id_2
00115 self.assertNotEquals(map_id_2, None, "Map was not loaded from the dynamic_map")
00116
00117
00118 print "Renaming first map"
00119 topic_map_name = "test_map_msg"
00120 rename_map(map_id=map_id_1, new_name=topic_map_name)
00121 rospy.sleep(1.0)
00122
00123
00124 for i in list_maps().map_list:
00125 if i.map_id == map_id_1:
00126 self.assertEquals(i.name, topic_map_name, \
00127 "Saved map has the wrong name: %s instead of %s"%(i.name, topic_map_name))
00128
00129
00130
00131 print "Displaying first map"
00132 self.wait_for_map = True
00133 self.check_map = test_map_1
00134 publish_map(map_id_1)
00135 while (self.wait_for_map):
00136 rospy.sleep(1.0)
00137 self.assertEquals(self.compare_maps(test_dynamic_map().map, test_map_1), True, "Test map 1 is bad")
00138
00139 print "Displaying second map"
00140 self.wait_for_map = True
00141 self.check_map = test_map_2
00142 publish_map(map_id_2)
00143 while (self.wait_for_map):
00144 rospy.sleep(1.0)
00145 self.assertEquals(self.compare_maps(test_dynamic_map().map, test_map_2), True, "Test map 2 is bad")
00146
00147
00148 print "Deleting both maps"
00149 delete_map(map_id_1)
00150 delete_map(map_id_2)
00151 rospy.sleep(1.0)
00152
00153
00154 print "Ensuring that the maps are gone"
00155 for i in list_maps().map_list:
00156 self.assertNotEquals(i.map_id, map_id_1, "The /map topic map could not be deleted")
00157 self.assertNotEquals(i.map_id, map_id_2, "The dynamic_map map could not be deleted")
00158
00159
00160
00161 rospy.sleep(1.0)
00162 print "Finished"
00163
00164
00165
00166
00167 if __name__ == '__main__':
00168 import rostest
00169 rostest.rosrun(PKG, 'test_annotations_store', TestMapStore)