dropped_packet_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2010, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 ##\author Kevin Watts
00036 ##\brief Tests that test monitor detects dropped packets
00037 
00038 
00039 """
00040 This tests that dropped etherCAT packets are detected as errors or
00041 ignored depending on late packets. This simulates diagnostics data from
00042 "EtherCAT Master" and checks the output from the test monitor.
00043 
00044 From #4848: If net drops > 10 per hour, this is a failure.
00045 "net drops" = dropped packets - late packets
00046 """
00047 
00048 from __future__ import with_statement
00049 
00050 PKG = 'pr2_hardware_test_monitor'
00051 import roslib; roslib.load_manifest(PKG)
00052 
00053 import unittest
00054 import rospy, rostest
00055 from time import sleep
00056 import sys
00057 
00058 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00059 from pr2_self_test_msgs.msg import TestStatus
00060 from std_msgs.msg import Bool
00061 import std_msgs.msg
00062 from std_srvs.srv import *
00063 import threading
00064 
00065 GRACE_TIME = 35 # 30 + 5 seconds for HW monitor to be ready
00066 IGNORE_TIME = 5
00067 
00068 # Generate diagnostic status message
00069 def _ecat_diag(pkts = 0, lates = 0):
00070     # Drops >= lates
00071     if pkts < lates:
00072         pkts = lates
00073 
00074     array = DiagnosticArray()
00075     stat = DiagnosticStatus()
00076     stat.name = 'EtherCAT Master'
00077     stat.level = 0
00078     stat.message = 'OK'
00079     stat.values.append(KeyValue('Dropped Packets', str(pkts)))
00080     stat.values.append(KeyValue('RX Late Packet', str(lates)))
00081 
00082     mcb_stat = DiagnosticStatus()
00083     mcb_stat.name = 'EtherCAT Device (my_motor)'
00084     mcb_stat.level = 0
00085     mcb_stat.values.append(KeyValue('Num encoder_errors', '0'))
00086 
00087     array.header.stamp = rospy.get_rostime()
00088     array.status.append(stat)
00089     array.status.append(mcb_stat)
00090     
00091     return array
00092 
00093 class TestDroppedPacket(unittest.TestCase):
00094     def __init__(self, *args):
00095         super(TestDroppedPacket, self).__init__(*args)
00096 
00097         self._mutex = threading.Lock()
00098         rospy.init_node('test_monitor_drop_pkt')
00099         self._ok = True
00100         self._level = 0
00101 
00102         self._start = rospy.get_time()
00103         self._last_msg = None
00104         self._max_lvl = -1
00105 
00106         self._halted = False
00107 
00108         self._num_drops = rospy.get_param('~num_drops', 10)
00109         self._num_lates = rospy.get_param('~num_lates', 0)
00110         self._increasing_drops = rospy.get_param('~increasing_drops', False)
00111 
00112         self._snapped = False
00113         self._snapshot_sub = rospy.Subscriber('snapshot_trigger', std_msgs.msg.Empty, self._snap_cb)
00114 
00115         self._halt_srv = rospy.Service('pr2_etherCAT/halt_motors', Empty, self.on_halt)
00116         self._diag_pub = rospy.Publisher('/diagnostics', DiagnosticArray)
00117 
00118         self._reset_test = rospy.ServiceProxy('reset_test', Empty)
00119 
00120         rospy.Subscriber('test_status', TestStatus, self._cb)
00121 
00122         self.motors_pub = rospy.Publisher('pr2_etherCAT/motors_halted', Bool)
00123         self.cal_pub = rospy.Publisher('calibrated', Bool, latch=True)
00124 
00125     def on_halt(self, srv):
00126         return EmptyResponse()
00127 
00128 
00129     def _snap_cb(self, msg):
00130         self._snapped = True
00131 
00132     def _cb(self, msg):
00133         with self._mutex:
00134             if not self._last_msg:
00135                 self._start = rospy.get_time()
00136 
00137             self._last_msg = msg
00138 
00139             if rospy.get_time() - self._start > IGNORE_TIME:
00140                 self._max_lvl = max(msg.test_ok, self._max_lvl)
00141 
00142     def test_drop_pkt(self):
00143         self.cal_pub.publish(True)
00144         while not rospy.is_shutdown():
00145             self._diag_pub.publish(_ecat_diag())
00146             self.motors_pub.publish(False)
00147             sleep(1.0)
00148             if rospy.get_time() - self._start > GRACE_TIME:
00149                 break
00150 
00151         # Publish single dropped packet
00152         self._diag_pub.publish(_ecat_diag(1))
00153         self.motors_pub.publish(False)
00154         sleep(1.0)
00155         # Make sure we're OK
00156 
00157         # Publish same status for 10 seconds
00158         if not self._increasing_drops:
00159             for i in range(0, 10):
00160                 self._diag_pub.publish(_ecat_diag(self._num_drops, self._num_lates))
00161                 self.motors_pub.publish(False)
00162                 sleep(1.0)
00163         else: # Test slowly increasing dropped packets
00164             drop_count = 0
00165             late_count = 0
00166             for i in range(0, max(self._num_drops, self._num_lates)):
00167                 if drop_count < self._num_drops:
00168                     drop_count += 1
00169                 if late_count < self._num_lates:
00170                     late_count += 1
00171                 self._diag_pub.publish(_ecat_diag(drop_count, late_count))
00172                 self.motors_pub.publish(False)
00173                 sleep(1.0)
00174 
00175         with self._mutex:
00176             self.assert_(not rospy.is_shutdown(), "Rospy shutdown")
00177 
00178             self.assert_(self._last_msg is not None, "No data from test monitor")
00179             
00180             # Check that we're in error state if we should be
00181             if self._num_drops - self._num_lates >= 10:
00182                 self.assert_(self._last_msg.test_ok == TestStatus.ERROR, "Test monitor reports that we're not in error state. Level: %d. Message: %s" % (self._last_msg.test_ok, self._last_msg.message))
00183                 # Check that snapshot trigger was called
00184                 self.assert_(self._snapped, "Snapshot trigger wasn't called, but we did halt")
00185             else:
00186                 self.assert_(self._last_msg.test_ok == TestStatus.RUNNING, "Test monitor reports that we're not running. Level: %d. Message: %s" % (self._last_msg.test_ok, self._last_msg.message))
00187                 # Check that snapshot trigger wasn't called
00188                 self.assert_(not self._snapped, "Snapshot trigger wasn't called, but we did halt")
00189 
00190 if __name__ == '__main__':
00191     if len(sys.argv) > 1 and sys.argv[1] == '-v':
00192         suite = unittest.TestSuite()
00193         suite.addTest(TestDroppedPacket('test_dropped_pkt'))
00194 
00195         unittest.TextTestRunner(verbosity=2).run(suite)
00196     else:
00197         rostest.run(PKG, sys.argv[0], TestDroppedPacket, sys.argv)


pr2_hardware_test_monitor
Author(s): Kevin Watts
autogenerated on Sat Dec 28 2013 17:54:19