log_performance_MM.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 # Software License Agreement (BSD License)
00004 #
00005 # Copyright (c) 2018, Dataspeed Inc.
00006 # All rights reserved.
00007 #
00008 # Redistribution and use in source and binary forms, with or without modification,
00009 # are permitted provided that the following conditions are met:
00010 # 
00011 #     * Redistributions of source code must retain the above copyright notice,
00012 #       this list of conditions and the following disclaimer.
00013 #     * Redistributions in binary form must reproduce the above copyright notice,
00014 #       this list of conditions and the following disclaimer in the documentation
00015 #       and/or other materials provided with the distribution.
00016 #     * Neither the name of Dataspeed Inc. nor the names of its
00017 #       contributors may be used to endorse or promote products derived from this
00018 #       software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00021 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00022 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00023 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
00024 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00025 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
00026 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00027 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
00028 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00029 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 
00031 import rospy
00032 import csv
00033 import math
00034 import sys
00035 import argparse
00036 
00037 from dbw_fca_msgs.msg import BrakeCmd, BrakeReport, BrakeInfoReport
00038 from dbw_fca_msgs.msg import ThrottleCmd, ThrottleReport, ThrottleInfoReport
00039 from dbw_fca_msgs.msg import Gear, GearCmd, GearReport
00040 from dbw_fca_msgs.msg import SteeringCmd, SteeringReport
00041 
00042 parser = argparse.ArgumentParser(description='Log Pacfica performance data')
00043 parser.add_argument('--steer', action='store_true', default=False, help='Enable steering tests')
00044 parser.add_argument('--brake', action='store_true', default=False, help='Enable brake tests')
00045 parser.add_argument('--lever', action='store_true', default=False, help='Enable lever tests')
00046 parser.add_argument('--gas',   action='store_true', default=False, help='Enable gas tests')
00047 parser.add_argument('--quick', action='store_true', default=False, help='Do a couple of quick tests')
00048 parser.add_argument('--all',   action='store_true', default=False, help='Enable all tests (default)')
00049 tests, _ = parser.parse_known_args()
00050 if (tests.steer or tests.brake or tests.lever or tests.gas):
00051     tests.all = False
00052 else:
00053     tests.all = True
00054 
00055 if tests.all:
00056     tests.steer = tests.brake = tests.lever = tests.gas = True
00057 tests.strict_MM = False # set to true to release to MM
00058 
00059 #
00060 #static const struct {float pedal; float torque;} BRAKE_TABLE[] = {
00061 #// Duty,   Nm
00062 # {0.150,    0},
00063 # {0.166,    0},
00064 # {0.168,    4},
00065 # {0.200,   56},
00066 # {0.225,  194},
00067 # {0.250,  456},
00068 # {0.300, 1312},
00069 # {0.350, 2352},
00070 # {0.400, 3716},
00071 # {0.434, 4740},
00072 # {0.566, 6888},
00073 # {0.600, 6888},
00074 #};
00075 #static const struct {float pedal; float percent;} THROTTLE_TABLE[] = {
00076 #// Duty,   %
00077 # {0.080, 0.000},
00078 # {0.114, 0.001},
00079 # {0.497, 0.500},
00080 # {0.890, 0.998},
00081 # {0.892, 1.000},
00082 #};
00083 
00084 # The Tests below actuate the Brake, Steering, Gas (Throttle), and Shift Lever
00085 # with the purpose of measuring the performace of these individual systems in terms
00086 # of response time and accuracy. All data is logged to csv so that the system performance
00087 # can be analyzed easily.
00088 
00089 # {0.080},
00090 # {0.114},
00091 # {0.497},
00092 # {0.890},
00093 # {0.892},
00094 brake_duty = [ 0.150, 0.166, 0.168, 0.200, 0.225, 0.250, 0.300, 0.350, 0.400, 0.434, 0.566, 0.600]
00095 brake_Nm =   [ 0.000, 0.000, 4.000, 56.00, 194.0, 456.0, 1312,  2352,  3716,  4740,  6888,  6888]
00096 
00097 throttle_pct = [ 0.000, 0.000, 0.001, 0.500, 0.998, 1.000, 1.000]
00098 throttle_duty = [ 0.000, 0.080, 0.114, 0.497, 0.890, 0.892, 1.000]
00099 
00100 dont_check = False
00101 
00102 # timing specification
00103 command_resolution_s = 0.04
00104 initial_check_wait_s = 1.0
00105 first_cmd_hold_s     = 5.0
00106 
00107 test_counts = {}
00108 class CarSysTest:
00109 
00110     def check_initial_conditions(self):
00111         if dont_check: return 
00112         # Make sure the system is in a safe configuration at the start of a test
00113         if not self.all_ready(['SteeringReport', 'GearReport']):
00114             self.stop()
00115             return
00116         steering_report = self.get_msg('SteeringReport')
00117         if steering_report.speed > 0.0:
00118             rospy.logerr('Speed check failed. Vehicle is moving.')
00119             self.stop()
00120             return
00121         gear_report = self.get_msg('GearReport')
00122         # this wont work for gear testing!
00123         if self.name() != "LeverTest":
00124             if not gear_report.state.gear == gear_report.state.PARK:
00125                 rospy.logerr('Gear check failed. Vehicle not in park.')
00126                 self.stop()
00127                 return
00128 
00129     def tick(self, event):
00130         if self.done: return
00131         if self.repeat_ct == self.repeats:
00132             self.repeat_ct = 0
00133             self.idx += 1
00134         else:
00135             self.repeat_ct += 1
00136 
00137         start_ct = -int(math.ceil(first_cmd_hold_s/command_resolution_s))
00138         if self.repeat_ct < start_ct:
00139             pass # wait a bit for system to be ready
00140         elif self.repeat_ct == start_ct:
00141             self.check_initial_conditions()
00142         else:
00143             try:
00144                 value = self.sequence[self.idx]
00145                 self.pub(value)
00146             except IndexError:
00147                 self.stop()
00148 
00149     def __init__(self, description, sequence, units="percent", num_reps=1, period_s=1):
00150         self.description = description
00151         self.repeats = int(math.ceil(period_s/command_resolution_s))
00152         self.sequence = sequence
00153         self.units = units
00154         self.done = False
00155         # give a counted name to each test
00156         global test_counts
00157         try:
00158             test_counts[self.name()] += 1
00159         except:
00160             test_counts[self.name()] = 1
00161         self.test_num = test_counts[self.name()]
00162 
00163     def name(self):
00164         return self.__class__.__name__
00165 
00166     def test_name(self):
00167         return "%s%d" % (self.name(), self.test_num)
00168 
00169     def start(self, msgs):
00170         self.msgs = msgs
00171 
00172         rospy.loginfo("Starting %s: %s" % (self.test_name(), self.description))
00173         self.csv_file = open(self.test_name() + '.csv', 'w')
00174         self.csv_writer = csv.writer(self.csv_file, delimiter=',')
00175         self.write_csv_header(self.csv_writer)
00176         # we want to wait a bit and check initial conditions before actually performing the tests
00177         self.repeat_ct = -int((initial_check_wait_s + first_cmd_hold_s)/command_resolution_s)
00178         self.idx = 0
00179         self.done = False
00180         self.pedal_cmd = 0.0
00181         self.start_time = rospy.get_rostime()
00182 
00183     def stop(self):
00184         rospy.loginfo('Saving csv file')
00185         self.csv_file.close()
00186         self.done = True
00187 
00188     def get_msg(self, msg_name):
00189         _, msg = self.msgs[msg_name]
00190         return msg
00191 
00192     def mark_not_ready(self, msg_name):
00193         try:
00194             ready, msg = self.msgs[msg_name]
00195             if ready:
00196                 self.msgs[msg_name] = (False, msg)
00197         except:
00198             rospy.logerr("%s is not a valid message name" % msg_name)
00199     def mark_all_not_ready(self):
00200         for msg_name, _ in self.msgs.items():
00201             mark_not_ready(msg_name)
00202 
00203     def is_ready(self, msg_name):
00204         try:
00205             ready, _ = self.msgs[msg_name]
00206             if not ready:
00207                 rospy.logwarn('no new %s messages' % msg_name)
00208             return ready
00209         except:
00210             return False
00211 
00212     def all_ready(self, msg_names):
00213         for name in msg_names:
00214             if not self.is_ready(name):
00215                 return False
00216         return True
00217 
00218     def elapsed_time_s(self):
00219         return (rospy.get_rostime() - self.start_time).to_sec() - initial_check_wait_s - first_cmd_hold_s + 0.02
00220 
00221 # Brake Tests
00222 #    Make sure GEAR=PARK and SPEED=0
00223 # Save the folowing data:
00224 #    /vehicle/brake_cmd:pedal_cmd
00225 #    /vehicle/brake_report:pedal_output
00226 #    /vehicle/brake_info_report:brake_pc
00227 #    /vehicle/brake_info_report:brake_pressure
00228 class BrakeTest(CarSysTest):
00229 
00230     def write_csv_header(self, csv_writer):
00231         csv_writer.writerow(['Elapsed Time (s)', 'Brake Cmd (%)', 'Measured (%)', 'Pressure (bar)', 'User Input'])
00232 
00233     def pub(self, brake_val):
00234         if self.repeat_ct == 0:
00235             rospy.loginfo("Setting brake to %f" % brake_val)
00236         msg = BrakeCmd()
00237         if brake_val >= 0:
00238             msg.enable = True
00239             if self.units == "percent":
00240                 msg.pedal_cmd_type = BrakeCmd.CMD_PERCENT
00241                 self.pedal_cmd = brake_val/100.0
00242                 msg.pedal_cmd = self.pedal_cmd
00243             elif self.units == "torque":
00244                 msg.pedal_cmd_type = BrakeCmd.CMD_TORQUE_RQ
00245                 self.pedal_cmd = brake_val
00246                 msg.pedal_cmd = self.pedal_cmd
00247             else:
00248                 raise Exception("units %s unsupported for BrakeTest" % self.units)
00249             brake_pub.publish(msg)
00250 
00251         # we are watching these, lets make sure we keep getting messages
00252         self.check_conditions()
00253         self.mark_not_ready('BrakeReport')
00254         self.mark_not_ready('BrakeInfoReport')
00255 
00256     def check_conditions(self):
00257         if dont_check: return
00258         # Make sure we are getting new messages
00259         if not self.all_ready(['BrakeReport', 'BrakeInfoReport']):
00260             self.stop()
00261             return
00262         # TODO: check if brake_report.enabled = True?
00263         # simple check doesn't work due to startup condition
00264 
00265     def recv_msg(self, msg):
00266         # don't record start sequence
00267         if self.repeat_ct < 0: return
00268         # align reporting to brake report
00269         if type(msg).__name__ == 'BrakeReport':
00270             if self.units == 'percent':
00271                 self.csv_writer.writerow(["{:10.06f}".format(self.elapsed_time_s()),
00272                                           "{:7.03f}".format(self.pedal_cmd*100),
00273                                           "{:7.03f}".format(msg.pedal_cmd),
00274                                           "{:7.03f}".format(msg.pedal_input),
00275                                           "{:7.03f}".format(msg.pedal_output),
00276                                           "{:d}".format(msg.user_input)])
00277             elif self.units == 'torque':
00278                 self.csv_writer.writerow(["{:10.06f}".format(self.elapsed_time_s()),
00279                                           "{:7.03f}".format(msg.pedal_cmd),
00280                                           "{:7.03f}".format(msg.pedal_input),
00281                                           "{:7.03f}".format(msg.pedal_output),
00282                                           "{:7.03f}".format(self.pedal_cmd)])
00283 
00284 # Steering Tests
00285 #    Make sure GEAR=PARK and SPEED=0
00286 # Save the following data:
00287 #    /vehicle/steering_cmd:steering_wheel_angle_cmd
00288 #    /vehicle/steering_report:steering_wheel_cmd
00289 #    /vehicle/steering_report:steering_wheel_angle
00290 class SteeringTest(CarSysTest):
00291 
00292     def write_csv_header(self, csv_writer):
00293         self.steering_wheel_cmd = 0.0
00294         csv_writer.writerow(['Elapsed Time (s)', 'Steering Cmd Sent (Degrees)', 'Steering Cmd Reported (Degrees)', 'Measured (Degrees)'])
00295 
00296     def pub(self, steering_val):
00297         if self.repeat_ct == 0:
00298             rospy.loginfo("Setting steering to %f" % steering_val)
00299         msg = SteeringCmd()
00300         msg.enable = True
00301         if self.units == "percent": # a percent corresponds to 5 degrees
00302             self.steering_wheel_cmd = math.radians(steering_val*5)
00303         elif self.units == "degrees":
00304             self.steering_wheel_cmd = math.radians(steering_val)
00305         else:
00306             raise Exception("units %s unsupported for SteeringTest" % self.units)
00307         msg.steering_wheel_angle_cmd = self.steering_wheel_cmd
00308         steering_pub.publish(msg)
00309 
00310         # we are watching these, lets make sure we keep getting messages
00311         self.check_conditions()
00312         self.mark_not_ready('SteeringReport')
00313 
00314     def check_conditions(self):
00315         if dont_check: return
00316         # Make sure we are getting new messages
00317         if not self.all_ready(['SteeringReport']):
00318             self.stop()
00319             return
00320         # TODO: check if brake_report.enabled = True?
00321         # simple check doesn't work due to startup condition
00322 
00323     def recv_msg(self, msg):
00324         # don't record start sequence
00325         if self.repeat_ct < 0: return
00326         # align reporting to steering report
00327         if type(msg).__name__ == 'SteeringReport':
00328             self.csv_writer.writerow(["{:10.06f}".format(self.elapsed_time_s()),
00329                                       "{: 6.1f}".format(math.degrees(self.steering_wheel_cmd)),
00330                                       "{: 6.1f}".format(math.degrees(msg.steering_wheel_cmd)),
00331                                       "{: 6.1f}".format(math.degrees(msg.steering_wheel_angle))])
00332 
00333     lock2lock_degrees = math.degrees(SteeringCmd().ANGLE_MAX*2.0)
00334 
00335 # Gas Tests
00336 #    Make sure GEAR=PARK and SPEED=0
00337 # Save the following data:
00338 #    /vehicle/throttle_cmd:pedal_cmd
00339 #    /vehicle/throttle_report:pedal_output
00340 #    /vehicle/throttle_info_report:throttle_pc
00341 class ThrottleTest(CarSysTest):
00342 
00343     def write_csv_header(self, csv_writer):
00344         csv_writer.writerow(['Elapsed Time (s)', 'Throttle Cmd (%)', 'Measured (%)'])
00345 
00346     def pub(self, throttle_val):
00347         if self.repeat_ct == 0:
00348             rospy.loginfo("Setting throttle to %f" % throttle_val)
00349         msg = ThrottleCmd()
00350         if throttle_val >= 0:
00351             msg.enable = True
00352             msg.pedal_cmd_type = ThrottleCmd.CMD_PERCENT
00353             if self.units == "percent":
00354                 self.pedal_cmd = throttle_val/100.0
00355                 msg.pedal_cmd = self.pedal_cmd
00356             else:
00357                 raise Exception("units %s unsupported for ThrottleTest" % self.units)
00358             throttle_pub.publish(msg)
00359 
00360         # we are watching these, lets make sure we keep getting messages
00361         self.check_conditions()
00362         self.mark_not_ready('ThrottleReport')
00363         self.mark_not_ready('ThrottleInfoReport')
00364 
00365     def check_conditions(self):
00366         if dont_check: return
00367         # Make sure we are getting new messages
00368         if not self.all_ready(['ThrottleReport', 'ThrottleInfoReport']):
00369             self.stop()
00370             return
00371         # TODO: check if throttle_report.enabled = True?
00372         # simple check doesn't work due to startup condition
00373 
00374     def recv_msg(self, msg):
00375         # don't record start sequence
00376         if self.repeat_ct < 0: return
00377         # align reporting to throttle report
00378         if type(msg).__name__ == 'ThrottleInfoReport':
00379             self.csv_writer.writerow(["{:10.06f}".format(self.elapsed_time_s()),
00380                                       "{:5.01f}".format(self.pedal_cmd*100),
00381                                       "{:5.01f}".format(msg.throttle_pc)])
00382 
00383 # Lever Tests
00384 # "To perfomer tests in safety condition, must be activated parking brake lever.
00385 # This test have to guarantee that there no necessity to active brake pedal by Magneti Marelli ACU.
00386 # IF THERE IS NECESSITY TO MANAGE BRAKE PEDAL PRESSION DURING LEVER CHANGE, THIS HAS TO BE COMMUNICATED TO MAGNETI MARELLI
00387 #    Make sure SPEED=0
00388 # Save the folowing data:
00389 #    /vehicle/gear_cmd:cmd
00390 #    /vehicle/gear_report:state
00391 # TODO: add lever tests class
00392 class LeverTest(CarSysTest):
00393 
00394     def write_csv_header(self, csv_writer):
00395         csv_writer.writerow(['Elapsed Time (s)', 'Lever Cmd Requested', 'Lever Cmd Reported', 'Lever Position Reported'])
00396 
00397     def pub(self, gear):
00398         if self.repeat_ct == 0:
00399             rospy.loginfo("Setting gear to %s" % gear)
00400         msg = GearCmd()
00401         if self.units == "gear":
00402             if gear == 'P':
00403                 msg.cmd.gear = Gear().PARK
00404             elif gear == 'R':
00405                 msg.cmd.gear = Gear().REVERSE
00406             elif gear == 'D':
00407                 msg.cmd.gear = Gear().DRIVE
00408             elif gear == 'N':
00409                 msg.cmd.gear = Gear().NEUTRAL
00410             else:
00411                 raise Exception("Gear %s is not supported" % gear)
00412             self.cmd = gear
00413         else:
00414             raise Exception("units %s unsupported for LeverTest" % self.units)
00415         gear_pub.publish(msg)
00416         # publish a brake message as well
00417         bmsg = BrakeCmd()
00418         bmsg.enable = True
00419         bmsg.pedal_cmd_type = BrakeCmd.CMD_PERCENT
00420         bmsg.pedal_cmd = 0.4
00421         brake_pub.publish(bmsg)
00422 
00423         # we are watching these, lets make sure we keep getting messages
00424         self.check_conditions()
00425         #self.mark_not_ready('GearReport') # these dont come fast enough
00426 
00427     def check_conditions(self):
00428         if dont_check: return
00429         # Make sure we are getting new messages
00430         if not self.all_ready(['GearReport']):
00431             self.stop()
00432             return
00433 
00434     def to_gear(self, gear_num):
00435         if gear_num == Gear().PARK:
00436             return 'P'
00437         elif gear_num == Gear().REVERSE:
00438             return 'R'
00439         elif gear_num == Gear().DRIVE:
00440             return 'D'
00441         elif gear_num == Gear().NEUTRAL:
00442             return 'N'
00443         elif gear_num == Gear().LOW:
00444             return 'L'
00445         else:
00446             return 'None'
00447 
00448     def recv_msg(self, msg):
00449         # don't record start sequence
00450         if self.repeat_ct < 0: return
00451         # align reporting to gear report
00452         if type(msg).__name__ == 'GearReport':
00453             self.csv_writer.writerow(["{:.06f}".format(self.elapsed_time_s()),
00454                                       self.cmd, self.to_gear(msg.cmd.gear), self.to_gear(msg.state.gear)])
00455 
00456 def reset_gear(gear):
00457     rospy.loginfo("Resetting gear to %s before starting test" % gear)
00458     times = int(5/command_resolution_s)
00459     r = rospy.Rate(1/command_resolution_s)
00460     for i in range(0, times):
00461         msg = GearCmd()
00462         if gear == 'P':
00463             msg.cmd.gear = Gear().PARK
00464         elif gear == 'R':
00465             msg.cmd.gear = Gear().REVERSE
00466         elif gear == 'D':
00467             msg.cmd.gear = Gear().DRIVE
00468         elif gear == 'N':
00469             msg.cmd.gear = Gear().NEUTRAL
00470         else:
00471             raise Exception("Gear %s is not supported" % gear)
00472         gear_pub.publish(msg)
00473 
00474         bmsg = BrakeCmd()
00475         bmsg.enable = True
00476         bmsg.pedal_cmd_type = BrakeCmd.CMD_PERCENT
00477         bmsg.pedal_cmd = 0.4
00478         brake_pub.publish(bmsg)
00479         r.sleep()
00480 
00481 perf_tests = []
00482 if tests.brake:
00483     perf_tests.append(BrakeTest("Square wave", [ 0, 10, 0, 10, 0, 10, 0, 10, 0, 10], period_s=1, num_reps=4))
00484     perf_tests.append(BrakeTest("Square wave", [ 0, 50, 0, 50, 0, 50, 0, 50], period_s=1, num_reps=4))
00485     if not tests.strict_MM:
00486         perf_tests.append(BrakeTest("Square wave", [ 0, 100, 0, 100, 0, 100, 0, 100, 0, 100, 0, 100, 0, 100, 0, 100], period_s=1, num_reps=4))
00487         perf_tests.append(BrakeTest("Square wave", [ 0, 100, 100, 100, 100, 100, 100, 100, 100], period_s=1, num_reps=4))
00488     if not tests.quick:
00489         perf_tests.append(BrakeTest("Increasing impulse wave",
00490                                     [ 20, 0, 20.5, 0, 21, 0, 21.5, 0, 22, 0, 22.5, 0, 23, 0, 23.5, 0, 24, 0, 24.5, 0, 25],
00491                                     period_s=5, num_reps=4))
00492         perf_tests.append(BrakeTest("Square wave",
00493                                     [0, 10, 0, 15, 0, 20, 0, 25, 0, 30, 0, 35, 0, 40, 0, 45, 0, 50,
00494                                      0, 55, 0, 60, 0, 65, 0, 70, 0, 75, 0, 80, 0, 85, 0, 90, 0, 95, 0, 100],
00495                                     period_s=10, num_reps=4))
00496         perf_tests.append(BrakeTest("Square Wave", [ 0, 10, 15, 20, 25, 30, 35, 40, 45, 50,
00497                                                      55, 60, 65, 70, 75, 80, 85, 90, 95, 100],
00498                                     period_s=10, num_reps=4))
00499         perf_tests.append(BrakeTest("Increasing Wave",
00500                                     [ 20, 21, 20, 21.5, 20, 22, 20, 22.5, 20, 23, 20, 23.5, 20, 24, 20, 24.5, 20, 25, 20, 25.5,
00501                                       20, 26, 20, 26.5, 20, 27, 20, 27.5, 20, 28, 20, 28.5, 20, 29, 20, 29.5, 20, 30],
00502                                     period_s=10, num_reps=4))
00503 if tests.steer:
00504     perf_tests.append(SteeringTest("Square wave",
00505                                    [0, SteeringTest.lock2lock_degrees / 4, 0, SteeringTest.lock2lock_degrees / 4, 0, SteeringTest.lock2lock_degrees / 4, 0, SteeringTest.lock2lock_degrees / 4, 0],
00506                                    period_s=2, units="degrees", num_reps=5))  # 0 degree - Max( from zero to lock side)
00507     perf_tests.append(SteeringTest("Increasing wave",
00508                                    [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
00509                                    period_s=2, num_reps=2))
00510     perf_tests.append(SteeringTest("Increasing wave",
00511                                    [0, 10, 0, 20, 0, 30, 0, 40, 0, 50, 0, 60, 0, 70, 0, 80, 0, 90, 0, 100, 0],
00512                                    period_s=2, num_reps=2))
00513     # perf_tests.append(SteeringTest("Increasing impulsive wave",
00514     #                [ 20, 0, 20.1, 0, 20.2, 0, 20.3, 0, 20.4, 0,  20.5, 0, 20.6, 0, 20.7, 0, 20.8, 0, 20.9, 0, 21],
00515     #                units="degrees", period_s=5, num_reps=4))
00516     perf_tests.append(SteeringTest("Increasing impulsive wave",
00517                                    [0, 0.5, 0, -0.5, 0, 1, 0, -1, 0, 1.5, 0, -1.5, 0, 2, 0, -2, 0, 2.5, 0, -2.5],
00518                                    units="degrees", period_s=2, num_reps=2))
00519 
00520 if tests.gas:
00521     perf_tests.append(ThrottleTest("Step test", [ 0, 25 ], num_reps=4))
00522     perf_tests.append(ThrottleTest("Step test", [ 0, 50 ], num_reps=4))
00523     perf_tests.append(ThrottleTest("Step test", [ 0, 75 ], num_reps=4))
00524     perf_tests.append(ThrottleTest("Step test", [ 0, 100 ], num_reps=4))
00525 
00526 if tests.lever:
00527     perf_tests.append(LeverTest("Transition", [ 'P', 'N'], num_reps=2, units="gear"))
00528     perf_tests.append(LeverTest("Transition", [ 'P', 'D'], num_reps=2, units="gear"))
00529     perf_tests.append(LeverTest("Transition", [ 'P', 'R'], num_reps=2, units="gear"))
00530     perf_tests.append(LeverTest("Transition", [ 'N', 'D'], num_reps=2, units="gear"))
00531     perf_tests.append(LeverTest("Transition", [ 'N', 'R'], num_reps=2, units="gear"))
00532     perf_tests.append(LeverTest("Transition", [ 'N', 'P'], num_reps=2, units="gear"))
00533     perf_tests.append(LeverTest("Transition", [ 'D', 'N'], num_reps=2, units="gear"))
00534     perf_tests.append(LeverTest("Transition", [ 'D', 'P'], num_reps=2, units="gear"))
00535     perf_tests.append(LeverTest("Transition", [ 'D', 'R'], num_reps=2, units="gear"))
00536     perf_tests.append(LeverTest("Transition", [ 'R', 'D'], num_reps=2, units="gear"))
00537     perf_tests.append(LeverTest("Transition", [ 'R', 'P'], num_reps=2, units="gear"))
00538     perf_tests.append(LeverTest("Transition", [ 'R', 'N'], num_reps=2, units="gear"))
00539 
00540     # just to put it back in park
00541     perf_tests.append(LeverTest("Transition", [ 'P'], num_reps=2, units="gear"))
00542 
00543 class TestNode:
00544     def __init__(self):
00545         rospy.init_node('test_runner')
00546         self.current_test = None
00547 
00548         # Variables for logging
00549         self.brake_cmd = 0.0
00550 
00551         # Keep all of the different types of message and whether or not they are ready
00552         self.msgs = {} # ( Ready?, Report )
00553         self.msgs['BrakeReport'] = ( False, BrakeReport() )
00554         self.msgs['BrakeInfoReport'] = ( False, BrakeInfoReport() )
00555         self.msgs['GearReport'] = ( False, GearReport() )
00556         self.msgs['SteeringReport'] = ( False, SteeringReport() )
00557         self.msgs['ThrottleReport'] = ( False, ThrottleReport() )
00558         self.msgs['ThrottleInfoReport'] = ( False, ThrottleInfoReport() )
00559 
00560         # Open CSV file
00561         self.csv_file = open('brake_sweep_data.csv', 'w')
00562         self.csv_writer = csv.writer(self.csv_file, delimiter=',')
00563         self.csv_writer.writerow(['Brake (%)', 'Measured (%)', 'Pressure (bar)'])
00564 
00565         # Publishers and subscribers
00566         # Note: publishers are global so that each of the specific test classes can publish to them
00567         # without having to explicity allocate and manage them
00568         global brake_pub, gear_pub, steering_pub, throttle_pub
00569         brake_pub    = rospy.Publisher('/vehicle/brake_cmd',    BrakeCmd,    queue_size=1)
00570         gear_pub     = rospy.Publisher('/vehicle/gear_cmd',     GearCmd,     queue_size=1)
00571         steering_pub = rospy.Publisher('/vehicle/steering_cmd', SteeringCmd, queue_size=1)
00572         throttle_pub = rospy.Publisher('/vehicle/throttle_cmd', ThrottleCmd, queue_size=1)
00573 
00574         rospy.Subscriber('/vehicle/brake_report',         BrakeReport,         self.recv_msg)
00575         rospy.Subscriber('/vehicle/brake_info_report',    BrakeInfoReport,     self.recv_msg)
00576         rospy.Subscriber('/vehicle/throttle_report',      ThrottleReport,      self.recv_msg)
00577         rospy.Subscriber('/vehicle/throttle_info_report', ThrottleInfoReport,  self.recv_msg)
00578         rospy.Subscriber('/vehicle/gear_report',          GearReport,          self.recv_msg)
00579         rospy.Subscriber('/vehicle/steering_report',      SteeringReport,      self.recv_msg)
00580         
00581         rospy.Timer(rospy.Duration(command_resolution_s), self.timer_process)
00582 
00583     def timer_process(self, event):
00584         if not self.current_test is None and not self.current_test.done:
00585             self.current_test.tick(event)
00586 
00587     # consolidate all of these into one that uses a dictionary
00588     def recv_msg(self, msg):
00589         self.msgs[type(msg).__name__] = ( True, msg)
00590         if not self.current_test is None and not self.current_test.done:
00591             self.current_test.recv_msg(msg)
00592 
00593     def run_tests(self):
00594         rospy.sleep(rospy.Duration(5))
00595         if rospy.is_shutdown():
00596             rospy.loginfo("Stopping test")
00597             sys.exit(-1)
00598         for test in perf_tests:
00599             if test.__class__.__name__ == "LeverTest":
00600                 reset_gear(test.sequence[0])
00601             test.start(self.msgs)
00602             self.current_test = test
00603             while not rospy.is_shutdown() and not test.done:
00604                 rospy.sleep(rospy.Duration(0.01))
00605             if rospy.is_shutdown():
00606                 break
00607 
00608 if __name__ == '__main__':
00609     try:
00610         node = TestNode()
00611         node.run_tests()
00612     except rospy.ROSInterruptException:
00613         rospy.loginfo("Stopping test")
00614         pass


dbw_fca_can
Author(s): Kevin Hallenbeck
autogenerated on Sat May 4 2019 02:40:31