Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 """
00039 This tests that a CameraListener will report stale after 15 seconds of not receiving any data.
00040 """
00041
00042 from __future__ import with_statement
00043
00044 PKG = 'pr2_hardware_test_monitor'
00045 import roslib; roslib.load_manifest(PKG)
00046
00047 import unittest
00048 import rospy, rostest
00049 from time import sleep
00050 import sys
00051
00052 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
00053 from pr2_self_test_msgs.msg import TestStatus
00054 from std_msgs.msg import Bool
00055 import std_msgs.msg
00056 from std_srvs.srv import *
00057 import threading
00058
00059 GRACE_TIME = 35
00060
00061 def _camera_diag(level = 0):
00062 array = DiagnosticArray()
00063 stat = DiagnosticStatus()
00064 stat.name = 'wge100: Driver Status'
00065 stat.level = level
00066 stat.message = 'OK'
00067
00068 array.header.stamp = rospy.get_rostime()
00069 array.status.append(stat)
00070
00071 return array
00072
00073 class TestCameraWarn(unittest.TestCase):
00074 def __init__(self, *args):
00075 super(TestCameraWarn, self).__init__(*args)
00076
00077 self._mutex = threading.Lock()
00078 rospy.init_node('test_monitor_latch_test')
00079 self._ignore_time = 5
00080 self._start_time = rospy.get_time()
00081 self._ok = True
00082 self._level = 0
00083
00084 self._start = rospy.get_time()
00085 self._last_msg = None
00086 self._max_lvl = -1
00087
00088 self._halted = False
00089
00090 self._snapped = False
00091 self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00092
00093 self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00094
00095 rospy.Subscriber('test_status', TestStatus, self._cb)
00096
00097 def _snap_cb(self, msg):
00098 self._snapped = True
00099
00100 def _cb(self, msg):
00101 with self._mutex:
00102 if not self._last_msg:
00103 self._start = rospy.get_time()
00104
00105 self._last_msg = msg
00106
00107 if rospy.get_time() - self._start_time > self._ignore_time:
00108 self._max_lvl = max(msg.test_ok, self._max_lvl)
00109
00110
00111 def test_cam_stale(self):
00112 while not rospy.is_shutdown():
00113 self._diag_pub.publish(_camera_diag())
00114 sleep(1.0)
00115 if rospy.get_time() - self._start > GRACE_TIME:
00116 break
00117
00118
00119 for i in range(0, 15):
00120 sleep(1.0)
00121
00122 with self._mutex:
00123 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00124
00125 self.assert_(self._last_msg is not None, "No data from test monitor")
00126
00127
00128 self.assert_(self._last_msg.test_ok != TestStatus.RUNNING, "Test monitor reports that we're running. Level: %d" % self._last_msg.test_ok)
00129
00130
00131 self.assert_(self._max_lvl == TestStatus.STALE, "We didn't get stale message from the camera listener. Max level: %d" % self._max_lvl)
00132
00133
00134 self.assert_(self._snapped, "Snapshot trigger was called, but we didn't halt")
00135
00136 if __name__ == '__main__':
00137 if len(sys.argv) > 1 and sys.argv[1] == '-v':
00138 suite = unittest.TestSuite()
00139 suite.addTest(TestCameraWarn('test_cam_warn'))
00140
00141 unittest.TextTestRunner(verbosity=2).run(suite)
00142 else:
00143 rostest.run(PKG, sys.argv[0], TestCameraWarn, sys.argv)