main_widget.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from python_qt_binding import loadUi
4 from python_qt_binding.QtCore import Qt, QTimer, qWarning, Signal, Slot
5 from python_qt_binding.QtGui import QIcon
6 from python_qt_binding.QtWidgets import QAction, QMenu, QWidget, QTreeWidgetItem
7 import rospy
8 import rospkg
9 import roslib
10 from roslib.message import get_message_class
11 from rqt_py_common import topic_helpers
12 from trajectory_msgs.msg import JointTrajectory
13 from control_msgs.msg import FollowJointTrajectoryActionGoal
14 from moveit_msgs.msg import DisplayTrajectory
15 import numpy as np
16 from .plot_widget import PlotWidget
17 
18 
19 class MainWidget(QWidget):
20  draw_curves = Signal(object, object)
21 
22  def __init__(self):
23  super(MainWidget, self).__init__()
24  self.setObjectName('MainWidget')
25 
26  rospack = rospkg.RosPack()
27  ui_file = rospack.get_path(
28  'rqt_joint_trajectory_plot') + '/resource/JointTrajectoryPlot.ui'
29  loadUi(ui_file, self)
30 
31  self.refresh_button.setIcon(QIcon.fromTheme('view-refresh'))
32  self.pause_button.setIcon(QIcon.fromTheme('media-playback-pause'))
33 
34  self.handler = None
35  self.joint_names = []
37  self.timer = QTimer(self)
38  self.timer.timeout.connect(self.update)
39  self.plot_widget = PlotWidget(self)
40  self.plot_layout.addWidget(self.plot_widget)
41  self.draw_curves.connect(self.plot_widget.draw_curves)
42 
43  self.time = None
44  (self.dis, self.vel, self.acc, self.eff) = ({}, {}, {}, {})
45 
46  # refresh topics list in the combobox
47  self.refresh_topics()
48  self.change_topic()
49 
50  self.refresh_button.clicked.connect(self.refresh_topics)
51  self.topic_combox.currentIndexChanged.connect(self.change_topic)
52  self.select_tree.itemChanged.connect(self.update_checkbox)
53 
54  def refresh_topics(self):
55  '''
56  Refresh topic list in the combobox
57  '''
58  topic_list = rospy.get_published_topics()
59  if topic_list is None:
60  return
61  self.topic_combox.clear()
62  self.topic_name_class_map = {}
63  for (name, type) in topic_list:
64  if type in [ 'trajectory_msgs/JointTrajectory',
65  'control_msgs/FollowJointTrajectoryActionGoal',
66  'moveit_msgs/DisplayTrajectory']:
67  self.topic_name_class_map[name] = get_message_class(type)
68  self.topic_combox.addItem(name)
69 
70  def change_topic(self):
71  topic_name = self.topic_combox.currentText()
72  if not topic_name:
73  return
74  if self.handler:
75  self.handler.unregister()
76  self.joint_names = []
77  self.handler = rospy.Subscriber(
78  topic_name, rospy.AnyMsg, self.callback, topic_name)
79 
80  def close(self):
81  if self.handler:
82  self.handler.unregister()
83  self.handler = None
84 
85  def refresh_tree(self):
86  self.select_tree.itemChanged.disconnect()
87  self.select_tree.clear()
88  for joint_name in self.joint_names:
89  item = QTreeWidgetItem(self.select_tree)
90  item.setText(0, joint_name)
91  item.setCheckState(0, Qt.Unchecked)
92  item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled)
93  for traj_name in ['position', 'velocity', 'acceleration', 'effort']:
94  sub_item = QTreeWidgetItem(item)
95  sub_item.setText(0, traj_name)
96  sub_item.setCheckState(0, Qt.Unchecked)
97  sub_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled)
98  self.select_tree.itemChanged.connect(self.update_checkbox)
99 
100  def callback(self, anymsg, topic_name):
101  if self.pause_button.isChecked():
102  return
103  # In case of control_msgs/FollowJointTrajectoryActionGoal
104  # set trajectory_msgs/JointTrajectory to 'msg'
105  # Convert AnyMsg to trajectory_msgs/JointTrajectory
106  msg_class = self.topic_name_class_map[topic_name]
107  if msg_class == JointTrajectory:
108  msg = JointTrajectory().deserialize(anymsg._buff)
109  elif msg_class == FollowJointTrajectoryActionGoal:
110  msg = FollowJointTrajectoryActionGoal().deserialize(anymsg._buff).goal.trajectory
111  elif msg_class == DisplayTrajectory:
112  if DisplayTrajectory().deserialize(anymsg._buff).trajectory.__len__() > 0:
113  msg = DisplayTrajectory().deserialize(anymsg._buff).trajectory.pop().joint_trajectory
114  else:
115  rospy.logwarn("Received planned trajectory has no waypoints in it. Nothing to plot!")
116  return
117  else:
118  rospy.logerr('Wrong message type %s'%msg_class)
119  return
120  self.time = np.array([0.0] * len(msg.points))
121  (self.dis, self.vel, self.acc, self.eff) = ({}, {}, {}, {})
122  for joint_name in msg.joint_names:
123  self.dis[joint_name] = np.array([0.0] * len(msg.points))
124  self.vel[joint_name] = np.array([0.0] * len(msg.points))
125  self.acc[joint_name] = np.array([0.0] * len(msg.points))
126  self.eff[joint_name] = np.array([0.0] * len(msg.points))
127  for i in range(len(msg.points)):
128  point = msg.points[i]
129  self.time[i] = point.time_from_start.to_sec()
130  for j in range(len(msg.joint_names)):
131  joint_name = msg.joint_names[j]
132  if point.positions:
133  self.dis[joint_name][i] = point.positions[j]
134  if point.velocities:
135  self.vel[joint_name][i] = point.velocities[j]
136  if point.accelerations:
137  self.acc[joint_name][i] = point.accelerations[j]
138  if point.effort:
139  self.eff[joint_name][i] = point.effort[j]
140  if self.joint_names != msg.joint_names:
141  self.joint_names = msg.joint_names
142  self.refresh_tree()
143  self.joint_names = msg.joint_names
144  self.plot_graph()
145 
146  def plot_graph(self):
147  '''
148  Emit changed signal to call plot_widet.draw_curves()
149  '''
150  curve_names = []
151  data = {}
152  data_list = [self.dis, self.vel, self.acc, self.eff]
153  traj_names = ['position', 'velocity', 'acceleration', 'effort']
154  # Create curve name and data from checked items
155  for i in range(self.select_tree.topLevelItemCount()):
156  joint_item = self.select_tree.topLevelItem(i)
157  for n in range(len(traj_names)):
158  item = joint_item.child(n)
159  if item.checkState(0):
160  joint_name = joint_item.text(0)
161  curve_name = joint_name + ' ' + traj_names[n]
162  curve_names.append(curve_name)
163  data[curve_name] = (self.time, data_list[n][joint_name])
164  self.draw_curves.emit(curve_names, data)
165 
166  def update_checkbox(self, item, column):
167  self.recursive_check(item)
168  self.plot_graph()
169 
170  def recursive_check(self, item):
171  check_state = item.checkState(0)
172  for i in range(item.childCount()):
173  item.child(i).setCheckState(0, check_state)
174  self.recursive_check(item.child(i))
def callback(self, anymsg, topic_name)
Definition: main_widget.py:100


rqt_joint_trajectory_plot
Author(s): Ryosuke Tajima
autogenerated on Mon Nov 2 2020 03:13:10