7 from actionlib
import SimpleActionClient
10 from stdr_msgs.msg
import RobotMsg, RobotIndexedVectorMsg
11 from stdr_msgs.msg
import (SpawnRobotAction, SpawnRobotGoal,
12 DeleteRobotAction, DeleteRobotGoal)
13 from stdr_msgs.srv
import LoadMap, MoveRobot, MoveRobotRequest
14 from stdr_msgs.srv
import (AddCO2Source, AddRfidTag, AddSoundSource,
15 AddThermalSource, DeleteCO2Source, DeleteRfidTag,
16 DeleteSoundSource, DeleteThermalSource)
17 from stdr_msgs.msg
import (CO2Source, CO2SourceVector, ThermalSource,
19 from nav_msgs.msg
import OccupancyGrid
21 from visualization_msgs.msg
import MarkerArray
22 from geometry_msgs.msg
import Pose2D
24 from subprocess
import call
25 from collections
import defaultdict
37 rospy.loginfo(
'got message from %s' % topic)
43 command =
'rosrun stdr_server load_map %s' % map_filename
44 call(command, shell=
True)
51 cls.spawn_robot_client.wait_for_server(rospy.Duration(5.0))
55 cls.delete_robot_client.wait_for_server(rospy.Duration(5.0))
62 cls.
add_rfid = rospy.ServiceProxy(
'stdr_server/add_rfid_tag',
64 cls.
delete_rfid = rospy.ServiceProxy(
'stdr_server/delete_rfid_tag',
66 cls.
add_co2 = rospy.ServiceProxy(
'stdr_server/add_co2_source',
68 cls.
delete_co2 = rospy.ServiceProxy(
'stdr_server/delete_co2_source',
70 cls.
add_thermal = rospy.ServiceProxy(
'stdr_server/add_thermal_source',
74 cls.
add_sound = rospy.ServiceProxy(
'stdr_server/add_sound_source',
76 cls.
delete_sound = rospy.ServiceProxy(
'stdr_server/delete_sound_source',
83 spawn_robot_goal = SpawnRobotGoal()
84 robot_msg = RobotMsg()
85 robot_msg.footprint.radius = 0.2
86 robot_msg.initialPose.x = 1
87 robot_msg.initialPose.y = 2
88 spawn_robot_goal.description = robot_msg
90 self.spawn_robot_client.send_goal(spawn_robot_goal)
92 self.assertTrue(self.spawn_robot_client.wait_for_result(rospy.Duration(10.0)))
94 self.spawn_robot_client.get_state() == actionlib.GoalStatus.ABORTED)
97 self.spawn_robot_client.get_result().indexedDescription.name
106 self.assertAlmostEqual(
108 robot_msg.footprint.radius)
109 self.assertAlmostEqual(
111 robot_msg.initialPose.x)
112 self.assertAlmostEqual(
114 robot_msg.initialPose.y)
120 delete_robot_goal = DeleteRobotGoal()
121 delete_robot_goal.name = name
122 self.delete_robot_client.send_goal(delete_robot_goal)
123 self.assertTrue(self.delete_robot_client.wait_for_result(rospy.Duration(10.0)))
138 self.assertIn(self.
map_topic, self.messages.keys())
142 self.map_sub.unregister()
152 rospy.wait_for_service(name +
'/replace')
153 move_service = rospy.ServiceProxy(name +
'/replace', MoveRobot)
154 pose = Pose2D(x=2, y=4)
155 req = MoveRobotRequest(newPose=pose)
156 self.assertTrue(move_service(req))
181 new_co2_source = CO2Source(id=
'1', ppm=1000, pose=Pose2D(x=1, y=1))
182 self.
add_co2(newSource=new_co2_source)
192 self.assertEqual(co2_msg.co2_sources[0].id,
'1')
193 self.assertEqual(co2_msg.co2_sources[0].pose, Pose2D(x=1, y=1))
197 self.assertEqual(co2_marker_msg.ns,
'co2')
198 self.assertEqual(co2_marker_msg.type, 2)
202 new_thermal_source = \
203 ThermalSource(id=
'1', degrees=180, pose=Pose2D(x=2, y=2))
210 self.assertEqual(thermal_msg.thermal_sources[0].id,
'1')
211 self.assertEqual(thermal_msg.thermal_sources[0].pose, Pose2D(x=2, y=2))
214 self.assertEqual(co2_marker_msg.ns,
'co2')
217 if mrk.ns ==
'thermal')))
227 self.assertFalse(thermal_msg.thermal_sources)
228 self.assertTrue(co2_msg.co2_sources)
232 self.assertEqual(marker_msg[-1].ns,
'co2')
236 new_co2_source = CO2Source(id=
'1', ppm=1000, pose=Pose2D(x=3, y=3))
237 self.assertRaises(rospy.ServiceException, self.
add_co2, new_co2_source)
241 self.assertEqual(len(co2_msg.co2_sources), 1)
242 self.assertEqual(co2_msg.co2_sources[-1].id,
'1')
243 self.assertEqual(co2_msg.co2_sources[-1].pose, Pose2D(x=1, y=1))
246 if __name__ ==
'__main__':
247 rospy.init_node(
'interface_test', anonymous=
True, log_level=rospy.INFO)
248 InterfaceTest.connect()
250 rostest.rosrun(
'stdr_server',
'interface_test', InterfaceTest, sys.argv)
def load_map(cls, map_filename)
def test_add_delete_sources(self)
def test_map_loaded(self)
def test_move_robot(self)
def test_add_delete_robot(self)
def _delete_robot(self, name)
def mock_cb(cls, msg, topic)