interfaces_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import sys
00004 import rospy
00005 import rostest
00006 import unittest
00007 from actionlib import SimpleActionClient
00008 import actionlib
00009 
00010 from stdr_msgs.msg import RobotMsg, RobotIndexedVectorMsg
00011 from stdr_msgs.msg import (SpawnRobotAction, SpawnRobotGoal,
00012                             DeleteRobotAction, DeleteRobotGoal)
00013 from stdr_msgs.srv import LoadMap, MoveRobot, MoveRobotRequest
00014 from stdr_msgs.srv import (AddCO2Source, AddRfidTag, AddSoundSource,
00015                             AddThermalSource, DeleteCO2Source, DeleteRfidTag,
00016                             DeleteSoundSource, DeleteThermalSource)
00017 from stdr_msgs.msg import (CO2Source, CO2SourceVector, ThermalSource,
00018                             ThermalSourceVector)
00019 from nav_msgs.msg import OccupancyGrid
00020 
00021 from visualization_msgs.msg import MarkerArray
00022 from geometry_msgs.msg import Pose2D
00023 
00024 from subprocess import call
00025 from collections import defaultdict
00026 import threading
00027 
00028 class InterfaceTest(unittest.TestCase):
00029 
00030     @classmethod
00031     def setUp(cls):
00032         print 'Starting...'
00033         cls.messages = defaultdict(list)
00034 
00035     @classmethod
00036     def mock_cb(cls, msg, topic):
00037         rospy.loginfo('got message from %s' % topic)
00038         cls.messages[topic].append(msg)
00039         cls.block.set()
00040 
00041     @classmethod
00042     def load_map(cls, map_filename):
00043         command = 'rosrun stdr_server load_map %s' % map_filename
00044         call(command, shell=True)
00045 
00046     @classmethod
00047     def connect(cls):
00048         cls.spawn_robot_client = \
00049             SimpleActionClient('stdr_server/spawn_robot',
00050                                             SpawnRobotAction)
00051         cls.spawn_robot_client.wait_for_server(rospy.Duration(5.0))
00052 
00053         cls.delete_robot_client = \
00054             SimpleActionClient('stdr_server/delete_robot', DeleteRobotAction)
00055         cls.delete_robot_client.wait_for_server(rospy.Duration(5.0))
00056 
00057         cls.robots_topic = 'stdr_server/active_robots'
00058         cls.active_robots_sub = rospy.Subscriber(cls.robots_topic,
00059                     RobotIndexedVectorMsg, cls.mock_cb, cls.robots_topic)
00060 
00061         # sources
00062         cls.add_rfid = rospy.ServiceProxy('stdr_server/add_rfid_tag',
00063                                             AddRfidTag)
00064         cls.delete_rfid = rospy.ServiceProxy('stdr_server/delete_rfid_tag',
00065                                                 DeleteRfidTag)
00066         cls.add_co2 = rospy.ServiceProxy('stdr_server/add_co2_source',
00067                                             AddCO2Source)
00068         cls.delete_co2 = rospy.ServiceProxy('stdr_server/delete_co2_source',
00069                                             DeleteCO2Source)
00070         cls.add_thermal = rospy.ServiceProxy('stdr_server/add_thermal_source',
00071                                                 AddThermalSource)
00072         cls.delete_thermal = rospy.ServiceProxy('stdr_server/delete_thermal_source',
00073                                             DeleteThermalSource)
00074         cls.add_sound = rospy.ServiceProxy('stdr_server/add_sound_source',
00075                                             AddSoundSource)
00076         cls.delete_sound = rospy.ServiceProxy('stdr_server/delete_sound_source',
00077                                                 DeleteSoundSource)
00078 
00079         cls.block = threading.Event()
00080 
00081     def _add_robot(self):
00082         self.block.clear()
00083         spawn_robot_goal = SpawnRobotGoal()
00084         robot_msg = RobotMsg()
00085         robot_msg.footprint.radius = 0.2
00086         robot_msg.initialPose.x = 1
00087         robot_msg.initialPose.y = 2
00088         spawn_robot_goal.description = robot_msg
00089 
00090         self.spawn_robot_client.send_goal(spawn_robot_goal)
00091 
00092         self.assertTrue(self.spawn_robot_client.wait_for_result(rospy.Duration(10.0)))
00093         self.assertFalse(
00094                 self.spawn_robot_client.get_state() == actionlib.GoalStatus.ABORTED)
00095 
00096         robot_name = \
00097             self.spawn_robot_client.get_result().indexedDescription.name
00098 
00099         # wait until we get a message
00100         self.block.wait()
00101 
00102         self.assertIn(self.robots_topic, self.messages.keys())
00103         self.assertEqual(len(self.messages[self.robots_topic]), 1)
00104         self.assertEqual(len(self.messages[self.robots_topic][-1].robots), 1)
00105         self.assertEqual(self.messages[self.robots_topic][-1].robots[0].name, robot_name)
00106         self.assertAlmostEqual(
00107                 self.messages[self.robots_topic][-1].robots[0].robot.footprint.radius,
00108                                 robot_msg.footprint.radius)
00109         self.assertAlmostEqual(
00110                 self.messages[self.robots_topic][-1].robots[0].robot.initialPose.x,
00111                                 robot_msg.initialPose.x)
00112         self.assertAlmostEqual(
00113                 self.messages[self.robots_topic][-1].robots[0].robot.initialPose.y,
00114                                 robot_msg.initialPose.y)
00115 
00116         return robot_name
00117 
00118     def _delete_robot(self, name):
00119         self.block.clear()
00120         delete_robot_goal = DeleteRobotGoal()
00121         delete_robot_goal.name = name
00122         self.delete_robot_client.send_goal(delete_robot_goal)
00123         self.assertTrue(self.delete_robot_client.wait_for_result(rospy.Duration(10.0)))
00124 
00125         # wait until we get a message
00126         self.block.wait()
00127         self.assertEqual(
00128                 len(self.messages[self.robots_topic][-1].robots), 0)
00129 
00130     def test_map_loaded(self):
00131         self.block.clear()
00132         self.map_topic = 'map'
00133         self.map_sub = rospy.Subscriber(self.map_topic, OccupancyGrid, self.mock_cb,
00134                 self.map_topic)
00135 
00136         # wait until we get a message
00137         self.block.wait()
00138         self.assertIn(self.map_topic, self.messages.keys())
00139         self.assertEqual(len(self.messages[self.map_topic]), 1)
00140 
00141         # cleanup
00142         self.map_sub.unregister()
00143 
00144     def test_add_delete_robot(self):
00145         name = self._add_robot()
00146         self._delete_robot(name)
00147 
00148     def test_move_robot(self):
00149         # first we need to add a new robot
00150         name = self._add_robot()
00151 
00152         rospy.wait_for_service(name + '/replace')
00153         move_service = rospy.ServiceProxy(name + '/replace', MoveRobot)
00154         pose = Pose2D(x=2, y=4)
00155         req = MoveRobotRequest(newPose=pose)
00156         self.assertTrue(move_service(req))
00157 
00158         # cleanup by deleting the robot
00159         self._delete_robot(name)
00160 
00161     def test_add_delete_sources(self):
00162         self.block.clear()
00163 
00164         # markers
00165         self.markers_topic = 'stdr_server/sources_visualization_markers'
00166         self.markers_sub = rospy.Subscriber(self.markers_topic, MarkerArray,
00167                                             self.mock_cb, self.markers_topic)
00168         self.co2_list_topic = 'stdr_server/co2_sources_list'
00169         self.co2_sub = rospy.Subscriber(self.co2_list_topic, CO2SourceVector,
00170                                         self.mock_cb, self.co2_list_topic)
00171 
00172         self.thermal_list_topic = 'stdr_server/thermal_sources_list'
00173         self.thermal_sub = rospy.Subscriber(self.thermal_list_topic,
00174                                             ThermalSourceVector,
00175                                             self.mock_cb, self.thermal_list_topic)
00176 
00177         # wait for connections to be established
00178         rospy.sleep(2.0)
00179 
00180         ### test co2 addition
00181         new_co2_source = CO2Source(id='1', ppm=1000, pose=Pose2D(x=1, y=1))
00182         self.add_co2(newSource=new_co2_source)
00183 
00184         rospy.sleep(2.0) # wait for multiple msgs
00185 
00186         self.assertIn(self.co2_list_topic, self.messages.keys())
00187         self.assertEqual(len(self.messages[self.co2_list_topic]), 1)
00188         # expect 2 identical messages due to sources republication
00189         self.assertEqual(len(self.messages[self.markers_topic]), 2)
00190 
00191         co2_msg = self.messages[self.co2_list_topic][-1]
00192         self.assertEqual(co2_msg.co2_sources[0].id, '1')
00193         self.assertEqual(co2_msg.co2_sources[0].pose, Pose2D(x=1, y=1))
00194 
00195         self.assertIn(self.markers_topic, self.messages.keys())
00196         co2_marker_msg = self.messages[self.markers_topic][-1].markers[-1]
00197         self.assertEqual(co2_marker_msg.ns, 'co2')
00198         self.assertEqual(co2_marker_msg.type, 2)
00199 
00200 
00201         ### test thermal addition
00202         new_thermal_source = \
00203             ThermalSource(id='1', degrees=180, pose=Pose2D(x=2, y=2))
00204         self.add_thermal(newSource=new_thermal_source)
00205 
00206         rospy.sleep(2.0)
00207 
00208         self.assertIn(self.thermal_list_topic, self.messages.keys())
00209         thermal_msg = self.messages[self.thermal_list_topic][-1]
00210         self.assertEqual(thermal_msg.thermal_sources[0].id, '1')
00211         self.assertEqual(thermal_msg.thermal_sources[0].pose, Pose2D(x=2, y=2))
00212 
00213         thermal_marker_msg = self.messages[self.markers_topic][-1].markers[-1]
00214         self.assertEqual(co2_marker_msg.ns, 'co2')
00215         self.assertTrue(any(
00216             (True for mrk in self.messages[self.markers_topic][-1].markers
00217                 if mrk.ns == 'thermal')))
00218 
00219 
00220         ### delete the thermal source, we should still get a marker for co2
00221         self.delete_thermal(name='1')
00222 
00223         rospy.sleep(2.0)
00224 
00225         thermal_msg = self.messages[self.thermal_list_topic][-1]
00226         co2_msg = self.messages[self.co2_list_topic][-1]
00227         self.assertFalse(thermal_msg.thermal_sources)
00228         self.assertTrue(co2_msg.co2_sources)
00229 
00230         # last marker msg should contain one co2 source
00231         marker_msg = self.messages[self.markers_topic][-1].markers
00232         self.assertEqual(marker_msg[-1].ns, 'co2')
00233 
00234 
00235         ### test invalid source id addition
00236         new_co2_source = CO2Source(id='1', ppm=1000, pose=Pose2D(x=3, y=3))
00237         self.assertRaises(rospy.ServiceException, self.add_co2, new_co2_source)
00238 
00239         # we should get no changes in co2 list msg
00240         co2_msg = self.messages[self.co2_list_topic][-1]
00241         self.assertEqual(len(co2_msg.co2_sources), 1)
00242         self.assertEqual(co2_msg.co2_sources[-1].id, '1')
00243         self.assertEqual(co2_msg.co2_sources[-1].pose, Pose2D(x=1, y=1))
00244 
00245 
00246 if __name__ == '__main__':
00247     rospy.init_node('interface_test', anonymous=True, log_level=rospy.INFO)
00248     InterfaceTest.connect()
00249     rospy.sleep(2.0)
00250     rostest.rosrun('stdr_server', 'interface_test', InterfaceTest, sys.argv)


stdr_server
Author(s): Chris Zalidis
autogenerated on Thu Jun 6 2019 18:57:23