control_status.py
Go to the documentation of this file.
1 import rospy
2 from threading import Lock
3 from clearpath_control_msgs.msg import ControlMode
4 from clearpath_control_msgs.msg import ControlSelectionState
5 
6 
7 # The name of the topic for current state. This needs to match the CPR OutdoorNav API.
8 CURRENT_STATE_TOPIC_NAME = "/control_selection/control_state"
9 # The name of the topic for current mode. This needs to match the CPR OutdoorNav API.
10 CURRENT_MODE_TOPIC_NAME = "/control_selection/current_mode"
11 
12 
14  """Create ROS subscribers for control status topics and save the results."""
15 
16  def __init__(self, msg_warn_period=10.0):
17  """Subscribes for updates to the various control topics"""
18 
19  self._status_lock = Lock()
20  self._control_state = ControlSelectionState()
21  self._control_state_sub = rospy.Subscriber(
22  CURRENT_STATE_TOPIC_NAME,
23  ControlSelectionState,
25 
26  self._current_mode = ControlMode()
27  self._current_mode_sub = rospy.Subscriber(
28  CURRENT_MODE_TOPIC_NAME,
29  ControlMode,
31 
32  self._no_new_msg_period = msg_warn_period
33  self._last_state_msg_time = rospy.get_time()
34  self._last_mode_msg_time = rospy.get_time()
35  self._timer = rospy.Timer(period=rospy.Duration(0.2), callback=self._msgChecker, oneshot=False)
36 
37  def _controlStateCallback(self, msg):
38  """Updates the control state.
39 
40  Parameters
41  ----------
42  msg : clearpath_control_msgs.msg.ControlSelectionState
43  The updated control state
44  """
45 
46  self._last_state_msg_time = rospy.get_time()
47  with self._status_lock:
48  self._control_state = msg
49 
50  def _controlModeCallback(self, msg):
51  """Updates the current mode.
52 
53  Parameters
54  ----------
55  msg : clearpath_control_msgs.msg.ControlMode
56  The updated control mode
57  """
58 
59  self._last_mode_msg_time = rospy.get_time()
60  with self._status_lock:
61  self._current_mode = msg
62 
63  def getAutonomyEnabled(self):
64  """Get value of the autonomy enabled state.
65 
66  Returns
67  -------
68  enabled : bool
69  True if autonomy is enabled, else False
70  """
71 
72  with self._status_lock:
73  return self._control_state.autonomy.enabled
74 
75  def getAutonomyPaused(self):
76  """Get value of the autonomy paused state.
77 
78  Returns
79  -------
80  enabled : bool
81  True if autonomy is paused, else False
82  """
83 
84  with self._status_lock:
85  return self._control_state.autonomy.paused
86 
87  def getCurrentMode(self):
88  """Get the current control mode.
89 
90  Returns
91  -------
92  mode : int8 (enum)
93  The current control mode (NEUTRAL, MANUAL, or AUTONOMY)
94  """
95 
96  with self._status_lock:
97  return self._current_mode.mode
98 
99  def _msgChecker(self, event):
100  """Check to see if messages have not been received within a certain amount of time"""
101 
102  now = rospy.get_time()
103  if now - self._last_state_msg_time > self._no_new_msg_period:
104  rospy.logwarn_throttle(self._no_new_msg_period, "No new control state message received in the last %0.1f seconds", self._no_new_msg_period)
105  if now - self._last_mode_msg_time > self._no_new_msg_period:
106  rospy.logwarn_throttle(self._no_new_msg_period, "No new current mode message received in the last %0.1f seconds", self._no_new_msg_period)
107 
108  def report(self):
109  """Logs all localization information."""
110 
111  with self._status_lock:
112  rospy.loginfo(" Control:")
113  rospy.loginfo(" State: Enabled: %s, Paused: %s " % (
114  self._control_state.autonomy.enabled,
115  self._control_state.autonomy.paused))
116  rospy.loginfo(" Mode: %d" % self._current_mode.mode)
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._control_state_sub
_control_state_sub
Definition: control_status.py:21
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor.getAutonomyPaused
def getAutonomyPaused(self)
Definition: control_status.py:75
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._timer
_timer
Definition: control_status.py:35
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor.getCurrentMode
def getCurrentMode(self)
Definition: control_status.py:87
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._current_mode_sub
_current_mode_sub
Definition: control_status.py:27
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._no_new_msg_period
_no_new_msg_period
Definition: control_status.py:32
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._status_lock
_status_lock
Definition: control_status.py:19
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor.__init__
def __init__(self, msg_warn_period=10.0)
Definition: control_status.py:16
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._control_state
_control_state
Definition: control_status.py:20
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._controlModeCallback
def _controlModeCallback(self, msg)
Definition: control_status.py:50
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor.getAutonomyEnabled
def getAutonomyEnabled(self)
Definition: control_status.py:63
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._controlStateCallback
def _controlStateCallback(self, msg)
Definition: control_status.py:37
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._last_state_msg_time
_last_state_msg_time
Definition: control_status.py:33
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor.report
def report(self)
Definition: control_status.py:108
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._msgChecker
def _msgChecker(self, event)
Definition: control_status.py:99
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor
Definition: control_status.py:13
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._current_mode
_current_mode
Definition: control_status.py:26
clearpath_onav_api_examples_lib.control_status.ControlStatusMonitor._last_mode_msg_time
_last_mode_msg_time
Definition: control_status.py:34


clearpath_onav_api_examples_lib
Author(s):
autogenerated on Sun Nov 12 2023 03:29:19