# Software License Agreement (BSD License)
#
# Copyright (C) 2013-2014, Jack O'Quin
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of the author nor of other contributors may be
# used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
.. module:: scheduler_node
This Python module implements the scheduler interface for the
`Robotics in Concert`_ (ROCON) project. It tracks resources and
allocates them to ROCON services.
.. include:: weblinks.rst
"""
# enable some python3 compatibility options:
from __future__ import absolute_import, print_function, unicode_literals
import rospy
import threading
from concert_scheduler_requests import Scheduler, TransitionError
from concert_scheduler_requests.priority_queue import (
PriorityQueue, QueueElement)
from scheduler_msgs.msg import CurrentStatus, Request
from concert_resource_pool import (
FailedToStartRappError, InvalidRequestError,
ResourcePool, SchedulerClients)
[docs]class SimpleSchedulerNode(object):
""" Simple ROCON scheduler node.
:param node_name: (str) Default name of scheduler node.
:param period: (:class:`rospy.Duration`) Rescheduling time quantum.
"""
def __init__(self, node_name='simple_scheduler',
period=rospy.Duration(1.0)):
""" Constructor. """
rospy.init_node(node_name)
self.ready_queue = PriorityQueue()
""" Queue of waiting requests. """
self.blocked_queue = PriorityQueue()
""" Queue of blocked requests. """
self.period = period
""" Time duration between periodic rescheduling. """
self.notification_set = set()
""" Set of requester identifiers to notify. """
self.timer = rospy.Timer(self.period, self.reschedule)
self.lock = threading.RLock()
""" Big Scheduler Lock. """
self.pool = SchedulerClients(lock=self.lock)
""" Resource pool of known ROCON clients. """
self.sch = Scheduler(self.callback, lock=self.lock)
""" Scheduler request handler. """
# Handle messages until canceled.
rospy.spin()
[docs] def callback(self, rset):
""" Scheduler request callback.
Called in the scheduler callback thread holding the Big
Scheduler Lock.
See: :class:`.concert_scheduler_requests.Scheduler` documentation.
"""
rospy.logdebug('scheduler callback:')
for rq in rset.values():
rospy.logdebug(' ' + str(rq))
if rq.msg.status == Request.NEW:
self.queue(rq, rset.requester_id)
elif rq.msg.status == Request.CANCELING:
self.free(rq, rset.requester_id)
self.dispatch() # try to allocate ready requests
[docs] def dispatch(self):
""" Grant any available resources to ready requests.
Notifies all affected requesters.
"""
while len(self.ready_queue) > 0:
# Try to allocate top element in the ready queue.
elem = self.ready_queue.pop()
rospy.logdebug('trying to dispatch ' + str(elem))
resources = []
try:
resources = self.pool.allocate(elem.request)
except InvalidRequestError as ex:
self.reject_request(elem, ex)
continue # skip to next queue element
if not resources: # top request cannot be satisfied?
# Return it to head of queue.
self.ready_queue.add(elem)
break # stop looking
try:
self.pool.start_resources(resources)
elem.request.grant(resources)
rospy.loginfo('Request granted: ' + str(elem.request.uuid))
except (FailedToStartRappError, TransitionError) as e:
# Return allocated resources to the pool.
rospy.logerr(str(e))
self.pool.release_resources(resources)
self.notification_set.add(elem.requester_id)
# notify all affected requesters
self.notify_requesters()
# notify of resource pool changes, if necessary
self.pool.notify_resources()
[docs] def free(self, request, requester_id):
""" Free all resources allocated for this *request*.
:param request: (:class:`.ActiveRequest`)
:param requester_id: (:class:`uuid.UUID`) Unique requester identifier.
"""
self.pool.release_request(request)
rospy.loginfo('Request canceled: ' + str(request.uuid))
request.close()
# remove request from any queues
request_id = request.uuid
for queue in [self.ready_queue, self.blocked_queue]:
if request_id in queue:
queue.remove(request_id)
break # should not be in any other queue
self.notification_set.add(requester_id)
[docs] def notify_requesters(self):
""" Notify affected requesters.
:pre: self.notification_set contains requesters to notify.
:post: self.notification_set is empty.
"""
for requester_id in self.notification_set:
try:
self.sch.notify(requester_id)
except KeyError: # requester now missing?
# shut down this requester
self.shutdown_requester(requester_id)
self.notification_set.clear()
[docs] def queue(self, request, requester_id):
""" Add *request* to ready queue, making it wait.
:param request: resource request to be queued.
:type request: :class:`.ActiveRequest`
:param requester_id: Unique requester identifier.
:type requester_id: :class:`uuid.UUID`
"""
try:
request.wait(reason=Request.BUSY)
except TransitionError: # request no longer active?
return
self.ready_queue.add(QueueElement(request, requester_id))
rospy.loginfo('Request queued: ' + str(request.uuid))
self.notification_set.add(requester_id)
[docs] def reject_request(self, element, exception):
""" Reject an invalid queue *element*.
:param element: Queue element to reject.
:type element: :class:`.QueueElement`
:param exception: Associated exception object.
"""
rospy.logwarn(str(exception))
if hasattr(Request, "INVALID"): # new reason code defined?
element.request.cancel(Request.INVALID)
else:
element.request.cancel(Request.UNAVAILABLE)
self.notification_set.add(element.requester_id)
[docs] def reschedule(self, event):
""" Periodic rescheduling thread.
Moves requests that cannot be satisfied with
currently-available resources to the blocked queue. Also
checks the blocked queue for requests that can be satisfied
due to newly-arrived resources.
Uses the Big Scheduler Lock to serialize these changes with
operations done within the scheduler callback thread.
"""
with self.lock:
# Check ready queue for requests that cannot be resolved
# using currently-available resources.
if len(self.ready_queue) > 0:
rospy.loginfo('reschedule ready ' + str(self.ready_queue))
while len(self.ready_queue) > 0:
# see if head of ready queue can be scheduled
elem = self.ready_queue.peek()
# see if all available or allocated resources would suffice
criteria = {CurrentStatus.AVAILABLE, CurrentStatus.ALLOCATED}
if self.pool.match_list(elem.request.msg.resources, criteria):
# request not blocked
break # do not look further
# move elem to blocked_queue
self.ready_queue.pop() # remove elem from ready queue
rospy.loginfo('Request blocked: '
+ str(elem.request.uuid))
elem.request.wait(reason=Request.UNAVAILABLE)
self.blocked_queue.add(elem)
self.notification_set.add(elem.requester_id)
# Check blocked queue for requests that can be resolved
# using currently-available resources.
for elem in self.blocked_queue.values():
rospy.logdebug('checking request ' + str(elem.request.uuid))
criteria = {CurrentStatus.AVAILABLE, CurrentStatus.ALLOCATED}
if self.pool.match_list(elem.request.msg.resources, criteria):
rospy.loginfo('request ' + str(elem.request.uuid)
+ ' no longer blocked')
self.blocked_queue.remove(elem)
self.ready_queue.add(elem)
self.notification_set.add(elem.requester_id)
# try to allocate any remaining ready requests
self.dispatch()
[docs] def shutdown_requester(self, requester_id):
""" Shut down this requester, recovering all resources assigned. """
for queue in [self.ready_queue, self.blocked_queue]:
for rq in queue:
if rq.requester_id == requester_id:
self.free(rq, requester_id)
[docs]def main():
""" Scheduler node main entry point. """
node = SimpleSchedulerNode()