Source code for concert_schedulers.compatibility_tree_scheduler.scheduler
#
# License: BSD
#
# https://raw.github.com/robotics-in-concert/rocon_concert/license/LICENSE
#
"""
.. module:: compatibility_tree_scheduler.scheduler
The compatibility tree scheduler node.
"""
##############################################################################
# Imports
##############################################################################
import threading
import copy
import rospy
import unique_id
import uuid
import concert_msgs.msg as concert_msgs
import scheduler_msgs.msg as scheduler_msgs
import concert_scheduler_requests
import rocon_uri
import rocon_python_utils
import concert_schedulers.common as common
from concert_schedulers.common.exceptions import FailedToAllocateException
from .compatibility_tree import create_compatibility_tree, prune_compatibility_tree, CompatibilityTree
from .ros_parameters import setup_ros_parameters
##############################################################################
# Classes
##############################################################################
[docs]class CompatibilityTreeScheduler(object):
"""
An implementation of the `concert_scheduler_requests`_ abstract Scheduler that
uses the compatibility tree algorithm.
It has the following characteristics:
* attempts to resolve the highest priority request first
* requests are resolved singly (resolving across multiple request being quite difficult)
* if the highest priority requests are unresolved, lower priority requests are blocked
* compatibility tree resolution of a single request works across:
* branches - the required resource (e.g. rocon_apps/teleop)
* leaves - concert clients that can satisfy that resource (e.g. turtlebot_1)
* resource allocation follows the two rules:
* always consider leaves on the lowest count branches first.
* always take the least visible leaf away from low count branches.
.. _`concert_scheduler_requests`: http://wiki.ros.org/concert_scheduler_requests
"""
__slots__ = [
'_subscribers',
'_publishers',
'_request_sets',
'_lock',
'spin',
'_scheduler',
'_clients',
'_requests',
'_parameters',
]
##########################################################################
# Initialisation
##########################################################################
[docs] def __init__(self, concert_clients_topic_name, requests_topic_name):
'''
:param str concert_clients_topic_name: concert client joining/leaving notifications
:param str requests_topic_name: incoming requests for resources topic
'''
self._subscribers = {} # ros subscribers
self._publishers = {} # ros publishers
self._request_sets = {} # concert_scheduler_request.transitions.RequestSet (contains ResourceReply objects)
self._clients = {} # common.ConcertClient.gateway_name : common.ConcertClient of all concert clients
self._lock = threading.Lock()
self._scheduler = concert_scheduler_requests.Scheduler(callback=self._requester_update, topic=requests_topic_name)
self._setup_ros_api(concert_clients_topic_name)
self._parameters = setup_ros_parameters()
# aliases
self.spin = rospy.spin
def _setup_ros_api(self, concert_clients_topic_name):
self._subscribers['concert_client_changes'] = rospy.Subscriber(concert_clients_topic_name, concert_msgs.ConcertClients, self._ros_subscriber_concert_client_changes)
self._publishers['resource_pool'] = rospy.Publisher('~resource_pool', scheduler_msgs.KnownResources, latch=True, queue_size=10)
##########################################################################
# Ros api handlers
##########################################################################
def _ros_subscriber_concert_client_changes(self, msg):
"""
Receives a list of all concert clients every time the state of the
list changes (i.e. not periodically).
@param msg : concert_msgs.ConcertClients
"""
pending_notifications = []
self._lock.acquire()
invited_clients = msg.clients + msg.missing_clients # both connected and missing (lost wireless connection)
# new_clients: concert_msgs.ConcertClient[]
new_clients = [client for client in invited_clients if client.gateway_name not in self._clients.keys()]
# lost_clients: common.ConcertClient[]
lost_clients = [client for client in self._clients.values() if client.gateway_name not in [c.gateway_name for c in invited_clients]]
# work over the client list
for client in new_clients:
rospy.loginfo("Scheduler : new concert client [%s]" % client.name)
self._clients[client.gateway_name] = common.ConcertClient(client) # default setting is unallocated
for client in lost_clients:
if client.allocated:
rospy.logwarn("Scheduler : lost allocated concert client [%s]" % client.name)
client_platform_uri = rocon_uri.parse(client.msg.platform_info.uri)
client_platform_uri.name = client.msg.gateway_name
for request_set in self._request_sets.values():
found = False
for request in request_set.values():
for resource in request.msg.resources:
# could use a better way to check for equality than this
if resource.uri == str(client_platform_uri):
client_platform_uri.name = concert_msgs.Strings.SCHEDULER_UNALLOCATED_RESOURCE
resource.uri = str(client_platform_uri)
found = True
break
if found:
pending_notifications.append(request_set.requester_id)
# @todo might want to consider changing the request status if all resources have been unallocated
break
if found:
break
# @todo might want to validate that we unallocated since we did detect an allocated flag
del self._clients[client.gateway_name]
if new_clients or lost_clients:
self._publish_resource_pool()
# should check for some things here, e.g. can we verify a client is allocated or not?
pending_notifications.extend(self._update(external_update=True))
self._lock.release()
# can't put this inside the lock scope since we can have the following deadlock scenario:
#
# 1) scheduler node's scheduler_requests subscriber callback thread
# - scheduler is handling a scheduler_requests subscriber callback
# - BIG LOCK.acquire()
# - rqr.update()
# - user callback
# - CompatibilityTreeScheduler._update()
# - self._lock.acquire()
# 2) this conductor client subscriber callback thread
# - self._lock.acquire()
# - scheduler.notify()
# - BIG LOCK.acquire()
#
# but does this have consequences? what if the update callback works on this request set
# and publishes it and then we publish again here?
for requester_id in pending_notifications:
self._scheduler.notify(requester_id) # publishes the request set
def _publish_resource_pool(self):
'''
Publishes the current resource pool. This is called whenever the state of the scheduler's
known resources changes.
'''
msg = scheduler_msgs.KnownResources()
msg.header.stamp = rospy.Time.now()
msg.resources = [client.toMsg() for client in self._clients.values()]
self._publishers['resource_pool'].publish(msg)
def _requester_update(self, request_set):
'''
Callback used in concert_scheduler_requests scheduler for processing incoming resource requests.
This gets fired every time an incoming message arrives from a Requester, which is periodically
not upon state changes.
@param request_set : a snapshot of all requests from a single requester in their current state.
@type concert_scheduler_requests.transition.RequestSet
'''
#rospy.logwarn("Scheduler : requester update callback")
self._lock.acquire()
# this might cause a problem if this gets blocked while in the middle of processing a lost
# allocation (where we change the request set's resource info). Would this next line overwrite
# our changes or would we be getting the updated request set?
self._request_sets[request_set.requester_id.hex] = request_set
self._update()
self._lock.release()
def _update(self, external_update=False):
"""
Logic for allocating resources after there has been a state change in either the list of
available concert clients or the incoming resource requests.
:param external_update bool: whether or not this is called inside the scheduler thread or not.
:returns: list of requester id's that have notifications pending.
We have to be careful with sending notifications if calling this from outside the scheduler thread.
"""
there_be_requests = False
for request_set in self._request_sets.values():
if request_set.keys():
there_be_requests = True
if not there_be_requests:
return [] # nothing to do
pending_notifications = []
########################################
# Sort the request sets
########################################
unallocated_clients = [client for client in self._clients.values() if not client.allocated]
new_replies = []
pending_replies = []
releasing_replies = []
for request_set in self._request_sets.values():
new_replies.extend([r for r in request_set.values() if r.msg.status == scheduler_msgs.Request.NEW])
pending_replies.extend([r for r in request_set.values() if r.msg.status == scheduler_msgs.Request.WAITING])
#pending_replies.extend([r for r in request_set.values() if r.msg.status == scheduler_msgs.Request.NEW or r.msg.status == scheduler_msgs.Request.WAITING])
releasing_replies.extend([r for r in request_set.values() if r.msg.status == scheduler_msgs.Request.CANCELING])
# get all requests for compatibility tree processing and sort by priority
# this is a bit inefficient, should just sort the request set directly? modifying it directly may be not right though
pending_replies[:] = sorted(pending_replies, key=lambda request: request.msg.priority)
########################################
# New
########################################
for reply in new_replies:
reply.wait()
########################################
# Pending
########################################
resource_pool_state_changed = False
# used to check if we should block further allocations to lower priorities
# important to use this variable because we might have a group of requests with equal priorities
# and we don't want to block the whole group because the first one failed.
last_failed_priority = None
reallocated_clients = {} # dic of uri string keys and request uuid hex strings
for reply in pending_replies:
request = reply.msg
request_id = unique_id.toHexString(request.id)
if last_failed_priority is not None and request.priority < last_failed_priority:
rospy.loginfo("Scheduler : ignoring lower priority requests until higher priorities are filled")
break
# add preemptible clients to the candidates
if self._parameters['enable_preemptions']:
allocatable_clients = unallocated_clients + [client for client in self._clients.values() if (client.allocated and client.allocated_priority < request.priority)]
else:
allocatable_clients = unallocated_clients
if not allocatable_clients:
# this gets spammy...
#rospy.loginfo("Scheduler : no resources available to satisfy request [%s]" % request_id)
last_failed_priority = request.priority
continue
compatibility_tree = create_compatibility_tree(request.resources, allocatable_clients)
if self._parameters['debug_show_compatibility_tree']:
compatibility_tree.print_branches("Compatibility Tree")
pruned_branches = prune_compatibility_tree(compatibility_tree, verbosity=self._parameters['debug_show_compatibility_tree'])
pruned_compatibility_tree = CompatibilityTree(pruned_branches)
if self._parameters['debug_show_compatibility_tree']:
pruned_compatibility_tree.print_branches("Pruned Tree", ' ')
if pruned_compatibility_tree.is_valid():
rospy.loginfo("Scheduler : compatibility tree is valid, attempting to allocate [%s]" % request_id)
last_failed_priority = None
resources = []
failed_to_allocate = False
for branch in pruned_compatibility_tree.branches:
if failed_to_allocate: # nested break catch
break
for leaf in branch.leaves: # there should be but one
# this info is actually embedding into self._clients
try:
if leaf.allocated:
old_request_id = leaf.reallocate(request_id, request.priority, branch.limb)
reallocated_clients[leaf.msg.platform_info.uri] = old_request_id
rospy.loginfo("Scheduler : pre-empted and reallocated [%s]" % leaf.name)
else:
leaf.allocate(request_id, request.priority, branch.limb)
rospy.loginfo("Scheduler : allocated [%s]" % leaf.name)
except FailedToAllocateException as e:
rospy.logwarn("Scheduler : failed to (re)allocate [%s][%s]" % (leaf.name, str(e)))
failed_to_allocate = True
break
resource = copy.deepcopy(branch.limb)
uri = rocon_uri.parse(leaf.msg.platform_info.uri) # leaf.msg is concert_msgs/ConcertClient
uri.name = leaf.msg.gateway_name.lower().replace(' ', '_') # store the unique name of the concert client
resource.uri = str(uri)
resources.append(resource)
if failed_to_allocate:
rospy.logwarn("Scheduler : aborting request allocation [%s]" % request_id)
# aborting request allocation
for branch in pruned_compatibility_tree.branches:
for leaf in branch.leaves: # there should be but one
if leaf.allocated:
leaf.abandon()
last_failed_priority = request.priority
else:
reply.grant(resources)
resource_pool_state_changed = True
# remove allocated clients from the unallocated list so they don't get doubly allocated on the next request in line
newly_allocated_client_names = []
for branch in pruned_compatibility_tree.branches:
newly_allocated_client_names.extend([leaf.name for leaf in branch.leaves if leaf.allocated])
unallocated_clients[:] = [client for client in unallocated_clients if client.name not in newly_allocated_client_names]
else:
last_failed_priority = request.priority
rospy.loginfo("Scheduler : insufficient resources to satisfy request [%s]" % request_id)
########################################
# Preempted resource handling
########################################
# this is basically equivalent to what is in the concert client changes
# subscriber callback...can we move this somewhere centralised?
for (resource_uri_string, request_id_hexstring) in reallocated_clients.iteritems():
request_id = uuid.UUID(request_id_hexstring)
# awkward that I don't have the requester id saved anywhere so
# we have to parse through each requester's set of requests to find
# the request id we want
found = False
for request_set in self._request_sets.values():
for request in request_set.values():
if request.uuid == request_id:
for resource in request.msg.resources:
# could use a better way to check for equality than this
if resource.uri == resource_uri_string:
updated_resource_uri = rocon_uri.parse(resource_uri_string)
updated_resource_uri.name = concert_msgs.Strings.SCHEDULER_UNALLOCATED_RESOURCE
resource.uri = str(updated_resource_uri)
found = True
break
if found:
break
if found:
if external_update:
pending_notifications.append(request_set.requester_id)
else:
self._scheduler.notify(request_set.requester_id)
# @todo might want to consider changing the request status if all resources have been unallocated
break
########################################
# Releasing
########################################
for reply in releasing_replies:
if reply.msg.reason == scheduler_msgs.Request.NONE:
rospy.loginfo("Scheduler : releasing resources from cancelled request [%s][none]" % ([resource.rapp for resource in reply.msg.resources]))
elif reply.msg.reason == scheduler_msgs.Request.TIMEOUT:
rospy.loginfo("Scheduler : releasing resources from cancelled request [%s][scheduler-requester watchdog timeout]" % ([resource.rapp for resource in reply.msg.resources]))
else:
rospy.loginfo("Scheduler : releasing resources from cancelled request [%s][%s]" % ([resource.rapp for resource in reply.msg.resources], reply.msg.reason))
for resource in reply.msg.resources:
try:
resource_name = rocon_uri.parse(resource.uri).name.string
for key, value in self._clients.items():
if resource_name == rocon_python_utils.ros.get_ros_friendly_name(key):
value.abandon()
except KeyError as e:
rospy.logwarn("Scheduler : Error while releasing resource[%s]"%str(e))
pass # nothing was allocated to that resource yet (i.e. unique gateway_name was not yet set)
reply.close()
#reply.msg.status = scheduler_msgs.Request.RELEASED
# Publish an update?
if resource_pool_state_changed or releasing_replies:
self._publish_resource_pool()
return pending_notifications