execute_step_plan_widget.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 
5 import rospy
6 import rospkg
7 import actionlib
8 
9 from python_qt_binding import loadUi
10 from python_qt_binding.QtCore import Signal
11 from python_qt_binding.QtGui import QWidget, QHBoxLayout, QVBoxLayout, QPushButton, QComboBox
12 
14 from vigir_footstep_planning_msgs.msg import StepPlan, ExecuteStepPlanAction, ExecuteStepPlanGoal
18 
19 
20 # widget for sending step plans to controller
22 
23  step_plan_sub = None
24  execute_step_plan_client = None
25  step_plan = None
26 
27  step_queue_size_signal = Signal(int)
28  executed_steps_signal = Signal(int)
29 
30  def __init__(self, parent=None, logger=Logger(), step_plan_topic=str()):
31  super(QExecuteStepPlanWidget, self).__init__(parent, logger)
32 
33  # start widget
34  vbox = QVBoxLayout()
35  vbox.setMargin(0)
36  vbox.setContentsMargins(0, 0, 0, 0)
37 
38  # step plan input topic selection
39  if len(step_plan_topic) == 0:
40  step_plan_topic_widget = QTopicWidget(topic_type='vigir_footstep_planning_msgs/StepPlan')
41  step_plan_topic_widget.topic_changed_signal.connect(self._init_step_plan_subscriber)
42  vbox.addWidget(step_plan_topic_widget)
43  else:
44  self._init_step_plan_subscriber(step_plan_topic)
45 
46  # execute action server topic selection
47  execute_topic_widget = QTopicWidget(topic_type='vigir_footstep_planning_msgs/ExecuteStepPlanAction', is_action_topic=True)
48  execute_topic_widget.topic_changed_signal.connect(self._init_execute_action_client)
49  vbox.addWidget(execute_topic_widget)
50 
51  # load execute widget from ui
52  execute_step_plan_widget = QWidget()
53  rp = rospkg.RosPack()
54  ui_file = os.path.join(rp.get_path('vigir_footstep_planning_lib'), 'resource', 'execute_step_plan.ui')
55  loadUi(ui_file, execute_step_plan_widget, {'QWidget': QWidget})
56  vbox.addWidget(execute_step_plan_widget)
57 
58  # execute
59  self.execute_button = execute_step_plan_widget.executePushButton
60  self.execute_button.clicked.connect(self.execute_button_callback)
61 
62  # repeat
63  self.repeat_button = execute_step_plan_widget.repeatPushButton
64  self.repeat_button.clicked.connect(self.execute_button_callback)
65 
66  # stop
67  self.stop_button = execute_step_plan_widget.stopPushButton
68  self.stop_button.clicked.connect(self.stop_button_callback)
69 
70  # progress bar
71  self.execution_progress_bar = execute_step_plan_widget.executionProgressBar
72  self.execution_progress_bar.setFormat("0/0")
73  self.step_queue_size_signal.connect(self.execution_progress_bar.setMaximum)
74  self.executed_steps_signal.connect(self.execution_progress_bar.setValue)
75 
76  # status info
77  self.status_line_edit = execute_step_plan_widget.statusLineEdit
78 
79  # step queue info
80  self.step_queue_line_edit = execute_step_plan_widget.stepQueueLineEdit
81 
82  # first changeable step info
83  self.first_changeable_step_line_edit = execute_step_plan_widget.firstChangeableLineEdit
84 
85  # end widget
86  self.setLayout(vbox)
87 
88  self.reset_ui()
89 
90  # init widget
91  if len(step_plan_topic) == 0:
92  step_plan_topic_widget.emit_topic_name()
93  execute_topic_widget.emit_topic_name()
94 
95  def reset_ui(self):
96  self.execute_button.setEnabled(False)
97  self.execute_button.setText("Execute [empty]")
98  self.repeat_button.setEnabled(False)
99  self.repeat_button.setText("Repeat [empty]")
100  self.stop_button.setEnabled(False)
101  self.execution_progress_bar.reset()
102  self.execution_progress_bar.setFormat("0/0")
103  self.status_line_edit.setText("N/A")
104  self.step_queue_line_edit.setText("N/A [empty]")
105  self.first_changeable_step_line_edit.setText("N/A")
106 
107  def shutdown_plugin(self):
108  print "Shutting down ..."
109  if self.step_plan_sub is None:
110  self.step_plan_sub.unregister()
111  print "Done!"
112 
113  def _init_step_plan_subscriber(self, topic_name):
114  if len(topic_name) > 0:
115  if self.step_plan_sub is not None:
116  self.step_plan_sub.unregister()
117  self.step_plan_sub = rospy.Subscriber(topic_name, StepPlan, self.step_plan_callback)
118  print "Step Plan topic changed: " + topic_name
119 
120  def _init_execute_action_client(self, topic_name):
121  if len(topic_name) > 0:
122  self.execute_step_plan_client = actionlib.SimpleActionClient(topic_name, ExecuteStepPlanAction)
123  self.execute_button.setEnabled(self.step_plan is not None)
124  self.repeat_button.setEnabled(False)
125  self.stop_button.setEnabled(False)
126  print "Execution topic changed: " + topic_name
127 
128  def step_plan_callback(self, step_plan):
129  if not len(step_plan.steps):
130  return
131 
132  self.step_plan = step_plan
133  self.execute_button.setText("Execute [" + str(self.step_plan.steps[0].step_index) + "; " +
134  str(self.step_plan.steps[-1].step_index) + "]")
135 
136  self.execute_button.setEnabled(self.execute_step_plan_client is not None)
137 
139  if self.execute_step_plan_client.wait_for_server(rospy.Duration(0.5)):
140  # bring ui into execution mode
141  self.reset_ui()
142  self.repeat_button.setEnabled(True)
143  self.repeat_button.setText("Repeat [" + str(self.step_plan.steps[0].step_index) + "; " +
144  str(self.step_plan.steps[-1].step_index) + "]")
145  self.stop_button.setEnabled(True)
146  self.step_queue_size_signal.emit(0)
147 
148  self.logger.log_info("Requesting footstep plan execution...")
149  goal = ExecuteStepPlanGoal()
150  goal.step_plan = self.step_plan
151  self.execute_step_plan_client.send_goal(goal, self.execute_step_plan_done_cb, self.execute_step_plan_active_cb, self.execute_step_plan_feedback_cb)
152  else:
153  self.logger.log_error("Can't connect to footstep controller action server!")
154 
156  self.stop_button.setEnabled(False)
157  self.execute_step_plan_client.cancel_goal()
158  self.logger.log_info("Preempting step plan.")
159 
160  # called by action client
162  self.logger.log_info("Execution of step plan started.")
163 
164  # called by action client
165  def execute_step_plan_done_cb(self, goal_status, result):
166  self.stop_button.setEnabled(False)
167  if goal_status == 3:
168  self.logger.log_info("Execution of step plan finished.")
169  else:
170  self.stop_button.setEnabled(False)
171  self.step_queue_size_signal.emit(len(self.step_plan.steps))
172  self.status_line_edit.setText("FAILED")
173  self.logger.log_error("Execution of step plan failed!")
174 
175  # called by action client
176  def execute_step_plan_feedback_cb(self, feedback):
177  # update progress bar
178  if feedback.last_performed_step_index >= 0:
179  if feedback.queue_size > 0:
180  self.step_queue_size_signal.emit(feedback.last_queued_step_index)
181  self.executed_steps_signal.emit(feedback.last_performed_step_index)
182  self.execution_progress_bar.setFormat(str(feedback.last_performed_step_index) + "/" +
183  str(self.execution_progress_bar.maximum()))
184 
185  # update info fields
186  self.status_line_edit.setText(walk_controller_state_to_string(feedback.controller_state))
187 
188  if feedback.queue_size > 0:
189  self.step_queue_line_edit.setText(str(feedback.currently_executing_step_index) + " [" +
190  str(feedback.first_queued_step_index) + "; " +
191  str(feedback.last_queued_step_index) + "]")
192 
193  self.first_changeable_step_line_edit.setText(str(feedback.first_changeable_step_index))
194  else:
195  self.step_queue_line_edit.setText("N/A [empty]")
196  self.first_changeable_step_line_edit.setText("N/A")
def __init__(self, parent=None, logger=Logger(), step_plan_topic=str())


vigir_footstep_planning_lib
Author(s): Alexander Stumpf
autogenerated on Mon Jun 10 2019 15:47:33