00001
00002
00003 PKG = 'map_store'
00004 import roslib; roslib.load_manifest(PKG)
00005 import sys
00006 import rospy
00007 import unittest
00008 from nav_msgs.msg import *
00009 from nav_msgs.srv import *
00010 from map_store.srv import *
00011
00012
00013
00014
00015
00016 class TestMapStore(unittest.TestCase):
00017
00018 def compare_maps(self, map_a, map_b):
00019 return str(map_a) == str(map_b)
00020
00021 def create_map(self, d):
00022 test_map = OccupancyGrid()
00023 test_map.info.resolution = 1.0 + d
00024 test_map.info.width = 10
00025 test_map.info.height = 10
00026 test_map.info.origin.position.x = 0.0 + d
00027 test_map.info.origin.position.y = 1.0 + d
00028 test_map.info.origin.position.z = 2.0 + d
00029 test_map.info.origin.orientation.x = 3.0 + d
00030 test_map.info.origin.orientation.y = 4.0 + d
00031 test_map.info.origin.orientation.z = 5.0 + d
00032 test_map.info.origin.orientation.w = 6.0 + d
00033 test_map.data = []
00034 for i in range(0, 100):
00035 test_map.data.append(i)
00036 return test_map
00037
00038 def on_map(self, msg):
00039 if (self.wait_for_map and self.check_map != None):
00040
00041 self.wait_for_map = False
00042
00043
00044
00045 def test_map_store(self):
00046 rospy.init_node('test_map_store')
00047
00048
00049 test_map_1 = self.create_map(00.0)
00050 test_map_2 = self.create_map(10.0)
00051 test_map_2.data.reverse()
00052
00053 self.wait_for_map = False
00054 self.check_map = None
00055
00056
00057 map_pub = rospy.Publisher('/map', OccupancyGrid)
00058 test_map_sub = rospy.Subscriber('/test_map', OccupancyGrid, self.on_map)
00059 dynamic_map = rospy.Service('dynamic_map', GetMap, lambda x: GetMapResponse(map=test_map_2))
00060 print "Wait for /list_maps"
00061 rospy.wait_for_service("/list_maps")
00062 list_maps = rospy.ServiceProxy('/list_maps', ListMaps)
00063 print "Wait for /name_latest_map"
00064 rospy.wait_for_service("/save_map")
00065 name_latest_map = rospy.ServiceProxy('/save_map', SaveMap)
00066 print "Wait for /delete_map"
00067 rospy.wait_for_service("/delete_map")
00068 delete_map = rospy.ServiceProxy('/delete_map', DeleteMap)
00069 print "Wait for /rename_map"
00070 rospy.wait_for_service("/rename_map")
00071 rename_map = rospy.ServiceProxy('/rename_map', RenameMap)
00072 print "Wait for /publish_map"
00073 rospy.wait_for_service("/publish_map")
00074 publish_map = rospy.ServiceProxy('/publish_map', PublishMap)
00075 print "Wait for /test_dynamic_map"
00076 rospy.wait_for_service("/test_dynamic_map")
00077 test_dynamic_map = rospy.ServiceProxy('/test_dynamic_map', GetMap)
00078
00079
00080 print "Wait 1 second for everything to start up"
00081 rospy.sleep(1.0)
00082
00083
00084 initial_map_list = []
00085 for m in list_maps().map_list:
00086 initial_map_list.append(m.map_id)
00087 print "Initial maps:", initial_map_list
00088
00089
00090 print "Sending map 1"
00091 map_pub.publish(test_map_1)
00092 rospy.sleep(5.0)
00093
00094
00095 map_id_1 = None
00096 for i in list_maps().map_list:
00097 if not i.map_id in initial_map_list:
00098 self.assertEquals(map_id_1, None, "Two or more maps from /map topic")
00099 map_id_1 = i.map_id
00100 print "First map is:", map_id_1
00101 self.assertNotEquals(map_id_1, None, "Map was not loaded from the /map topic")
00102
00103
00104 saved_map_name = "test_map_srv"
00105 name_latest_map(map_name=saved_map_name)
00106 rospy.sleep(5.0)
00107
00108
00109 map_id_2 = None
00110 for i in list_maps().map_list:
00111 if not i.map_id in initial_map_list and i.map_id != map_id_1:
00112 self.assertEquals(map_id_2, None, "Two or more maps from dynamic_map")
00113 self.assertEquals(i.name, saved_map_name, \
00114 "Saved map has the wrong name: %s instead of %s"%(i.name, saved_map_name))
00115 map_id_2 = i.map_id
00116 print "Second map is:", map_id_2
00117 self.assertNotEquals(map_id_2, None, "Map was not loaded from the dynamic_map")
00118
00119
00120 print "Renaming first map"
00121 topic_map_name = "test_map_msg"
00122 rename_map(map_id=map_id_1, new_name=topic_map_name)
00123 rospy.sleep(1.0)
00124
00125
00126 for i in list_maps().map_list:
00127 if i.map_id == map_id_1:
00128 self.assertEquals(i.name, topic_map_name, \
00129 "Saved map has the wrong name: %s instead of %s"%(i.name, topic_map_name))
00130
00131
00132
00133 print "Displaying first map"
00134 self.wait_for_map = True
00135 self.check_map = test_map_1
00136 publish_map(map_id_1)
00137 while (self.wait_for_map):
00138 rospy.sleep(1.0)
00139 self.assertEquals(self.compare_maps(test_dynamic_map().map, test_map_1), True, "Test map 1 is bad")
00140
00141 print "Displaying second map"
00142 self.wait_for_map = True
00143 self.check_map = test_map_2
00144 publish_map(map_id_2)
00145 while (self.wait_for_map):
00146 rospy.sleep(1.0)
00147 self.assertEquals(self.compare_maps(test_dynamic_map().map, test_map_2), True, "Test map 2 is bad")
00148
00149
00150 print "Deleting both maps"
00151 delete_map(map_id_1)
00152 delete_map(map_id_2)
00153 rospy.sleep(1.0)
00154
00155
00156 print "Ensuring that the maps are gone"
00157 for i in list_maps().map_list:
00158 self.assertNotEquals(i.map_id, map_id_1, "The /map topic map could not be deleted")
00159 self.assertNotEquals(i.map_id, map_id_2, "The dynamic_map map could not be deleted")
00160
00161
00162
00163 rospy.sleep(1.0)
00164 print "Finished"
00165
00166
00167
00168
00169 if __name__ == '__main__':
00170 import rostest
00171 rostest.rosrun(PKG, 'test_map_store', TestMapStore)