test_sensor_managers.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 
35 import roslib; roslib.load_manifest('calibration_launch')
36 
37 import sys
38 import unittest
39 import rospy
40 import time
41 
42 from capture_executive.sensor_managers import CamManager
43 from capture_executive.sensor_managers import ChainManager
44 from capture_executive.sensor_managers import LaserManager
45 from calibration_msgs.msg import *
46 from sensor_msgs.msg import *
47 
48 def build_msg(type, secs):
49  msg = type()
50  msg.header.stamp = rospy.Time(secs, 0)
51  return msg
52 
53 class TestChainManager(unittest.TestCase):
54  def test_easy(self):
55  self._cb_chain_id = None
56  self._cb_msg = None
57  self._cb_called = False
58  manager = ChainManager("chain1", self.callback)
59  manager.enable()
60 
61  chain_state_pub = rospy.Publisher("chain1/chain_state", sensor_msgs.msg.JointState)
62 
63  time.sleep(1.0)
64 
65  # Send a full set
66  manager.enable()
67  self._cb_called = False
68  chain_state_pub.publish(build_msg(JointState, 1))
69  time.sleep(1.0)
70  self.assertTrue(self._cb_called)
71 
72  # disable chain manager, and then send data
73  manager.disable()
74  self._cb_called = False
75  chain_state_pub.publish(build_msg(JointState, 2))
76  time.sleep(1.0)
77  self.assertFalse(self._cb_called)
78 
79  def callback(self, chain_id, msg):
80  print "Calling user callback"
81  self._cb_called = True
82  self._cb_chain_id = chain_id
83  self._cb_msg = msg
84 
85 class TestCamManager(unittest.TestCase):
86  def test_easy(self):
87  self._cb_cam_id = None
88  self._cb_msg = None
89  self._cb_called = False
90  manager = CamManager("cam1", self.callback)
91  manager.enable(verbose=False)
92 
93  cam_info_pub = rospy.Publisher("cam1/camera_info", sensor_msgs.msg.CameraInfo)
94  features_pub = rospy.Publisher("cam1/features", calibration_msgs.msg.CalibrationPattern)
95  image_pub = rospy.Publisher("cam1/image", sensor_msgs.msg.Image)
96 
97  time.sleep(1.0)
98 
99  # Send a full set
100  manager.enable(verbose=True)
101  self._cb_called = False
102  cam_info_pub.publish(build_msg(CameraInfo, 1))
103  features_pub.publish(build_msg(CalibrationPattern, 1))
104  image_pub.publish(build_msg(Image, 1))
105  time.sleep(1.0)
106  self.assertTrue(self._cb_called)
107 
108  # Only send a few of the messages. The verbose synchronizer shouldn't find a match
109  manager.enable(verbose=True)
110  self._cb_called = False
111  cam_info_pub.publish(build_msg(CameraInfo, 2))
112  features_pub.publish(build_msg(CalibrationPattern, 2))
113  time.sleep(1.0)
114  self.assertFalse(self._cb_called)
115 
116  # Only send a few of the messages again. The concise synchronizer should find a match
117  manager.enable(verbose=False)
118  self._cb_called = False
119  cam_info_pub.publish(build_msg(CameraInfo, 3))
120  features_pub.publish(build_msg(CalibrationPattern, 3))
121  time.sleep(1.0)
122  self.assertTrue(self._cb_called)
123 
124 
125  def callback(self, cam_id, msg):
126  print "Calling user callback"
127  self._cb_called = True
128  self._cb_cam_id = cam_id
129  self._cb_msg = msg
130 
131 class TestLaserManager(unittest.TestCase):
132  def test_easy(self):
133  self._cb_laser_id = None
134  self._cb_msg = None
135  self._cb_called = False
136  manager = LaserManager("laser1", self.callback)
137  manager.enable(verbose=False)
138 
139  snapshot_pub = rospy.Publisher("laser1/snapshot", calibration_msgs.msg.DenseLaserSnapshot)
140  laser_image_pub = rospy.Publisher("laser1/laser_image", sensor_msgs.msg.Image)
141  image_features_pub = rospy.Publisher("laser1/image_features", calibration_msgs.msg.CalibrationPattern)
142  joint_features_pub = rospy.Publisher("laser1/joint_features", calibration_msgs.msg.JointStateCalibrationPattern)
143  duration_pub = rospy.Publisher("laser1/laser_interval", calibration_msgs.msg.IntervalStamped)
144 
145  time.sleep(1.0)
146 
147  # Send a full set
148  manager.enable(verbose=True)
149  self._cb_called = False
150  snapshot_pub.publish(build_msg(DenseLaserSnapshot, 1))
151  laser_image_pub.publish(build_msg(Image, 1))
152  image_features_pub.publish(build_msg(CalibrationPattern, 1))
153  joint_features_pub.publish(build_msg(JointStateCalibrationPattern, 1))
154  duration_pub.publish(build_msg(IntervalStamped, 1))
155  time.sleep(1.0)
156  self.assertTrue(self._cb_called)
157 
158  # Only send a few of the messages. The verbose synchronizer shouldn't find a match
159  manager.enable(verbose=True)
160  self._cb_called = False
161  snapshot_pub.publish(build_msg(DenseLaserSnapshot, 2))
162  laser_image_pub.publish(build_msg(Image, 2))
163  #image_features_pub.publish(build_msg(CalibrationPattern, 1))
164  joint_features_pub.publish(build_msg(JointStateCalibrationPattern, 2))
165  duration_pub.publish(build_msg(IntervalStamped, 2))
166  time.sleep(1.0)
167  self.assertFalse(self._cb_called)
168 
169  # Only send a few of the messages again. The concise synchronizer should find a match
170  manager.enable(verbose=False)
171  self._cb_called = False
172  snapshot_pub.publish(build_msg(DenseLaserSnapshot, 3))
173  laser_image_pub.publish(build_msg(Image, 3))
174  #image_features_pub.publish(build_msg(CalibrationPattern, 1))
175  joint_features_pub.publish(build_msg(JointStateCalibrationPattern, 3))
176  duration_pub.publish(build_msg(IntervalStamped, 3))
177  time.sleep(1.0)
178  self.assertTrue(self._cb_called)
179 
180  def callback(self, cam_id, msg, interval_start, interval_end):
181  print "Calling user callback"
182  self._cb_called = True
183  self._cb_cam_id = cam_id
184  self._cb_msg = msg
185 
186 
187 if __name__ == '__main__':
188  import rostest
189  rospy.init_node("test_node")
190  rostest.unitrun('pr2_calibration_executive', 'test_cam_manager', TestCamManager)
191  rostest.unitrun('pr2_calibration_executive', 'test_chain_manager', TestChainManager)
192  rostest.unitrun('pr2_calibration_executive', 'test_laser_manager', TestLaserManager)
def callback(self, cam_id, msg, interval_start, interval_end)


calibration_launch
Author(s): Michael Ferguson
autogenerated on Fri Apr 2 2021 02:13:07