00001
00002
00003 import sys
00004 import rospy
00005 import rostest
00006 import unittest
00007 from actionlib import SimpleActionClient
00008 import actionlib
00009
00010 from stdr_msgs.msg import RobotMsg, RobotIndexedVectorMsg
00011 from stdr_msgs.msg import (SpawnRobotAction, SpawnRobotGoal,
00012 DeleteRobotAction, DeleteRobotGoal)
00013 from stdr_msgs.srv import LoadMap, MoveRobot, MoveRobotRequest
00014 from stdr_msgs.srv import (AddCO2Source, AddRfidTag, AddSoundSource,
00015 AddThermalSource, DeleteCO2Source, DeleteRfidTag,
00016 DeleteSoundSource, DeleteThermalSource)
00017 from stdr_msgs.msg import (CO2Source, CO2SourceVector, ThermalSource,
00018 ThermalSourceVector)
00019 from nav_msgs.msg import OccupancyGrid
00020
00021 from visualization_msgs.msg import MarkerArray
00022 from geometry_msgs.msg import Pose2D
00023
00024 from subprocess import call
00025 from collections import defaultdict
00026 import threading
00027
00028 class InterfaceTest(unittest.TestCase):
00029
00030 @classmethod
00031 def setUp(cls):
00032 print 'Starting...'
00033 cls.messages = defaultdict(list)
00034
00035 @classmethod
00036 def mock_cb(cls, msg, topic):
00037 rospy.loginfo('got message from %s' % topic)
00038 cls.messages[topic].append(msg)
00039 cls.block.set()
00040
00041 @classmethod
00042 def load_map(cls, map_filename):
00043 command = 'rosrun stdr_server load_map %s' % map_filename
00044 call(command, shell=True)
00045
00046 @classmethod
00047 def connect(cls):
00048 cls.spawn_robot_client = \
00049 SimpleActionClient('stdr_server/spawn_robot',
00050 SpawnRobotAction)
00051 cls.spawn_robot_client.wait_for_server(rospy.Duration(5.0))
00052
00053 cls.delete_robot_client = \
00054 SimpleActionClient('stdr_server/delete_robot', DeleteRobotAction)
00055 cls.delete_robot_client.wait_for_server(rospy.Duration(5.0))
00056
00057 cls.robots_topic = 'stdr_server/active_robots'
00058 cls.active_robots_sub = rospy.Subscriber(cls.robots_topic,
00059 RobotIndexedVectorMsg, cls.mock_cb, cls.robots_topic)
00060
00061
00062 cls.add_rfid = rospy.ServiceProxy('stdr_server/add_rfid_tag',
00063 AddRfidTag)
00064 cls.delete_rfid = rospy.ServiceProxy('stdr_server/delete_rfid_tag',
00065 DeleteRfidTag)
00066 cls.add_co2 = rospy.ServiceProxy('stdr_server/add_co2_source',
00067 AddCO2Source)
00068 cls.delete_co2 = rospy.ServiceProxy('stdr_server/delete_co2_source',
00069 DeleteCO2Source)
00070 cls.add_thermal = rospy.ServiceProxy('stdr_server/add_thermal_source',
00071 AddThermalSource)
00072 cls.delete_thermal = rospy.ServiceProxy('stdr_server/delete_thermal_source',
00073 DeleteThermalSource)
00074 cls.add_sound = rospy.ServiceProxy('stdr_server/add_sound_source',
00075 AddSoundSource)
00076 cls.delete_sound = rospy.ServiceProxy('stdr_server/delete_sound_source',
00077 DeleteSoundSource)
00078
00079 cls.block = threading.Event()
00080
00081 def _add_robot(self):
00082 self.block.clear()
00083 spawn_robot_goal = SpawnRobotGoal()
00084 robot_msg = RobotMsg()
00085 robot_msg.footprint.radius = 0.2
00086 robot_msg.initialPose.x = 1
00087 robot_msg.initialPose.y = 2
00088 spawn_robot_goal.description = robot_msg
00089
00090 self.spawn_robot_client.send_goal(spawn_robot_goal)
00091
00092 self.assertTrue(self.spawn_robot_client.wait_for_result(rospy.Duration(10.0)))
00093 self.assertFalse(
00094 self.spawn_robot_client.get_state() == actionlib.GoalStatus.ABORTED)
00095
00096 robot_name = \
00097 self.spawn_robot_client.get_result().indexedDescription.name
00098
00099
00100 self.block.wait()
00101
00102 self.assertIn(self.robots_topic, self.messages.keys())
00103 self.assertEqual(len(self.messages[self.robots_topic]), 1)
00104 self.assertEqual(len(self.messages[self.robots_topic][-1].robots), 1)
00105 self.assertEqual(self.messages[self.robots_topic][-1].robots[0].name, robot_name)
00106 self.assertAlmostEqual(
00107 self.messages[self.robots_topic][-1].robots[0].robot.footprint.radius,
00108 robot_msg.footprint.radius)
00109 self.assertAlmostEqual(
00110 self.messages[self.robots_topic][-1].robots[0].robot.initialPose.x,
00111 robot_msg.initialPose.x)
00112 self.assertAlmostEqual(
00113 self.messages[self.robots_topic][-1].robots[0].robot.initialPose.y,
00114 robot_msg.initialPose.y)
00115
00116 return robot_name
00117
00118 def _delete_robot(self, name):
00119 self.block.clear()
00120 delete_robot_goal = DeleteRobotGoal()
00121 delete_robot_goal.name = name
00122 self.delete_robot_client.send_goal(delete_robot_goal)
00123 self.assertTrue(self.delete_robot_client.wait_for_result(rospy.Duration(10.0)))
00124
00125
00126 self.block.wait()
00127 self.assertEqual(
00128 len(self.messages[self.robots_topic][-1].robots), 0)
00129
00130 def test_map_loaded(self):
00131 self.block.clear()
00132 self.map_topic = 'map'
00133 self.map_sub = rospy.Subscriber(self.map_topic, OccupancyGrid, self.mock_cb,
00134 self.map_topic)
00135
00136
00137 self.block.wait()
00138 self.assertIn(self.map_topic, self.messages.keys())
00139 self.assertEqual(len(self.messages[self.map_topic]), 1)
00140
00141
00142 self.map_sub.unregister()
00143
00144 def test_add_delete_robot(self):
00145 name = self._add_robot()
00146 self._delete_robot(name)
00147
00148 def test_move_robot(self):
00149
00150 name = self._add_robot()
00151
00152 rospy.wait_for_service(name + '/replace')
00153 move_service = rospy.ServiceProxy(name + '/replace', MoveRobot)
00154 pose = Pose2D(x=2, y=4)
00155 req = MoveRobotRequest(newPose=pose)
00156 self.assertTrue(move_service(req))
00157
00158
00159 self._delete_robot(name)
00160
00161 def test_add_delete_sources(self):
00162 self.block.clear()
00163
00164
00165 self.markers_topic = 'stdr_server/sources_visualization_markers'
00166 self.markers_sub = rospy.Subscriber(self.markers_topic, MarkerArray,
00167 self.mock_cb, self.markers_topic)
00168 self.co2_list_topic = 'stdr_server/co2_sources_list'
00169 self.co2_sub = rospy.Subscriber(self.co2_list_topic, CO2SourceVector,
00170 self.mock_cb, self.co2_list_topic)
00171
00172 self.thermal_list_topic = 'stdr_server/thermal_sources_list'
00173 self.thermal_sub = rospy.Subscriber(self.thermal_list_topic,
00174 ThermalSourceVector,
00175 self.mock_cb, self.thermal_list_topic)
00176
00177
00178 rospy.sleep(2.0)
00179
00180
00181 new_co2_source = CO2Source(id='1', ppm=1000, pose=Pose2D(x=1, y=1))
00182 self.add_co2(newSource=new_co2_source)
00183
00184 rospy.sleep(2.0)
00185
00186 self.assertIn(self.co2_list_topic, self.messages.keys())
00187 self.assertEqual(len(self.messages[self.co2_list_topic]), 1)
00188
00189 self.assertEqual(len(self.messages[self.markers_topic]), 2)
00190
00191 co2_msg = self.messages[self.co2_list_topic][-1]
00192 self.assertEqual(co2_msg.co2_sources[0].id, '1')
00193 self.assertEqual(co2_msg.co2_sources[0].pose, Pose2D(x=1, y=1))
00194
00195 self.assertIn(self.markers_topic, self.messages.keys())
00196 co2_marker_msg = self.messages[self.markers_topic][-1].markers[-1]
00197 self.assertEqual(co2_marker_msg.ns, 'co2')
00198 self.assertEqual(co2_marker_msg.type, 2)
00199
00200
00201
00202 new_thermal_source = \
00203 ThermalSource(id='1', degrees=180, pose=Pose2D(x=2, y=2))
00204 self.add_thermal(newSource=new_thermal_source)
00205
00206 rospy.sleep(2.0)
00207
00208 self.assertIn(self.thermal_list_topic, self.messages.keys())
00209 thermal_msg = self.messages[self.thermal_list_topic][-1]
00210 self.assertEqual(thermal_msg.thermal_sources[0].id, '1')
00211 self.assertEqual(thermal_msg.thermal_sources[0].pose, Pose2D(x=2, y=2))
00212
00213 thermal_marker_msg = self.messages[self.markers_topic][-1].markers[-1]
00214 self.assertEqual(co2_marker_msg.ns, 'co2')
00215 self.assertTrue(any(
00216 (True for mrk in self.messages[self.markers_topic][-1].markers
00217 if mrk.ns == 'thermal')))
00218
00219
00220
00221 self.delete_thermal(name='1')
00222
00223 rospy.sleep(2.0)
00224
00225 thermal_msg = self.messages[self.thermal_list_topic][-1]
00226 co2_msg = self.messages[self.co2_list_topic][-1]
00227 self.assertFalse(thermal_msg.thermal_sources)
00228 self.assertTrue(co2_msg.co2_sources)
00229
00230
00231 marker_msg = self.messages[self.markers_topic][-1].markers
00232 self.assertEqual(marker_msg[-1].ns, 'co2')
00233
00234
00235
00236 new_co2_source = CO2Source(id='1', ppm=1000, pose=Pose2D(x=3, y=3))
00237 self.assertRaises(rospy.ServiceException, self.add_co2, new_co2_source)
00238
00239
00240 co2_msg = self.messages[self.co2_list_topic][-1]
00241 self.assertEqual(len(co2_msg.co2_sources), 1)
00242 self.assertEqual(co2_msg.co2_sources[-1].id, '1')
00243 self.assertEqual(co2_msg.co2_sources[-1].pose, Pose2D(x=1, y=1))
00244
00245
00246 if __name__ == '__main__':
00247 rospy.init_node('interface_test', anonymous=True, log_level=rospy.INFO)
00248 InterfaceTest.connect()
00249 rospy.sleep(2.0)
00250 rostest.rosrun('stdr_server', 'interface_test', InterfaceTest, sys.argv)