Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 """
00039 This tests that many warning/error codes from the wge100 camera are reported as an error.
00040 """
00041
00042 from __future__ import with_statement
00043
00044 PKG = 'pr2_hardware_test_monitor'
00045 import roslib; roslib.load_manifest(PKG)
00046
00047 import unittest
00048 import rospy, rostest
00049 from time import sleep
00050 import sys
00051
00052 import random
00053 random.seed()
00054
00055 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
00056 from pr2_self_test_msgs.msg import TestStatus
00057 from std_msgs.msg import Bool
00058 import std_msgs.msg
00059 from std_srvs.srv import *
00060 import threading
00061
00062 GRACE_TIME = 35
00063
00064 def _camera_diag(level = 0):
00065 array = DiagnosticArray()
00066 stat = DiagnosticStatus()
00067 stat.name = 'wge100: Driver Status'
00068 stat.level = level
00069 stat.message = 'OK'
00070
00071 array.header.stamp = rospy.get_rostime()
00072 array.status.append(stat)
00073
00074 return array
00075
00076 class TestCameraWarn(unittest.TestCase):
00077 def __init__(self, *args):
00078 super(TestCameraWarn, self).__init__(*args)
00079
00080 self._mutex = threading.Lock()
00081 rospy.init_node('test_monitor_latch_test')
00082 self._ignore_time = 5
00083 self._start_time = rospy.get_time()
00084 self._ok = True
00085 self._level = 0
00086
00087 self._start = rospy.get_time()
00088 self._last_msg = None
00089 self._max_lvl = -1
00090
00091 self._halted = False
00092
00093 self._snapped = False
00094 self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00095
00096 self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00097
00098 rospy.Subscriber('test_status', TestStatus, self._cb)
00099
00100 def _snap_cb(self, msg):
00101 self._snapped = True
00102
00103 def _cb(self, msg):
00104 with self._mutex:
00105 if not self._last_msg:
00106 self._start = rospy.get_time()
00107
00108 self._last_msg = msg
00109
00110 if rospy.get_time() - self._start_time > self._ignore_time:
00111 self._max_lvl = max(msg.test_ok, self._max_lvl)
00112
00113
00114 def test_many_cam_warn(self):
00115 while not rospy.is_shutdown():
00116 self._diag_pub.publish(_camera_diag())
00117 sleep(1.0)
00118 if rospy.get_time() - self._start > GRACE_TIME:
00119 break
00120
00121
00122 for i in range(0, 15):
00123 self._diag_pub.publish(_camera_diag(level = random.randint(1, 2)))
00124 sleep(1.0)
00125
00126
00127 for i in range(0, 10):
00128 self._diag_pub.publish(_camera_diag())
00129 sleep(1.0)
00130
00131 with self._mutex:
00132 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00133
00134 self.assert_(self._last_msg is not None, "No data from test monitor")
00135
00136
00137 self.assert_(self._last_msg.test_ok == TestStatus.ERROR, "Test monitor should be in error state. Level: %d" % self._last_msg.test_ok)
00138
00139 self.assert_(self._snapped, "Snapshot trigger wasn't called")
00140
00141 if __name__ == '__main__':
00142 if len(sys.argv) > 1 and sys.argv[1] == '-v':
00143 suite = unittest.TestSuite()
00144 suite.addTest(TestCameraWarn('test_many_cam_warn'))
00145
00146 unittest.TextTestRunner(verbosity=2).run(suite)
00147 else:
00148 rostest.run(PKG, sys.argv[0], TestCameraWarn, sys.argv)