scheduler

Python interface for ROCON schedulers handling resource requests.

class rocon_scheduler_requests.scheduler.Scheduler(callback, frequency=0.25, topic='rocon_scheduler')[source]

This class is used by a ROCON scheduler to manage all the resource requests sent by various ROCON services. It subscribes to the ROCON scheduler topic, handling resource requests as they are received.

Parameters:
  • callback – Callback function invoked with the updated RequestSet when requests arrive.
  • frequency (float) – requester heartbeat frequency in Hz.
  • topic (str) – Topic name for resource allocation requests.
callback(rset)
Param rset:(RequestSet) The current status of all requests for some active requester.

The callback function is called when new or updated requests are received, already holding the Big Scheduler Lock. It is expected to iterate over its RequestSet, checking the status of every ActiveRequest it contains, modifying them appropriately, then return without waiting. The results will be sent to the requester after this callback returns.

Usage example:

#!/usr/bin/env python
""" Scheduler usage example. """
from collections import deque
import rospy
from scheduler_msgs.msg import Request, Resource
from rocon_scheduler_requests import Scheduler, TransitionError


class ExampleScheduler:

    def __init__(self):
        rospy.init_node("example_scheduler")
        # simplifying assumptions: all requests want a single robot,
        # and any of these will do:
        self.avail = deque([            # FIFO queue of available robots
            Resource(rapp='example_rapp', uri='rocon:/turtlebot/roberto'),
            Resource(rapp='example_rapp', uri='rocon:/turtlebot/marvin')])
        self.ready_queue = deque()      # FIFO queue of waiting requests
        self.sch = Scheduler(self.callback)
        rospy.spin()

    def callback(self, rset):
        """ Scheduler request callback. """
        rospy.logdebug('scheduler callback:')
        for rq in rset.values():
            rospy.logdebug('  ' + str(rq))
            if rq.msg.status == Request.NEW:
                self.queue(rset.requester_id, rq)
            elif rq.msg.status == Request.CANCELING:
                self.free(rset.requester_id, rq)

    def dispatch(self):
        """ Grant any available resources to waiting requests. """
        while len(self.ready_queue) > 0:
            if len(self.avail) == 0:    # no resources available?
                return
            resource = self.avail.popleft()
            requester_id, rq = self.ready_queue.popleft()
            try:                        # grant request & notify requester
                rq.grant([resource])
                self.sch.notify(requester_id)
                rospy.loginfo('Request granted: ' + str(rq.uuid))
            except (TransitionError, KeyError):
                # request no longer active or requester missing?
                # Put resource back at the front of the queue.
                self.avail.appendleft(resource)

    def free(self, requester_id, rq):
        """ Free all resources allocated for this request. """
        self.avail.extend(rq.allocations)
        rospy.loginfo('Request canceled: ' + str(rq.uuid))
        rq.close()
        self.dispatch()                 # grant waiting requests

    def queue(self, requester_id, rq):
        """ Add request to ready queue, making it wait. """
        try:
            rq.wait(reason=Request.BUSY)
        except TransitionError:         # request no longer active?
            return
        self.ready_queue.append((requester_id, rq))
        rospy.loginfo('Request queued: ' + str(rq.uuid))
        self.dispatch()

if __name__ == '__main__':
    node = ExampleScheduler()
callback = None

Callback function for request updates.

lock = None

Big Scheduler Lock.

This recursive Python threading lock serializes access to scheduler data. The scheduler callback is always invoked holding it. All Scheduler methods acquire it automatically, whenever needed.

In any other thread, acquire it when updating shared request set objects. Never hold it when sleeping or waiting for I/O.

requesters = None

Dictionary of active requesters and their requests.

topic = None

Scheduler request topic name.

notify(requester_id)[source]

Notify requester of status updates.

Parameters:requester_id (uuid.UUID) – Requester to notify.
Raises :KeyError if unknown requester identifier.

Previous topic

requester

Next topic

transitions

This Page