log_performance_MM.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2018, Dataspeed Inc.
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without modification,
9 # are permitted provided that the following conditions are met:
10 #
11 # * Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above copyright notice,
14 # this list of conditions and the following disclaimer in the documentation
15 # and/or other materials provided with the distribution.
16 # * Neither the name of Dataspeed Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from this
18 # software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
21 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 import rospy
32 import csv
33 import math
34 import sys
35 import argparse
36 
37 from dbw_fca_msgs.msg import BrakeCmd, BrakeReport, BrakeInfoReport
38 from dbw_fca_msgs.msg import ThrottleCmd, ThrottleReport, ThrottleInfoReport
39 from dbw_fca_msgs.msg import Gear, GearCmd, GearReport
40 from dbw_fca_msgs.msg import SteeringCmd, SteeringReport
41 
42 parser = argparse.ArgumentParser(description='Log Pacfica performance data')
43 parser.add_argument('--steer', action='store_true', default=False, help='Enable steering tests')
44 parser.add_argument('--brake', action='store_true', default=False, help='Enable brake tests')
45 parser.add_argument('--lever', action='store_true', default=False, help='Enable lever tests')
46 parser.add_argument('--gas', action='store_true', default=False, help='Enable gas tests')
47 parser.add_argument('--quick', action='store_true', default=False, help='Do a couple of quick tests')
48 parser.add_argument('--all', action='store_true', default=False, help='Enable all tests (default)')
49 tests, _ = parser.parse_known_args()
50 if (tests.steer or tests.brake or tests.lever or tests.gas):
51  tests.all = False
52 else:
53  tests.all = True
54 
55 if tests.all:
56  tests.steer = tests.brake = tests.lever = tests.gas = True
57 tests.strict_MM = False # set to true to release to MM
58 
59 #
60 #static const struct {float pedal; float torque;} BRAKE_TABLE[] = {
61 #// Duty, Nm
62 # {0.150, 0},
63 # {0.166, 0},
64 # {0.168, 4},
65 # {0.200, 56},
66 # {0.225, 194},
67 # {0.250, 456},
68 # {0.300, 1312},
69 # {0.350, 2352},
70 # {0.400, 3716},
71 # {0.434, 4740},
72 # {0.566, 6888},
73 # {0.600, 6888},
74 #};
75 #static const struct {float pedal; float percent;} THROTTLE_TABLE[] = {
76 #// Duty, %
77 # {0.080, 0.000},
78 # {0.114, 0.001},
79 # {0.497, 0.500},
80 # {0.890, 0.998},
81 # {0.892, 1.000},
82 #};
83 
84 # The Tests below actuate the Brake, Steering, Gas (Throttle), and Shift Lever
85 # with the purpose of measuring the performace of these individual systems in terms
86 # of response time and accuracy. All data is logged to csv so that the system performance
87 # can be analyzed easily.
88 
89 # {0.080},
90 # {0.114},
91 # {0.497},
92 # {0.890},
93 # {0.892},
94 brake_duty = [ 0.150, 0.166, 0.168, 0.200, 0.225, 0.250, 0.300, 0.350, 0.400, 0.434, 0.566, 0.600]
95 brake_Nm = [ 0.000, 0.000, 4.000, 56.00, 194.0, 456.0, 1312, 2352, 3716, 4740, 6888, 6888]
96 
97 throttle_pct = [ 0.000, 0.000, 0.001, 0.500, 0.998, 1.000, 1.000]
98 throttle_duty = [ 0.000, 0.080, 0.114, 0.497, 0.890, 0.892, 1.000]
99 
100 dont_check = False
101 
102 # timing specification
103 command_resolution_s = 0.04
104 initial_check_wait_s = 1.0
105 first_cmd_hold_s = 5.0
106 
107 test_counts = {}
109 
111  if dont_check: return
112  # Make sure the system is in a safe configuration at the start of a test
113  if not self.all_ready(['SteeringReport', 'GearReport']):
114  self.stop()
115  return
116  steering_report = self.get_msg('SteeringReport')
117  if steering_report.speed > 0.0:
118  rospy.logerr('Speed check failed. Vehicle is moving.')
119  self.stop()
120  return
121  gear_report = self.get_msg('GearReport')
122  # this wont work for gear testing!
123  if self.name() != "LeverTest":
124  if not gear_report.state.gear == gear_report.state.PARK:
125  rospy.logerr('Gear check failed. Vehicle not in park.')
126  self.stop()
127  return
128 
129  def tick(self, event):
130  if self.done: return
131  if self.repeat_ct == self.repeats:
132  self.repeat_ct = 0
133  self.idx += 1
134  else:
135  self.repeat_ct += 1
136 
137  start_ct = -int(math.ceil(first_cmd_hold_s/command_resolution_s))
138  if self.repeat_ct < start_ct:
139  pass # wait a bit for system to be ready
140  elif self.repeat_ct == start_ct:
142  else:
143  try:
144  value = self.sequence[self.idx]
145  self.pub(value)
146  except IndexError:
147  self.stop()
148 
149  def __init__(self, description, sequence, units="percent", num_reps=1, period_s=1):
150  self.description = description
151  self.repeats = int(math.ceil(period_s/command_resolution_s))
152  self.sequence = sequence
153  self.units = units
154  self.done = False
155  # give a counted name to each test
156  global test_counts
157  try:
158  test_counts[self.name()] += 1
159  except:
160  test_counts[self.name()] = 1
161  self.test_num = test_counts[self.name()]
162 
163  def name(self):
164  return self.__class__.__name__
165 
166  def test_name(self):
167  return "%s%d" % (self.name(), self.test_num)
168 
169  def start(self, msgs):
170  self.msgs = msgs
171 
172  rospy.loginfo("Starting %s: %s" % (self.test_name(), self.description))
173  self.csv_file = open(self.test_name() + '.csv', 'w')
174  self.csv_writer = csv.writer(self.csv_file, delimiter=',')
175  self.write_csv_header(self.csv_writer)
176  # we want to wait a bit and check initial conditions before actually performing the tests
177  self.repeat_ct = -int((initial_check_wait_s + first_cmd_hold_s)/command_resolution_s)
178  self.idx = 0
179  self.done = False
180  self.pedal_cmd = 0.0
181  self.start_time = rospy.get_rostime()
182 
183  def stop(self):
184  rospy.loginfo('Saving csv file')
185  self.csv_file.close()
186  self.done = True
187 
188  def get_msg(self, msg_name):
189  _, msg = self.msgs[msg_name]
190  return msg
191 
192  def mark_not_ready(self, msg_name):
193  try:
194  ready, msg = self.msgs[msg_name]
195  if ready:
196  self.msgs[msg_name] = (False, msg)
197  except:
198  rospy.logerr("%s is not a valid message name" % msg_name)
200  for msg_name, _ in self.msgs.items():
201  mark_not_ready(msg_name)
202 
203  def is_ready(self, msg_name):
204  try:
205  ready, _ = self.msgs[msg_name]
206  if not ready:
207  rospy.logwarn('no new %s messages' % msg_name)
208  return ready
209  except:
210  return False
211 
212  def all_ready(self, msg_names):
213  for name in msg_names:
214  if not self.is_ready(name):
215  return False
216  return True
217 
218  def elapsed_time_s(self):
219  return (rospy.get_rostime() - self.start_time).to_sec() - initial_check_wait_s - first_cmd_hold_s + 0.02
220 
221 # Brake Tests
222 # Make sure GEAR=PARK and SPEED=0
223 # Save the folowing data:
224 # /vehicle/brake_cmd:pedal_cmd
225 # /vehicle/brake_report:pedal_output
226 # /vehicle/brake_info_report:brake_pc
227 # /vehicle/brake_info_report:brake_pressure
229 
230  def write_csv_header(self, csv_writer):
231  csv_writer.writerow(['Elapsed Time (s)', 'Brake Cmd (%)', 'Measured (%)', 'Pressure (bar)', 'User Input'])
232 
233  def pub(self, brake_val):
234  if self.repeat_ct == 0:
235  rospy.loginfo("Setting brake to %f" % brake_val)
236  msg = BrakeCmd()
237  if brake_val >= 0:
238  msg.enable = True
239  if self.units == "percent":
240  msg.pedal_cmd_type = BrakeCmd.CMD_PERCENT
241  self.pedal_cmd = brake_val/100.0
242  msg.pedal_cmd = self.pedal_cmd
243  elif self.units == "torque":
244  msg.pedal_cmd_type = BrakeCmd.CMD_TORQUE_RQ
245  self.pedal_cmd = brake_val
246  msg.pedal_cmd = self.pedal_cmd
247  else:
248  raise Exception("units %s unsupported for BrakeTest" % self.units)
249  brake_pub.publish(msg)
250 
251  # we are watching these, lets make sure we keep getting messages
252  self.check_conditions()
253  self.mark_not_ready('BrakeReport')
254  self.mark_not_ready('BrakeInfoReport')
255 
256  def check_conditions(self):
257  if dont_check: return
258  # Make sure we are getting new messages
259  if not self.all_ready(['BrakeReport', 'BrakeInfoReport']):
260  self.stop()
261  return
262  # TODO: check if brake_report.enabled = True?
263  # simple check doesn't work due to startup condition
264 
265  def recv_msg(self, msg):
266  # don't record start sequence
267  if self.repeat_ct < 0: return
268  # align reporting to brake report
269  if type(msg).__name__ == 'BrakeReport':
270  if self.units == 'percent':
271  self.csv_writer.writerow(["{:10.06f}".format(self.elapsed_time_s()),
272  "{:7.03f}".format(self.pedal_cmd*100),
273  "{:7.03f}".format(msg.pedal_cmd),
274  "{:7.03f}".format(msg.pedal_input),
275  "{:7.03f}".format(msg.pedal_output),
276  "{:d}".format(msg.user_input)])
277  elif self.units == 'torque':
278  self.csv_writer.writerow(["{:10.06f}".format(self.elapsed_time_s()),
279  "{:7.03f}".format(msg.pedal_cmd),
280  "{:7.03f}".format(msg.pedal_input),
281  "{:7.03f}".format(msg.pedal_output),
282  "{:7.03f}".format(self.pedal_cmd)])
283 
284 # Steering Tests
285 # Make sure GEAR=PARK and SPEED=0
286 # Save the following data:
287 # /vehicle/steering_cmd:steering_wheel_angle_cmd
288 # /vehicle/steering_report:steering_wheel_cmd
289 # /vehicle/steering_report:steering_wheel_angle
291 
292  def write_csv_header(self, csv_writer):
294  csv_writer.writerow(['Elapsed Time (s)', 'Steering Cmd Sent (Degrees)', 'Steering Cmd Reported (Degrees)', 'Measured (Degrees)'])
295 
296  def pub(self, steering_val):
297  if self.repeat_ct == 0:
298  rospy.loginfo("Setting steering to %f" % steering_val)
299  msg = SteeringCmd()
300  msg.enable = True
301  if self.units == "percent": # a percent corresponds to 5 degrees
302  self.steering_wheel_cmd = math.radians(steering_val*5)
303  elif self.units == "degrees":
304  self.steering_wheel_cmd = math.radians(steering_val)
305  else:
306  raise Exception("units %s unsupported for SteeringTest" % self.units)
307  msg.steering_wheel_angle_cmd = self.steering_wheel_cmd
308  steering_pub.publish(msg)
309 
310  # we are watching these, lets make sure we keep getting messages
311  self.check_conditions()
312  self.mark_not_ready('SteeringReport')
313 
314  def check_conditions(self):
315  if dont_check: return
316  # Make sure we are getting new messages
317  if not self.all_ready(['SteeringReport']):
318  self.stop()
319  return
320  # TODO: check if brake_report.enabled = True?
321  # simple check doesn't work due to startup condition
322 
323  def recv_msg(self, msg):
324  # don't record start sequence
325  if self.repeat_ct < 0: return
326  # align reporting to steering report
327  if type(msg).__name__ == 'SteeringReport':
328  self.csv_writer.writerow(["{:10.06f}".format(self.elapsed_time_s()),
329  "{: 6.1f}".format(math.degrees(self.steering_wheel_cmd)),
330  "{: 6.1f}".format(math.degrees(msg.steering_wheel_cmd)),
331  "{: 6.1f}".format(math.degrees(msg.steering_wheel_angle))])
332 
333  lock2lock_degrees = math.degrees(SteeringCmd().ANGLE_MAX*2.0)
334 
335 # Gas Tests
336 # Make sure GEAR=PARK and SPEED=0
337 # Save the following data:
338 # /vehicle/throttle_cmd:pedal_cmd
339 # /vehicle/throttle_report:pedal_output
340 # /vehicle/throttle_info_report:throttle_pc
342 
343  def write_csv_header(self, csv_writer):
344  csv_writer.writerow(['Elapsed Time (s)', 'Throttle Cmd (%)', 'Measured (%)'])
345 
346  def pub(self, throttle_val):
347  if self.repeat_ct == 0:
348  rospy.loginfo("Setting throttle to %f" % throttle_val)
349  msg = ThrottleCmd()
350  if throttle_val >= 0:
351  msg.enable = True
352  msg.pedal_cmd_type = ThrottleCmd.CMD_PERCENT
353  if self.units == "percent":
354  self.pedal_cmd = throttle_val/100.0
355  msg.pedal_cmd = self.pedal_cmd
356  else:
357  raise Exception("units %s unsupported for ThrottleTest" % self.units)
358  throttle_pub.publish(msg)
359 
360  # we are watching these, lets make sure we keep getting messages
361  self.check_conditions()
362  self.mark_not_ready('ThrottleReport')
363  self.mark_not_ready('ThrottleInfoReport')
364 
365  def check_conditions(self):
366  if dont_check: return
367  # Make sure we are getting new messages
368  if not self.all_ready(['ThrottleReport', 'ThrottleInfoReport']):
369  self.stop()
370  return
371  # TODO: check if throttle_report.enabled = True?
372  # simple check doesn't work due to startup condition
373 
374  def recv_msg(self, msg):
375  # don't record start sequence
376  if self.repeat_ct < 0: return
377  # align reporting to throttle report
378  if type(msg).__name__ == 'ThrottleInfoReport':
379  self.csv_writer.writerow(["{:10.06f}".format(self.elapsed_time_s()),
380  "{:5.01f}".format(self.pedal_cmd*100),
381  "{:5.01f}".format(msg.throttle_pc)])
382 
383 # Lever Tests
384 # "To perfomer tests in safety condition, must be activated parking brake lever.
385 # This test have to guarantee that there no necessity to active brake pedal by Magneti Marelli ACU.
386 # IF THERE IS NECESSITY TO MANAGE BRAKE PEDAL PRESSION DURING LEVER CHANGE, THIS HAS TO BE COMMUNICATED TO MAGNETI MARELLI
387 # Make sure SPEED=0
388 # Save the folowing data:
389 # /vehicle/gear_cmd:cmd
390 # /vehicle/gear_report:state
391 # TODO: add lever tests class
393 
394  def write_csv_header(self, csv_writer):
395  csv_writer.writerow(['Elapsed Time (s)', 'Lever Cmd Requested', 'Lever Cmd Reported', 'Lever Position Reported'])
396 
397  def pub(self, gear):
398  if self.repeat_ct == 0:
399  rospy.loginfo("Setting gear to %s" % gear)
400  msg = GearCmd()
401  if self.units == "gear":
402  if gear == 'P':
403  msg.cmd.gear = Gear().PARK
404  elif gear == 'R':
405  msg.cmd.gear = Gear().REVERSE
406  elif gear == 'D':
407  msg.cmd.gear = Gear().DRIVE
408  elif gear == 'N':
409  msg.cmd.gear = Gear().NEUTRAL
410  else:
411  raise Exception("Gear %s is not supported" % gear)
412  self.cmd = gear
413  else:
414  raise Exception("units %s unsupported for LeverTest" % self.units)
415  gear_pub.publish(msg)
416  # publish a brake message as well
417  bmsg = BrakeCmd()
418  bmsg.enable = True
419  bmsg.pedal_cmd_type = BrakeCmd.CMD_PERCENT
420  bmsg.pedal_cmd = 0.4
421  brake_pub.publish(bmsg)
422 
423  # we are watching these, lets make sure we keep getting messages
424  self.check_conditions()
425  #self.mark_not_ready('GearReport') # these dont come fast enough
426 
427  def check_conditions(self):
428  if dont_check: return
429  # Make sure we are getting new messages
430  if not self.all_ready(['GearReport']):
431  self.stop()
432  return
433 
434  def to_gear(self, gear_num):
435  if gear_num == Gear().PARK:
436  return 'P'
437  elif gear_num == Gear().REVERSE:
438  return 'R'
439  elif gear_num == Gear().DRIVE:
440  return 'D'
441  elif gear_num == Gear().NEUTRAL:
442  return 'N'
443  elif gear_num == Gear().LOW:
444  return 'L'
445  else:
446  return 'None'
447 
448  def recv_msg(self, msg):
449  # don't record start sequence
450  if self.repeat_ct < 0: return
451  # align reporting to gear report
452  if type(msg).__name__ == 'GearReport':
453  self.csv_writer.writerow(["{:.06f}".format(self.elapsed_time_s()),
454  self.cmd, self.to_gear(msg.cmd.gear), self.to_gear(msg.state.gear)])
455 
456 def reset_gear(gear):
457  rospy.loginfo("Resetting gear to %s before starting test" % gear)
458  times = int(5/command_resolution_s)
459  r = rospy.Rate(1/command_resolution_s)
460  for i in range(0, times):
461  msg = GearCmd()
462  if gear == 'P':
463  msg.cmd.gear = Gear().PARK
464  elif gear == 'R':
465  msg.cmd.gear = Gear().REVERSE
466  elif gear == 'D':
467  msg.cmd.gear = Gear().DRIVE
468  elif gear == 'N':
469  msg.cmd.gear = Gear().NEUTRAL
470  else:
471  raise Exception("Gear %s is not supported" % gear)
472  gear_pub.publish(msg)
473 
474  bmsg = BrakeCmd()
475  bmsg.enable = True
476  bmsg.pedal_cmd_type = BrakeCmd.CMD_PERCENT
477  bmsg.pedal_cmd = 0.4
478  brake_pub.publish(bmsg)
479  r.sleep()
480 
481 perf_tests = []
482 if tests.brake:
483  perf_tests.append(BrakeTest("Square wave", [ 0, 10, 0, 10, 0, 10, 0, 10, 0, 10], period_s=1, num_reps=4))
484  perf_tests.append(BrakeTest("Square wave", [ 0, 50, 0, 50, 0, 50, 0, 50], period_s=1, num_reps=4))
485  if not tests.strict_MM:
486  perf_tests.append(BrakeTest("Square wave", [ 0, 100, 0, 100, 0, 100, 0, 100, 0, 100, 0, 100, 0, 100, 0, 100], period_s=1, num_reps=4))
487  perf_tests.append(BrakeTest("Square wave", [ 0, 100, 100, 100, 100, 100, 100, 100, 100], period_s=1, num_reps=4))
488  if not tests.quick:
489  perf_tests.append(BrakeTest("Increasing impulse wave",
490  [ 20, 0, 20.5, 0, 21, 0, 21.5, 0, 22, 0, 22.5, 0, 23, 0, 23.5, 0, 24, 0, 24.5, 0, 25],
491  period_s=5, num_reps=4))
492  perf_tests.append(BrakeTest("Square wave",
493  [0, 10, 0, 15, 0, 20, 0, 25, 0, 30, 0, 35, 0, 40, 0, 45, 0, 50,
494  0, 55, 0, 60, 0, 65, 0, 70, 0, 75, 0, 80, 0, 85, 0, 90, 0, 95, 0, 100],
495  period_s=10, num_reps=4))
496  perf_tests.append(BrakeTest("Square Wave", [ 0, 10, 15, 20, 25, 30, 35, 40, 45, 50,
497  55, 60, 65, 70, 75, 80, 85, 90, 95, 100],
498  period_s=10, num_reps=4))
499  perf_tests.append(BrakeTest("Increasing Wave",
500  [ 20, 21, 20, 21.5, 20, 22, 20, 22.5, 20, 23, 20, 23.5, 20, 24, 20, 24.5, 20, 25, 20, 25.5,
501  20, 26, 20, 26.5, 20, 27, 20, 27.5, 20, 28, 20, 28.5, 20, 29, 20, 29.5, 20, 30],
502  period_s=10, num_reps=4))
503 if tests.steer:
504  perf_tests.append(SteeringTest("Square wave",
505  [0, SteeringTest.lock2lock_degrees / 4, 0, SteeringTest.lock2lock_degrees / 4, 0, SteeringTest.lock2lock_degrees / 4, 0, SteeringTest.lock2lock_degrees / 4, 0],
506  period_s=2, units="degrees", num_reps=5)) # 0 degree - Max( from zero to lock side)
507  perf_tests.append(SteeringTest("Increasing wave",
508  [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
509  period_s=2, num_reps=2))
510  perf_tests.append(SteeringTest("Increasing wave",
511  [0, 10, 0, 20, 0, 30, 0, 40, 0, 50, 0, 60, 0, 70, 0, 80, 0, 90, 0, 100, 0],
512  period_s=2, num_reps=2))
513  # perf_tests.append(SteeringTest("Increasing impulsive wave",
514  # [ 20, 0, 20.1, 0, 20.2, 0, 20.3, 0, 20.4, 0, 20.5, 0, 20.6, 0, 20.7, 0, 20.8, 0, 20.9, 0, 21],
515  # units="degrees", period_s=5, num_reps=4))
516  perf_tests.append(SteeringTest("Increasing impulsive wave",
517  [0, 0.5, 0, -0.5, 0, 1, 0, -1, 0, 1.5, 0, -1.5, 0, 2, 0, -2, 0, 2.5, 0, -2.5],
518  units="degrees", period_s=2, num_reps=2))
519 
520 if tests.gas:
521  perf_tests.append(ThrottleTest("Step test", [ 0, 25 ], num_reps=4))
522  perf_tests.append(ThrottleTest("Step test", [ 0, 50 ], num_reps=4))
523  perf_tests.append(ThrottleTest("Step test", [ 0, 75 ], num_reps=4))
524  perf_tests.append(ThrottleTest("Step test", [ 0, 100 ], num_reps=4))
525 
526 if tests.lever:
527  perf_tests.append(LeverTest("Transition", [ 'P', 'N'], num_reps=2, units="gear"))
528  perf_tests.append(LeverTest("Transition", [ 'P', 'D'], num_reps=2, units="gear"))
529  perf_tests.append(LeverTest("Transition", [ 'P', 'R'], num_reps=2, units="gear"))
530  perf_tests.append(LeverTest("Transition", [ 'N', 'D'], num_reps=2, units="gear"))
531  perf_tests.append(LeverTest("Transition", [ 'N', 'R'], num_reps=2, units="gear"))
532  perf_tests.append(LeverTest("Transition", [ 'N', 'P'], num_reps=2, units="gear"))
533  perf_tests.append(LeverTest("Transition", [ 'D', 'N'], num_reps=2, units="gear"))
534  perf_tests.append(LeverTest("Transition", [ 'D', 'P'], num_reps=2, units="gear"))
535  perf_tests.append(LeverTest("Transition", [ 'D', 'R'], num_reps=2, units="gear"))
536  perf_tests.append(LeverTest("Transition", [ 'R', 'D'], num_reps=2, units="gear"))
537  perf_tests.append(LeverTest("Transition", [ 'R', 'P'], num_reps=2, units="gear"))
538  perf_tests.append(LeverTest("Transition", [ 'R', 'N'], num_reps=2, units="gear"))
539 
540  # just to put it back in park
541  perf_tests.append(LeverTest("Transition", [ 'P'], num_reps=2, units="gear"))
542 
543 class TestNode:
544  def __init__(self):
545  rospy.init_node('test_runner')
546  self.current_test = None
547 
548  # Variables for logging
549  self.brake_cmd = 0.0
550 
551  # Keep all of the different types of message and whether or not they are ready
552  self.msgs = {} # ( Ready?, Report )
553  self.msgs['BrakeReport'] = ( False, BrakeReport() )
554  self.msgs['BrakeInfoReport'] = ( False, BrakeInfoReport() )
555  self.msgs['GearReport'] = ( False, GearReport() )
556  self.msgs['SteeringReport'] = ( False, SteeringReport() )
557  self.msgs['ThrottleReport'] = ( False, ThrottleReport() )
558  self.msgs['ThrottleInfoReport'] = ( False, ThrottleInfoReport() )
559 
560  # Open CSV file
561  self.csv_file = open('brake_sweep_data.csv', 'w')
562  self.csv_writer = csv.writer(self.csv_file, delimiter=',')
563  self.csv_writer.writerow(['Brake (%)', 'Measured (%)', 'Pressure (bar)'])
564 
565  # Publishers and subscribers
566  # Note: publishers are global so that each of the specific test classes can publish to them
567  # without having to explicity allocate and manage them
568  global brake_pub, gear_pub, steering_pub, throttle_pub
569  brake_pub = rospy.Publisher('/vehicle/brake_cmd', BrakeCmd, queue_size=1)
570  gear_pub = rospy.Publisher('/vehicle/gear_cmd', GearCmd, queue_size=1)
571  steering_pub = rospy.Publisher('/vehicle/steering_cmd', SteeringCmd, queue_size=1)
572  throttle_pub = rospy.Publisher('/vehicle/throttle_cmd', ThrottleCmd, queue_size=1)
573 
574  rospy.Subscriber('/vehicle/brake_report', BrakeReport, self.recv_msg)
575  rospy.Subscriber('/vehicle/brake_info_report', BrakeInfoReport, self.recv_msg)
576  rospy.Subscriber('/vehicle/throttle_report', ThrottleReport, self.recv_msg)
577  rospy.Subscriber('/vehicle/throttle_info_report', ThrottleInfoReport, self.recv_msg)
578  rospy.Subscriber('/vehicle/gear_report', GearReport, self.recv_msg)
579  rospy.Subscriber('/vehicle/steering_report', SteeringReport, self.recv_msg)
580 
581  rospy.Timer(rospy.Duration(command_resolution_s), self.timer_process)
582 
583  def timer_process(self, event):
584  if not self.current_test is None and not self.current_test.done:
585  self.current_test.tick(event)
586 
587  # consolidate all of these into one that uses a dictionary
588  def recv_msg(self, msg):
589  self.msgs[type(msg).__name__] = ( True, msg)
590  if not self.current_test is None and not self.current_test.done:
591  self.current_test.recv_msg(msg)
592 
593  def run_tests(self):
594  rospy.sleep(rospy.Duration(5))
595  if rospy.is_shutdown():
596  rospy.loginfo("Stopping test")
597  sys.exit(-1)
598  for test in perf_tests:
599  if test.__class__.__name__ == "LeverTest":
600  reset_gear(test.sequence[0])
601  test.start(self.msgs)
602  self.current_test = test
603  while not rospy.is_shutdown() and not test.done:
604  rospy.sleep(rospy.Duration(0.01))
605  if rospy.is_shutdown():
606  break
607 
608 if __name__ == '__main__':
609  try:
610  node = TestNode()
611  node.run_tests()
612  except rospy.ROSInterruptException:
613  rospy.loginfo("Stopping test")
614  pass
def __init__(self, description, sequence, units="percent", num_reps=1, period_s=1)
def write_csv_header(self, csv_writer)
def write_csv_header(self, csv_writer)
def mark_not_ready(self, msg_name)
def write_csv_header(self, csv_writer)
def write_csv_header(self, csv_writer)


dbw_fca_can
Author(s): Kevin Hallenbeck
autogenerated on Wed May 12 2021 02:14:05