throttle_thrash.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2018, Dataspeed Inc.
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without modification,
9 # are permitted provided that the following conditions are met:
10 #
11 # * Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above copyright notice,
14 # this list of conditions and the following disclaimer in the documentation
15 # and/or other materials provided with the distribution.
16 # * Neither the name of Dataspeed Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from this
18 # software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
21 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 import rospy
32 from std_msgs.msg import Bool
33 from dbw_fca_msgs.msg import ThrottleCmd
34 from dbw_fca_msgs.msg import GearReport, SteeringReport
35 
37  def __init__(self):
38  rospy.init_node('throttle_thrash')
39 
40  # Shutdown
41  self.shutdown = False
42 
43  # Received messages
44  self.dbw_enabled = False
46  self.msg_gear_report = GearReport()
47  self.msg_gear_report_ready = False
48  self.msg_steering_report = SteeringReport()
49  self.msg_steering_report_ready = False
50 
51  # Parameters
52  self.started = False
53  rospy.loginfo('Preparing to thrash the throttle pedal command to try and induce a fault...')
54  rospy.loginfo('Validating that vehicle is parked...')
55 
56  # Command state
57  self.cmd_state = False
58 
59  # Publishers and subscribers
60  self.pub = rospy.Publisher('/vehicle/throttle_cmd', ThrottleCmd, queue_size=10)
61  rospy.Subscriber('/vehicle/dbw_enabled', Bool, self.recv_enabled)
62  rospy.Subscriber('/vehicle/gear_report', GearReport, self.recv_gear)
63  rospy.Subscriber('/vehicle/steering_report', SteeringReport, self.recv_steering)
64  rospy.Timer(rospy.Duration(0.2), self.timer_process)
65 
66  def timer_process(self, event):
67  # Check for safe conditions
68  if not self.msg_steering_report_ready:
69  self.shutdown = True
70  rospy.logerr('Speed check failed. No messages on topic \'/vehicle/steering_report\'')
71  elif not self.msg_steering_report.speed == 0.0:
72  self.shutdown = True
73  rospy.logerr('Speed check failed. Vehicle speed is not zero.')
74  if not self.msg_gear_report_ready:
75  self.shutdown = True
76  rospy.logerr('Gear check failed. No messages on topic \'/vehicle/gear_report\'')
77  elif not self.msg_gear_report.state.gear == self.msg_gear_report.state.PARK:
78  self.shutdown = True
79  rospy.logerr('Gear check failed. Vehicle not in park.')
80 
81  # Check if enabled
82  if self.shutdown:
83  rospy.signal_shutdown('')
84  return
85 
86  # Check if enabled
87  if not self.dbw_enabled:
88  rospy.logwarn('Drive-by-wire not enabled!')
89 
90  # Start command timers
91  if not self.started:
92  self.started = True
93  rospy.Timer(rospy.Duration(0.020), self.timer_cmd)
94  rospy.loginfo('Started thrashing the throttle pedal command to try and induce a fault.')
95 
96  # Prepare for next iteration
97  self.msg_gear_report_ready = False
98  self.msg_steering_report_ready = False
99 
100  def timer_cmd(self, event):
101  if not self.shutdown:
102  msg = ThrottleCmd()
103  msg.enable = True
104  msg.pedal_cmd_type = ThrottleCmd.CMD_PEDAL
105  if self.cmd_state:
106  msg.pedal_cmd = 1.0
107  else:
108  msg.pedal_cmd = 0.0
109  self.pub.publish(msg)
110  self.cmd_state = not self.cmd_state
111 
112  def recv_enabled(self, msg):
113  self.dbw_enabled = msg.data
114 
115  def recv_gear(self, msg):
116  self.msg_gear_report = msg
117  self.msg_gear_report_ready = True
118 
119  def recv_steering(self, msg):
120  self.msg_steering_report = msg
121  self.msg_steering_report_ready = True
122 
123  def shutdown_handler(self):
124  pass
125 
126 if __name__ == '__main__':
127  try:
128  node = ThrottleThrash()
129  rospy.on_shutdown(node.shutdown_handler)
130  rospy.spin()
131  except rospy.ROSInterruptException:
132  pass


dbw_fca_can
Author(s): Kevin Hallenbeck
autogenerated on Wed May 12 2021 02:14:05