interfaces_test.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import sys
4 import rospy
5 import rostest
6 import unittest
7 from actionlib import SimpleActionClient
8 import actionlib
9 
10 from stdr_msgs.msg import RobotMsg, RobotIndexedVectorMsg
11 from stdr_msgs.msg import (SpawnRobotAction, SpawnRobotGoal,
12  DeleteRobotAction, DeleteRobotGoal)
13 from stdr_msgs.srv import LoadMap, MoveRobot, MoveRobotRequest
14 from stdr_msgs.srv import (AddCO2Source, AddRfidTag, AddSoundSource,
15  AddThermalSource, DeleteCO2Source, DeleteRfidTag,
16  DeleteSoundSource, DeleteThermalSource)
17 from stdr_msgs.msg import (CO2Source, CO2SourceVector, ThermalSource,
18  ThermalSourceVector)
19 from nav_msgs.msg import OccupancyGrid
20 
21 from visualization_msgs.msg import MarkerArray
22 from geometry_msgs.msg import Pose2D
23 
24 from subprocess import call
25 from collections import defaultdict
26 import threading
27 
28 class InterfaceTest(unittest.TestCase):
29 
30  @classmethod
31  def setUp(cls):
32  print 'Starting...'
33  cls.messages = defaultdict(list)
34 
35  @classmethod
36  def mock_cb(cls, msg, topic):
37  rospy.loginfo('got message from %s' % topic)
38  cls.messages[topic].append(msg)
39  cls.block.set()
40 
41  @classmethod
42  def load_map(cls, map_filename):
43  command = 'rosrun stdr_server load_map %s' % map_filename
44  call(command, shell=True)
45 
46  @classmethod
47  def connect(cls):
49  SimpleActionClient('stdr_server/spawn_robot',
50  SpawnRobotAction)
51  cls.spawn_robot_client.wait_for_server(rospy.Duration(5.0))
52 
54  SimpleActionClient('stdr_server/delete_robot', DeleteRobotAction)
55  cls.delete_robot_client.wait_for_server(rospy.Duration(5.0))
56 
57  cls.robots_topic = 'stdr_server/active_robots'
58  cls.active_robots_sub = rospy.Subscriber(cls.robots_topic,
59  RobotIndexedVectorMsg, cls.mock_cb, cls.robots_topic)
60 
61  # sources
62  cls.add_rfid = rospy.ServiceProxy('stdr_server/add_rfid_tag',
63  AddRfidTag)
64  cls.delete_rfid = rospy.ServiceProxy('stdr_server/delete_rfid_tag',
65  DeleteRfidTag)
66  cls.add_co2 = rospy.ServiceProxy('stdr_server/add_co2_source',
67  AddCO2Source)
68  cls.delete_co2 = rospy.ServiceProxy('stdr_server/delete_co2_source',
69  DeleteCO2Source)
70  cls.add_thermal = rospy.ServiceProxy('stdr_server/add_thermal_source',
71  AddThermalSource)
72  cls.delete_thermal = rospy.ServiceProxy('stdr_server/delete_thermal_source',
73  DeleteThermalSource)
74  cls.add_sound = rospy.ServiceProxy('stdr_server/add_sound_source',
75  AddSoundSource)
76  cls.delete_sound = rospy.ServiceProxy('stdr_server/delete_sound_source',
77  DeleteSoundSource)
78 
79  cls.block = threading.Event()
80 
81  def _add_robot(self):
82  self.block.clear()
83  spawn_robot_goal = SpawnRobotGoal()
84  robot_msg = RobotMsg()
85  robot_msg.footprint.radius = 0.2
86  robot_msg.initialPose.x = 1
87  robot_msg.initialPose.y = 2
88  spawn_robot_goal.description = robot_msg
89 
90  self.spawn_robot_client.send_goal(spawn_robot_goal)
91 
92  self.assertTrue(self.spawn_robot_client.wait_for_result(rospy.Duration(10.0)))
93  self.assertFalse(
94  self.spawn_robot_client.get_state() == actionlib.GoalStatus.ABORTED)
95 
96  robot_name = \
97  self.spawn_robot_client.get_result().indexedDescription.name
98 
99  # wait until we get a message
100  self.block.wait()
101 
102  self.assertIn(self.robots_topic, self.messages.keys())
103  self.assertEqual(len(self.messages[self.robots_topic]), 1)
104  self.assertEqual(len(self.messages[self.robots_topic][-1].robots), 1)
105  self.assertEqual(self.messages[self.robots_topic][-1].robots[0].name, robot_name)
106  self.assertAlmostEqual(
107  self.messages[self.robots_topic][-1].robots[0].robot.footprint.radius,
108  robot_msg.footprint.radius)
109  self.assertAlmostEqual(
110  self.messages[self.robots_topic][-1].robots[0].robot.initialPose.x,
111  robot_msg.initialPose.x)
112  self.assertAlmostEqual(
113  self.messages[self.robots_topic][-1].robots[0].robot.initialPose.y,
114  robot_msg.initialPose.y)
115 
116  return robot_name
117 
118  def _delete_robot(self, name):
119  self.block.clear()
120  delete_robot_goal = DeleteRobotGoal()
121  delete_robot_goal.name = name
122  self.delete_robot_client.send_goal(delete_robot_goal)
123  self.assertTrue(self.delete_robot_client.wait_for_result(rospy.Duration(10.0)))
124 
125  # wait until we get a message
126  self.block.wait()
127  self.assertEqual(
128  len(self.messages[self.robots_topic][-1].robots), 0)
129 
130  def test_map_loaded(self):
131  self.block.clear()
132  self.map_topic = 'map'
133  self.map_sub = rospy.Subscriber(self.map_topic, OccupancyGrid, self.mock_cb,
134  self.map_topic)
135 
136  # wait until we get a message
137  self.block.wait()
138  self.assertIn(self.map_topic, self.messages.keys())
139  self.assertEqual(len(self.messages[self.map_topic]), 1)
140 
141  # cleanup
142  self.map_sub.unregister()
143 
145  name = self._add_robot()
146  self._delete_robot(name)
147 
148  def test_move_robot(self):
149  # first we need to add a new robot
150  name = self._add_robot()
151 
152  rospy.wait_for_service(name + '/replace')
153  move_service = rospy.ServiceProxy(name + '/replace', MoveRobot)
154  pose = Pose2D(x=2, y=4)
155  req = MoveRobotRequest(newPose=pose)
156  self.assertTrue(move_service(req))
157 
158  # cleanup by deleting the robot
159  self._delete_robot(name)
160 
162  self.block.clear()
163 
164  # markers
165  self.markers_topic = 'stdr_server/sources_visualization_markers'
166  self.markers_sub = rospy.Subscriber(self.markers_topic, MarkerArray,
167  self.mock_cb, self.markers_topic)
168  self.co2_list_topic = 'stdr_server/co2_sources_list'
169  self.co2_sub = rospy.Subscriber(self.co2_list_topic, CO2SourceVector,
170  self.mock_cb, self.co2_list_topic)
171 
172  self.thermal_list_topic = 'stdr_server/thermal_sources_list'
173  self.thermal_sub = rospy.Subscriber(self.thermal_list_topic,
174  ThermalSourceVector,
175  self.mock_cb, self.thermal_list_topic)
176 
177  # wait for connections to be established
178  rospy.sleep(2.0)
179 
180  ### test co2 addition
181  new_co2_source = CO2Source(id='1', ppm=1000, pose=Pose2D(x=1, y=1))
182  self.add_co2(newSource=new_co2_source)
183 
184  rospy.sleep(2.0) # wait for multiple msgs
185 
186  self.assertIn(self.co2_list_topic, self.messages.keys())
187  self.assertEqual(len(self.messages[self.co2_list_topic]), 1)
188  # expect 2 identical messages due to sources republication
189  self.assertEqual(len(self.messages[self.markers_topic]), 2)
190 
191  co2_msg = self.messages[self.co2_list_topic][-1]
192  self.assertEqual(co2_msg.co2_sources[0].id, '1')
193  self.assertEqual(co2_msg.co2_sources[0].pose, Pose2D(x=1, y=1))
194 
195  self.assertIn(self.markers_topic, self.messages.keys())
196  co2_marker_msg = self.messages[self.markers_topic][-1].markers[-1]
197  self.assertEqual(co2_marker_msg.ns, 'co2')
198  self.assertEqual(co2_marker_msg.type, 2)
199 
200 
201  ### test thermal addition
202  new_thermal_source = \
203  ThermalSource(id='1', degrees=180, pose=Pose2D(x=2, y=2))
204  self.add_thermal(newSource=new_thermal_source)
205 
206  rospy.sleep(2.0)
207 
208  self.assertIn(self.thermal_list_topic, self.messages.keys())
209  thermal_msg = self.messages[self.thermal_list_topic][-1]
210  self.assertEqual(thermal_msg.thermal_sources[0].id, '1')
211  self.assertEqual(thermal_msg.thermal_sources[0].pose, Pose2D(x=2, y=2))
212 
213  thermal_marker_msg = self.messages[self.markers_topic][-1].markers[-1]
214  self.assertEqual(co2_marker_msg.ns, 'co2')
215  self.assertTrue(any(
216  (True for mrk in self.messages[self.markers_topic][-1].markers
217  if mrk.ns == 'thermal')))
218 
219 
220  ### delete the thermal source, we should still get a marker for co2
221  self.delete_thermal(name='1')
222 
223  rospy.sleep(2.0)
224 
225  thermal_msg = self.messages[self.thermal_list_topic][-1]
226  co2_msg = self.messages[self.co2_list_topic][-1]
227  self.assertFalse(thermal_msg.thermal_sources)
228  self.assertTrue(co2_msg.co2_sources)
229 
230  # last marker msg should contain one co2 source
231  marker_msg = self.messages[self.markers_topic][-1].markers
232  self.assertEqual(marker_msg[-1].ns, 'co2')
233 
234 
235  ### test invalid source id addition
236  new_co2_source = CO2Source(id='1', ppm=1000, pose=Pose2D(x=3, y=3))
237  self.assertRaises(rospy.ServiceException, self.add_co2, new_co2_source)
238 
239  # we should get no changes in co2 list msg
240  co2_msg = self.messages[self.co2_list_topic][-1]
241  self.assertEqual(len(co2_msg.co2_sources), 1)
242  self.assertEqual(co2_msg.co2_sources[-1].id, '1')
243  self.assertEqual(co2_msg.co2_sources[-1].pose, Pose2D(x=1, y=1))
244 
245 
246 if __name__ == '__main__':
247  rospy.init_node('interface_test', anonymous=True, log_level=rospy.INFO)
248  InterfaceTest.connect()
249  rospy.sleep(2.0)
250  rostest.rosrun('stdr_server', 'interface_test', InterfaceTest, sys.argv)
def load_map(cls, map_filename)
def mock_cb(cls, msg, topic)


stdr_server
Author(s): Chris Zalidis
autogenerated on Mon Jun 10 2019 15:15:07