execute_step_plan_widget.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import os
00004 
00005 import rospy
00006 import rospkg
00007 import actionlib
00008 
00009 from python_qt_binding import loadUi
00010 from python_qt_binding.QtCore import Signal
00011 from python_qt_binding.QtGui import QWidget, QHBoxLayout, QVBoxLayout, QPushButton, QComboBox
00012 
00013 from vigir_footstep_planning_msgs.footstep_planning_msgs import *
00014 from vigir_footstep_planning_msgs.msg import StepPlan, ExecuteStepPlanAction, ExecuteStepPlanGoal
00015 from vigir_footstep_planning_lib.error_status_widget import *
00016 from vigir_footstep_planning_lib.logging import *
00017 from vigir_footstep_planning_lib.topic_widget import *
00018 
00019 
00020 # widget for sending step plans to controller
00021 class QExecuteStepPlanWidget(QWidgetWithLogger):
00022 
00023     step_plan_sub = None
00024     execute_step_plan_client = None
00025     step_plan = None
00026 
00027     step_queue_size_signal = Signal(int)
00028     executed_steps_signal = Signal(int)
00029 
00030     def __init__(self, parent=None, logger=Logger(), step_plan_topic=str()):
00031         super(QExecuteStepPlanWidget, self).__init__(parent, logger)
00032 
00033         # start widget
00034         vbox = QVBoxLayout()
00035         vbox.setMargin(0)
00036         vbox.setContentsMargins(0, 0, 0, 0)
00037 
00038         # step plan input topic selection
00039         if len(step_plan_topic) == 0:
00040             step_plan_topic_widget = QTopicWidget(topic_type='vigir_footstep_planning_msgs/StepPlan')
00041             step_plan_topic_widget.topic_changed_signal.connect(self._init_step_plan_subscriber)
00042             vbox.addWidget(step_plan_topic_widget)
00043         else:
00044             self._init_step_plan_subscriber(step_plan_topic)
00045 
00046         # execute action server topic selection
00047         execute_topic_widget = QTopicWidget(topic_type='vigir_footstep_planning_msgs/ExecuteStepPlanAction', is_action_topic=True)
00048         execute_topic_widget.topic_changed_signal.connect(self._init_execute_action_client)
00049         vbox.addWidget(execute_topic_widget)
00050 
00051         # load execute widget from ui
00052         execute_step_plan_widget = QWidget()
00053         rp = rospkg.RosPack()
00054         ui_file = os.path.join(rp.get_path('vigir_footstep_planning_lib'), 'resource', 'execute_step_plan.ui')
00055         loadUi(ui_file, execute_step_plan_widget, {'QWidget': QWidget})
00056         vbox.addWidget(execute_step_plan_widget)
00057 
00058         # execute
00059         self.execute_button = execute_step_plan_widget.executePushButton
00060         self.execute_button.clicked.connect(self.execute_button_callback)
00061 
00062         # repeat
00063         self.repeat_button = execute_step_plan_widget.repeatPushButton
00064         self.repeat_button.clicked.connect(self.execute_button_callback)
00065 
00066         # stop
00067         self.stop_button = execute_step_plan_widget.stopPushButton
00068         self.stop_button.clicked.connect(self.stop_button_callback)
00069 
00070         # progress bar
00071         self.execution_progress_bar = execute_step_plan_widget.executionProgressBar
00072         self.execution_progress_bar.setFormat("0/0")
00073         self.step_queue_size_signal.connect(self.execution_progress_bar.setMaximum)
00074         self.executed_steps_signal.connect(self.execution_progress_bar.setValue)
00075 
00076         # status info
00077         self.status_line_edit = execute_step_plan_widget.statusLineEdit
00078 
00079         # step queue info
00080         self.step_queue_line_edit = execute_step_plan_widget.stepQueueLineEdit
00081 
00082         # first changeable step info
00083         self.first_changeable_step_line_edit = execute_step_plan_widget.firstChangeableLineEdit
00084 
00085         # end widget
00086         self.setLayout(vbox)
00087 
00088         self.reset_ui()
00089 
00090         # init widget
00091         if len(step_plan_topic) == 0:
00092             step_plan_topic_widget.emit_topic_name()
00093         execute_topic_widget.emit_topic_name()
00094 
00095     def reset_ui(self):
00096         self.execute_button.setEnabled(False)
00097         self.execute_button.setText("Execute [empty]")
00098         self.repeat_button.setEnabled(False)
00099         self.repeat_button.setText("Repeat [empty]")
00100         self.stop_button.setEnabled(False)
00101         self.execution_progress_bar.reset()
00102         self.execution_progress_bar.setFormat("0/0")
00103         self.status_line_edit.setText("N/A")
00104         self.step_queue_line_edit.setText("N/A [empty]")
00105         self.first_changeable_step_line_edit.setText("N/A")
00106 
00107     def shutdown_plugin(self):
00108         print "Shutting down ..."
00109         if self.step_plan_sub is None:
00110             self.step_plan_sub.unregister()
00111         print "Done!"
00112 
00113     def _init_step_plan_subscriber(self, topic_name):
00114         if len(topic_name) > 0:
00115             if self.step_plan_sub is not None:
00116                 self.step_plan_sub.unregister()
00117             self.step_plan_sub = rospy.Subscriber(topic_name, StepPlan, self.step_plan_callback)
00118             print "Step Plan topic changed: " + topic_name
00119 
00120     def _init_execute_action_client(self, topic_name):
00121         if len(topic_name) > 0:
00122             self.execute_step_plan_client = actionlib.SimpleActionClient(topic_name, ExecuteStepPlanAction)
00123             self.execute_button.setEnabled(self.step_plan is not None)
00124             self.repeat_button.setEnabled(False)
00125             self.stop_button.setEnabled(False)
00126             print "Execution topic changed: " + topic_name
00127 
00128     def step_plan_callback(self, step_plan):
00129         if not len(step_plan.steps):
00130             return
00131 
00132         self.step_plan = step_plan
00133         self.execute_button.setText("Execute [" + str(self.step_plan.steps[0].step_index) + "; " +
00134                                     str(self.step_plan.steps[-1].step_index) + "]")
00135 
00136         self.execute_button.setEnabled(self.execute_step_plan_client is not None)
00137 
00138     def execute_button_callback(self):
00139         if self.execute_step_plan_client.wait_for_server(rospy.Duration(0.5)):
00140             # bring ui into execution mode
00141             self.reset_ui()
00142             self.repeat_button.setEnabled(True)
00143             self.repeat_button.setText("Repeat [" + str(self.step_plan.steps[0].step_index) + "; " +
00144                                         str(self.step_plan.steps[-1].step_index) + "]")
00145             self.stop_button.setEnabled(True)
00146             self.step_queue_size_signal.emit(0)
00147 
00148             self.logger.log_info("Requesting footstep plan execution...")
00149             goal = ExecuteStepPlanGoal()
00150             goal.step_plan = self.step_plan
00151             self.execute_step_plan_client.send_goal(goal, self.execute_step_plan_done_cb, self.execute_step_plan_active_cb, self.execute_step_plan_feedback_cb)
00152         else:
00153             self.logger.log_error("Can't connect to footstep controller action server!")
00154 
00155     def stop_button_callback(self):
00156         self.stop_button.setEnabled(False)
00157         self.execute_step_plan_client.cancel_goal()
00158         self.logger.log_info("Preempting step plan.")
00159 
00160     # called by action client
00161     def execute_step_plan_active_cb(self):
00162         self.logger.log_info("Execution of step plan started.")
00163 
00164     # called by action client
00165     def execute_step_plan_done_cb(self, goal_status, result):
00166         self.stop_button.setEnabled(False)
00167         if goal_status == 3:
00168             self.logger.log_info("Execution of step plan finished.")
00169         else:
00170             self.stop_button.setEnabled(False)
00171             self.step_queue_size_signal.emit(len(self.step_plan.steps))
00172             self.status_line_edit.setText("FAILED")
00173             self.logger.log_error("Execution of step plan failed!")
00174 
00175     # called by action client
00176     def execute_step_plan_feedback_cb(self, feedback):
00177         # update progress bar
00178         if feedback.last_performed_step_index >= 0:
00179             if feedback.queue_size > 0:
00180                 self.step_queue_size_signal.emit(feedback.last_queued_step_index)
00181             self.executed_steps_signal.emit(feedback.last_performed_step_index)
00182             self.execution_progress_bar.setFormat(str(feedback.last_performed_step_index) + "/" +
00183                                                   str(self.execution_progress_bar.maximum()))
00184 
00185         # update info fields
00186         self.status_line_edit.setText(walk_controller_state_to_string(feedback.controller_state))
00187 
00188         if feedback.queue_size > 0:
00189             self.step_queue_line_edit.setText(str(feedback.currently_executing_step_index) + " [" +
00190                                               str(feedback.first_queued_step_index) + "; " +
00191                                               str(feedback.last_queued_step_index) + "]")
00192 
00193             self.first_changeable_step_line_edit.setText(str(feedback.first_changeable_step_index))
00194         else:
00195             self.step_queue_line_edit.setText("N/A [empty]")
00196             self.first_changeable_step_line_edit.setText("N/A")


vigir_footstep_planning_lib
Author(s): Alexander Stumpf
autogenerated on Sat Jun 8 2019 19:01:44