Go to the documentation of this file.00001
00002
00003 import unittest
00004 import rospy
00005 import time
00006 import Queue as queue
00007 from sensor_msgs.msg import LaserScan
00008
00009 PKG = "stage_ros"
00010
00011
00012 class RangerIntensityTests(unittest.TestCase):
00013 def setUp(self):
00014 """Called before every test. Set up a LaserScan subscriber
00015 """
00016 rospy.init_node('ranger_intensity_test', anonymous=True)
00017 self._scan_q = queue.Queue()
00018 self._scan_topic = rospy.get_param("scan_topic", "base_scan")
00019 self._subscriber = rospy.Subscriber(self._scan_topic,
00020 LaserScan, self._scan_callback)
00021
00022 def tearDown(self):
00023 """Called after every test. Cancel the scan subscription
00024 """
00025 self._subscriber.unregister()
00026 self._scan_q = None
00027 self._subscriber = None
00028
00029 def _scan_callback(self, scan_msg):
00030 """Called every time a scan is received
00031 """
00032 self._scan_q.put(scan_msg)
00033
00034 def _wait_for_scan(self, timeout=1.0):
00035 """Wait for a laser scan to be received
00036 """
00037
00038 end_time = time.time() + timeout
00039 while time.time() < end_time:
00040 try:
00041 return self._scan_q.get(True, 0.1)
00042 except queue.Empty:
00043 pass
00044 return None
00045
00046 def test_intensity_greater_than_256(self):
00047 """Make sure stage_ros returns intensity values higher than 256
00048 """
00049 scan = self._wait_for_scan()
00050 self.assertIsNotNone(scan)
00051 self.assertGreater(max(scan.intensities), 256.9)
00052
00053
00054 if __name__ == '__main__':
00055 import rosunit
00056 rosunit.unitrun(PKG, 'test_intensity', RangerIntensityTests)