Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 import rospy
00032 from std_msgs.msg import Bool
00033 from dbw_mkz_msgs.msg import ThrottleCmd
00034 from dbw_mkz_msgs.msg import GearReport, SteeringReport
00035
00036 class ThrottleThrash:
00037 def __init__(self):
00038 rospy.init_node('throttle_thrash')
00039
00040
00041 self.shutdown = False
00042
00043
00044 self.dbw_enabled = False
00045 self.msg_steering_report_ready = False
00046 self.msg_gear_report = GearReport()
00047 self.msg_gear_report_ready = False
00048 self.msg_steering_report = SteeringReport()
00049 self.msg_steering_report_ready = False
00050
00051
00052 self.started = False
00053 rospy.loginfo('Preparing to thrash the throttle pedal command to try and induce a fault...')
00054 rospy.loginfo('Validating that vehicle is parked...')
00055
00056
00057 self.cmd_state = False
00058
00059
00060 self.pub = rospy.Publisher('/vehicle/throttle_cmd', ThrottleCmd, queue_size=10)
00061 rospy.Subscriber('/vehicle/dbw_enabled', Bool, self.recv_enabled)
00062 rospy.Subscriber('/vehicle/gear_report', GearReport, self.recv_gear)
00063 rospy.Subscriber('/vehicle/steering_report', SteeringReport, self.recv_steering)
00064 rospy.Timer(rospy.Duration(0.2), self.timer_process)
00065
00066 def timer_process(self, event):
00067
00068 if not self.msg_steering_report_ready:
00069 self.shutdown = True
00070 rospy.logerr('Speed check failed. No messages on topic \'/vehicle/steering_report\'')
00071 elif not self.msg_steering_report.speed == 0.0:
00072 self.shutdown = True
00073 rospy.logerr('Speed check failed. Vehicle speed is not zero.')
00074 if not self.msg_gear_report_ready:
00075 self.shutdown = True
00076 rospy.logerr('Gear check failed. No messages on topic \'/vehicle/gear_report\'')
00077 elif not self.msg_gear_report.state.gear == self.msg_gear_report.state.PARK:
00078 self.shutdown = True
00079 rospy.logerr('Gear check failed. Vehicle not in park.')
00080
00081
00082 if self.shutdown:
00083 rospy.signal_shutdown('')
00084 return
00085
00086
00087 if not self.dbw_enabled:
00088 rospy.logwarn('Drive-by-wire not enabled!')
00089
00090
00091 if not self.started:
00092 self.started = True
00093 rospy.Timer(rospy.Duration(0.020), self.timer_cmd)
00094 rospy.loginfo('Started thrashing the throttle pedal command to try and induce a fault.')
00095
00096
00097 self.msg_gear_report_ready = False
00098 self.msg_steering_report_ready = False
00099
00100 def timer_cmd(self, event):
00101 if not self.shutdown:
00102 msg = ThrottleCmd()
00103 msg.enable = True
00104 msg.pedal_cmd_type = ThrottleCmd.CMD_PEDAL
00105 if self.cmd_state:
00106 msg.pedal_cmd = 1.0
00107 else:
00108 msg.pedal_cmd = 0.0
00109 self.pub.publish(msg)
00110 self.cmd_state = not self.cmd_state
00111
00112 def recv_enabled(self, msg):
00113 self.dbw_enabled = msg.data
00114
00115 def recv_gear(self, msg):
00116 self.msg_gear_report = msg
00117 self.msg_gear_report_ready = True
00118
00119 def recv_steering(self, msg):
00120 self.msg_steering_report = msg
00121 self.msg_steering_report_ready = True
00122
00123 def shutdown_handler(self):
00124 pass
00125
00126 if __name__ == '__main__':
00127 try:
00128 node = ThrottleThrash()
00129 rospy.on_shutdown(node.shutdown_handler)
00130 rospy.spin()
00131 except rospy.ROSInterruptException:
00132 pass