trans_error_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2010, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 ##\author Kevin Watts
00036 
00037 ##\brief Tests receipt of test monitor messages from life tests
00038 
00039 from __future__ import with_statement
00040 
00041 DURATION = 10
00042 
00043 PKG = 'pr2_hardware_test_monitor'
00044 import roslib; roslib.load_manifest(PKG)
00045 
00046 import unittest
00047 import rospy, rostest
00048 from time import sleep
00049 import sys
00050 from optparse import OptionParser
00051 import math
00052 
00053 from std_srvs.srv import Empty, EmptyResponse
00054 from pr2_mechanism_msgs.msg import MechanismStatistics, JointStatistics, ActuatorStatistics
00055 from pr2_self_test_msgs.msg import TestStatus
00056 import threading
00057 
00058 class TransListenerErrorTest(unittest.TestCase):
00059     def __init__(self, *args):
00060         super(TransListenerErrorTest, self).__init__(*args)
00061 
00062         self._mutex = threading.Lock()
00063         rospy.init_node('test_trans_listener_error')
00064         self._ignore_time = 5 # Ignore values for about 5 seconds
00065         self._start_time = rospy.get_time()
00066         self._ok = True
00067         self._message = None
00068         self._level = 0
00069 
00070         self._roll_error = rospy.get_param("~roll_error", False)
00071         self._flex_error = rospy.get_param("~flex_error", False)
00072 
00073         self._start_time = rospy.get_time()
00074 
00075  
00076         rospy.Subscriber('test_status', TestStatus, self.cb)
00077 
00078         self.mech_pub = rospy.Publisher('mechanism_statistics', MechanismStatistics)
00079         self._reset_srv = rospy.Service('pr2_etherCAT/reset_motors', Empty, self.on_reset)
00080         self._halt_srv = rospy.Service('pr2_etherCAT/halt_motors', Empty, self.on_halt)
00081 
00082     def on_halt(self, srv):
00083         with self._mutex:
00084             self._ok = False
00085         return EmptyResponse()
00086 
00087     def on_reset(self, srv):
00088         with self._mutex:
00089             self._ok = True
00090         return EmptyResponse()
00091 
00092     def publish_mech_stats(self, flex_error = False, roll_error = False):
00093         # Joint state is a sine, period 1s, Amplitude 2,
00094         trig_arg = rospy.get_time() - self._start_time
00095 
00096         sine = math.sin(trig_arg)
00097         cosine = math.cos(trig_arg)
00098 
00099         jnt_st = JointStatistics()
00100         jnt_st.name = 'r_elbow_flex_joint'
00101         jnt_st.position = float(1 * sine) - 1.2
00102         jnt_st.is_calibrated = 1
00103 
00104         cont_st = JointStatistics()
00105         cont_st.name = 'r_forearm_roll_joint'
00106         cont_st.position = 5 * float(0.5 * sine)
00107         cont_st.velocity = 2.5 * float(0.5 * cosine)
00108         cont_st.is_calibrated = 1
00109 
00110         cont_act_st = ActuatorStatistics()
00111         cont_act_st.name = 'r_forearm_roll_motor'
00112         cont_act_st.calibration_reading = False 
00113         wrapped_position = (cont_st.position % 6.28)
00114         if wrapped_position < 3.14:
00115             cont_act_st.calibration_reading = True
00116         if roll_error:
00117             cont_act_st.calibration_reading = not cont_act_st.calibration_reading
00118 
00119         act_st = ActuatorStatistics()
00120         act_st.name = 'r_elbow_flex_motor'
00121         act_st.calibration_reading = True
00122         if jnt_st.position > -1.1606:
00123             act_st.calibration_reading = False
00124         if flex_error:
00125             act_st.calibration_reading = not act_st.calibration_reading
00126 
00127         mech_st = MechanismStatistics()
00128         mech_st.actuator_statistics = [ act_st, cont_act_st ]
00129         mech_st.joint_statistics = [ jnt_st, cont_st ]
00130         mech_st.header.stamp = rospy.get_rostime()
00131 
00132         self.mech_pub.publish(mech_st)
00133     
00134     def cb(self, msg):
00135         if rospy.get_time() - self._start_time < self._ignore_time:
00136             return
00137 
00138         with self._mutex:
00139             self._message = msg.message
00140             self._level = msg.test_ok
00141 
00142     def test_monitor(self):
00143         while not rospy.is_shutdown():
00144             sleep(0.1)
00145             self.publish_mech_stats()
00146             if rospy.get_time() - self._start_time > DURATION:
00147                 break
00148         rospy.loginfo('Sent some normal data')
00149         with self._mutex:
00150             self.assert_(self._message, "No data from test monitor")
00151             self.assert_(self._ok, "Not OK after sending only good data. Message: %s" % self._message)
00152 
00153         for i in range(0, 10): # 10 consecutive errors
00154             self.publish_mech_stats(self._flex_error, self._roll_error)
00155             sleep(0.1)
00156 
00157         for i in range(0, 10): # Some OK values
00158             self.publish_mech_stats()
00159             sleep(0.1)
00160                 
00161         with self._mutex:
00162             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00163             self.assert_(self._message, "No data from test monitor")
00164             self.assert_(self._level == 2, "Transmission listener didn't report error. Level: %d. Message: %s" % (self._level, self._message))
00165             self.assert_(not self._ok, "Transmission listener didn't call halt motors!")
00166 
00167 
00168 if __name__ == '__main__':
00169     print 'SYS ARGS:', sys.argv
00170     rostest.run(PKG, sys.argv[0], TransListenerErrorTest, sys.argv)


pr2_hardware_test_monitor
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:54:19