Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 from __future__ import with_statement
00040
00041 DURATION = 10
00042
00043 PKG = 'pr2_hardware_test_monitor'
00044 import roslib; roslib.load_manifest(PKG)
00045
00046 import unittest
00047 import rospy, rostest
00048 from time import sleep
00049 import sys
00050 from optparse import OptionParser
00051 import math
00052
00053 from std_srvs.srv import Empty, EmptyResponse
00054 from pr2_mechanism_msgs.msg import MechanismStatistics, JointStatistics, ActuatorStatistics
00055 from pr2_self_test_msgs.msg import TestStatus
00056 import threading
00057
00058 class TransListenerErrorTest(unittest.TestCase):
00059 def __init__(self, *args):
00060 super(TransListenerErrorTest, self).__init__(*args)
00061
00062 self._mutex = threading.Lock()
00063 rospy.init_node('test_trans_listener_error')
00064 self._ignore_time = 5
00065 self._start_time = rospy.get_time()
00066 self._ok = True
00067 self._message = None
00068 self._level = 0
00069
00070 self._roll_error = rospy.get_param("~roll_error", False)
00071 self._flex_error = rospy.get_param("~flex_error", False)
00072
00073 self._start_time = rospy.get_time()
00074
00075
00076 rospy.Subscriber('test_status', TestStatus, self.cb)
00077
00078 self.mech_pub = rospy.Publisher('mechanism_statistics', MechanismStatistics)
00079 self._reset_srv = rospy.Service('pr2_etherCAT/reset_motors', Empty, self.on_reset)
00080 self._halt_srv = rospy.Service('pr2_etherCAT/halt_motors', Empty, self.on_halt)
00081
00082 def on_halt(self, srv):
00083 with self._mutex:
00084 self._ok = False
00085 return EmptyResponse()
00086
00087 def on_reset(self, srv):
00088 with self._mutex:
00089 self._ok = True
00090 return EmptyResponse()
00091
00092 def publish_mech_stats(self, flex_error = False, roll_error = False):
00093
00094 trig_arg = rospy.get_time() - self._start_time
00095
00096 sine = math.sin(trig_arg)
00097 cosine = math.cos(trig_arg)
00098
00099 jnt_st = JointStatistics()
00100 jnt_st.name = 'r_elbow_flex_joint'
00101 jnt_st.position = float(1 * sine) - 1.2
00102 jnt_st.is_calibrated = 1
00103
00104 cont_st = JointStatistics()
00105 cont_st.name = 'r_forearm_roll_joint'
00106 cont_st.position = 5 * float(0.5 * sine)
00107 cont_st.velocity = 2.5 * float(0.5 * cosine)
00108 cont_st.is_calibrated = 1
00109
00110 cont_act_st = ActuatorStatistics()
00111 cont_act_st.name = 'r_forearm_roll_motor'
00112 cont_act_st.calibration_reading = False
00113 wrapped_position = (cont_st.position % 6.28)
00114 if wrapped_position < 3.14:
00115 cont_act_st.calibration_reading = True
00116 if roll_error:
00117 cont_act_st.calibration_reading = not cont_act_st.calibration_reading
00118
00119 act_st = ActuatorStatistics()
00120 act_st.name = 'r_elbow_flex_motor'
00121 act_st.calibration_reading = True
00122 if jnt_st.position > -1.1606:
00123 act_st.calibration_reading = False
00124 if flex_error:
00125 act_st.calibration_reading = not act_st.calibration_reading
00126
00127 mech_st = MechanismStatistics()
00128 mech_st.actuator_statistics = [ act_st, cont_act_st ]
00129 mech_st.joint_statistics = [ jnt_st, cont_st ]
00130 mech_st.header.stamp = rospy.get_rostime()
00131
00132 self.mech_pub.publish(mech_st)
00133
00134 def cb(self, msg):
00135 if rospy.get_time() - self._start_time < self._ignore_time:
00136 return
00137
00138 with self._mutex:
00139 self._message = msg.message
00140 self._level = msg.test_ok
00141
00142 def test_monitor(self):
00143 while not rospy.is_shutdown():
00144 sleep(0.1)
00145 self.publish_mech_stats()
00146 if rospy.get_time() - self._start_time > DURATION:
00147 break
00148 rospy.loginfo('Sent some normal data')
00149 with self._mutex:
00150 self.assert_(self._message, "No data from test monitor")
00151 self.assert_(self._ok, "Not OK after sending only good data. Message: %s" % self._message)
00152
00153 for i in range(0, 10):
00154 self.publish_mech_stats(self._flex_error, self._roll_error)
00155 sleep(0.1)
00156
00157 for i in range(0, 10):
00158 self.publish_mech_stats()
00159 sleep(0.1)
00160
00161 with self._mutex:
00162 self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00163 self.assert_(self._message, "No data from test monitor")
00164 self.assert_(self._level == 2, "Transmission listener didn't report error. Level: %d. Message: %s" % (self._level, self._message))
00165 self.assert_(not self._ok, "Transmission listener didn't call halt motors!")
00166
00167
00168 if __name__ == '__main__':
00169 print 'SYS ARGS:', sys.argv
00170 rostest.run(PKG, sys.argv[0], TransListenerErrorTest, sys.argv)