fake_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Software License Agreement (BSD License)
00004 #
00005 # Copyright (c) 2009, Willow Garage, Inc.
00006 # All rights reserved.
00007 #
00008 # Redistribution and use in source and binary forms, with or without
00009 # modification, are permitted provided that the following conditions
00010 # are met:
00011 #
00012 #  * Redistributions of source code must retain the above copyright
00013 #    notice, this list of conditions and the following disclaimer.
00014 #  * Redistributions in binary form must reproduce the above
00015 #    copyright notice, this list of conditions and the following
00016 #    disclaimer in the documentation and/or other materials provided
00017 #    with the distribution.
00018 #  * Neither the name of the Willow Garage nor the names of its
00019 #    contributors may be used to endorse or promote products derived
00020 #    from this software without specific prior written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 
00035 ##\author Kevin Watts
00036 ##\brief Tests Test Manager system, simulates PR2 hardware undergoing life testing
00037 
00038 import roslib
00039 roslib.load_manifest('life_test')
00040 import wx
00041 import sys
00042 
00043 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00044 from std_srvs.srv import *
00045 from pr2_mechanism_msgs.msg import MechanismStatistics, JointStatistics, ActuatorStatistics
00046 from std_msgs.msg import Bool
00047 
00048 import os
00049 import time
00050 
00051 import rospy
00052 
00053 from time import sleep
00054 from wx import xrc
00055 
00056 import math
00057 import signal
00058 
00059 class FakeTestFrame(wx.Frame):
00060     def __init__(self, parent):
00061         wx.Frame.__init__(self, parent, wx.ID_ANY, 'Fake Test')
00062         
00063         self.diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00064         self.mech_pub = rospy.Publisher('mechanism_statistics', MechanismStatistics)
00065         self.trans_pub = rospy.Publisher('pr2_transmission_check/transmission_status', Bool)
00066         self.motors_pub = rospy.Publisher('pr2_etherCAT/motors_halted', Bool)
00067 
00068         self._start_time = rospy.get_time()
00069 
00070         # Load XRC
00071         xrc_path = os.path.join(roslib.packages.get_pkg_dir('life_test'), 'xrc/gui.xrc')
00072 
00073         self._panel = xrc.XmlResource(xrc_path).LoadPanel(self, 'fake_panel')
00074         self._pub_check_box = xrc.XRCCTRL(self._panel, 'publish_check_box')
00075         self._level_choice = xrc.XRCCTRL(self._panel, 'level_choice')
00076 
00077         self.enum_param_ctrl = xrc.XRCCTRL(self._panel, 'enum_param_ctrl')
00078         self.range_param_ctrl = xrc.XRCCTRL(self._panel, 'range_param_ctrl')
00079 
00080         self._cal_box = xrc.XRCCTRL(self._panel, 'calibration_box')
00081         
00082         self._reset_srv = rospy.Service('pr2_etherCAT/reset_motors', Empty, self.on_reset)
00083         self._halt_srv = rospy.Service('pr2_etherCAT/halt_motors', Empty, self.on_halt)
00084 
00085         self.set_enum_ctrl()
00086         self.set_range_ctrl()
00087 
00088         self._mech_timer = wx.Timer(self, 2)
00089         self.Bind(wx.EVT_TIMER, self.on_mech_timer, self._mech_timer)
00090         self._last_mech_pub = rospy.get_time()
00091         self._mech_timer.Start(50, True)
00092 
00093         self._diag_timer = wx.Timer(self, 1)
00094         self.Bind(wx.EVT_TIMER, self.on_timer, self._diag_timer)
00095         self._last_publish = rospy.get_time()
00096         self._diag_timer.Start(500, True)
00097 
00098 
00099     def on_mech_timer(self, event = None):
00100         if not rospy.is_shutdown():
00101             self._mech_timer.Start(10, True)
00102         
00103         # Joint state is a sine, period 1s, Amplitude 2,
00104         trig_arg = rospy.get_time() - self._start_time
00105 
00106         sine = math.sin(trig_arg)
00107         cosine = math.cos(trig_arg)
00108 
00109         # Stop joint at zero if not running
00110         level =  self._level_choice.GetSelection()
00111         if level != 0:
00112             sine = 0
00113             cosine = 0
00114 
00115         jnt_st = JointStatistics()
00116         jnt_st.name = 'fake_joint'
00117         jnt_st.position = float(2 * sine)
00118         jnt_st.velocity = float(2 * cosine)
00119         jnt_st.is_calibrated = True
00120 
00121         cont_st = JointStatistics()
00122         cont_st.name = 'cont_joint'
00123         cont_st.position = 5 * float(0.5 * sine)
00124         cont_st.velocity = 2.5 * float(0.5 * cosine)
00125         cont_st.is_calibrated = True
00126 
00127         cont_act_st = ActuatorStatistics()
00128         cont_act_st.name = 'cont_motor'
00129         cont_act_st.calibration_reading = False
00130         wrapped_position = (cont_st.position % 6.28)
00131         if wrapped_position > 3.14 and self._cal_box.IsChecked():
00132             cont_act_st.calibration_reading = True
00133         
00134         act_st = ActuatorStatistics()
00135         act_st.name = 'fake_motor'
00136         act_st.calibration_reading = True
00137         if sine > 0.0 and self._cal_box.IsChecked():
00138             act_st.calibration_reading = False
00139 
00140         mech_st = MechanismStatistics()
00141         mech_st.actuator_statistics = [ act_st, cont_act_st ]
00142         mech_st.joint_statistics = [ jnt_st, cont_st ]
00143 
00144         self.trans_pub.publish(self._cal_box.IsChecked())
00145         self.mech_pub.publish(mech_st)
00146   
00147     def on_halt(self, srv):
00148         wx.CallAfter(self.set_level, 2)
00149         return EmptyResponse()
00150 
00151     def on_reset(self, srv):
00152         wx.CallAfter(self.set_level, 0)
00153         return EmptyResponse()
00154 
00155     def set_level(self, val):
00156         self._level_choice.SetSelection(val)
00157 
00158     def set_enum_ctrl(self):
00159         enum_param = rospy.get_param('test_choice', 'A')
00160         self.enum_param_ctrl.SetValue(enum_param)
00161 
00162     def set_range_ctrl(self):
00163         range_param = rospy.get_param('cycle_rate', 1.0)
00164         self.range_param_ctrl.SetValue(range_param)
00165 
00166     def on_timer(self, event = None):
00167         # Has to sleep to pick up sigint or something.
00168         sleep(0.1)
00169         if not rospy.is_shutdown():
00170             self._diag_timer.Start(1000, True)
00171 
00172         level_dict = { "OK": 0, "Warn": 1, "Error": 2 }
00173 
00174         level =  self._level_choice.GetSelection()
00175 
00176         choice = self._level_choice.GetStringSelection()
00177      
00178         if self._pub_check_box.IsChecked():
00179             self.publish_diag(level, str(choice))
00180 
00181     def publish_diag(self, level, choice):
00182         msg = DiagnosticArray()
00183         stat = DiagnosticStatus()
00184         msg.status.append(stat)
00185 
00186         stat.level = level
00187         stat.name = 'EtherCAT Master'
00188         stat.message = choice 
00189         stat.values.append(KeyValue(key='Dropped Packets', value='0'))
00190         stat.values.append(KeyValue(key='RX Late Packet', value='0'))
00191 
00192         mcb_stat = DiagnosticStatus()
00193         mcb_stat.level = 0
00194         mcb_stat.name = 'EtherCAT Device (cont_motor)'
00195         mcb_stat.message = 'OK'
00196         mcb_stat.values.append(KeyValue(key='Num encoder_errors', value='0'))
00197         msg.status.append(mcb_stat)
00198      
00199         self.diag_pub.publish(msg)
00200 
00201         halted = Bool()
00202         halted.data = level != 0
00203 
00204         self.motors_pub.publish(halted)
00205             
00206 
00207 class FakeTestApp(wx.App):
00208     def __init__(self):
00209         wx.App.__init__(self, clearSigInt = False)
00210 
00211     def OnInit(self):
00212         rospy.init_node("fake_test_node")
00213         self._frame = FakeTestFrame(None)
00214         self._frame.SetSize(wx.Size(224, 230))
00215         self._frame.Layout()
00216         self._frame.Centre()
00217         self._frame.Show(True)
00218 
00219         signal.signal(signal.SIGINT, signal.SIG_DFL)
00220 
00221         return True
00222 
00223 if __name__ == '__main__':
00224     try:
00225         app = FakeTestApp()
00226         app.MainLoop()
00227     except:
00228         print 'Caught exception in FakeTest!'
00229         import traceback
00230         traceback.print_exc()
00231         rospy.logerr(traceback.format_exc())


life_test
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:56:37