navigation_status.py
Go to the documentation of this file.
1 import rospy
2 from threading import Lock
3 from clearpath_navigation_msgs.msg import DistanceToGoal
4 from clearpath_navigation_msgs.msg import MotionState
5 from geometry_msgs.msg import PoseArray
6 from clearpath_navigation_msgs.msg import Progress
7 from clearpath_navigation_msgs.msg import NavigationState
8 from geometry_msgs.msg import Twist
9 
10 
11 # The name of the topic for the distance to goal. This needs to match the CPR OutdoorNav API.
12 DISTANCE_TO_GOAL_TOPIC_NAME = "/navigation/distance_to_goal"
13 # The name of the topic for the motion state. This needs to match the CPR OutdoorNav API.
14 MOTION_STATE_TOPIC_NAME = "/navigation/motion_state"
15 # The name of the topic for the navigation path. This needs to match the CPR OutdoorNav API.
16 NAV_PATH_TOPIC_NAME = "/navigation/path"
17 # The name of the topic for the navigation progress. This needs to match the CPR OutdoorNav API.
18 NAV_PROGRESS_TOPIC_NAME = "/navigation/progress"
19 # The name of the topic for the navigation commanded velocity. This needs to match the CPR OutdoorNav API.
20 NAV_CMD_VEL_TOPIC_NAME = "/navigation/cmd_vel"
21 # The name of the topic for the navigation commanded velocity. This needs to match the CPR OutdoorNav API.
22 PLATFORM_VEL_TOPIC_NAME = "/platform/cmd_vel"
23 
24 
26  """Create ROS subscribers for navigation status topics and save the results."""
27 
28  def __init__(self, store_data=False, msg_warn_period=10.0):
29  """Subscribes for updates to the various navigation topics"""
30 
31  self._store_data = store_data
32  self._status_lock = Lock()
33 
34  self._last_platform_vel = Twist()
35  self._platform_vel_sub = rospy.Subscriber(
36  PLATFORM_VEL_TOPIC_NAME,
37  Twist, self._platformVelCallback)
38 
39  self._last_nav_cmd_vel = Twist()
40  self._nav_vel_sub = rospy.Subscriber(
41  NAV_CMD_VEL_TOPIC_NAME,
42  Twist, self._navVelCallback)
43 
44  self._distance_to_goal = DistanceToGoal()
45  self._distance_to_goal_sub = rospy.Subscriber(
46  DISTANCE_TO_GOAL_TOPIC_NAME,
47  DistanceToGoal,
49 
50  self._motion_state = MotionState()
51  self._motion_state_sub = rospy.Subscriber(
52  MOTION_STATE_TOPIC_NAME,
53  MotionState,
55 
56  self._new_path_received = False
57  self._planned_path = PoseArray()
58  self._planned_path_sub = rospy.Subscriber(
59  NAV_PATH_TOPIC_NAME,
60  PoseArray,
62 
63  self._progress = Progress()
64  self._progress_sub = rospy.Subscriber(
65  NAV_PROGRESS_TOPIC_NAME,
66  Progress,
67  self._progressCallback)
68 
69  self._no_new_msg_period = msg_warn_period
70  self._last_platform_vel_msg_time = rospy.get_time()
71  self._last_nav_cmd_vel_msg_time = rospy.get_time()
72  self._last_distance_msg_time = rospy.get_time()
73  self._last_motion_state_msg_time = rospy.get_time()
74  self._last_progress_msg_time = rospy.get_time()
75  self._timer = rospy.Timer(period=rospy.Duration(0.2), callback=self._msgChecker, oneshot=False)
76 
77  self._time = []
78  self._platform_vel = {
79  "lin_x": [],
80  "ang_z": [],
81  }
82  self._nav_cmd_vel = {
83  "lin_x": [],
84  "ang_z": [],
85  }
86  self._nav_path = {
87  "x": [],
88  "y": [],
89  }
90 
91  def _platformVelCallback(self, msg):
92  """Updates the platform velocity.
93 
94  Parameters
95  ----------
96  msg : geometry_msgs.msg.Twist
97  The updated platform velocity
98  """
99 
100  self._last_platform_vel_msg_time = rospy.get_time()
101  with self._status_lock:
102  self._last_platform_vel = msg
103 
104  def _navVelCallback(self, msg):
105  """Updates the navigation commanded velocity.
106 
107  Parameters
108  ----------
109  msg : geometry_msgs.msg.Twist
110  The updated navigation commanded velocity
111  """
112 
113  self._last_nav_cmd_vel_msg_time = rospy.get_time()
114  with self._status_lock:
115  self._last_nav_cmd_vel = msg
116  if self._store_data:
117  self._time.append(rospy.get_time())
118  self._platform_vel["lin_x"].append(self._last_platform_vel.linear.x)
119  self._platform_vel["ang_z"].append(self._last_platform_vel.angular.z)
120  self._nav_cmd_vel["lin_x"].append(self._last_nav_cmd_vel.linear.x)
121  self._nav_cmd_vel["ang_z"].append(self._last_nav_cmd_vel.angular.z)
122 
123  def _distanceToGoalCallback(self, msg):
124  """Updates the distance to goal state.
125 
126  Parameters
127  ----------
128  msg : clearpath_navigation_msgs.msg.DistanceToGoal
129  The updated distance to goal state
130  """
131 
132  self._last_distance_msg_time = rospy.get_time()
133  with self._status_lock:
134  self._distance_to_goal = msg
135 
136  def _motionStateCallback(self, msg):
137  """Updates the motion state.
138 
139  Parameters
140  ----------
141  msg : clearpath_navigation_msgs.msg.MotionState
142  The updated motion state
143  """
144 
145  self._last_motion_state_msg_time = rospy.get_time()
146  with self._status_lock:
147  self._motion_state = msg
148 
149  def _plannedPathCallback(self, msg):
150  """Updates the planned path state.
151 
152  Parameters
153  ----------
154  msg : geometry_msgs.msg.PoseArray
155  The updated planned path state
156  """
157 
158  self._new_path_received = True
159  with self._status_lock:
160  self._planned_path = msg
161  if self._store_data:
162  for i in range(len(self._planned_path.poses)):
163  self._nav_path["x"].append(self._planned_path.poses[i].position.x)
164  self._nav_path["y"].append(self._planned_path.poses[i].position.y)
165 
166  def _progressCallback(self, msg):
167  """Updates the progress state.
168 
169  Parameters
170  ----------
171  msg : clearpath_navigation_msgs.msg.Progress
172  The updated progress state
173  """
174 
175  self._last_progress_msg_time = rospy.get_time()
176  with self._status_lock:
177  self._progress = msg
178 
179  def _msgChecker(self, event):
180  """Check to see if messages have not been received within a certain amount of time"""
181 
182  now = rospy.get_time()
183  if now - self._last_platform_vel_msg_time > self._no_new_msg_period:
184  rospy.logwarn_throttle(self._no_new_msg_period, "No new platform velocity message received in the last %0.1f seconds", self._no_new_msg_period)
185  if now - self._last_nav_cmd_vel_msg_time > self._no_new_msg_period:
186  rospy.logwarn_throttle(self._no_new_msg_period, "No new navigation command velocity message received in the last %0.1f seconds", self._no_new_msg_period)
187  if now - self._last_distance_msg_time > self._no_new_msg_period:
188  rospy.logwarn_throttle(self._no_new_msg_period, "No new distance to goal message received in the last %0.1f seconds", self._no_new_msg_period)
189  if now - self._last_motion_state_msg_time > self._no_new_msg_period:
190  rospy.logwarn_throttle(self._no_new_msg_period, "No new motion state received in the last %0.1f seconds", self._no_new_msg_period)
191  if now - self._last_progress_msg_time > self._no_new_msg_period:
192  rospy.logwarn_throttle(self._no_new_msg_period, "No new progress message received in the last %0.1f seconds", self._no_new_msg_period)
193 
194  def report(self):
195  """Logs all localization information."""
196 
197  with self._status_lock:
198  rospy.loginfo(" Navigation:")
199  rospy.loginfo(" Distance to Goal: Euclidean: %f, Path: %f" % (
200  self._distance_to_goal.euclidean,
201  self._distance_to_goal.path))
202  rospy.loginfo(" Motion State: Motion: %d, Indicator: %d, Direction: %d" % (
203  self._motion_state.motion,
204  self._motion_state.indicator,
205  self._motion_state.direction))
206  if self._new_path_received:
207  rospy.loginfo(" Planned Path:")
208  for pose in self._planned_path.poses:
209  rospy.loginfo(" (%f, %f)" % (
210  pose.position.x, pose.position.y))
211  self._new_path_received = False
212  rospy.loginfo(" Progress: Path: %f, Goal: %f, Mission: %f" % (
213  self._progress.path_progress,
214  self._progress.goal_progress,
215  self._progress.mission_progress))
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._navVelCallback
def _navVelCallback(self, msg)
Definition: navigation_status.py:104
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._progress_sub
_progress_sub
Definition: navigation_status.py:64
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._plannedPathCallback
def _plannedPathCallback(self, msg)
Definition: navigation_status.py:149
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._last_nav_cmd_vel
_last_nav_cmd_vel
Definition: navigation_status.py:39
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._last_progress_msg_time
_last_progress_msg_time
Definition: navigation_status.py:74
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._new_path_received
_new_path_received
Definition: navigation_status.py:56
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._msgChecker
def _msgChecker(self, event)
Definition: navigation_status.py:179
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor.__init__
def __init__(self, store_data=False, msg_warn_period=10.0)
Definition: navigation_status.py:28
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._last_distance_msg_time
_last_distance_msg_time
Definition: navigation_status.py:72
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._progress
_progress
Definition: navigation_status.py:63
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._nav_vel_sub
_nav_vel_sub
Definition: navigation_status.py:40
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor.report
def report(self)
Definition: navigation_status.py:194
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._motion_state
_motion_state
Definition: navigation_status.py:50
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._nav_path
_nav_path
Definition: navigation_status.py:86
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._no_new_msg_period
_no_new_msg_period
Definition: navigation_status.py:69
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._nav_cmd_vel
_nav_cmd_vel
Definition: navigation_status.py:82
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._last_nav_cmd_vel_msg_time
_last_nav_cmd_vel_msg_time
Definition: navigation_status.py:71
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._platform_vel_sub
_platform_vel_sub
Definition: navigation_status.py:35
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._timer
_timer
Definition: navigation_status.py:75
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._platformVelCallback
def _platformVelCallback(self, msg)
Definition: navigation_status.py:91
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._platform_vel
_platform_vel
Definition: navigation_status.py:78
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._progressCallback
def _progressCallback(self, msg)
Definition: navigation_status.py:166
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._motionStateCallback
def _motionStateCallback(self, msg)
Definition: navigation_status.py:136
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._status_lock
_status_lock
Definition: navigation_status.py:32
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._last_motion_state_msg_time
_last_motion_state_msg_time
Definition: navigation_status.py:73
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._store_data
_store_data
Definition: navigation_status.py:31
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._motion_state_sub
_motion_state_sub
Definition: navigation_status.py:51
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._distance_to_goal
_distance_to_goal
Definition: navigation_status.py:44
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor
Definition: navigation_status.py:25
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._last_platform_vel_msg_time
_last_platform_vel_msg_time
Definition: navigation_status.py:70
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._distanceToGoalCallback
def _distanceToGoalCallback(self, msg)
Definition: navigation_status.py:123
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._distance_to_goal_sub
_distance_to_goal_sub
Definition: navigation_status.py:45
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._time
_time
Definition: navigation_status.py:77
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._last_platform_vel
_last_platform_vel
Definition: navigation_status.py:34
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._planned_path_sub
_planned_path_sub
Definition: navigation_status.py:58
clearpath_onav_api_examples_lib.navigation_status.NavigationStatusMonitor._planned_path
_planned_path
Definition: navigation_status.py:57


clearpath_onav_api_examples_lib
Author(s):
autogenerated on Sun Nov 12 2023 03:29:19