plot_optimal_control_results.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 #
4 # Software License Agreement
5 #
6 # Copyright (c) 2020, Christoph Roesmann, All rights reserved.
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <https://www.gnu.org/licenses/>.
20 #
21 # Authors: Christoph Roesmann
22 
23 import rospy, math
24 from mpc_local_planner_msgs.msg import OptimalControlResult
25 import numpy as np
26 import matplotlib.pyplot as plt
27 import matplotlib.ticker as ticker
28 from threading import Lock
29 
31 
32  def __init__(self, plot_states, topic_name):
33 
34  self.initialized = False
35  self.plot_states = plot_states
36  self.dim_states = 0
37  self.dim_controls = 0
38  self.x_fig = plt.Figure()
39  self.x_axes = []
40  self.u_fig = plt.Figure()
41  self.u_axes = []
42  self.tx = []
43  self.x = []
44  self.tu = []
45  self.u = []
46  self.mutex = Lock()
47 
48  self.sub = rospy.Subscriber(topic_name, OptimalControlResult, self.ocp_result_callback, queue_size = 1)
49  rospy.loginfo("Plotting OCP results published on '%s'.",topic_name)
50  rospy.loginfo("Make sure to enable rosparam 'controller/publish_ocp_results' in the mpc_local_planner.")
51 
52  def ocp_result_callback(self, data):
53  rospy.loginfo_once("First message received.")
54  if not self.initialized:
55  self.dim_states = data.dim_states
56  self.dim_controls = data.dim_controls
57 
58  # Read data
59  self.mutex.acquire()
60  if self.plot_states:
61  self.tx = np.array(data.time_states)
62  self.x = np.matrix(data.states)
63  self.x = np.reshape(self.x, (self.tx.size, int(self.dim_states)))
64  self.tu = np.array(data.time_controls)
65  self.u = np.matrix(data.controls)
66  self.u = np.reshape(self.u, (self.tu.size, int(self.dim_controls)))
67  self.mutex.release()
68 
70  if self.plot_states:
71  self.x_fig, self.x_axes = plt.subplots(self.dim_states, sharex=True)
72  self.x_axes[0].set_title('States')
73  for idx, ax in enumerate(self.x_axes):
74  ax.set_ylabel("x" + str(idx))
75  self.x_axes[-1].set_xlabel('Time [s]')
76 
77  self.u_fig, self.u_axes = plt.subplots(self.dim_controls, sharex=True)
78  self.u_axes[0].set_title('Controls')
79  for idx, ax in enumerate(self.u_axes):
80  ax.set_ylabel("u" + str(idx))
81  self.u_axes[-1].set_xlabel('Time [s]')
82  plt.ion()
83  plt.show()
84 
85 
86  def plot(self):
87  # We recreate the plot every time, not fast, but okay for now....
88  self.mutex.acquire()
89  if self.plot_states:
90  for idx, ax in enumerate(self.x_axes):
91  ax.cla()
92  ax.grid()
93  ax.plot(self.tx, self.x[:,idx])
94  ax.get_yaxis().set_major_formatter(ticker.FuncFormatter(lambda x, p: "%.2f" % x))
95  ax.set_ylabel("x" + str(idx))
96  self.x_axes[0].set_title('States')
97  self.x_axes[-1].set_xlabel('Time [s]')
98  self.x_fig.canvas.draw()
99 
100  for idx, ax in enumerate(self.u_axes):
101  ax.cla()
102  ax.grid()
103  ax.step(self.tu, self.u[:, idx], where='post')
104  ax.get_yaxis().set_major_formatter(ticker.FuncFormatter(lambda x, p: "%.2f" % x))
105  ax.set_ylabel("u" + str(idx))
106  self.u_axes[0].set_title('Controls')
107  self.u_axes[-1].set_xlabel('Time [s]')
108  self.u_fig.canvas.draw()
109  self.mutex.release()
110 
111  def start(self, rate):
112  r = rospy.Rate(rate) # define rate here
113  while not rospy.is_shutdown():
114  if not self.initialized and (self.dim_states > 0 or self.dim_controls > 0):
115  self.initializePlotWindows()
116  self.initialized = True
117  if self.initialized:
118  self.plot()
119  r.sleep()
120 
121 
122 
123 if __name__ == '__main__':
124  try:
125 
126  rospy.init_node("ocp_result_plotter", anonymous=True)
127 
128  topic_name = "/test_mpc_optim_node/ocp_result"
129  topic_name = rospy.get_param('~ocp_result_topic', topic_name)
130 
131  plot_states = rospy.get_param('~plot_states', False)
132 
133  result_plotter = OcpResultPlotter(plot_states, topic_name)
134  rate = 2
135  topic_name = rospy.get_param('~plot_rate', rate)
136  result_plotter.start(rate)
137  except rospy.ROSInterruptException:
138  pass


mpc_local_planner
Author(s): Christoph Rösmann
autogenerated on Mon Feb 28 2022 22:53:18