trans_error_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Software License Agreement (BSD License)
00004 #
00005 # Copyright (c) 2009, Willow Garage, Inc.
00006 # All rights reserved.
00007 #
00008 # Redistribution and use in source and binary forms, with or without
00009 # modification, are permitted provided that the following conditions
00010 # are met:
00011 #
00012 #  * Redistributions of source code must retain the above copyright
00013 #    notice, this list of conditions and the following disclaimer.
00014 #  * Redistributions in binary form must reproduce the above
00015 #    copyright notice, this list of conditions and the following
00016 #    disclaimer in the documentation and/or other materials provided
00017 #    with the distribution.
00018 #  * Neither the name of the Willow Garage nor the names of its
00019 #    contributors may be used to endorse or promote products derived
00020 #    from this software without specific prior written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 #
00035 
00036 ##\author Kevin Watts
00037 
00038 ##\brief Tests receipt of test monitor messages from life tests
00039 
00040 from __future__ import with_statement
00041 
00042 DURATION = 10
00043 
00044 PKG = 'pr2_transmission_check'
00045 import roslib; roslib.load_manifest(PKG)
00046 
00047 import unittest
00048 import rospy, rostest
00049 from time import sleep
00050 import sys
00051 from optparse import OptionParser
00052 import math
00053 
00054 from std_msgs.msg import Bool
00055 from std_srvs.srv import Empty, EmptyResponse
00056 from pr2_mechanism_msgs.msg import MechanismStatistics, JointStatistics, ActuatorStatistics
00057 from diagnostic_msgs.msg import DiagnosticArray
00058 
00059 import threading
00060 
00061 FLEX_JOINT = 'r_elbow_flex_joint'
00062 ROLL_JOINT = 'r_forearm_roll_joint'
00063 FLEX_MOTOR = 'r_elbow_flex_motor'
00064 ROLL_MOTOR = 'r_forearm_roll_motor'
00065 
00066 WRIST_FLEX_JOINT = 'r_wrist_flex_joint'
00067 WRIST_ROLL_JOINT = 'r_wrist_roll_joint'
00068 WRIST_FLEX_MOTOR = 'r_wrist_l_motor'
00069 WRIST_ROLL_MOTOR = 'r_wrist_r_motor'
00070 
00071 def check_status(msg, joint_name):
00072     for stat in msg.status:
00073         rospy.loginfo('Joint: %s. Status: %s' % (joint_name, stat.name))
00074         if stat.name.find(joint_name) == -1:
00075             continue
00076         if stat.name.startswith('Transmission'):
00077             return stat.level
00078 
00079     return -1
00080 
00081 def mech_stats(flex_error = False, roll_error = False, start_time = 0.0):
00082     # Joint state is a sine, period 1s, Amplitude 2,
00083     trig_arg = rospy.get_time() - start_time
00084     
00085     sine = math.sin(trig_arg)
00086     cosine = math.cos(trig_arg)
00087     
00088     jnt_st = JointStatistics()
00089     jnt_st.name = FLEX_JOINT
00090     jnt_st.position = float(1 * sine) - 1.2
00091     jnt_st.is_calibrated = 1
00092     
00093     cont_st = JointStatistics()
00094     cont_st.name = ROLL_JOINT
00095     cont_st.position = 5 * float(0.5 * sine)
00096     cont_st.velocity = 2.5 * float(0.5 * cosine)
00097     cont_st.is_calibrated = 1
00098     
00099     cont_act_st = ActuatorStatistics()
00100     cont_act_st.name = ROLL_MOTOR
00101     cont_act_st.calibration_reading = False 
00102     wrapped_position = (cont_st.position % 6.28)
00103     if wrapped_position < 3.14:
00104         cont_act_st.calibration_reading = True
00105     if roll_error:
00106         cont_act_st.calibration_reading = not cont_act_st.calibration_reading
00107 
00108     act_st = ActuatorStatistics()
00109     act_st.name = FLEX_MOTOR
00110     act_st.calibration_reading = True
00111     if jnt_st.position > -1.1606:
00112         act_st.calibration_reading = False
00113     if flex_error:
00114         act_st.calibration_reading = not act_st.calibration_reading
00115         
00116     mech_st = MechanismStatistics()
00117     mech_st.actuator_statistics = [ act_st, cont_act_st ]
00118     mech_st.joint_statistics = [ jnt_st, cont_st ]
00119     mech_st.header.stamp = rospy.get_rostime()
00120     
00121     return mech_st
00122 
00123 def wrist_mech_stats(flex_error = False, roll_error = False, start_time = 0.0):
00124     # Joint state is a sine wave, period 1s
00125     trig_arg = rospy.get_time() - start_time
00126     
00127     sine = math.sin(trig_arg)
00128     cosine = math.cos(trig_arg)
00129     
00130     jnt_st = JointStatistics()
00131     jnt_st.name = WRIST_FLEX_JOINT
00132     jnt_st.position = float(1 * sine) - 1.2
00133     jnt_st.is_calibrated = 1
00134     
00135     cont_st = JointStatistics()
00136     cont_st.name = WRIST_ROLL_JOINT
00137     cont_st.position = 5 * float(0.5 * sine)
00138     cont_st.velocity = 2.5 * float(0.5 * cosine)
00139     cont_st.is_calibrated = 1
00140     
00141     cont_act_st = ActuatorStatistics()
00142     cont_act_st.name = WRIST_ROLL_MOTOR
00143     cont_act_st.calibration_reading = False 
00144     wrapped_position = (cont_st.position % 6.28)
00145     if wrapped_position > 3.14159 + 1.5707 or wrapped_position < 1.5707:
00146         cont_act_st.calibration_reading = True
00147     if roll_error:
00148         cont_act_st.calibration_reading = not cont_act_st.calibration_reading
00149 
00150     act_st = ActuatorStatistics()
00151     act_st.name = WRIST_FLEX_MOTOR
00152     act_st.calibration_reading = True
00153     if jnt_st.position > -0.54105:
00154         act_st.calibration_reading = False
00155     if flex_error:
00156         act_st.calibration_reading = not act_st.calibration_reading
00157         
00158     mech_st = MechanismStatistics()
00159     mech_st.actuator_statistics = [ act_st, cont_act_st ]
00160     mech_st.joint_statistics = [ jnt_st, cont_st ]
00161     mech_st.header.stamp = rospy.get_rostime()
00162     
00163     return mech_st
00164 
00165 
00166 class TransListenerErrorTest(unittest.TestCase):
00167     def __init__(self, *args):
00168         super(TransListenerErrorTest, self).__init__(*args)
00169 
00170         self._mutex = threading.Lock()
00171         rospy.init_node('test_trans_listener_error')
00172         self._ignore_time = 5 # Ignore values for about 5 seconds
00173         self._start_time = rospy.get_time()
00174         self._ok = True
00175         self._message = None
00176         self._level = 0
00177 
00178         self._roll_error = rospy.get_param("~roll_error", False)
00179         self._flex_error = rospy.get_param("~flex_error", False)
00180         self._roll_warn = rospy.get_param("~roll_warn", False)
00181         self._flex_warn = rospy.get_param("~flex_warn", False)
00182 
00183         self._wrist = rospy.get_param("~wrist", False)
00184         self._flex_joint = WRIST_FLEX_JOINT if self._wrist else FLEX_JOINT
00185         self._roll_joint = WRIST_ROLL_JOINT if self._wrist else ROLL_JOINT
00186         self._flex_motor = WRIST_FLEX_MOTOR if self._wrist else FLEX_MOTOR
00187         self._roll_motor = WRIST_ROLL_MOTOR if self._wrist else ROLL_MOTOR
00188 
00189         self._start_time = rospy.get_time()
00190 
00191         self._diag_msg = None
00192         self._trans_status = False
00193         self._trans_msg = None
00194         
00195         self._diag_sub  = rospy.Subscriber('/diagnostics', DiagnosticArray, self._diag_cb)
00196         self.mech_pub   = rospy.Publisher('mechanism_statistics', MechanismStatistics)
00197         self._reset_srv = rospy.Service('pr2_etherCAT/reset_motors', Empty, self.on_reset)
00198         self._halt_srv  = rospy.Service('pr2_etherCAT/halt_motors', Empty, self.on_halt)
00199         self._trans_sub = rospy.Subscriber('pr2_transmission_check/transmission_status', Bool, self._trans_cb)
00200 
00201     def _trans_cb(self, msg):
00202         with self._mutex:
00203             self._trans_msg = msg
00204             self._trans_status = msg.data
00205 
00206     def _diag_cb(self, msg):
00207         with self._mutex:
00208             self._diag_msg = msg
00209 
00210     def on_halt(self, srv):
00211         rospy.loginfo('Halt motors called')
00212         with self._mutex:
00213             self._ok = False
00214         return EmptyResponse()
00215 
00216     def on_reset(self, srv):
00217         rospy.loginfo('Reset motors called')
00218         with self._mutex:
00219             self._ok = True
00220         return EmptyResponse()
00221 
00222     def publish_mech_stats(self, flex_error = False, roll_error = False):
00223         if self._wrist:
00224             mech_st = wrist_mech_stats(flex_error, roll_error, self._start_time)
00225         else:
00226             mech_st = mech_stats(flex_error, roll_error, self._start_time)
00227 
00228         self.mech_pub.publish(mech_st)
00229     
00230     def test_monitor(self):
00231         while not rospy.is_shutdown():
00232             sleep(0.1)
00233             self.publish_mech_stats()
00234             if rospy.get_time() - self._start_time > DURATION:
00235                 break
00236         rospy.loginfo('Sent some normal data')
00237         with self._mutex:
00238             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00239 
00240             self.assert_(self._diag_msg, "No diagnostics message received")
00241             self.assert_(self._trans_msg, "Transmission status topic didn't publish")
00242             self.assert_(self._trans_status, "Transmission status topic reported error!")
00243             
00244             lvl = check_status(self._diag_msg, self._flex_joint)
00245             self.assert_(lvl  == 0, "Flex transmission wasn't OK. Level: %d" % lvl)
00246 
00247             lvl = check_status(self._diag_msg, self._roll_joint)
00248             self.assert_(lvl  == 0, "Roll transmission wasn't OK. Level: %d" % lvl)
00249 
00250         for i in range(0, 10): # 10 consecutive errors
00251             self.publish_mech_stats(self._flex_error, self._roll_error)
00252             sleep(0.1)
00253 
00254         # Publish one value if we have warnings
00255         self.publish_mech_stats(self._flex_warn, self._roll_warn)
00256         sleep(0.1)
00257 
00258         for i in range(0, 10): # Some OK values
00259             self.publish_mech_stats()
00260             sleep(0.1)
00261                 
00262         with self._mutex:
00263             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00264             
00265             if self._roll_error or self._flex_error:
00266                 self.assert_(not self._ok, "Transmission listener didn't call halt motors!")
00267                 self.assert_(not self._trans_status, "Transmission status topic didn't report error")
00268             else:
00269                 self.assert_(self._ok, "Transmission listener called halt motors!")
00270                 self.assert_(self._trans_status, "Transmission status topic reported error")
00271 
00272 
00273             flex_val = 2 if self._flex_error else 0
00274             lvl = check_status(self._diag_msg, self._flex_joint)
00275             self.assert_(lvl  == flex_val, "Flex transmission level incorrect. Level: %d. Expected: %d" % (lvl, flex_val))
00276 
00277             roll_val = 2 if self._roll_error else 0
00278             lvl = check_status(self._diag_msg, self._roll_joint)
00279             self.assert_(lvl  == roll_val, "Roll transmission level incorrect. Level: %d. Expected: %d" % (lvl, roll_val))
00280 
00281             
00282 
00283 
00284 if __name__ == '__main__':
00285     print 'SYS ARGS:', sys.argv
00286     rostest.run(PKG, sys.argv[0], TransListenerErrorTest, sys.argv)


pr2_transmission_check
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:53:50