Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 from __future__ import with_statement
00039 PKG = 'pr2_mechanism_diagnostics'
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 import rospy, rostest, unittest
00043 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
00044
00045 import sys
00046 import threading
00047 from time import sleep
00048
00049 WAIT_TIME = 5
00050
00051 class TestMechDiag(unittest.TestCase):
00052 def __init__(self, *args):
00053 super(TestMechDiag, self).__init__(*args)
00054
00055 self._mutex = threading.Lock()
00056 self._joints = {}
00057 self._controllers = {}
00058
00059 self._start_time = rospy.get_time()
00060
00061
00062 self._cal = rospy.get_param("mech_diag/cal", True)
00063 self._nan = rospy.get_param("mech_diag/nan", False)
00064
00065 self._running = rospy.get_param("mech_diag/running", True)
00066 self._overrun = rospy.get_param("mech_diag/overrun", False)
00067
00068 sub_diag = rospy.Subscriber('/diagnostics', DiagnosticArray, self._diag_cb)
00069
00070 def _diag_cb(self, msg):
00071 with self._mutex:
00072 for stat in msg.status:
00073 if stat.name.startswith('Joint'):
00074 self._joints[stat.name] = stat
00075 if stat.name.startswith('Controller'):
00076
00077 if stat.name.find('No controllers') > 0:
00078 continue
00079
00080 self._controllers[stat.name] = stat
00081
00082 def test_mech_diag(self):
00083 while not rospy.is_shutdown() and (rospy.get_time() - self._start_time) < WAIT_TIME:
00084 sleep(0.5)
00085
00086 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00087
00088 with self._mutex:
00089 if len(self._joints.items()) == 0:
00090 self.assert_(False, "No joint data in diagnostics")
00091 for key, val in self._joints.iteritems():
00092 if self._nan:
00093 self.assert_(val.level == 2, "Joint %s was not ERROR, but was NaN. Level %d" % (val.name, val.level))
00094 elif self._cal:
00095 self.assert_(val.level == 0, "Joint %s was not OK. Level %d" % (val.name, val.level))
00096 else:
00097 self.assert_(val.level == 1, "Joint %s was not warning, but was uncalibrated. Level %d" % (val.name, val.level))
00098
00099 if len(self._controllers.items()) == 0:
00100 self.assert_(False, "No controller data in diagnostics")
00101
00102 for key, val in self._controllers.iteritems():
00103 if self._overrun:
00104 self.assert_(val.level == 1, "Controller %s was not WARN, but was overrun. Level %d" % (val.name, val.level))
00105 else:
00106 self.assert_(val.level == 0, "Controller %s was not OK. Level %d" % (val.name, val.level))
00107
00108 if __name__ == '__main__':
00109 rospy.init_node('test_mech_diag_nominal')
00110 if True:
00111 rostest.run(PKG, sys.argv[0], TestMechDiag, sys.argv)
00112 else:
00113 suite = unittest.TestSuite()
00114 suite.addTest(TestMechDiag('test_mech_diag'))
00115
00116 unittest.TextTestRunner(verbosity = 2).run(suite)