platform_status.py
Go to the documentation of this file.
1 import rospy
2 from threading import Lock
3 from wireless_msgs.msg import Connection as WirelessConnection
4 from sensor_msgs.msg import BatteryState
5 
6 
7 # The name of the topic for the wireless connection. This needs to match the CPR OutdoorNav API.
8 WIRELESS_CONNECTION_TOPIC_NAME = "/onboard_systems/wireless/connection"
9 # The root name of the topic for the BMS. This needs to match the CPR OutdoorNav API.
10 BMS_ROOT_TOPIC_NAME = "/onboard_systems/bms"
11 
12 
14  """Create ROS subscribers for Platform API topics and save the results."""
15 
16  def __init__(self, num_bms=0, msg_warn_period=10.0):
17  """Subscribes for updates to the various platform topics
18 
19  Parameters
20  ----------
21  num_bms : int, optional
22  Number of BMS in the system (default is 0)
23  """
24 
25  self._status_lock = Lock()
26 
27  self._wireless_connection = WirelessConnection()
28  self._wireless_connection_sub = rospy.Subscriber(
29  WIRELESS_CONNECTION_TOPIC_NAME,
30  WirelessConnection,
32 
33  if num_bms > 0:
34  self._bms_state = [BatteryState()] * num_bms
35  self._bms_state_sub = [None] * num_bms
36  for i in range(num_bms):
37  topic = BMS_ROOT_TOPIC_NAME + "/[%d]/state" % i
38  self._bms_state_sub[i] = rospy.Subscriber(
39  topic, BatteryState, self._bmsStateCallback, i)
40  else:
41  self._bms_state = []
42 
43  self._no_new_msg_period = msg_warn_period
44  self._last_wireless_msg_time = rospy.get_time()
45  self._last_bms_msg_time = rospy.get_time()
46  self._timer = rospy.Timer(period=rospy.Duration(0.2), callback=self._msgChecker)
47 
49  """Updates the wireless connection state.
50 
51  Parameters
52  ----------
53  msg : wireless_msgs.msg.Connection
54  The updated wireless connection state
55  """
56 
57  self._last_wireless_msg_time = rospy.get_time()
58  with self._status_lock:
59  self._wireless_connection = msg
60 
61  def _bmsStateCallback(self, msg, index):
62  """Updates the BMS state.
63 
64  Parameters
65  ----------
66  msg : sensor_msgs.msg.BatteryState
67  The updated battery state
68 
69  index : int
70  The index of the BMS for the msg (in the case of multi-BMS systems)
71  """
72 
73  self._last_bms_msg_time = rospy.get_time()
74  with self._status_lock:
75  self._bms_state[index] = msg
76 
77 
78  def _msgChecker(self, event):
79  """Check to see if messages have not been received within a certain amount of time"""
80 
81  now = rospy.get_time()
82  if now - self._last_wireless_msg_time > self._no_new_msg_period:
83  rospy.logwarn_throttle(self._no_new_msg_period, "No new wireless message received in the last %0.1f seconds", self._no_new_msg_period)
84  if now - self._last_bms_msg_time > self._no_new_msg_period:
85  rospy.logwarn_throttle(self._no_new_msg_period, "No new bms messages received in the last %0.1f seconds", self._no_new_msg_period)
86 
87  def report(self):
88  """Logs all status information."""
89 
90  with self._status_lock:
91  rospy.loginfo(" Platform:")
92  rospy.loginfo(" Wireless connection: Bitrate %f Mbps; Link quality: %f" % (
93  self._wireless_connection.bitrate, self._wireless_connection.link_quality))
94  for i in range(len(self._bms_state)):
95  rospy.loginfo(" BMS[%d]: %f Volts" % (i, self._bms_state[i].voltage))
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._status_lock
_status_lock
Definition: platform_status.py:25
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._last_bms_msg_time
_last_bms_msg_time
Definition: platform_status.py:45
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor
Definition: platform_status.py:13
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._bms_state
_bms_state
Definition: platform_status.py:34
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._bms_state_sub
_bms_state_sub
Definition: platform_status.py:35
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._msgChecker
def _msgChecker(self, event)
Definition: platform_status.py:78
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor.report
def report(self)
Definition: platform_status.py:87
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor.__init__
def __init__(self, num_bms=0, msg_warn_period=10.0)
Definition: platform_status.py:16
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._wireless_connection_sub
_wireless_connection_sub
Definition: platform_status.py:28
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._last_wireless_msg_time
_last_wireless_msg_time
Definition: platform_status.py:44
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._wirelessConnectionCallback
def _wirelessConnectionCallback(self, msg)
Definition: platform_status.py:48
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._bmsStateCallback
def _bmsStateCallback(self, msg, index)
Definition: platform_status.py:61
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._no_new_msg_period
_no_new_msg_period
Definition: platform_status.py:43
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._timer
_timer
Definition: platform_status.py:46
clearpath_onav_api_examples_lib.platform_status.PlatformStatusMonitor._wireless_connection
_wireless_connection
Definition: platform_status.py:27


clearpath_onav_api_examples_lib
Author(s):
autogenerated on Sun Nov 12 2023 03:29:19