usb_serial.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #***********************************************************
00003 #* Software License Agreement (BSD License)
00004 #*
00005 #*  Copyright (c) 2009, Willow Garage, Inc.
00006 #*  All rights reserved.
00007 #*
00008 #*  Redistribution and use in source and binary forms, with or without
00009 #*  modification, are permitted provided that the following conditions
00010 #*  are met:
00011 #*
00012 #*   * Redistributions of source code must retain the above copyright
00013 #*     notice, this list of conditions and the following disclaimer.
00014 #*   * Redistributions in binary form must reproduce the above
00015 #*     copyright notice, this list of conditions and the following
00016 #*     disclaimer in the documentation and/or other materials provided
00017 #*     with the distribution.
00018 #*   * Neither the name of the Willow Garage nor the names of its
00019 #*     contributors may be used to endorse or promote products derived
00020 #*     from this software without specific prior written permission.
00021 #*
00022 #*  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 #*  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 #*  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 #*  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 #*  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 #*  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 #*  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 #*  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 #*  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 #*  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 #*  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 #*  POSSIBILITY OF SUCH DAMAGE.
00034 #***********************************************************
00035 
00036 PKG = 'qualification' 
00037 
00038 import roslib; roslib.load_manifest(PKG)
00039 import rospy
00040 import serial
00041 import random              
00042 import threading
00043 import traceback
00044 
00045 from pr2_self_test_msgs.srv import TestResult, TestResultRequest
00046 
00047 
00048 def index_to_msg(lst, i):
00049     try:
00050         if lst[i]:
00051             return "OK"
00052         return "FAIL"
00053     except:
00054         return "N/A"
00055 
00056 class TestSerialPort:
00057     def __init__(self, port, rate, timeout):
00058         self.port = port
00059         self.serial = serial.Serial(port=port, baudrate=rate)
00060         self.timeout = timeout
00061         self.ok = [False] * 2
00062         self.thread = [None] * 2
00063         self.serial.timeout = 0
00064         while self.serial.read(1024) != '': # Clear the input buffer
00065             pass
00066         self.serial.timeout = timeout
00067 
00068         self.write_ok = False
00069         self.join_ok = False
00070 
00071     READ = 0
00072     WRITE = 1
00073     DIR = { 0: 'read', 1: 'write' }
00074 
00075     def start(self, direction, seed, length):
00076         rospy.logout( "start %s %s"%(self.port, TestSerialPort.DIR[direction]))
00077         self.ok[direction] = True
00078         self.thread[direction] = threading.Thread(target=self._run_test,args=[direction, seed, length])
00079         self.thread[direction].start()
00080 
00081     def join_is_error(self, direction):
00082         rospy.logout( "join %s %s"%(self.port, TestSerialPort.DIR[direction]))
00083         self.thread[direction].join(self.timeout + 5)
00084         if self.thread[direction].isAlive():
00085             rospy.logout( "Failed to join on %s test for %s"%(TestSerialPort.DIR[direction], self.port))
00086             self.ok[direction] = False
00087         return self.ok[direction]
00088 
00089     def _run_test(self, direction, seed, length):
00090         rospy.logout( "run %s %s"%(self.port, TestSerialPort.DIR[direction]))
00091         anyletter = [chr(i) for i in range(0,256)] 
00092         data = ''.join([random.Random(seed).choice(anyletter) for i in range(0,length)]) 
00093         if direction == TestSerialPort.READ:
00094             indata = self.serial.read(length)  
00095             if len(indata) != length:
00096                 rospy.logout( "Read %i bytes instead of %i in %s"%(len(indata), length, self.port))
00097                 return
00098             if data != indata:
00099                 rospy.logout( "Input data did not match expectations for %s"%(self.port))
00100                 return
00101         elif direction == TestSerialPort.WRITE:
00102             outlen = self.serial.write(data)
00103             # For now we can't check that the write occurred because write
00104             # always returns None. The documentation says it should return
00105             # the number of bytes written. I have filed a bug report with
00106             # upstream. Doesn't matter much for us, as if the bytes don't
00107             # go out, they won't come in.
00108             #if outlen != length:
00109             #    rospy.logout( "Wrote %i bytes instead of %i in %s"%(outlen, length, self.port))
00110             #    return
00111         else:
00112             raise ValueError
00113         # We only get here on success
00114         self.ok[direction] = True
00115 
00116 
00117 class QualUSBSerial:
00118     def __init__(self):
00119         rospy.init_node('usb_serial_test')
00120         self.data_sent = False
00121         
00122         self.result_service = rospy.ServiceProxy('test_result', TestResult)
00123 
00124         self.reads = []
00125         self.writes = []
00126 
00127         self.ports = []
00128         self.names = []
00129         
00130     def failure_call(self, exception_str = ''):
00131         rospy.logerr(exception_str)
00132         r = TestResultRequest()
00133         r.html_result = exception_str
00134         r.text_summary = 'Caught exception, automated test failure.'
00135         r.plots = []
00136         r.result = TestResultRequest.RESULT_FAIL
00137         self.send_results(r)
00138         rospy.logout( 'Test failed, waiting for shutdown')
00139         rospy.spin()
00140 
00141     def send_results(self, test_result):
00142         if self.data_sent:
00143             return
00144         
00145         rospy.wait_for_service('test_result', 15)
00146         self.result_service.call(test_result)
00147         self.data_sent = True
00148 
00149     def run_qual_test(self):
00150                           
00151         bytes = 16384
00152         rate = 115200
00153         
00154         ports = []
00155         seeds = []
00156         
00157         timeout = bytes * 10 / rate * 1.1 + 3
00158 
00159         ok = True
00160         try:
00161             for i in range(0,4):
00162                 self.names.append('/dev/ttyUSB%i'%i)
00163                 try:
00164                     ports.append(TestSerialPort(self.names[i], rate, timeout))
00165                 except serial.SerialException:
00166                     rospy.logout( 'couldnt open port: %d' % i)
00167                     self.failure_call("Error opening serial port %s. Device may not be connected." % self.names[i])
00168 
00169                 seeds.append(random.random())
00170     
00171             for i in range(0,4):
00172                 ports[i].start(TestSerialPort.READ, seeds[i^1], bytes)
00173                 
00174             for i in range(0,4):
00175                 ports[i].start(TestSerialPort.WRITE, seeds[i], bytes)
00176                 
00177             for i in range(0,4):
00178                 self.reads.append(ports[i].join_is_error(TestSerialPort.WRITE))
00179                 self.writes.append(ports[i].join_is_error(TestSerialPort.READ))
00180 
00181         except serial.SerialException:
00182             self.failure_call(traceback.format_exc())
00183         except KeyboardInterrupt:
00184             self.failure_call(traceback.format_exc())
00185 
00186         r = TestResultRequest()
00187         r.text_summary = 'USB to Serial Test: %s' % self._passed_msg()
00188         r.html_result = self._write_results()
00189         r.result = TestResultRequest.RESULT_FAIL
00190         if self._passed():
00191             r.result = TestResultRequest.RESULT_PASS
00192         
00193         self.send_results(r)
00194         
00195     def _passed_msg(self):
00196         if len(self.reads) != len(self.writes) or len(self.reads) != 4:
00197             return "Incomplete"
00198 
00199         if self._passed():
00200             return "PASS"
00201         return "FAIL"
00202 
00203     def _passed(self):
00204         if len(self.reads) != len(self.writes) or len(self.reads) != 4:
00205             return False
00206 
00207         ok = True
00208         for read in self.reads:
00209             ok = ok and read
00210         for write in self.writes:
00211             ok = ok and write
00212 
00213         if ok:
00214             return "PASS"
00215         return "FAIL"
00216 
00217     def _write_results(self):
00218         html = '<p>USB to serial send/receive test: %s</p>\n' % self._passed()
00219         html += '<table border="1" cellpadding="2" cellspacing="0">\n'
00220         html += '<tr><td><b>Port</b></td><td><b>Read</b></td><td><b>Write</b></td></tr>\n'
00221         for i in range(0, 4):
00222             html += '<tr><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (self.names[i], index_to_msg(self.reads, i), index_to_msg(self.writes, i))
00223         html += '</table>\n'
00224 
00225         return html
00226         
00227         
00228 
00229 if __name__ == '__main__':
00230     qt = QualUSBSerial()
00231     try:
00232         qt.run_qual_test()
00233     except:
00234         qt.failure_call(traceback.format_exc())
00235 
00236 


qualification
Author(s): Kevin Watts (watts@willowgarage.com), Josh Faust (jfaust@willowgarage.com)
autogenerated on Sat Dec 28 2013 17:57:34