00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 """
00039 This tests that a transient error status, such as a camera error, produces
00040 the correct behavior. We expect that an error, after a "grace period",
00041 will cause the "halt" method of all listeners to be called, and that the error
00042 state will "latch" until reset is called.
00043 """
00044
00045 from __future__ import with_statement
00046
00047 PKG = 'pr2_hardware_test_monitor'
00048 import roslib; roslib.load_manifest(PKG)
00049
00050 import unittest
00051 import rospy, rostest
00052 from time import sleep
00053 import sys
00054
00055 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00056 from pr2_self_test_msgs.msg import TestStatus
00057 from std_msgs.msg import Bool
00058 import std_msgs.msg
00059 from std_srvs.srv import *
00060 import threading
00061
00062 GRACE_TIME = 35
00063
00064 def _camera_diag(level = 0):
00065 array = DiagnosticArray()
00066 stat = DiagnosticStatus()
00067 stat.name = 'wge100: Driver Status'
00068 stat.level = level
00069 stat.message = 'OK'
00070
00071 motor_stat = DiagnosticStatus()
00072 motor_stat.name = 'EtherCAT Master'
00073 motor_stat.level = 0
00074 motor_stat.values = [
00075 KeyValue(key='Dropped Packets', value='0'),
00076 KeyValue(key='RX Late Packet', value='0')]
00077
00078 mcb_stat = DiagnosticStatus()
00079 mcb_stat.name = 'EtherCAT Device (my_motor)'
00080 mcb_stat.level = 0
00081 mcb_stat.values.append(KeyValue('Num encoder_errors', '0'))
00082
00083 array.header.stamp = rospy.get_rostime()
00084 array.status.append(stat)
00085 array.status.append(motor_stat)
00086 array.status.append(mcb_stat)
00087
00088 return array
00089
00090 class TestMonitorLatch(unittest.TestCase):
00091 def __init__(self, *args):
00092 super(TestMonitorLatch, self).__init__(*args)
00093
00094 self._mutex = threading.Lock()
00095 rospy.init_node('test_monitor_latch_test')
00096 self._ignore_time = 5
00097 self._start_time = rospy.get_time()
00098 self._ok = True
00099 self._level = 0
00100
00101 self._start = rospy.get_time()
00102 self._last_msg = None
00103
00104 self._halted = False
00105
00106
00107 self._cal_pub = rospy.Publisher('calibrated', Bool, latch=True)
00108 self._cal_pub.publish(True)
00109
00110 self._snapped = False
00111 self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00112
00113 self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00114 self._pub = rospy.Publisher('pr2_etherCAT/motors_halted', Bool)
00115 self._hlt = rospy.Service('pr2_etherCAT/halt_motors', Empty, self._hlt_cb)
00116 self._rst = rospy.Service('pr2_etherCAT/reset_motors', Empty, self._rst_cb)
00117
00118 self._reset_test = rospy.ServiceProxy('reset_test', Empty)
00119
00120 rospy.Subscriber('test_status', TestStatus, self._cb)
00121
00122 def _snap_cb(self, msg):
00123 self._snapped = True
00124
00125 def _hlt_cb(self, srv):
00126 self._halted = True
00127 return EmptyResponse()
00128
00129 def _rst_cb(self, srv):
00130 self._halted = False
00131 return EmptyResponse()
00132
00133 def _cb(self, msg):
00134 with self._mutex:
00135 if not self._last_msg:
00136 self._start = rospy.get_time()
00137
00138 self._last_msg = msg
00139
00140
00141 def test_monitor(self):
00142 while not rospy.is_shutdown():
00143 self._pub.publish(False)
00144 self._diag_pub.publish(_camera_diag())
00145 sleep(1.0)
00146 if rospy.get_time() - self._start > GRACE_TIME:
00147 break
00148
00149
00150 for i in range(0, 10):
00151 self._pub.publish(False)
00152 self._diag_pub.publish(_camera_diag(level = 2))
00153 sleep(1.0)
00154
00155
00156 for i in range(0, 5):
00157 self._pub.publish(False)
00158 self._diag_pub.publish(_camera_diag())
00159 sleep(1.0)
00160
00161 with self._mutex:
00162 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00163
00164 self.assert_(self._last_msg is not None, "No data from test monitor")
00165
00166
00167 self.assert_(self._last_msg.test_ok == TestStatus.ERROR, "Didn't record error state from Test Monitor. Should have error state because motors halted. Level: %d" % self._last_msg.test_ok)
00168
00169
00170 self.assert_(self._last_msg.message != 'OK', "Got OK message from test monitor, even with error level")
00171 self.assert_(self._last_msg.message.find("Camera Error") > -1, "Didn't get camera error message")
00172
00173
00174 self.assert_(self._halted, "Halt motors wasn't called after failure")
00175
00176
00177 self.assert_(self._snapped, "Snapshot trigger wasn't called after halt")
00178
00179
00180 self._reset_test()
00181
00182
00183 for i in range(0, 5):
00184 self._pub.publish(False)
00185 self._diag_pub.publish(_camera_diag())
00186 sleep(1.0)
00187
00188 with self._mutex:
00189 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00190 self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Test didn't reset properly after error condition")
00191
00192 self.assert_(not self._halted, "Reset motors wasn't called after reset")
00193
00194 if __name__ == '__main__':
00195 if len(sys.argv) > 1 and sys.argv[1] == '-v':
00196 suite = unittest.TestSuite()
00197 suite.addTest(TestMonitorLatch('test_monitor'))
00198
00199 unittest.TextTestRunner(verbosity=2).run(suite)
00200 else:
00201 rostest.run(PKG, sys.argv[0], TestMonitorLatch, sys.argv)