00001
00002
00003
00004
00005 import unittest
00006
00007 import rospy
00008
00009 from lama_interfaces.srv import ActOnMap
00010 from lama_interfaces.srv import ActOnMapRequest
00011 from lama_msgs.msg import LamaObject
00012
00013 vertex1 = LamaObject()
00014 vertex1.id_in_world = 101
00015 vertex1.name = 'vertex1'
00016 vertex1.type = LamaObject.VERTEX
00017
00018 vertex2 = LamaObject()
00019 vertex2.id_in_world = 102
00020 vertex2.name = 'vertex2'
00021 vertex2.type = LamaObject.VERTEX
00022
00023 vertex3 = LamaObject()
00024 vertex3.id_in_world = 103
00025 vertex3.name = 'vertex3'
00026 vertex3.type = LamaObject.VERTEX
00027
00028 vertex3a = LamaObject()
00029 vertex3a.id_in_world = 104
00030 vertex3a.name = 'vertex3'
00031 vertex3a.type = LamaObject.VERTEX
00032
00033 vertex3b = LamaObject()
00034 vertex3b.id_in_world = 103
00035 vertex3b.name = 'vertex3b'
00036 vertex3b.type = LamaObject.VERTEX
00037
00038 edge1 = LamaObject()
00039 edge1.id_in_world = 101
00040 edge1.name = 'edge1'
00041 edge1.type = LamaObject.EDGE
00042
00043 edge2 = LamaObject()
00044 edge2.id_in_world = 102
00045 edge2.name = 'edge2'
00046 edge2.type = LamaObject.EDGE
00047
00048 edge3 = LamaObject()
00049 edge3.id_in_world = 103
00050 edge3.name = 'edge3'
00051 edge3.type = LamaObject.EDGE
00052
00053
00054 class TestCoreInterface(unittest.TestCase):
00055 def __init__(self, *args, **kwargs):
00056 rospy.init_node('test_lama_core', anonymous=True)
00057 self.map_agent = rospy.ServiceProxy('lama_map_agent', ActOnMap)
00058 self.map_agent.wait_for_service()
00059 super(TestCoreInterface, self).__init__(*args, **kwargs)
00060
00061 def assertNonEmpty(self, objects):
00062 self.assertGreater(len(objects), 0,
00063 msg='At least one LamaObject expected')
00064
00065 def assertObjectEqual(self, object0, object1):
00066 """Fail if LamaObjects are not equal"""
00067 self.assertIsInstance(object0, LamaObject,
00068 msg='Argument 1 is not a LamaObject')
00069 self.assertIsInstance(object1, LamaObject,
00070 msg='Argument 2 is not a LamaObject')
00071 self.assertIdEqual(object0, object1)
00072 self.assertIdInWorldEqual(object0, object1)
00073 self.assertNameEqual(object0, object1)
00074 self.assertReferencesEqual(object0, object1)
00075
00076 def assertIdEqual(self, object0, object1):
00077 """Fail if id are not equal"""
00078 self.assertIsInstance(object0, LamaObject,
00079 msg='Argument 1 is not a LamaObject')
00080 self.assertIsInstance(object1, LamaObject,
00081 msg='Argument 2 is not a LamaObject')
00082 self.assertEqual(object0.id, object1.id,
00083 msg='id differ: {} != {}'.format(
00084 object0.id, object1.id))
00085
00086 def assertIdInWorldEqual(self, object0, object1):
00087 """Fail if id_in_world are not equal"""
00088 self.assertIsInstance(object0, LamaObject,
00089 msg='Argument 1 is not a LamaObject')
00090 self.assertIsInstance(object1, LamaObject,
00091 msg='Argument 2 is not a LamaObject')
00092 self.assertEqual(object0.id_in_world, object1.id_in_world,
00093 msg='id_in_world differ: {} != {}'.format(
00094 object0.id_in_world, object1.id_in_world))
00095
00096 def assertNameEqual(self, object0, object1):
00097 """Fail if name are not equal"""
00098 self.assertIsInstance(object0, LamaObject,
00099 msg='Argument 1 is not a LamaObject')
00100 self.assertIsInstance(object1, LamaObject,
00101 msg='Argument 2 is not a LamaObject')
00102 self.assertEqual(object0.name, object1.name,
00103 msg='name differ: {} != {}'.format(
00104 object0.name, object1.name))
00105
00106 def assertReferencesEqual(self, object0, object1):
00107 """Fail if references are not equal"""
00108 self.assertIsInstance(object0, LamaObject,
00109 msg='Argument 1 is not a LamaObject')
00110 self.assertIsInstance(object1, LamaObject,
00111 msg='Argument 2 is not a LamaObject')
00112 self.assertListEqual(object0.references, object1.references)
00113
00114 def test001_push_vertex(self):
00115 """Test ActOnMap service PUSH_VERTEX"""
00116 try:
00117 response = self.map_agent(object=vertex1,
00118 action=ActOnMapRequest.PUSH_VERTEX)
00119 except rospy.ServiceException as exc:
00120 self.assertTrue(False,
00121 'service did not process request: {}'.format(exc))
00122 vertex1.id = response.objects[0].id
00123 self.assertEqual(len(response.objects), 1)
00124
00125 response.objects[0].references = list(response.objects[0].references)
00126 self.assertObjectEqual(response.objects[0], vertex1)
00127
00128 try:
00129 response = self.map_agent(object=vertex2,
00130 action=ActOnMapRequest.PUSH_VERTEX)
00131 except rospy.ServiceException as exc:
00132 self.assertTrue(
00133 False,
00134 'service did not process request vertex2: {}'.format(exc))
00135 vertex2.id = response.objects[0].id
00136 self.assertEqual(len(response.objects), 1)
00137
00138 response.objects[0].references = list(response.objects[0].references)
00139 self.assertObjectEqual(response.objects[0], vertex2)
00140
00141 try:
00142 response = self.map_agent(object=vertex3,
00143 action=ActOnMapRequest.PUSH_VERTEX)
00144 except rospy.ServiceException as exc:
00145 self.assertTrue(
00146 False,
00147 'service did not process request vertex3: {}'.format(exc))
00148 vertex3.id = response.objects[0].id
00149 self.assertEqual(len(response.objects), 1)
00150
00151 response.objects[0].references = list(response.objects[0].references)
00152 self.assertObjectEqual(response.objects[0], vertex3)
00153
00154 try:
00155 response = self.map_agent(object=vertex3a,
00156 action=ActOnMapRequest.PUSH_VERTEX)
00157 except rospy.ServiceException as exc:
00158 self.assertTrue(
00159 False,
00160 'service did not process request vertex3a: {}'.format(exc))
00161 vertex3a.id = response.objects[0].id
00162 self.assertEqual(len(response.objects), 1)
00163
00164 response.objects[0].references = list(response.objects[0].references)
00165 self.assertObjectEqual(response.objects[0], vertex3a)
00166
00167 try:
00168 response = self.map_agent(object=vertex3b,
00169 action=ActOnMapRequest.PUSH_VERTEX)
00170 except rospy.ServiceException as exc:
00171 self.assertTrue(
00172 False,
00173 'service did not process request vertex3b: {}'.format(exc))
00174 vertex3b.id = response.objects[0].id
00175 self.assertEqual(len(response.objects), 1)
00176
00177 response.objects[0].references = list(response.objects[0].references)
00178 self.assertObjectEqual(response.objects[0], vertex3b)
00179
00180 def test002_push_existing_vertex(self):
00181 """Test ActOnMap service PUSH_VERTEX with existing vertex"""
00182 vertex1.name = 'vertex1a'
00183 try:
00184 response = self.map_agent(object=vertex1,
00185 action=ActOnMapRequest.PUSH_VERTEX)
00186 except rospy.ServiceException as exc:
00187 self.assertTrue(False,
00188 'service did not process request: {}'.format(exc))
00189 self.assertEqual(len(response.objects), 1)
00190
00191 response.objects[0].references = list(response.objects[0].references)
00192 self.assertObjectEqual(response.objects[0], vertex1)
00193
00194 def test002_pull_vertex(self):
00195 """Test ActOnMap service PULL_VERTEX"""
00196 req = LamaObject()
00197 req.id = vertex1.id
00198 response = self.map_agent(object=req,
00199 action=ActOnMapRequest.PULL_VERTEX)
00200 self.assertEqual(len(response.objects), 1)
00201
00202 response.objects[0].references = list(response.objects[0].references)
00203 self.assertObjectEqual(response.objects[0], vertex1)
00204
00205 def test002_pull_vertex_id_in_world(self):
00206 """Test ActOnMap service GET_VERTEX_LIST according to id_in_world"""
00207 req = LamaObject()
00208 req.id_in_world = vertex1.id_in_world
00209 response = self.map_agent(object=req,
00210 action=ActOnMapRequest.GET_VERTEX_LIST)
00211 self.assertNonEmpty(response.objects)
00212 self.assertIn(vertex1.id, [o.id for o in response.objects])
00213 for obj in response.objects:
00214
00215 obj.references = list(obj.references)
00216 self.assertIdInWorldEqual(obj, vertex1)
00217
00218 def test002_pull_vertex_name(self):
00219 """Test ActOnMap service GET_VERTEX_LIST according to name"""
00220 req = LamaObject()
00221 req.name = vertex1.name
00222 response = self.map_agent(object=req,
00223 action=ActOnMapRequest.GET_VERTEX_LIST)
00224 self.assertNonEmpty(response.objects)
00225 self.assertIn(vertex1.id, [o.id for o in response.objects])
00226 for obj in response.objects:
00227
00228 obj.references = list(obj.references)
00229 self.assertNameEqual(obj, vertex1)
00230
00231 def test002_push_edge(self):
00232 """Test ActOnMap service push edge """
00233 edge1.references[0] = vertex1.id
00234 edge1.references[1] = vertex2.id
00235 try:
00236 response = self.map_agent(object=edge1,
00237 action=ActOnMapRequest.PUSH_EDGE)
00238 except rospy.ServiceException as exc:
00239 self.assertTrue(False,
00240 'service did not process request: {}'.format(exc))
00241 edge1.id = response.objects[0].id
00242 self.assertEqual(len(response.objects), 1)
00243
00244 response.objects[0].references = list(response.objects[0].references)
00245 self.assertObjectEqual(response.objects[0], edge1)
00246
00247 edge2.references[0] = vertex2.id
00248 edge2.references[1] = vertex3.id
00249 try:
00250 response = self.map_agent(object=edge2,
00251 action=ActOnMapRequest.PUSH_EDGE)
00252 except rospy.ServiceException as exc:
00253 self.assertTrue(False,
00254 'service did not process request: {}'.format(exc))
00255 edge2.id = response.objects[0].id
00256 self.assertEqual(len(response.objects), 1)
00257
00258 response.objects[0].references = list(response.objects[0].references)
00259 self.assertObjectEqual(response.objects[0], edge2)
00260
00261 edge3.references = [vertex2.id, 0]
00262 self.assertRaises(rospy.ServiceException,
00263 self.map_agent,
00264 object=edge3, action=ActOnMapRequest.PUSH_EDGE)
00265
00266 edge3.references = [vertex2.id, vertex2.id]
00267 try:
00268 response = self.map_agent(object=edge3,
00269 action=ActOnMapRequest.PUSH_EDGE)
00270 except rospy.ServiceException as exc:
00271 self.assertTrue(False,
00272 'service did not process request: {}'.format(exc))
00273 edge3.id = response.objects[0].id
00274 self.assertEqual(len(response.objects), 1)
00275
00276 response.objects[0].references = list(response.objects[0].references)
00277 self.assertObjectEqual(response.objects[0], edge3)
00278
00279 def test003_pull_edge(self):
00280 """Test ActOnMap service PULL_EDGE"""
00281 req = LamaObject()
00282 req.id = edge1.id
00283 response = self.map_agent(object=req, action=ActOnMapRequest.PULL_EDGE)
00284 self.assertEqual(len(response.objects), 1)
00285
00286 response.objects[0].references = list(response.objects[0].references)
00287 self.assertObjectEqual(response.objects[0], edge1)
00288
00289 def test003_pull_edge_id_in_world(self):
00290 """test ActOnMap service GET_EDGE_LIST according id in world"""
00291 req = LamaObject()
00292 req.id_in_world = edge1.id_in_world
00293 response = self.map_agent(object=req,
00294 action=ActOnMapRequest.GET_EDGE_LIST)
00295 self.assertNonEmpty(response.objects)
00296 self.assertIn(edge1.id, [o.id for o in response.objects])
00297 for obj in response.objects:
00298
00299 obj.references = list(obj.references)
00300 self.assertIdInWorldEqual(obj, vertex1)
00301
00302 def test003_pull_edge_name(self):
00303 """ test ActOnMap service GET_EDGE_LIST according name"""
00304 req = LamaObject()
00305 req.name = edge1.name
00306 response = self.map_agent(object=req,
00307 action=ActOnMapRequest.GET_EDGE_LIST)
00308 self.assertNonEmpty(response.objects)
00309 self.assertIn(edge1.id, [o.id for o in response.objects])
00310 for obj in response.objects:
00311
00312 obj.references = list(obj.references)
00313 self.assertNameEqual(obj, edge1)
00314
00315 def test003_pull_edge_vertices(self):
00316 """test ActOnMap service GET_EDGE_LIST according to both vertices"""
00317 req = LamaObject()
00318 req.references[0] = edge1.references[0]
00319 req.references[1] = edge1.references[1]
00320 response = self.map_agent(object=req,
00321 action=ActOnMapRequest.GET_EDGE_LIST)
00322 self.assertNonEmpty(response.objects)
00323 self.assertIn(edge1.id, [o.id for o in response.objects])
00324 for obj in response.objects:
00325
00326 obj.references = list(obj.references)
00327 self.assertReferencesEqual(obj, edge1)
00328
00329 def test003_pull_edge_start_vertex(self):
00330 """test ActOnMap service GET_EDGE_LIST according to start vertex"""
00331 req = LamaObject()
00332 req.type = LamaObject.EDGE
00333 req.references[0] = edge1.references[0]
00334 response = self.map_agent(object=req,
00335 action=ActOnMapRequest.GET_EDGE_LIST)
00336 self.assertNonEmpty(response.objects)
00337 self.assertIn(edge1.id, [o.id for o in response.objects])
00338 for obj in response.objects:
00339 self.assertEqual(obj.references[0], edge1.references[0],
00340 'wrong start vertex, got {}, expected {}'.format(
00341 obj.references[0], edge1.references[0]))
00342
00343 def test003_get_outgoing_edges(self):
00344 """test ActOnMap service GET_OUTGOING_EDGES"""
00345 req = LamaObject()
00346 req.id = edge1.references[0]
00347 response = self.map_agent(object=req,
00348 action=ActOnMapRequest.GET_OUTGOING_EDGES)
00349 self.assertNonEmpty(response.objects)
00350 self.assertIn(edge1.id, [o.id for o in response.objects])
00351 for obj in response.objects:
00352 self.assertEqual(obj.references[0], edge1.references[0],
00353 'wrong start vertex, got {}, expected {}'.format(
00354 obj.references[0], edge1.references[0]))
00355
00356 def test003_pull_edge_stop_vertex(self):
00357 """test ActOnMap service GET_EDGE_LIST according to stop vertex"""
00358 req = LamaObject()
00359 req.references[1] = edge1.references[1]
00360 response = self.map_agent(object=req,
00361 action=ActOnMapRequest.GET_EDGE_LIST)
00362 self.assertNonEmpty(response.objects)
00363 self.assertIn(edge1.id, [o.id for o in response.objects])
00364 for obj in response.objects:
00365 self.assertEqual(obj.references[1], edge1.references[1],
00366 'wrong start vertex, got {}, expected {}'.format(
00367 obj.references[1], edge1.references[1]))
00368
00369 if __name__ == '__main__':
00370 import rostest
00371 rostest.rosrun('lama_interfaces',
00372 'test_core_interface',
00373 TestCoreInterface)