Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 """
00039 This tests that a CameraListener will report stale after 15 seconds of not receiving any data.
00040 """
00041 
00042 from __future__ import with_statement
00043 
00044 PKG = 'pr2_hardware_test_monitor'
00045 import roslib; roslib.load_manifest(PKG)
00046 
00047 import unittest
00048 import rospy, rostest
00049 from time import sleep
00050 import sys
00051 
00052 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
00053 from pr2_self_test_msgs.msg import TestStatus
00054 from std_msgs.msg import Bool
00055 import std_msgs.msg
00056 from std_srvs.srv import *
00057 import threading
00058 
00059 GRACE_TIME = 35 
00060 
00061 def _camera_diag(level = 0):
00062     array = DiagnosticArray()
00063     stat = DiagnosticStatus()
00064     stat.name = 'wge100: Driver Status'
00065     stat.level = level
00066     stat.message = 'OK'
00067 
00068     array.header.stamp = rospy.get_rostime()
00069     array.status.append(stat)
00070     
00071     return array
00072 
00073 class TestCameraWarn(unittest.TestCase):
00074     def __init__(self, *args):
00075         super(TestCameraWarn, self).__init__(*args)
00076 
00077         self._mutex = threading.Lock()
00078         rospy.init_node('test_monitor_latch_test')
00079         self._ignore_time = 5
00080         self._start_time = rospy.get_time()
00081         self._ok = True
00082         self._level = 0
00083 
00084         self._start = rospy.get_time()
00085         self._last_msg = None
00086         self._max_lvl = -1
00087 
00088         self._halted = False
00089 
00090         self._snapped = False
00091         self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00092 
00093         self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00094 
00095         rospy.Subscriber('test_status', TestStatus, self._cb)
00096 
00097     def _snap_cb(self, msg):
00098         self._snapped = True
00099 
00100     def _cb(self, msg):
00101         with self._mutex:
00102             if not self._last_msg:
00103                 self._start = rospy.get_time()
00104 
00105             self._last_msg = msg
00106 
00107             if rospy.get_time() - self._start_time > self._ignore_time:
00108                 self._max_lvl = max(msg.test_ok, self._max_lvl)
00109 
00110 
00111     def test_cam_stale(self):
00112         while not rospy.is_shutdown():
00113             self._diag_pub.publish(_camera_diag())
00114             sleep(1.0)
00115             if rospy.get_time() - self._start > GRACE_TIME:
00116                 break
00117 
00118         
00119         for i in range(0, 15):
00120             sleep(1.0)
00121 
00122         with self._mutex:
00123             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00124 
00125             self.assert_(self._last_msg is not None, "No data from test monitor")
00126             
00127             
00128             self.assert_(self._last_msg.test_ok != TestStatus.RUNNING, "Test monitor reports that we're running. Level: %d" % self._last_msg.test_ok)
00129 
00130             
00131             self.assert_(self._max_lvl == TestStatus.STALE, "We didn't get stale message from the camera listener. Max level: %d" % self._max_lvl)
00132 
00133             
00134             self.assert_(self._snapped, "Snapshot trigger was called, but we didn't halt")
00135 
00136 if __name__ == '__main__':
00137     if len(sys.argv) > 1 and sys.argv[1] == '-v':
00138         suite = unittest.TestSuite()
00139         suite.addTest(TestCameraWarn('test_cam_warn'))
00140 
00141         unittest.TextTestRunner(verbosity=2).run(suite)
00142     else:
00143         rostest.run(PKG, sys.argv[0], TestCameraWarn, sys.argv)