Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 """
00039 This tests that not publishing a 'heartbeat' topic will cause the test monitor
00040 to halt everything.
00041 """
00042 
00043 from __future__ import with_statement
00044 
00045 DURATION = 120
00046 SHORT = 10
00047 
00048 PKG = 'pr2_hardware_test_monitor'
00049 import roslib; roslib.load_manifest(PKG)
00050 
00051 import unittest
00052 import rospy, rostest
00053 from time import sleep
00054 import sys
00055 
00056 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00057 from pr2_self_test_msgs.msg import TestStatus
00058 from std_msgs.msg import Bool
00059 import std_msgs.msg
00060 from std_srvs.srv import *
00061 import threading
00062 
00063 
00064 def _camera_diag(level = 0):
00065     array = DiagnosticArray()
00066     stat = DiagnosticStatus()
00067     stat.name = 'wge100: Driver Status'
00068     stat.level = level
00069     stat.message = 'OK'
00070 
00071     motor_stat = DiagnosticStatus()
00072     motor_stat.name = 'EtherCAT Master'
00073     motor_stat.level = 0
00074     motor_stat.values = [
00075         KeyValue(key='Dropped Packets', value='0'),
00076         KeyValue(key='RX Late Packet', value='0')]
00077 
00078     mcb_stat = DiagnosticStatus()
00079     mcb_stat.name = 'EtherCAT Device (my_motor)'
00080     mcb_stat.level = 0
00081     mcb_stat.values.append(KeyValue('Num encoder_errors', '0'))
00082 
00083     array.header.stamp = rospy.get_rostime()
00084     array.status.append(stat)
00085     array.status.append(motor_stat)
00086     array.status.append(mcb_stat)
00087     
00088     return array
00089 
00090 class TestMonitorHeartbeat(unittest.TestCase):
00091     def __init__(self, *args):
00092         super(TestMonitorHeartbeat, self).__init__(*args)
00093 
00094         self._mutex = threading.Lock()
00095         rospy.init_node('test_monitor_no_beat_test')
00096         self._ignore_time = 5
00097         self._start_time = rospy.get_time()
00098         self._ok = True
00099         self._level = 0
00100 
00101         self._start = rospy.get_time()
00102         self._last_msg = None
00103 
00104         self._halted = False
00105 
00106         self._heartbeat_pub = rospy.Publisher('/heartbeat', std_msgs.msg.Empty)
00107 
00108         
00109         self._cal_pub = rospy.Publisher('calibrated', Bool, latch=True)
00110         self._cal_pub.publish(True)
00111 
00112         
00113         self._snapped = False
00114         self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00115 
00116         
00117         self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00118         self._pub = rospy.Publisher('pr2_etherCAT/motors_halted', Bool)
00119 
00120         self._hlt = rospy.Service('pr2_etherCAT/halt_motors', Empty, self._hlt_cb)
00121         self._rst = rospy.Service('pr2_etherCAT/reset_motors', Empty, self._rst_cb)
00122 
00123         self._reset_test = rospy.ServiceProxy('reset_test', Empty)
00124 
00125         rospy.Subscriber('test_status', TestStatus, self._cb)
00126 
00127     def _snap_cb(self, msg):
00128         self._snapped = True
00129 
00130     def _hlt_cb(self, srv):
00131         self._halted = True
00132         return EmptyResponse()
00133 
00134     def _rst_cb(self, srv):
00135         self._halted = False
00136         return EmptyResponse()
00137     
00138     def _cb(self, msg):
00139         with self._mutex:
00140             if not self._last_msg:
00141                 self._start = rospy.get_time()
00142 
00143             self._last_msg = msg
00144 
00145     def test_monitor(self):
00146         
00147         while not rospy.is_shutdown():
00148             self._pub.publish(False)
00149             self._diag_pub.publish(_camera_diag())
00150             sleep(1.0)
00151             if rospy.get_time() - self._start > SHORT:
00152                 break
00153 
00154         
00155         with self._mutex:
00156             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00157             self.assert_(self._last_msg is not None, "No data from test monitor")
00158             self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Should have good data, since we're publishing OK")
00159             self.assert_(not self._halted, "Motors halted, but we should be OK")
00160 
00161         
00162         while not rospy.is_shutdown() and self._last_msg.test_ok == TestStatus.RUNNING:
00163             self._pub.publish(False)
00164             self._diag_pub.publish(_camera_diag())
00165             sleep(1.0)
00166             if rospy.get_time() - self._start > DURATION:
00167                 break
00168 
00169         with self._mutex:
00170             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00171             self.assert_(self._last_msg is not None, "No data from test monitor")
00172 
00173             
00174             self.assert_(self._last_msg.test_ok == TestStatus.STALE, "Didn't record stale state from monitor. Should have stale state because no heartbeat")
00175 
00176             
00177             self.assert_(self._halted, "Halt motors wasn't called after no heartbeat")
00178 
00179             self.assert_(self._snapped, "Snapshot trigger wasn't called for halt")
00180 
00181 
00182         
00183         for i in range(5):
00184             if rospy.is_shutdown():
00185                 break
00186 
00187             self._pub.publish(False)
00188             self._diag_pub.publish(_camera_diag())
00189             self._heartbeat_pub.publish()
00190             sleep(1.0)
00191 
00192         with self._mutex:
00193             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00194             self.assert_(self._last_msg is not None, "No data from test monitor")
00195 
00196             
00197             self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Test monitor should have reset automatically after heartbeat sent")
00198 
00199 
00200 
00201 if __name__ == '__main__':
00202     if len(sys.argv) > 1 and sys.argv[1] == '-v':
00203         suite = unittest.TestSuite()
00204         suite.addTest(TestMonitorHeartbeat('test_monitor'))
00205 
00206         unittest.TextTestRunner(verbosity=2).run(suite)
00207     else:
00208         rostest.run(PKG, sys.argv[0], TestMonitorHeartbeat, sys.argv)