$search
00001 #!/usr/bin/env python 00002 # 00003 # Software License Agreement (BSD License) 00004 # 00005 # Copyright (c) 2010, Willow Garage, Inc. 00006 # All rights reserved. 00007 # 00008 # Redistribution and use in source and binary forms, with or without 00009 # modification, are permitted provided that the following conditions 00010 # are met: 00011 # 00012 # * Redistributions of source code must retain the above copyright 00013 # notice, this list of conditions and the following disclaimer. 00014 # * Redistributions in binary form must reproduce the above 00015 # copyright notice, this list of conditions and the following 00016 # disclaimer in the documentation and/or other materials provided 00017 # with the distribution. 00018 # * Neither the name of the Willow Garage nor the names of its 00019 # contributors may be used to endorse or promote products derived 00020 # from this software without specific prior written permission. 00021 # 00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00033 # POSSIBILITY OF SUCH DAMAGE. 00034 # 00035 00036 ##\author Kevin Watts 00037 00038 from __future__ import with_statement 00039 PKG = 'pr2_mechanism_diagnostics' 00040 import roslib; roslib.load_manifest(PKG) 00041 00042 import rospy, rostest, unittest 00043 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus 00044 00045 import sys 00046 import threading 00047 from time import sleep 00048 00049 WAIT_TIME = 5 00050 00051 class TestMechDiag(unittest.TestCase): 00052 def __init__(self, *args): 00053 super(TestMechDiag, self).__init__(*args) 00054 00055 self._mutex = threading.Lock() 00056 self._joints = {} 00057 self._controllers = {} 00058 00059 self._start_time = rospy.get_time() 00060 00061 # Calibrated, Nan for joints 00062 self._cal = rospy.get_param("mech_diag/cal", True) 00063 self._nan = rospy.get_param("mech_diag/nan", False) 00064 00065 self._running = rospy.get_param("mech_diag/running", True) 00066 self._overrun = rospy.get_param("mech_diag/overrun", False) 00067 00068 sub_diag = rospy.Subscriber('/diagnostics', DiagnosticArray, self._diag_cb) 00069 00070 def _diag_cb(self, msg): 00071 with self._mutex: 00072 for stat in msg.status: 00073 if stat.name.startswith('Joint'): 00074 self._joints[stat.name] = stat 00075 if stat.name.startswith('Controller'): 00076 # Ignore special "No controllers" status 00077 if stat.name.find('No controllers') > 0: 00078 continue 00079 00080 self._controllers[stat.name] = stat 00081 00082 def test_mech_diag(self): 00083 while not rospy.is_shutdown() and (rospy.get_time() - self._start_time) < WAIT_TIME: 00084 sleep(0.5) 00085 00086 self.assert_(not rospy.is_shutdown(), "Rospy shutdown") 00087 00088 with self._mutex: 00089 if len(self._joints.items()) == 0: 00090 self.assert_(False, "No joint data in diagnostics") 00091 for key, val in self._joints.iteritems(): 00092 if self._nan: 00093 self.assert_(val.level == 2, "Joint %s was not ERROR, but was NaN. Level %d" % (val.name, val.level)) 00094 elif self._cal: 00095 self.assert_(val.level == 0, "Joint %s was not OK. Level %d" % (val.name, val.level)) 00096 else: 00097 self.assert_(val.level == 1, "Joint %s was not warning, but was uncalibrated. Level %d" % (val.name, val.level)) 00098 00099 if len(self._controllers.items()) == 0: 00100 self.assert_(False, "No controller data in diagnostics") 00101 00102 for key, val in self._controllers.iteritems(): 00103 if self._overrun: 00104 self.assert_(val.level == 1, "Controller %s was not WARN, but was overrun. Level %d" % (val.name, val.level)) 00105 else: 00106 self.assert_(val.level == 0, "Controller %s was not OK. Level %d" % (val.name, val.level)) 00107 00108 if __name__ == '__main__': 00109 rospy.init_node('test_mech_diag_nominal') 00110 if True: 00111 rostest.run(PKG, sys.argv[0], TestMechDiag, sys.argv) 00112 else: 00113 suite = unittest.TestSuite() 00114 suite.addTest(TestMechDiag('test_mech_diag')) 00115 00116 unittest.TextTestRunner(verbosity = 2).run(suite)