Source code for concert_schedulers.resource_pool_requester.requester
#
# License: BSD
#
# https://raw.github.com/robotics-in-concert/rocon_concert/license/LICENSE
#
"""
.. module:: resource_pool_requester.requester
The resource pool requester.
"""
##############################################################################
# Imports
##############################################################################
import threading
import rospy
import unique_id
import concert_scheduler_requests
import scheduler_msgs.msg as scheduler_msgs
import concert_msgs.msg as concert_msgs
import rocon_uri
##############################################################################
# Methods
##############################################################################
[docs]def request_completely_unallocated(request):
'''
Quick check to see if a request's resources have been entirely
lost (i.e. unallocated). This will trigger the responder to
cleanup (i.e. release the request).
:param request:
:type request: scheduler_msgs.Request
:returns: true or false if entirely unallocated or not.
:rtype: bool
'''
for resource in request.msg.resources:
if rocon_uri.parse(resource.uri).name.string != concert_msgs.Strings.SCHEDULER_UNALLOCATED_RESOURCE:
return False
return True
##############################################################################
# Classes
##############################################################################
[docs]class ResourcePoolRequester(object):
'''
A variant on the basic requester that can handle some higher level
formulation and management of requests.
'''
__slots__ = [
'_requester', # the base requester class from concert_scheduler_requests
'_resource_groups',
'_state', # state of the requester (see State class)
'_feedback',
'_high_priority', # priority to set for necessary (minimum) resource requirements
'_low_priority', # priority to set for optional resource requirements
'_recovery_start', # the timestamp recorded when moving from State.ALIVE -> State.RECOVERING (used for recovery timeout)
'_lock'
]
class State(object):
PENDING = 'pending' # pending resource allocations for minimum requirements of the resource groups
ALIVE = 'alive' # minimum resource requirements have been allocated - it is now functioning
RECOVERING = 'recovering' # was alive, but resource allocations fell below minimum requirements and trying to recover
timeout = rospy.Duration(10.0) # timeout for dropping 'alive' state status.
[docs] def __init__(self,
resource_groups,
feedback,
high_priority=scheduler_msgs.Request.HIGH_PRIORITY,
low_priority=scheduler_msgs.Request.LOW_PRIORITY,
uuid=None,
topic=concert_scheduler_requests.common.SCHEDULER_TOPIC,
frequency=concert_scheduler_requests.common.HEARTBEAT_HZ):
'''
Loads the requester up with a collecation of resource groups that have min/max requirements
for each resource type.
:param resource_groups: many resource groups that will form the bases for this requester
:type resource_groups: resource_group.MinMaxResourceGroup[]
:param func feedback: external feedback callback function
:param int high_priority: priority to set for necessary (minimum) resource requirements
:param int low_priority: priority to set for optional resource requirements
'''
self._requester = concert_scheduler_requests.Requester(self._requester_feedback, uuid, 0, topic, frequency)
self._resource_groups = resource_groups
self._feedback = feedback
self._state = self.State.PENDING
self._high_priority = high_priority
self._low_priority = low_priority
self._lock = threading.Lock()
self._issue_minimum_request()
self._requester.send_requests()
def _issue_minimum_request(self):
initial_resources = []
for resource_group in self._resource_groups:
initial_resources.extend(resource_group.initial_resources())
unused_minimum_request_uuid = self._requester.new_request(initial_resources, priority=self._high_priority)
[docs] def cancel_all_requests(self):
'''
Exactly as it says! Used typically when shutting down or when
it's lost more allocated resources than the minimum required (in which case it
cancels everything and starts reissuing new requests).
'''
#self._lock.acquire()
rospy.loginfo("Requester : cancelling all requests")
self._requester.cancel_all()
self._requester.send_requests()
#self._lock.release()
def _requester_feedback(self, request_set):
'''
This returns requests processed by the scheduler with whatever necessary modifications
that were made on the original requests.
@param request_set : the modified requests
@type dic { uuid.UUID : scheduler_msgs.ResourceRequest }
'''
# call self._feedback in here
#print("Request set: %s" % request_set)
########################################
# Update all resource tracking info
########################################
for resource_group in self._resource_groups:
resource_group.reset_scheduler_flags()
for request in request_set.values():
high_priority_flag = True if request.msg.priority == self._high_priority else False
if request.msg.status == scheduler_msgs.Request.NEW or request.msg.status == scheduler_msgs.Request.WAITING:
self._flag_resource_trackers(request.msg.resources, tracking=True, allocated=False, high_priority_flag=high_priority_flag)
elif request.msg.status == scheduler_msgs.Request.GRANTED:
self._flag_resource_trackers(request.msg.resources, tracking=True, allocated=True, high_priority_flag=high_priority_flag)
if request_completely_unallocated(request):
rospy.loginfo("Requester : cancelling request [has been completely unallocated]")
request.cancel()
elif request.msg.status == scheduler_msgs.Request.CLOSED:
self._flag_resource_trackers(request.msg.resources, tracking=False, allocated=False)
#for resource_group in self._resource_groups:
# print("\n%s" % str(resource_group))
########################################
# Requester state transitions
########################################
tentatively_alive = True
for resource_group in self._resource_groups:
if not resource_group.is_alive():
tentatively_alive = False
break
# alive state changes
if self._state == self.State.PENDING and tentatively_alive:
self._state = self.State.ALIVE
rospy.loginfo("Requester : state change [%s->%s]" % (self.State.PENDING, self.State.ALIVE))
elif self._state == self.State.ALIVE and not tentatively_alive:
rospy.loginfo("Requester : state change [%s->%s]" % (self.State.ALIVE, self.State.RECOVERING))
self._state = self.State.RECOVERING
self._recovery_start = rospy.Time.now()
# Not practical, but currently the safe non-thread way.
#self._requester.cancel_all()
#self._issue_minimum_request()
# maybe not such a great idea - the requester is not officially thread-safe,
# but it's interesting to test the problems with doing this
thread = threading.Thread(target=self._check_recovery_timeout)
thread.start()
elif self._state == self.State.RECOVERING and tentatively_alive:
self._state = self.State.ALIVE
# else moving from RECOVERING to PENDING is handled by the thread function
########################################
# Check optional request requirements
########################################
if self._state == self.State.ALIVE:
for resource_group in self._resource_groups:
(resource, high_priority_flag) = resource_group.requires_new_request()
if resource is not None:
priority = self._high_priority if high_priority_flag else self._low_priority
unused_request_uuid = self._requester.new_request([resource], priority=priority)
def _check_recovery_timeout(self):
'''
Thread worker function that monitors the requester state while it's trying to
recover the minimum necessary requirements to run the resource pool. If
nothing happens for the duration of this wait, then it cancels all requests (deallocating
any resources) and reissues brand new requests as needed.
In the future this probably has to change as we can't dictate to the service how long
it should wait before cancelling requests. Ultimately the service itself should handle
what happens if this timeout is reached - it needs to cleanup (i.e. finalise
whatever it is doing first such as sending robots back to home base) before finally issuing
the cancel order for the request.
'''
while (rospy.Time.now() - self._recovery_start) < self.State.timeout and not rospy.is_shutdown() and self._state == self.State.RECOVERING:
rospy.rostime.wallsleep(1.0)
if (rospy.Time.now() - self._recovery_start) > self.State.timeout:
self.cancel_all_requests()
rospy.logwarn("Requester : timed out trying to recover necessary resource pool state.")
rospy.logwarn("Requester : issuing new minimum request.")
self._issue_minimum_request()
self._state = self.State.PENDING
def _flag_resource_trackers(self, resources, tracking, allocated, high_priority_flag=False):
'''
Update the flags in the resource trackers for each resource. This is used to
follow whether a particular resource is getting tracked, or is already allocated so
that we can determine if new requests should be made and at what priority.
'''
for resource in resources:
# do a quick check to make sure individual resources haven't been previously allocated, and then lost
# WARNING : this unallocated check doesn't actually work - the requester isn't sending us back this info yet.
if rocon_uri.parse(resource.uri).name.string == concert_msgs.Strings.SCHEDULER_UNALLOCATED_RESOURCE:
tracking = False
allocated = False
resource_tracker = self._find_resource_tracker(unique_id.toHexString(resource.id))
if resource_tracker is None:
pass # should raise an exception
else:
resource_tracker.tracking = tracking
resource_tracker.allocated = allocated
resource_tracker.high_priority_flag = high_priority_flag
def _find_resource_tracker(self, key):
'''
@param key : unique identifier for the resource
@type hex string
@return the resource tracker corresponding to the key
@type ResourceTracker or None
'''
for resource_group in self._resource_groups:
resource_tracker = resource_group.find_resource_tracker(key)
if resource_tracker is not None:
return resource_tracker
return None