Source code for py_trees.demos.action

#!/usr/bin/env python
#
# License: BSD
#   https://raw.githubusercontent.com/stonier/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
.. argparse::
   :module: py_trees.demos.action
   :func: command_line_argument_parser
   :prog: py-trees-demo-action-behaviour

.. image:: images/action.gif
"""

##############################################################################
# Imports
##############################################################################

import argparse
import atexit
import multiprocessing
import py_trees
import time

import py_trees.console as console

##############################################################################
# Classes
##############################################################################


def description():
    content = "Demonstrates the characteristics of a typical 'action' behaviour.\n"
    content += "\n"
    content += "* Mocks an external process and connects to it in the setup() method\n"
    content += "* Kickstarts new goals with the external process in the initialise() method\n"
    content += "* Monitors the ongoing goal status in the update() method\n"
    content += "* Determines RUNNING/SUCCESS pending feedback from the external process\n"

    if py_trees.console.has_colours:
        banner_line = console.green + "*" * 79 + "\n" + console.reset
        s = "\n"
        s += banner_line
        s += console.bold_white + "Action Behaviour".center(79) + "\n" + console.reset
        s += banner_line
        s += "\n"
        s += content
        s += "\n"
        s += banner_line
    else:
        s = content
    return s


def epilog():
    if py_trees.console.has_colours:
        return console.cyan + "And his noodly appendage reached forth to tickle the blessed...\n" + console.reset
    else:
        return None


def command_line_argument_parser():
    return argparse.ArgumentParser(description=description(),
                                   epilog=epilog(),
                                   formatter_class=argparse.RawDescriptionHelpFormatter,
                                   )


[docs]def planning(pipe_connection): """ Emulates an external process which might accept long running planning jobs. """ idle = True percentage_complete = 0 try: while(True): if pipe_connection.poll(): pipe_connection.recv() percentage_complete = 0 idle = False if not idle: percentage_complete += 10 pipe_connection.send([percentage_complete]) if percentage_complete == 100: idle = True time.sleep(0.5) except KeyboardInterrupt: pass
[docs]class Action(py_trees.behaviour.Behaviour): """ Connects to a subprocess to initiate a goal, and monitors the progress of that goal at each tick until the goal is completed, at which time the behaviour itself returns with success or failure (depending on success or failure of the goal itself). This is typical of a behaviour that is connected to an external process responsible for driving hardware, conducting a plan, or a long running processing pipeline (e.g. planning/vision). Key point - this behaviour itself should not be doing any work! """
[docs] def __init__(self, name="Action"): """ Default construction. """ super(Action, self).__init__(name) self.logger.debug("%s.__init__()" % (self.__class__.__name__))
[docs] def setup(self, unused_timeout=15): """ No delayed initialisation required for this example. """ self.logger.debug("%s.setup()->connections to an external process" % (self.__class__.__name__)) self.parent_connection, self.child_connection = multiprocessing.Pipe() self.planning = multiprocessing.Process(target=planning, args=(self.child_connection,)) atexit.register(self.planning.terminate) self.planning.start() return True
[docs] def initialise(self): """ Reset a counter variable. """ self.logger.debug("%s.initialise()->sending new goal" % (self.__class__.__name__)) self.parent_connection.send(['new goal']) self.percentage_completion = 0
[docs] def update(self): """ Increment the counter and decide upon a new status result for the behaviour. """ new_status = py_trees.Status.RUNNING if self.parent_connection.poll(): self.percentage_completion = self.parent_connection.recv().pop() if self.percentage_completion == 100: new_status = py_trees.Status.SUCCESS if new_status == py_trees.Status.SUCCESS: self.feedback_message = "Processing finished" self.logger.debug("%s.update()[%s->%s][%s]" % (self.__class__.__name__, self.status, new_status, self.feedback_message)) else: self.feedback_message = "{0}%".format(self.percentage_completion) self.logger.debug("%s.update()[%s][%s]" % (self.__class__.__name__, self.status, self.feedback_message)) return new_status
[docs] def terminate(self, new_status): """ Nothing to clean up in this example. """ self.logger.debug("%s.terminate()[%s->%s]" % (self.__class__.__name__, self.status, new_status))
############################################################################## # Main ##############################################################################
[docs]def main(): """ Entry point for the demo script. """ command_line_argument_parser().parse_args() print(description()) py_trees.logging.level = py_trees.logging.Level.DEBUG action = Action() action.setup() try: for unused_i in range(0, 12): action.tick_once() time.sleep(0.5) print("\n") except KeyboardInterrupt: pass