#!/usr/bin/env python
#
# License: BSD
# https://raw.githubusercontent.com/stonier/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
A few behaviours to support the tutorials.
"""
##############################################################################
# Imports
##############################################################################
import dynamic_reconfigure.client
import py_trees
import rospy
import std_msgs.msg as std_msgs
##############################################################################
# Behaviours
##############################################################################
[docs]class FlashLedStrip(py_trees.behaviour.Behaviour):
"""
This behavoiur simply shoots a command off to the LEDStrip to flash
a certain colour and returns :attr:`~py_trees.common.Status.RUNNING`.
Note that this behaviour will never return with
:attr:`~py_trees.common.Status.SUCCESS` but will send a clearing
command to the LEDStrip if it is cancelled or interrupted by a higher
priority behaviour.
Args:
name (:obj:`str`): name of the behaviour
topic_name (:obj:`str`) : name of the battery state topic
colour (:obj:`str`) : colour to flash ['red', 'green', blue']
"""
def __init__(self, name, topic_name="/led_strip/command", colour="red"):
super(FlashLedStrip, self).__init__(name=name)
self.topic_name = topic_name
self.colour = colour
[docs] def setup(self, timeout):
"""
Args:
timeout (:obj:`float`): time to wait (0.0 is blocking forever)
Returns:
:obj:`bool`: whether it timed out trying to setup
"""
self.publisher = rospy.Publisher(self.topic_name, std_msgs.String, queue_size=10, latch=True)
self.feedback_message = "setup"
return True
[docs] def update(self):
"""
Annoy the led strip to keep firing every time it ticks over (the led strip will clear itself
if no command is forthcoming within a certain period of time). This behaviour will only finish if it
is terminated or interrupted from above.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
self.publisher.publish(std_msgs.String(self.colour))
self.feedback_message = "flashing {0}".format(self.colour)
return py_trees.common.Status.RUNNING
[docs] def terminate(self, new_status):
"""
Shoot off a clearing command to the led strip.
Args:
new_status (:class:`~py_trees.common.Status`): the behaviour is transitioning to this new status
"""
self.publisher.publish(std_msgs.String(""))
self.feedback_message = "cleared"
[docs]class ScanContext(py_trees.behaviour.Behaviour):
"""
This behavoiur simply shoots a command off to the LEDStrip to flash
a certain colour and returns :attr:`~py_trees.common.Status.RUNNING`.
Note that this behaviour will never return with
:attr:`~py_trees.common.Status.SUCCESS` but will send a clearing
command to the LEDStrip if it is cancelled or interrupted by a higher
priority behaviour.
Args:
name (:obj:`str`): name of the behaviour
"""
def __init__(self, name):
super(ScanContext, self).__init__(name=name)
self.initialised = False
self._namespaces = ["safety_sensors",
"rotate",
]
self._dynamic_reconfigure_clients = {}
for name in self._namespaces:
self._dynamic_reconfigure_clients[name] = None
self._dynamic_reconfigure_configurations = {}
[docs] def setup(self, timeout):
"""
Try and connect to the dynamic reconfigure server on the various namespaces.
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
for namespace in self._namespaces:
if not self._dynamic_reconfigure_clients[namespace]:
try:
self._dynamic_reconfigure_clients[namespace] = dynamic_reconfigure.client.Client(
name=namespace,
timeout=timeout
)
except rospy.ROSException:
rospy.logwarn("ScanContext [%s" % self.name + "]: could not connect to dynamic reconfigure server [%s][%s secs]" % (namespace, timeout))
self.feedback_message = "could not connect to dynamic reconfigure server [%s][%s secs]" % (namespace, timeout)
return False
return True
[docs] def initialise(self):
"""
Get various dyn reconf configurations and cache/set the new variables.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
for name, client in self._dynamic_reconfigure_clients.iteritems():
self._dynamic_reconfigure_configurations[name] = client.get_configuration()
try:
self.safety_sensors_enable = self._dynamic_reconfigure_configurations["safety_sensors"]["enable"]
self._dynamic_reconfigure_clients["safety_sensors"].update_configuration({"enable": True})
except dynamic_reconfigure.DynamicReconfigureParameterException:
self.feedback_message = "failed to configure the 'enable' parameter [safety_sensors]"
self.initialised = False
try:
self.rotate_duration = self._dynamic_reconfigure_configurations["rotate"]["duration"]
self._dynamic_reconfigure_clients["rotate"].update_configuration({"duration": 8.0})
except dynamic_reconfigure.DynamicReconfigureParameterException:
self.feedback_message = "failed to configure the 'duration' parameter [rotate]"
self.initialised = False
self.initialised = True
self.feedback_message = "reconfigured the context for scanning"
def update(self):
self.logger.debug("%s.update()" % self.__class__.__name__)
if not self.initialised:
return py_trees.common.Status.FAILURE
# used under a parallel, never returns success
return py_trees.common.Status.RUNNING
[docs] def terminate(self, new_status):
"""
Regardless of whether it succeeed or failed or is getting set to invalid we have to be absolutely
sure to reset the navi context.
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
if self.initialised:
try:
self._dynamic_reconfigure_clients["safety_sensors"].update_configuration({"enable": self.safety_sensors_enable})
except dynamic_reconfigure.DynamicReconfigureParameterException:
self.feedback_message = "failed to reset the 'enable' parameter [safety_sensors]"
try:
self._dynamic_reconfigure_clients["rotate"].update_configuration({"duration": self.rotate_duration})
except dynamic_reconfigure.DynamicReconfigureParameterException:
self.feedback_message = "failed to reset the 'duration' parameter [rotate]"
self.initialised = False