Source code for rocon_scheduler_requests.scheduler

# Software License Agreement (BSD License)
#
# Copyright (C) 2013-2014, Jack O'Quin
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of the author nor of other contributors may be
#    used to endorse or promote products derived from this software
#    without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
.. module:: scheduler

Python interface for ROCON schedulers handling resource requests.

.. _UUID: http://en.wikipedia.org/wiki/Uuid

"""

# enable some python3 compatibility options:
from __future__ import absolute_import, print_function, unicode_literals

import rospy
import threading
import unique_id

# ROS messages
from scheduler_msgs.msg import Request, SchedulerRequests

# internal modules
from . import common
from .transitions import ActiveRequest, RequestSet


class _RequesterStatus:
    """
    This class tracks the status of all resource requests made by a
    single requester.  It subscribes to the requester feedback topic,
    and provides updated information when appropriate.

    Being internal to the :class:`.Scheduler`, methods of this class
    are invoked already holding the :ref:`Big Scheduler Lock
    <Big_Scheduler_Lock>`.

    :param sched: (:class:`.Scheduler`) Scheduler object with which
        this requester is connected.

    :param msg: (scheduler_msgs/SchedulerRequests) Initial resource
        allocation requests.

    """

    def __init__(self, sched, msg):
        """ Constructor. """
        self.last_msg_time = msg.header.stamp
        self.sched = sched
        """ Scheduler serving this requester. """
        self.requester_id = unique_id.fromMsg(msg.requester)
        """ :class:`uuid.UUID` of this requester. """
        self.rset = RequestSet(msg, contents=ActiveRequest)
        """ All active requests for this requester. """

        feedback_topic = common.feedback_topic(self.requester_id,
                                               self.sched.topic)
        rospy.loginfo('requester feedback topic: ' + feedback_topic)
        self.pub = rospy.Publisher(feedback_topic, SchedulerRequests,
                                   latch=True)

    def contact(self):
        """ Contact newly-connected requester. """
        # Cancel any out-of-date requests the requester had lying around.
        self.rset.cancel_out_of_date(reason=Request.TIMEOUT)
        self.sched.callback(self.rset)  # handle initial message
        self.send_feedback()

    def send_feedback(self):
        """ Send feedback message to requester. """
        self.pub.publish(self.rset.to_msg())

    def update(self, msg):
        """ Update requester status.

        :param msg: Latest resource allocation request.
        :type msg: scheduler_msgs/SchedulerRequests

        """
        self.last_msg_time = msg.header.stamp
        # Make a new RequestSet from this message
        new_rset = RequestSet(msg, contents=ActiveRequest)
        if self.rset != new_rset:       # something new?
            self.rset.merge(new_rset)
            self.sched.callback(self.rset)
            if self.rset != new_rset:   # still different?
                self.send_feedback()

    def timeout(self, limit, event):
        """ Check for requester timeout.

        :param limit: Time *limit* since last message received.
        :type limit: :class:`rospy.Duration`
        :param event: Current :class:`rospy.TimerEvent` object.
        :returns: True if *limit* exceeded, False if still active.

        """
        lost = (event.current_real - self.last_msg_time) > limit
        if lost:                # lost contact with this requester?
            # Cancel every active request, so the callback will
            # recover everything it had allocated.
            self.rset.cancel_all(reason=Request.TIMEOUT)
            self.sched.callback(self.rset)
            # No one left to notify.
        return lost


[docs]class Scheduler: """ This class is used by a ROCON scheduler to manage all the resource requests sent by various ROCON services. It subscribes to the ROCON scheduler topic, handling resource requests as they are received. :param callback: Callback function invoked with the updated :class:`.RequestSet` when requests arrive. :param frequency: requester heartbeat frequency in Hz. :type frequency: float :param topic: Topic name for resource allocation requests. :type topic: str .. describe:: callback(rset) :param rset: (:class:`.RequestSet`) The current status of all requests for some active requester. The *callback* function is called when new or updated requests are received, already holding the :ref:`Big Scheduler Lock <Big_Scheduler_Lock>`. It is expected to iterate over its :class:`.RequestSet`, checking the status of every :class:`.ActiveRequest` it contains, modifying them appropriately, then return without waiting. The results will be sent to the requester after this callback returns. Usage example: .. literalinclude:: ../tests/example_scheduler.py """ def __init__(self, callback, frequency=common.HEARTBEAT_HZ, topic=common.SCHEDULER_TOPIC): """ Constructor. """ self.callback = callback """ Callback function for request updates. """ self.lock = threading.RLock() """ .. _Big_Scheduler_Lock: Big Scheduler Lock. This recursive Python threading lock serializes access to scheduler data. The scheduler *callback* is always invoked holding it. All :class:`.Scheduler` methods acquire it automatically, whenever needed. In any other thread, acquire it when updating shared request set objects. Never hold it when sleeping or waiting for I/O. """ self.requesters = {} """ Dictionary of active requesters and their requests. """ self.topic = topic """ Scheduler request topic name. """ rospy.loginfo('scheduler request topic: ' + self.topic) self.sub = rospy.Subscriber(self.topic, SchedulerRequests, self._allocate_resources, queue_size=1, tcp_nodelay=True) self.duration = rospy.Duration(1.0 / frequency) self.time_limit = self.duration * 4.0 self.timer = rospy.Timer(self.duration, self._watchdog) def _allocate_resources(self, msg): """ Scheduler resource allocation message handler. """ with self.lock: rqr_id = unique_id.fromMsg(msg.requester) rqr = self.requesters.get(rqr_id) if rqr: # known requester? rqr.update(msg) else: # new requester rqr = _RequesterStatus(self, msg) self.requesters[rqr_id] = rqr rqr.contact() def _watchdog(self, event): """ Scheduler request watchdog timer handler. """ # Must iterate over a copy of the dictionary items, because # some may be deleted inside the loop. with self.lock: for rqr_id, rqr in self.requesters.items(): if rqr.timeout(self.time_limit, event): del self.requesters[rqr_id]
[docs] def notify(self, requester_id): """ Notify requester of status updates. :param requester_id: Requester to notify. :type requester_id: uuid.UUID :raises: :exc:`KeyError` if unknown requester identifier. """ with self.lock: self.requesters[requester_id].send_feedback()