test_sensor_managers.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 
00035 import roslib; roslib.load_manifest('calibration_launch')
00036 
00037 import sys
00038 import unittest
00039 import rospy
00040 import time
00041 
00042 from capture_executive.sensor_managers import CamManager
00043 from capture_executive.sensor_managers import ChainManager
00044 from capture_executive.sensor_managers import LaserManager
00045 from calibration_msgs.msg import *
00046 from sensor_msgs.msg import *
00047 
00048 def build_msg(type, secs):
00049     msg = type()
00050     msg.header.stamp = rospy.Time(secs, 0)
00051     return msg
00052 
00053 class TestChainManager(unittest.TestCase):
00054     def test_easy(self):
00055         self._cb_chain_id = None
00056         self._cb_msg = None
00057         self._cb_called = False
00058         manager = ChainManager("chain1", self.callback)
00059         manager.enable()
00060 
00061         chain_state_pub = rospy.Publisher("chain1/chain_state", sensor_msgs.msg.JointState)
00062 
00063         time.sleep(1.0)
00064 
00065         # Send a full set
00066         manager.enable()
00067         self._cb_called = False
00068         chain_state_pub.publish(build_msg(JointState, 1))
00069         time.sleep(1.0)
00070         self.assertTrue(self._cb_called)
00071 
00072         # disable chain manager, and then send data
00073         manager.disable()
00074         self._cb_called = False
00075         chain_state_pub.publish(build_msg(JointState, 2))
00076         time.sleep(1.0)
00077         self.assertFalse(self._cb_called)
00078 
00079     def callback(self, chain_id, msg):
00080         print "Calling user callback"
00081         self._cb_called = True
00082         self._cb_chain_id = chain_id
00083         self._cb_msg = msg
00084 
00085 class TestCamManager(unittest.TestCase):
00086     def test_easy(self):
00087         self._cb_cam_id = None
00088         self._cb_msg = None
00089         self._cb_called = False
00090         manager = CamManager("cam1", self.callback)
00091         manager.enable(verbose=False)
00092 
00093         cam_info_pub = rospy.Publisher("cam1/camera_info", sensor_msgs.msg.CameraInfo)
00094         features_pub = rospy.Publisher("cam1/features", calibration_msgs.msg.CalibrationPattern)
00095         image_pub    = rospy.Publisher("cam1/image",    sensor_msgs.msg.Image)
00096 
00097         time.sleep(1.0)
00098 
00099         # Send a full set
00100         manager.enable(verbose=True)
00101         self._cb_called = False
00102         cam_info_pub.publish(build_msg(CameraInfo, 1))
00103         features_pub.publish(build_msg(CalibrationPattern, 1))
00104         image_pub.publish(build_msg(Image, 1))
00105         time.sleep(1.0)
00106         self.assertTrue(self._cb_called)
00107 
00108         # Only send a few of the messages. The verbose synchronizer shouldn't find a match
00109         manager.enable(verbose=True)
00110         self._cb_called = False
00111         cam_info_pub.publish(build_msg(CameraInfo, 2))
00112         features_pub.publish(build_msg(CalibrationPattern, 2))
00113         time.sleep(1.0)
00114         self.assertFalse(self._cb_called)
00115 
00116         # Only send a few of the messages again. The concise synchronizer should find a match
00117         manager.enable(verbose=False)
00118         self._cb_called = False
00119         cam_info_pub.publish(build_msg(CameraInfo, 3))
00120         features_pub.publish(build_msg(CalibrationPattern, 3))
00121         time.sleep(1.0)
00122         self.assertTrue(self._cb_called)
00123 
00124 
00125     def callback(self, cam_id, msg):
00126         print "Calling user callback"
00127         self._cb_called = True
00128         self._cb_cam_id = cam_id
00129         self._cb_msg = msg
00130 
00131 class TestLaserManager(unittest.TestCase):
00132     def test_easy(self):
00133         self._cb_laser_id = None
00134         self._cb_msg = None
00135         self._cb_called = False
00136         manager = LaserManager("laser1", self.callback)
00137         manager.enable(verbose=False)
00138 
00139         snapshot_pub       = rospy.Publisher("laser1/snapshot",       calibration_msgs.msg.DenseLaserSnapshot)
00140         laser_image_pub    = rospy.Publisher("laser1/laser_image",    sensor_msgs.msg.Image)
00141         image_features_pub = rospy.Publisher("laser1/image_features", calibration_msgs.msg.CalibrationPattern)
00142         joint_features_pub = rospy.Publisher("laser1/joint_features", calibration_msgs.msg.JointStateCalibrationPattern)
00143         duration_pub       = rospy.Publisher("laser1/laser_interval", calibration_msgs.msg.IntervalStamped)
00144 
00145         time.sleep(1.0)
00146 
00147         # Send a full set
00148         manager.enable(verbose=True)
00149         self._cb_called = False
00150         snapshot_pub.publish(build_msg(DenseLaserSnapshot, 1))
00151         laser_image_pub.publish(build_msg(Image, 1))
00152         image_features_pub.publish(build_msg(CalibrationPattern, 1))
00153         joint_features_pub.publish(build_msg(JointStateCalibrationPattern, 1))
00154         duration_pub.publish(build_msg(IntervalStamped, 1))
00155         time.sleep(1.0)
00156         self.assertTrue(self._cb_called)
00157 
00158         # Only send a few of the messages. The verbose synchronizer shouldn't find a match
00159         manager.enable(verbose=True)
00160         self._cb_called = False
00161         snapshot_pub.publish(build_msg(DenseLaserSnapshot, 2))
00162         laser_image_pub.publish(build_msg(Image, 2))
00163         #image_features_pub.publish(build_msg(CalibrationPattern, 1))
00164         joint_features_pub.publish(build_msg(JointStateCalibrationPattern, 2))
00165         duration_pub.publish(build_msg(IntervalStamped, 2))
00166         time.sleep(1.0)
00167         self.assertFalse(self._cb_called)
00168 
00169         # Only send a few of the messages again. The concise synchronizer should find a match
00170         manager.enable(verbose=False)
00171         self._cb_called = False
00172         snapshot_pub.publish(build_msg(DenseLaserSnapshot, 3))
00173         laser_image_pub.publish(build_msg(Image, 3))
00174         #image_features_pub.publish(build_msg(CalibrationPattern, 1))
00175         joint_features_pub.publish(build_msg(JointStateCalibrationPattern, 3))
00176         duration_pub.publish(build_msg(IntervalStamped, 3))
00177         time.sleep(1.0)
00178         self.assertTrue(self._cb_called)
00179 
00180     def callback(self, cam_id, msg, interval_start, interval_end):
00181         print "Calling user callback"
00182         self._cb_called = True
00183         self._cb_cam_id = cam_id
00184         self._cb_msg = msg
00185 
00186 
00187 if __name__ == '__main__':
00188     import rostest
00189     rospy.init_node("test_node")
00190     rostest.unitrun('pr2_calibration_executive', 'test_cam_manager',   TestCamManager)
00191     rostest.unitrun('pr2_calibration_executive', 'test_chain_manager', TestChainManager)
00192     rostest.unitrun('pr2_calibration_executive', 'test_laser_manager', TestLaserManager)


calibration_launch
Author(s): Michael Ferguson
autogenerated on Tue Sep 27 2016 04:06:51