encoder_error_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2010, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 ##\author Kevin Watts
00036 ##\brief Tests that test monitor detects dropped packets
00037 
00038 
00039 """
00040 This tests that encoder errors are reported as failures. 
00041 This simulates diagnostics data from "EtherCAT Master" 
00042 and checks the output from the test monitor.
00043 
00044 From #4814: Fail for any encoder errors
00045 """
00046 
00047 from __future__ import with_statement
00048 
00049 PKG = 'pr2_hardware_test_monitor'
00050 import roslib; roslib.load_manifest(PKG)
00051 
00052 import unittest
00053 import rospy, rostest
00054 from time import sleep
00055 import sys
00056 
00057 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00058 from pr2_self_test_msgs.msg import TestStatus
00059 from std_msgs.msg import Bool
00060 import std_msgs.msg
00061 from std_srvs.srv import *
00062 import threading
00063 
00064 GRACE_TIME = 35 # 30 + 5 seconds for HW monitor to be ready
00065 IGNORE_TIME = 5
00066 
00067 # Generate diagnostic status message
00068 def _ecat_diag(enc_errors = 0):
00069     array = DiagnosticArray()
00070     stat = DiagnosticStatus()
00071     stat.name = 'EtherCAT Master'
00072     stat.level = 0
00073     stat.message = 'OK'
00074     stat.values.append(KeyValue('Dropped Packets', '0'))
00075     stat.values.append(KeyValue('RX Late Packet', '0'))
00076 
00077     mcb_stat = DiagnosticStatus()
00078     mcb_stat.name = 'EtherCAT Device (my_motor)'
00079     mcb_stat.level = 0
00080     mcb_stat.values.append(KeyValue('Num encoder_errors', str(enc_errors)))
00081 
00082     array.header.stamp = rospy.get_rostime()
00083     array.status.append(stat)
00084     array.status.append(mcb_stat)
00085     
00086     return array
00087 
00088 class TestEncoderError(unittest.TestCase):
00089     def __init__(self, *args):
00090         super(TestEncoderError, self).__init__(*args)
00091 
00092         self._mutex = threading.Lock()
00093         rospy.init_node('test_monitor_drop_pkt')
00094         self._ok = True
00095         self._level = 0
00096 
00097         self._start = rospy.get_time()
00098         self._last_msg = None
00099         self._max_lvl = -1
00100 
00101         self._halted = False
00102 
00103         self._num_errors = rospy.get_param('~num_errors', 1)
00104 
00105         self._snapped = False
00106         self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00107 
00108         self._halt_srv = rospy.Service('pr2_etherCAT/halt_motors', Empty, self.on_halt)
00109         self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00110 
00111         self._reset_test = rospy.ServiceProxy('reset_test', Empty)
00112 
00113         rospy.Subscriber('test_status', TestStatus, self._cb)
00114 
00115         self._motors_pub = rospy.Publisher('pr2_etherCAT/motors_halted', Bool)
00116         self.cal_pub = rospy.Publisher('calibrated', Bool, latch=True)
00117 
00118     def on_halt(self, srv):
00119         return EmptyResponse()
00120 
00121     def _snap_cb(self, msg):
00122         self._snapped = True
00123 
00124     def _cb(self, msg):
00125         with self._mutex:
00126             if not self._last_msg:
00127                 self._start = rospy.get_time()
00128 
00129             self._last_msg = msg
00130 
00131             if rospy.get_time() - self._start > IGNORE_TIME:
00132                 self._max_lvl = max(msg.test_ok, self._max_lvl)
00133 
00134     def test_drop_pkt(self):
00135         self.cal_pub.publish(True)
00136         while not rospy.is_shutdown():
00137             self._diag_pub.publish(_ecat_diag())
00138             self._motors_pub.publish(False)
00139             sleep(1.0)
00140             if rospy.get_time() - self._start > GRACE_TIME:
00141                 break
00142 
00143         # Publish data with encoder errors
00144         self._diag_pub.publish(_ecat_diag(self._num_errors))
00145         self._motors_pub.publish(False)
00146         sleep(1.0)        
00147 
00148         with self._mutex:
00149             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00150 
00151             self.assert_(self._last_msg is not None, "No data from test monitor")
00152             
00153             # Check that we're in error state if we should be
00154             if self._num_errors > 0:
00155                 self.assert_(self._last_msg.test_ok == TestStatus.ERROR, "Test monitor reports that we're not in error state. Level: %d. Message: %s" % (self._last_msg.test_ok, self._last_msg.message))
00156                 # Check that snapshot trigger was called
00157                 self.assert_(self._snapped, "Snapshot trigger wasn't called, but we did halt")
00158             else:
00159                 self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Test monitor reports that we're not running. Level: %d. Message: %s" % (self._last_msg.test_ok, self._last_msg.message))
00160                 # Check that snapshot trigger wasn't called
00161                 self.assert_(not self._snapped, "Snapshot trigger wasn't called, but we did halt")
00162 
00163 if __name__ == '__main__':
00164     if len(sys.argv) > 1 and sys.argv[1] == '-v':
00165         suite = unittest.TestSuite()
00166         suite.addTest(TestEncoderError('test_dropped_pkt'))
00167 
00168         unittest.TextTestRunner(verbosity=2).run(suite)
00169     else:
00170         rostest.run(PKG, sys.argv[0], TestEncoderError, sys.argv)


pr2_hardware_test_monitor
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:54:19