control_input_manager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 from geometry_msgs.msg import TwistStamped, Twist
4 
5 HALT = Twist()
6 
7 class CommandHandle(object):
8  """Manages timeouts and relaying incoming messages for a given control input"""
9  # Class variables
10  prev_cmd = None
11 
12  def __init__(self, sub_topic, pub_topic, timeout, stamped, sub_is_stamped=True, publish_redundant_halts=True, frame_id=''):
13  self.sub_topic = sub_topic
14  self.pub_topic = pub_topic
15  self.timeout = float(timeout)
16  self.is_stamped = stamped
17  self.sub_is_stamped = sub_is_stamped
18  self.publish_redundant_halts = publish_redundant_halts
19  self.frame_id = frame_id
20  self._pub_type = TwistStamped if stamped else Twist
21  self._sub_type = TwistStamped if sub_is_stamped else Twist
22 
23  try:
24  self.timeout = float(timeout)
25  except ValueError:
26  raise ValueError('Invalid `timeout` value for control_input: {TIMEOUT}'
27  .format(TIMEOUT=timeout))
28 
29  if not isinstance(stamped, bool):
30  raise ValueError('Invalid `stamped` value for control_input: {STAMPED}'
31  .format(STAMPED=stamped))
32 
33  self.pub = rospy.Publisher(self.pub_topic, self._pub_type, queue_size=1)
34  self.sub = rospy.Subscriber(self.sub_topic, self._sub_type, self.callback, queue_size=5)
35 
36  rospy.loginfo(
37  'Configuring Command Input Handler\n' +
38  'Subscribed Topic: {TOPIC}\n'.format(TOPIC=self.sub_topic) +
39  'Published Topic: {TOPIC}\n'.format(TOPIC=self.pub_topic) +
40  'Timeout: {TIMEOUT}\n'.format(TIMEOUT=self.timeout) +
41  'Input Topic Stamped: {STAMPED}\n'.format(STAMPED=self.sub_is_stamped) +
42  'Output Topic Stamped: {STAMPED}\n'.format(STAMPED=self.is_stamped) +
43  'Publishing Redundant Halt Commands: {HALT}\n'.format(HALT=self.publish_redundant_halts)
44  )
45 
46  def callback(self, data):
47  """Passes through message and strip strips header if necessary"""
48  if self.sub_is_stamped:
49  twist_cmd = data.twist
50  else:
51  twist_cmd = data
52 
53  if not self.publish_redundant_halts and (self.prev_cmd == HALT and twist_cmd == HALT):
54  return
55 
56  self.prev_cmd = twist_cmd
57 
58  if type(data) == TwistStamped and self.expired(data):
59  return
60 
61  self.pub.publish(self.format_cmd_data(data))
62 
63 
64  def expired(self, data):
65  """Tests message stamp to see if it has timeout"""
66  now = rospy.Time.now()
67  return 0.0 < self.timeout <= (now - data.header.stamp).to_sec()
68 
69  def format_cmd_data(self, data):
70  if (self._sub_type == TwistStamped and self._pub_type == TwistStamped) or \
71  (self._sub_type == Twist and self._pub_type == Twist):
72  return data
73  elif self._sub_type == TwistStamped and self._pub_type == Twist:
74  return data.twist
75  else:
76  data_stamped = TwistStamped()
77  data_stamped.header.frame_id = self.frame_id
78  data_stamped.header.stamp = rospy.Time.now()
79  data_stamped.twist = data
80  return data_stamped
81 
82 
84  """Creates a handle for each input. Drop inputs if timeout is not a number or
85  stamped is not a boolean.
86  """
87 
88  def __init__(self, control_inputs):
89  self.input_handles = []
90  for idx, control_input in enumerate(control_inputs):
91  try:
92  command_handle = CommandHandle(**control_input)
93  self.input_handles.append(command_handle)
94  except ValueError as e:
95  rospy.logfatal('control_input manager ({IDX}) was not created. {params}'
96  .format(IDX=idx, params=control_input))
97  rospy.logfatal(e)
98 
99  def run(self):
100  rospy.spin()
101 
102 
103 def check_params(control_inputs):
104  required_parameters = {'pub_topic', 'sub_topic', 'timeout', 'stamped'}
105  optional_parameters = {'sub_is_stamped', 'publish_redundant_halts'}
106  for control_input in control_inputs:
107  params = set(control_input.keys())
108  missing_params = required_parameters - params
109  unrecognized_params = params - required_parameters - optional_parameters
110  if len(missing_params) != 0:
111  err_msg = "Missing the parameters: " + ', '.join(list(missing_params))
112  raise ValueError(err_msg)
113  elif len(unrecognized_params) != 0:
114  rospy.logerr('Unrecognized parameters: ' + ', '.join(list(unrecognized_params)))
115 
116 
117 def main():
118  rospy.init_node('control_input_manager')
119  control_inputs = rospy.get_param('~control_inputs')
120  check_params(control_inputs)
121  cim = ControlInputManager(control_inputs)
122  cim.run()
123 
124 
125 if __name__ == '__main__':
126  main()
def __init__(self, sub_topic, pub_topic, timeout, stamped, sub_is_stamped=True, publish_redundant_halts=True, frame_id='')
def check_params(control_inputs)


rr_control_input_manager
Author(s):
autogenerated on Thu Sep 10 2020 03:38:35