no_heartbeat_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2010, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 ##\author Kevin Watts
00036 ##\brief Tests that test monitor latches error state
00037 
00038 """
00039 This tests that not publishing a 'heartbeat' topic will cause the test monitor
00040 to halt everything.
00041 """
00042 
00043 from __future__ import with_statement
00044 
00045 DURATION = 120
00046 SHORT = 10
00047 
00048 PKG = 'pr2_hardware_test_monitor'
00049 import roslib; roslib.load_manifest(PKG)
00050 
00051 import unittest
00052 import rospy, rostest
00053 from time import sleep
00054 import sys
00055 
00056 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00057 from pr2_self_test_msgs.msg import TestStatus
00058 from std_msgs.msg import Bool
00059 import std_msgs.msg
00060 from std_srvs.srv import *
00061 import threading
00062 
00063 
00064 def _camera_diag(level = 0):
00065     array = DiagnosticArray()
00066     stat = DiagnosticStatus()
00067     stat.name = 'wge100: Driver Status'
00068     stat.level = level
00069     stat.message = 'OK'
00070 
00071     motor_stat = DiagnosticStatus()
00072     motor_stat.name = 'EtherCAT Master'
00073     motor_stat.level = 0
00074     motor_stat.values = [
00075         KeyValue(key='Dropped Packets', value='0'),
00076         KeyValue(key='RX Late Packet', value='0')]
00077 
00078     mcb_stat = DiagnosticStatus()
00079     mcb_stat.name = 'EtherCAT Device (my_motor)'
00080     mcb_stat.level = 0
00081     mcb_stat.values.append(KeyValue('Num encoder_errors', '0'))
00082 
00083     array.header.stamp = rospy.get_rostime()
00084     array.status.append(stat)
00085     array.status.append(motor_stat)
00086     array.status.append(mcb_stat)
00087     
00088     return array
00089 
00090 class TestMonitorHeartbeat(unittest.TestCase):
00091     def __init__(self, *args):
00092         super(TestMonitorHeartbeat, self).__init__(*args)
00093 
00094         self._mutex = threading.Lock()
00095         rospy.init_node('test_monitor_no_beat_test')
00096         self._ignore_time = 5
00097         self._start_time = rospy.get_time()
00098         self._ok = True
00099         self._level = 0
00100 
00101         self._start = rospy.get_time()
00102         self._last_msg = None
00103 
00104         self._halted = False
00105 
00106         self._heartbeat_pub = rospy.Publisher('/heartbeat', std_msgs.msg.Empty)
00107 
00108         # Publish that we're calibrated
00109         self._cal_pub = rospy.Publisher('calibrated', Bool, latch=True)
00110         self._cal_pub.publish(True)
00111 
00112         # Snapshot trigger 
00113         self._snapped = False
00114         self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00115 
00116         # Publish camera/motors data
00117         self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00118         self._pub = rospy.Publisher('pr2_etherCAT/motors_halted', Bool)
00119 
00120         self._hlt = rospy.Service('pr2_etherCAT/halt_motors', Empty, self._hlt_cb)
00121         self._rst = rospy.Service('pr2_etherCAT/reset_motors', Empty, self._rst_cb)
00122 
00123         self._reset_test = rospy.ServiceProxy('reset_test', Empty)
00124 
00125         rospy.Subscriber('test_status', TestStatus, self._cb)
00126 
00127     def _snap_cb(self, msg):
00128         self._snapped = True
00129 
00130     def _hlt_cb(self, srv):
00131         self._halted = True
00132         return EmptyResponse()
00133 
00134     def _rst_cb(self, srv):
00135         self._halted = False
00136         return EmptyResponse()
00137     
00138     def _cb(self, msg):
00139         with self._mutex:
00140             if not self._last_msg:
00141                 self._start = rospy.get_time()
00142 
00143             self._last_msg = msg
00144 
00145     def test_monitor(self):
00146         # Publish good data for a bit
00147         while not rospy.is_shutdown():
00148             self._pub.publish(False)
00149             self._diag_pub.publish(_camera_diag())
00150             sleep(1.0)
00151             if rospy.get_time() - self._start > SHORT:
00152                 break
00153 
00154         # Check that our data is good for now
00155         with self._mutex:
00156             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00157             self.assert_(self._last_msg is not None, "No data from test monitor")
00158             self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Should have good data, since we're publishing OK")
00159             self.assert_(not self._halted, "Motors halted, but we should be OK")
00160 
00161         # Keep publishing good data until we trip the heartbeat
00162         while not rospy.is_shutdown() and self._last_msg.test_ok == TestStatus.RUNNING:
00163             self._pub.publish(False)
00164             self._diag_pub.publish(_camera_diag())
00165             sleep(1.0)
00166             if rospy.get_time() - self._start > DURATION:
00167                 break
00168 
00169         with self._mutex:
00170             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00171             self.assert_(self._last_msg is not None, "No data from test monitor")
00172 
00173             # Check that we're halted for heartbeat
00174             self.assert_(self._last_msg.test_ok == TestStatus.STALE, "Didn't record stale state from monitor. Should have stale state because no heartbeat")
00175 
00176             # Check that it called halt_motors correctly
00177             self.assert_(self._halted, "Halt motors wasn't called after no heartbeat")
00178 
00179             self.assert_(self._snapped, "Snapshot trigger wasn't called for halt")
00180 
00181 
00182         # Send the heartbeat to reset the monitor automatically, #4878
00183         for i in range(5):
00184             if rospy.is_shutdown():
00185                 break
00186 
00187             self._pub.publish(False)
00188             self._diag_pub.publish(_camera_diag())
00189             self._heartbeat_pub.publish()
00190             sleep(1.0)
00191 
00192         with self._mutex:
00193             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00194             self.assert_(self._last_msg is not None, "No data from test monitor")
00195 
00196             # Check that we're halted for heartbeat
00197             self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Test monitor should have reset automatically after heartbeat sent")
00198 
00199 
00200 
00201 if __name__ == '__main__':
00202     if len(sys.argv) > 1 and sys.argv[1] == '-v':
00203         suite = unittest.TestSuite()
00204         suite.addTest(TestMonitorHeartbeat('test_monitor'))
00205 
00206         unittest.TextTestRunner(verbosity=2).run(suite)
00207     else:
00208         rostest.run(PKG, sys.argv[0], TestMonitorHeartbeat, sys.argv)


pr2_hardware_test_monitor
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:54:19