Source code for py_trees_ros.tutorials.jobs

#!/usr/bin/env python
#
# License: BSD
#   https://raw.githubusercontent.com/stonier/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
A few behaviours to support the tutorials.
"""

##############################################################################
# Imports
##############################################################################

import copy
import move_base_msgs.msg as move_base_msgs
import py_trees
import py_trees_msgs.msg as py_trees_msgs
import py_trees_ros
import rospy
import std_msgs.msg as std_msgs
import threading

##############################################################################
# Behaviours
##############################################################################


[docs]class Scan(object): """ A job handler that instantiates a subtree for scanning to be executed by a behaviour tree. """
[docs] def __init__(self): """ Tune into a channel for incoming goal requests. This is a simple subscriber here but more typically would be a service or action interface. """ self._subscriber = rospy.Subscriber("/dashboard/scan", std_msgs.Empty, self.incoming) self._goal = None self._lock = threading.Lock()
@property def goal(self): """ Getter for the variable indicating whether or not a goal has recently been received but not yet handled. It simply makes sure it is wrapped with the appropriate locking. """ with self._lock: g = copy.copy(self._goal) or self._goal return g @goal.setter def goal(self, value): """ Setter for the variable indicating whether or not a goal has recently been received but not yet handled. It simply makes sure it is wrapped with the appropriate locking. """ with self._lock: self._goal = value
[docs] def incoming(self, msg): """ Incoming goal callback. Args: msg (:class:`~std_msgs.Empty`): incoming goal message """ if self.goal: rospy.logerr("Scan: rejecting new goal, previous still in the pipeline") else: self.goal = msg
[docs] def create_report_string(self, subtree_root): """ Introspect the subtree root to determine an appropriate human readable status report string. Args: subtree_root (:class:`~py_trees.behaviour.Behaviour`): introspect the subtree Returns: :obj:`str`: human readable substring """ if subtree_root.tip().has_parent_with_name("Cancelling?"): return "cancelling" else: return "scanning"
[docs] @staticmethod def create_root(goal=std_msgs.Empty()): """ Create the job subtree based on the incoming goal specification. Args: goal (:class:`~std_msgs.msg.Empty`): incoming goal specification Returns: :class:`~py_trees.behaviour.Behaviour`: subtree root """ # beahviors root = py_trees.composites.Selector(name="Scan or Die") die = py_trees_ros.tutorials.behaviours.FlashLedStrip( name="Uh Oh", colour="red") ere_we_go = py_trees.composites.Sequence(name="Ere we Go") undock = py_trees_ros.actions.ActionClient( name="UnDock", action_namespace="/dock", action_spec=py_trees_msgs.DockAction, action_goal=py_trees_msgs.DockGoal(False), override_feedback_message_on_running="undocking" ) scan_or_be_cancelled = py_trees.composites.Selector("Scan or Be Cancelled") cancelling = py_trees.composites.Sequence("Cancelling?") is_cancel_requested = py_trees.blackboard.CheckBlackboardVariable( name="Cancel?", variable_name='event_cancel_button', expected_value=True ) move_home_after_cancel = py_trees_ros.actions.ActionClient( name="Move Home", action_namespace="/move_base", action_spec=move_base_msgs.MoveBaseAction, action_goal=move_base_msgs.MoveBaseGoal() ) move_out_and_scan = py_trees.composites.Sequence("Move Out and Scan") move_base = py_trees_ros.actions.ActionClient( name="Move Out", action_namespace="/move_base", action_spec=move_base_msgs.MoveBaseAction, action_goal=move_base_msgs.MoveBaseGoal() ) scanning = py_trees.composites.Parallel(name="Scanning", policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE) scan_context_switch = py_trees_ros.tutorials.behaviours.ScanContext("Context Switch") rospy.logwarn("Setting up rotating") scan_rotate = py_trees_ros.actions.ActionClient( name="Rotate", action_namespace="/rotate", action_spec=py_trees_msgs.RotateAction, action_goal=py_trees_msgs.RotateGoal(), override_feedback_message_on_running="rotating" ) scan_flash_blue = py_trees_ros.tutorials.behaviours.FlashLedStrip(name="Flash Blue", colour="blue") move_home_after_scan = py_trees_ros.actions.ActionClient( name="Move Home", action_namespace="/move_base", action_spec=move_base_msgs.MoveBaseAction, action_goal=move_base_msgs.MoveBaseGoal() ) celebrate = py_trees.composites.Parallel(name="Celebrate", policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE) celebrate_flash_green = py_trees_ros.tutorials.behaviours.FlashLedStrip(name="Flash Green", colour="green") celebrate_pause = py_trees.timers.Timer("Pause", duration=3.0) dock = py_trees_ros.actions.ActionClient( name="Dock", action_namespace="/dock", action_spec=py_trees_msgs.DockAction, action_goal=py_trees_msgs.DockGoal(True), override_feedback_message_on_running="docking" ) # Subtree root.add_children([ere_we_go, die]) ere_we_go.add_children([undock, scan_or_be_cancelled, dock, celebrate]) scan_or_be_cancelled.add_children([cancelling, move_out_and_scan]) cancelling.add_children([is_cancel_requested, move_home_after_cancel]) move_out_and_scan.add_children([move_base, scanning, move_home_after_scan]) scanning.add_children([scan_context_switch, scan_rotate, scan_flash_blue]) celebrate.add_children([celebrate_flash_green, celebrate_pause]) return root