Source code for py_trees_ros.actions

#!/usr/bin/env python
#
# License: BSD
#   https://raw.github.com/stonier/py_trees_ros/license/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
Various instantiable templates for action client/server style behaviours.
"""

##############################################################################
# Imports
##############################################################################

import actionlib
import py_trees
import rospy
import actionlib_msgs.msg as actionlib_msgs

##############################################################################
# Behaviours
##############################################################################


[docs]class ActionClient(py_trees.behaviour.Behaviour): """ A generic action client interface. This simply sends a pre-configured goal to the action client. Cases where you might want to subclass and extend this behaviour: * Update the goal data in :meth:`~py_trees_ros.actions.ActionClient.initialise()` * e.g. use blackboard data to determine the new characteristics of your goal * Trigger true pre-emption by sending a new goal in :meth:`~py_trees_ros.actions.ActionClient.update()` Args: name (:obj:`str`): name of the behaviour action_spec (:obj:`any`): spec type for the action (e.g. move_base_msgs.msg.MoveBaseAction) action_goal (:obj:`any`): preconfigured action goal (e.g. move_base_msgs.msg.MoveBaseGoal()) action_namespace (:obj:`str`): where you can find the action topics override_feedback_message_on_running (:obj:`str`): override the feedback message from the server Feedback messages are often accompanied with detailed messages that continuously change - since these are not significant and we don't want to log every change due to these messages, you can provide an override here that simply signifies the action is running. @todo: a more comprehensive way of filtering/parsing feedback messages to customise a running state so that it can identify and react to 'significant' events while running. """ def __init__(self, name="Action Client", action_spec=None, action_goal=None, action_namespace="/action", override_feedback_message_on_running="moving"): super(ActionClient, self).__init__(name) self.action_client = None self.sent_goal = False self.action_spec = action_spec self.action_goal = action_goal self.action_namespace = action_namespace self.override_feedback_message_on_running = override_feedback_message_on_running
[docs] def setup(self, timeout): """ Args: timeout (:obj:`float`): time to wait (0.0 is blocking forever) Returns: :obj:`bool`: whether it timed out trying to setup """ self.logger.debug("%s.setup()" % self.__class__.__name__) self.action_client = actionlib.SimpleActionClient( self.action_namespace, self.action_spec ) if not self.action_client.wait_for_server(rospy.Duration(timeout)): self.logger.error("{0}.setup() could not connect to the action server at '{1}'".format(self.__class__.__name__, self.action_namespace)) self.action_client = None return False return True
[docs] def initialise(self): """ Reset the internal variables. """ self.logger.debug("{0}.initialise()".format(self.__class__.__name__)) self.sent_goal = False
[docs] def update(self): """ Check only to see whether the underlying action server has succeeded, is running, or has cancelled/aborted for some reason and map these to the usual behaviour return states. """ self.logger.debug("{0}.update()".format(self.__class__.__name__)) if not self.action_client: self.feedback_message = "no action client, did you call setup() on your tree?" return py_trees.Status.INVALID # pity there is no 'is_connected' api like there is for c++ if not self.sent_goal: self.action_client.send_goal(self.action_goal) self.sent_goal = True self.feedback_message = "sent goal to the action server" return py_trees.Status.RUNNING self.feedback_message = self.action_client.get_goal_status_text() if self.action_client.get_state() in [actionlib_msgs.GoalStatus.ABORTED, actionlib_msgs.GoalStatus.PREEMPTED]: return py_trees.Status.FAILURE result = self.action_client.get_result() if result: return py_trees.Status.SUCCESS else: self.feedback_message = self.override_feedback_message_on_running return py_trees.Status.RUNNING
[docs] def terminate(self, new_status): """ If running and the current goal has not already succeeded, cancel it. Args: new_status (:class:`~py_trees.common.Status`): the behaviour is transitioning to this new status """ self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status)) if self.action_client is not None and self.sent_goal: motion_state = self.action_client.get_state() if ((motion_state == actionlib_msgs.GoalStatus.PENDING) or (motion_state == actionlib_msgs.GoalStatus.ACTIVE) or (motion_state == actionlib_msgs.GoalStatus.PREEMPTING) or (motion_state == actionlib_msgs.GoalStatus.RECALLING)): self.action_client.cancel_goal() self.sent_goal = False