camera_stale_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2010, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 ##\author Kevin Watts
00036 ##\brief Tests that test monitor latches error state
00037 
00038 """
00039 This tests that a CameraListener will report stale after 15 seconds of not receiving any data.
00040 """
00041 
00042 from __future__ import with_statement
00043 
00044 PKG = 'pr2_hardware_test_monitor'
00045 import roslib; roslib.load_manifest(PKG)
00046 
00047 import unittest
00048 import rospy, rostest
00049 from time import sleep
00050 import sys
00051 
00052 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
00053 from pr2_self_test_msgs.msg import TestStatus
00054 from std_msgs.msg import Bool
00055 import std_msgs.msg
00056 from std_srvs.srv import *
00057 import threading
00058 
00059 GRACE_TIME = 35 # 30 + 5 seconds for HW monitor to be ready
00060 
00061 def _camera_diag(level = 0):
00062     array = DiagnosticArray()
00063     stat = DiagnosticStatus()
00064     stat.name = 'wge100: Driver Status'
00065     stat.level = level
00066     stat.message = 'OK'
00067 
00068     array.header.stamp = rospy.get_rostime()
00069     array.status.append(stat)
00070     
00071     return array
00072 
00073 class TestCameraWarn(unittest.TestCase):
00074     def __init__(self, *args):
00075         super(TestCameraWarn, self).__init__(*args)
00076 
00077         self._mutex = threading.Lock()
00078         rospy.init_node('test_monitor_latch_test')
00079         self._ignore_time = 5
00080         self._start_time = rospy.get_time()
00081         self._ok = True
00082         self._level = 0
00083 
00084         self._start = rospy.get_time()
00085         self._last_msg = None
00086         self._max_lvl = -1
00087 
00088         self._halted = False
00089 
00090         self._snapped = False
00091         self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00092 
00093         self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00094 
00095         rospy.Subscriber('test_status', TestStatus, self._cb)
00096 
00097     def _snap_cb(self, msg):
00098         self._snapped = True
00099 
00100     def _cb(self, msg):
00101         with self._mutex:
00102             if not self._last_msg:
00103                 self._start = rospy.get_time()
00104 
00105             self._last_msg = msg
00106 
00107             if rospy.get_time() - self._start_time > self._ignore_time:
00108                 self._max_lvl = max(msg.test_ok, self._max_lvl)
00109 
00110 
00111     def test_cam_stale(self):
00112         while not rospy.is_shutdown():
00113             self._diag_pub.publish(_camera_diag())
00114             sleep(1.0)
00115             if rospy.get_time() - self._start > GRACE_TIME:
00116                 break
00117 
00118         # Stop publishing, see if we're stale
00119         for i in range(0, 15):
00120             sleep(1.0)
00121 
00122         with self._mutex:
00123             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00124 
00125             self.assert_(self._last_msg is not None, "No data from test monitor")
00126             
00127             # Check that message level is error state
00128             self.assert_(self._last_msg.test_ok != TestStatus.RUNNING, "Test monitor reports that we're running. Level: %d" % self._last_msg.test_ok)
00129 
00130             # Check that we went into warning state
00131             self.assert_(self._max_lvl == TestStatus.STALE, "We didn't get stale message from the camera listener. Max level: %d" % self._max_lvl)
00132 
00133             # Check that snapshot trigger was called
00134             self.assert_(self._snapped, "Snapshot trigger was called, but we didn't halt")
00135 
00136 if __name__ == '__main__':
00137     if len(sys.argv) > 1 and sys.argv[1] == '-v':
00138         suite = unittest.TestSuite()
00139         suite.addTest(TestCameraWarn('test_cam_warn'))
00140 
00141         unittest.TextTestRunner(verbosity=2).run(suite)
00142     else:
00143         rostest.run(PKG, sys.argv[0], TestCameraWarn, sys.argv)


pr2_hardware_test_monitor
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:54:19