00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 """
00039 This tests that not publishing a 'heartbeat' topic will cause the test monitor
00040 to halt everything.
00041 """
00042
00043 from __future__ import with_statement
00044
00045 DURATION = 120
00046 SHORT = 10
00047
00048 PKG = 'pr2_hardware_test_monitor'
00049 import roslib; roslib.load_manifest(PKG)
00050
00051 import unittest
00052 import rospy, rostest
00053 from time import sleep
00054 import sys
00055
00056 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00057 from pr2_self_test_msgs.msg import TestStatus
00058 from std_msgs.msg import Bool
00059 import std_msgs.msg
00060 from std_srvs.srv import *
00061 import threading
00062
00063
00064 def _camera_diag(level = 0):
00065 array = DiagnosticArray()
00066 stat = DiagnosticStatus()
00067 stat.name = 'wge100: Driver Status'
00068 stat.level = level
00069 stat.message = 'OK'
00070
00071 motor_stat = DiagnosticStatus()
00072 motor_stat.name = 'EtherCAT Master'
00073 motor_stat.level = 0
00074 motor_stat.values = [
00075 KeyValue(key='Dropped Packets', value='0'),
00076 KeyValue(key='RX Late Packet', value='0')]
00077
00078 mcb_stat = DiagnosticStatus()
00079 mcb_stat.name = 'EtherCAT Device (my_motor)'
00080 mcb_stat.level = 0
00081 mcb_stat.values.append(KeyValue('Num encoder_errors', '0'))
00082
00083 array.header.stamp = rospy.get_rostime()
00084 array.status.append(stat)
00085 array.status.append(motor_stat)
00086 array.status.append(mcb_stat)
00087
00088 return array
00089
00090 class TestMonitorHeartbeat(unittest.TestCase):
00091 def __init__(self, *args):
00092 super(TestMonitorHeartbeat, self).__init__(*args)
00093
00094 self._mutex = threading.Lock()
00095 rospy.init_node('test_monitor_no_beat_test')
00096 self._ignore_time = 5
00097 self._start_time = rospy.get_time()
00098 self._ok = True
00099 self._level = 0
00100
00101 self._start = rospy.get_time()
00102 self._last_msg = None
00103
00104 self._halted = False
00105
00106 self._heartbeat_pub = rospy.Publisher('/heartbeat', std_msgs.msg.Empty)
00107
00108
00109 self._cal_pub = rospy.Publisher('calibrated', Bool, latch=True)
00110 self._cal_pub.publish(True)
00111
00112
00113 self._snapped = False
00114 self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00115
00116
00117 self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00118 self._pub = rospy.Publisher('pr2_etherCAT/motors_halted', Bool)
00119
00120 self._hlt = rospy.Service('pr2_etherCAT/halt_motors', Empty, self._hlt_cb)
00121 self._rst = rospy.Service('pr2_etherCAT/reset_motors', Empty, self._rst_cb)
00122
00123 self._reset_test = rospy.ServiceProxy('reset_test', Empty)
00124
00125 rospy.Subscriber('test_status', TestStatus, self._cb)
00126
00127 def _snap_cb(self, msg):
00128 self._snapped = True
00129
00130 def _hlt_cb(self, srv):
00131 self._halted = True
00132 return EmptyResponse()
00133
00134 def _rst_cb(self, srv):
00135 self._halted = False
00136 return EmptyResponse()
00137
00138 def _cb(self, msg):
00139 with self._mutex:
00140 if not self._last_msg:
00141 self._start = rospy.get_time()
00142
00143 self._last_msg = msg
00144
00145 def test_monitor(self):
00146
00147 while not rospy.is_shutdown():
00148 self._pub.publish(False)
00149 self._diag_pub.publish(_camera_diag())
00150 sleep(1.0)
00151 if rospy.get_time() - self._start > SHORT:
00152 break
00153
00154
00155 with self._mutex:
00156 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00157 self.assert_(self._last_msg is not None, "No data from test monitor")
00158 self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Should have good data, since we're publishing OK")
00159 self.assert_(not self._halted, "Motors halted, but we should be OK")
00160
00161
00162 while not rospy.is_shutdown() and self._last_msg.test_ok == TestStatus.RUNNING:
00163 self._pub.publish(False)
00164 self._diag_pub.publish(_camera_diag())
00165 sleep(1.0)
00166 if rospy.get_time() - self._start > DURATION:
00167 break
00168
00169 with self._mutex:
00170 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00171 self.assert_(self._last_msg is not None, "No data from test monitor")
00172
00173
00174 self.assert_(self._last_msg.test_ok == TestStatus.STALE, "Didn't record stale state from monitor. Should have stale state because no heartbeat")
00175
00176
00177 self.assert_(self._halted, "Halt motors wasn't called after no heartbeat")
00178
00179 self.assert_(self._snapped, "Snapshot trigger wasn't called for halt")
00180
00181
00182
00183 for i in range(5):
00184 if rospy.is_shutdown():
00185 break
00186
00187 self._pub.publish(False)
00188 self._diag_pub.publish(_camera_diag())
00189 self._heartbeat_pub.publish()
00190 sleep(1.0)
00191
00192 with self._mutex:
00193 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00194 self.assert_(self._last_msg is not None, "No data from test monitor")
00195
00196
00197 self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Test monitor should have reset automatically after heartbeat sent")
00198
00199
00200
00201 if __name__ == '__main__':
00202 if len(sys.argv) > 1 and sys.argv[1] == '-v':
00203 suite = unittest.TestSuite()
00204 suite.addTest(TestMonitorHeartbeat('test_monitor'))
00205
00206 unittest.TextTestRunner(verbosity=2).run(suite)
00207 else:
00208 rostest.run(PKG, sys.argv[0], TestMonitorHeartbeat, sys.argv)