main_widget.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 from python_qt_binding import loadUi
00004 from python_qt_binding.QtCore import Qt, QTimer, qWarning, Signal, Slot
00005 from python_qt_binding.QtGui import QIcon
00006 from python_qt_binding.QtWidgets import QAction, QMenu, QWidget, QTreeWidgetItem
00007 import rospy
00008 import rospkg
00009 import roslib
00010 from roslib.message import get_message_class
00011 from rqt_py_common import topic_helpers
00012 from trajectory_msgs.msg import JointTrajectory
00013 from control_msgs.msg import FollowJointTrajectoryActionGoal
00014 import numpy as np
00015 from .plot_widget import PlotWidget
00016 
00017 
00018 class MainWidget(QWidget):
00019     draw_curves = Signal(object, object)
00020 
00021     def __init__(self):
00022         super(MainWidget, self).__init__()
00023         self.setObjectName('MainWidget')
00024 
00025         rospack = rospkg.RosPack()
00026         ui_file = rospack.get_path(
00027             'rqt_joint_trajectory_plot') + '/resource/JointTrajectoryPlot.ui'
00028         loadUi(ui_file, self)
00029 
00030         self.refresh_button.setIcon(QIcon.fromTheme('view-refresh'))
00031         self.pause_button.setIcon(QIcon.fromTheme('media-playback-pause'))
00032 
00033         self.handler = None
00034         self.joint_names = []
00035         self.topic_name_class_map = {}
00036         self.timer = QTimer(self)
00037         self.timer.timeout.connect(self.update)
00038         self.plot_widget = PlotWidget(self)
00039         self.plot_layout.addWidget(self.plot_widget)
00040         self.draw_curves.connect(self.plot_widget.draw_curves)
00041 
00042         self.time = None
00043         (self.dis, self.vel, self.acc, self.eff) = ({}, {}, {}, {})
00044 
00045         # refresh topics list in the combobox
00046         self.refresh_topics()
00047         self.change_topic()
00048 
00049         self.refresh_button.clicked.connect(self.refresh_topics)
00050         self.topic_combox.currentIndexChanged.connect(self.change_topic)
00051         self.select_tree.itemChanged.connect(self.update_checkbox)
00052 
00053     def refresh_topics(self):
00054         '''
00055         Refresh topic list in the combobox
00056         '''
00057         topic_list = rospy.get_published_topics()
00058         if topic_list is None:
00059             return
00060         self.topic_combox.clear()
00061         self.topic_name_class_map = {}
00062         for (name, type) in topic_list:
00063             if type in [ 'trajectory_msgs/JointTrajectory',
00064                          'control_msgs/FollowJointTrajectoryActionGoal' ]:
00065                 self.topic_name_class_map[name] = get_message_class(type)
00066                 self.topic_combox.addItem(name)
00067 
00068     def change_topic(self):
00069         topic_name = self.topic_combox.currentText()
00070         if not topic_name:
00071             return
00072         if self.handler:
00073             self.handler.unregister()
00074         self.joint_names = []
00075         self.handler = rospy.Subscriber(
00076             topic_name, rospy.AnyMsg, self.callback, topic_name)
00077 
00078     def close(self):
00079         if self.handler:
00080             self.handler.unregister()
00081             self.handler = None
00082 
00083     def refresh_tree(self):
00084         self.select_tree.itemChanged.disconnect()
00085         self.select_tree.clear()
00086         for joint_name in self.joint_names:
00087             item = QTreeWidgetItem(self.select_tree)
00088             item.setText(0, joint_name)
00089             item.setCheckState(0, Qt.Unchecked)
00090             item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled)
00091             for traj_name in ['position', 'velocity', 'acceleration', 'effort']:
00092                 sub_item = QTreeWidgetItem(item)
00093                 sub_item.setText(0, traj_name)
00094                 sub_item.setCheckState(0, Qt.Unchecked)
00095                 sub_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled)
00096         self.select_tree.itemChanged.connect(self.update_checkbox)
00097 
00098     def callback(self, anymsg, topic_name):
00099         if self.pause_button.isChecked():
00100             return
00101         # In case of control_msgs/FollowJointTrajectoryActionGoal
00102         # set trajectory_msgs/JointTrajectory to 'msg'
00103         # Convert AnyMsg to trajectory_msgs/JointTrajectory
00104         msg_class = self.topic_name_class_map[topic_name]
00105         if msg_class == JointTrajectory:
00106             msg = JointTrajectory().deserialize(anymsg._buff)
00107         elif msg_class == FollowJointTrajectoryActionGoal:
00108             msg = FollowJointTrajectoryActionGoal().deserialize(anymsg._buff).goal.trajectory
00109         else:
00110             rospy.logerr('Wrong message type %s'%msg_class)
00111             return;
00112         self.time = np.array([0.0] * len(msg.points))
00113         (self.dis, self.vel, self.acc, self.eff) = ({}, {}, {}, {})
00114         for joint_name in msg.joint_names:
00115             self.dis[joint_name] = np.array([0.0] * len(msg.points))
00116             self.vel[joint_name] = np.array([0.0] * len(msg.points))
00117             self.acc[joint_name] = np.array([0.0] * len(msg.points))
00118             self.eff[joint_name] = np.array([0.0] * len(msg.points))
00119         for i in range(len(msg.points)):
00120             point = msg.points[i]
00121             self.time[i] = point.time_from_start.to_sec()
00122             for j in range(len(msg.joint_names)):
00123                 joint_name = msg.joint_names[j]
00124                 if point.positions:
00125                     self.dis[joint_name][i] = point.positions[j]
00126                 if point.velocities:
00127                     self.vel[joint_name][i] = point.velocities[j]
00128                 if point.accelerations:
00129                     self.acc[joint_name][i] = point.accelerations[j]
00130                 if point.effort:
00131                     self.eff[joint_name][i] = point.effort[j]
00132         if self.joint_names != msg.joint_names:
00133             self.joint_names = msg.joint_names
00134             self.refresh_tree()
00135         self.joint_names = msg.joint_names
00136         self.plot_graph()
00137 
00138     def plot_graph(self):
00139         '''
00140         Emit changed signal to call plot_widet.draw_curves()
00141         '''
00142         curve_names = []
00143         data = {}
00144         data_list = [self.dis, self.vel, self.acc, self.eff]
00145         traj_names = ['position', 'velocity', 'acceleration', 'effort']
00146         # Create curve name and data from checked items
00147         for i in range(self.select_tree.topLevelItemCount()):
00148             joint_item = self.select_tree.topLevelItem(i)
00149             for n in range(len(traj_names)):
00150                 item = joint_item.child(n)
00151                 if item.checkState(0):
00152                     joint_name = joint_item.text(0)
00153                     curve_name = joint_name + ' ' + traj_names[n]
00154                     curve_names.append(curve_name)
00155                     data[curve_name] = (self.time, data_list[n][joint_name])
00156         self.draw_curves.emit(curve_names, data)
00157 
00158     def update_checkbox(self, item, column):
00159         self.recursive_check(item)
00160         self.plot_graph()
00161 
00162     def recursive_check(self, item):
00163         check_state = item.checkState(0)
00164         for i in range(item.childCount()):
00165             item.child(i).setCheckState(0, check_state)
00166             self.recursive_check(item.child(i))


rqt_joint_trajectory_plot
Author(s): Ryosuke Tajima
autogenerated on Thu Jun 6 2019 20:46:49