monitor_latch_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2010, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 ##\author Kevin Watts
00036 ##\brief Tests that test monitor latches error state
00037 
00038 """
00039 This tests that a transient error status, such as a camera error, produces 
00040 the correct behavior. We expect that an error, after a "grace period",
00041 will cause the "halt" method of all listeners to be called, and that the error 
00042 state will "latch" until reset is called.
00043 """
00044 
00045 from __future__ import with_statement
00046 
00047 PKG = 'pr2_hardware_test_monitor'
00048 import roslib; roslib.load_manifest(PKG)
00049 
00050 import unittest
00051 import rospy, rostest
00052 from time import sleep
00053 import sys
00054 
00055 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00056 from pr2_self_test_msgs.msg import TestStatus
00057 from std_msgs.msg import Bool
00058 import std_msgs.msg
00059 from std_srvs.srv import *
00060 import threading
00061 
00062 GRACE_TIME = 35 # 30 + 5 seconds for HW monitor to be ready
00063 
00064 def _camera_diag(level = 0):
00065     array = DiagnosticArray()
00066     stat = DiagnosticStatus()
00067     stat.name = 'wge100: Driver Status'
00068     stat.level = level
00069     stat.message = 'OK'
00070 
00071     motor_stat = DiagnosticStatus()
00072     motor_stat.name = 'EtherCAT Master'
00073     motor_stat.level = 0
00074     motor_stat.values = [
00075         KeyValue(key='Dropped Packets', value='0'),
00076         KeyValue(key='RX Late Packet', value='0')]
00077 
00078     mcb_stat = DiagnosticStatus()
00079     mcb_stat.name = 'EtherCAT Device (my_motor)'
00080     mcb_stat.level = 0
00081     mcb_stat.values.append(KeyValue('Num encoder_errors', '0'))
00082 
00083     array.header.stamp = rospy.get_rostime()
00084     array.status.append(stat)
00085     array.status.append(motor_stat)
00086     array.status.append(mcb_stat)
00087         
00088     return array
00089 
00090 class TestMonitorLatch(unittest.TestCase):
00091     def __init__(self, *args):
00092         super(TestMonitorLatch, self).__init__(*args)
00093 
00094         self._mutex = threading.Lock()
00095         rospy.init_node('test_monitor_latch_test')
00096         self._ignore_time = 5
00097         self._start_time = rospy.get_time()
00098         self._ok = True
00099         self._level = 0
00100 
00101         self._start = rospy.get_time()
00102         self._last_msg = None
00103 
00104         self._halted = False
00105 
00106         # Publish that we're calibrated
00107         self._cal_pub = rospy.Publisher('calibrated', Bool, latch=True)
00108         self._cal_pub.publish(True)
00109 
00110         self._snapped = False
00111         self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00112 
00113         self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00114         self._pub = rospy.Publisher('pr2_etherCAT/motors_halted', Bool)
00115         self._hlt = rospy.Service('pr2_etherCAT/halt_motors', Empty, self._hlt_cb)
00116         self._rst = rospy.Service('pr2_etherCAT/reset_motors', Empty, self._rst_cb)
00117 
00118         self._reset_test = rospy.ServiceProxy('reset_test', Empty)
00119 
00120         rospy.Subscriber('test_status', TestStatus, self._cb)
00121 
00122     def _snap_cb(self, msg):
00123         self._snapped = True
00124 
00125     def _hlt_cb(self, srv):
00126         self._halted = True
00127         return EmptyResponse()
00128 
00129     def _rst_cb(self, srv):
00130         self._halted = False
00131         return EmptyResponse()
00132     
00133     def _cb(self, msg):
00134         with self._mutex:
00135             if not self._last_msg:
00136                 self._start = rospy.get_time()
00137 
00138             self._last_msg = msg
00139 
00140 
00141     def test_monitor(self):
00142         while not rospy.is_shutdown():
00143             self._pub.publish(False)
00144             self._diag_pub.publish(_camera_diag())
00145             sleep(1.0)
00146             if rospy.get_time() - self._start > GRACE_TIME:
00147                 break
00148 
00149         # Publish camera error for 10 seconds
00150         for i in range(0, 10):
00151             self._pub.publish(False)
00152             self._diag_pub.publish(_camera_diag(level = 2))
00153             sleep(1.0)
00154 
00155         # Publish cameras OK 5x
00156         for i in range(0, 5):
00157             self._pub.publish(False)
00158             self._diag_pub.publish(_camera_diag())
00159             sleep(1.0)
00160 
00161         with self._mutex:
00162             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00163 
00164             self.assert_(self._last_msg is not None, "No data from test monitor")
00165             
00166             # Check that message level is error state
00167             self.assert_(self._last_msg.test_ok == TestStatus.ERROR, "Didn't record error state from Test Monitor. Should have error state because motors halted. Level: %d" % self._last_msg.test_ok)
00168 
00169             # Check that message message has motors data
00170             self.assert_(self._last_msg.message != 'OK', "Got OK message from test monitor, even with error level")
00171             self.assert_(self._last_msg.message.find("Camera Error") > -1, "Didn't get camera error message")
00172 
00173             # Check that it called halt_motors correctly
00174             self.assert_(self._halted, "Halt motors wasn't called after failure")
00175 
00176             # Check that snapshot trigger was called
00177             self.assert_(self._snapped, "Snapshot trigger wasn't called after halt")
00178 
00179         # Reset the test and make sure we're OK
00180         self._reset_test()
00181 
00182         # Publish good data for 5s
00183         for i in range(0, 5):
00184             self._pub.publish(False)
00185             self._diag_pub.publish(_camera_diag())
00186             sleep(1.0)
00187 
00188         with self._mutex:
00189             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00190             self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Test didn't reset properly after error condition")
00191 
00192             self.assert_(not self._halted, "Reset motors wasn't called after reset")
00193 
00194 if __name__ == '__main__':
00195     if len(sys.argv) > 1 and sys.argv[1] == '-v':
00196         suite = unittest.TestSuite()
00197         suite.addTest(TestMonitorLatch('test_monitor'))
00198 
00199         unittest.TextTestRunner(verbosity=2).run(suite)
00200     else:
00201         rostest.run(PKG, sys.argv[0], TestMonitorLatch, sys.argv)


pr2_hardware_test_monitor
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:54:19