# Software License Agreement (BSD License)
#
# Copyright (C) 2014, Jack O'Quin
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of the author nor of other contributors may be
# used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
.. module:: priority_queue
This module provides queue containers for scheduler requests for the
`Robotics in Concert`_ (ROCON) project.
.. include:: weblinks.rst
"""
# enable some python3 compatibility options:
from __future__ import absolute_import, print_function, unicode_literals
import copy
import heapq
import itertools
[docs]class PriorityQueue(object):
""" This is a container class for ROCON_ scheduler request queue elements.
:param iterable: Iterable yielding initial contents, either
:class:`.QueueElement` objects, or something that behaves
similarly.
This implementation is based on the :py:mod:`heapq` module and
uses some of the ideas explained in its `priority queue
implementation notes`_.
.. describe:: len(queue)
:returns: The number of elements in the *queue*.
.. describe:: request in queue
:param request: (:class:`uuid.UUID` or :class:`.QueueElement`)
*request* to query.
:returns: ``True`` if *request* is in the *queue*.
"""
def __init__(self, iterable=[]):
self._queue = []
""" Priority queue of :class:`.QueueElement`. """
self._requests = {}
""" Dictionary of queued requests. """
for element in iterable:
self.add(element)
def __contains__(self, request):
return hash(request) in self._requests
def __len__(self):
return len(self._requests)
def __str__(self):
contents = sorted(self.values())
rval = 'queue: '
for elem in contents:
rval += '\n ' + str(elem)
return rval
[docs] def add(self, element, priority=None):
""" Add a new *element* to the queue.
:param element: Queue *element* to add.
:type element: :class:`.QueueElement`
:param priority: (Optional) new priority for this *element*.
:type priority: int
If a request with the same identifier was already in the
queue, its old element is removed and replaced by the new
*element*.
If a new *priority* is specified, the priority of the original
request is updated. That is the only safe way to change the
priority of an *element* that is already queued.
.. warning:: Changing priority via some other name for that
request would break the queue implementation.
"""
if hash(element) in self._requests: # already in the queue?
self.remove(element) # mark that copy inactive
element = copy.copy(element)
element.active = True
if priority is not None:
element.request.msg.priority = priority
self._requests[hash(element)] = element
heapq.heappush(self._queue, element)
[docs] def peek(self):
""" Return the top-priority element from the queue head
without removing it.
:raises: :exc:`IndexError` if queue was empty.
"""
# Return the top element that was not previously removed.
for element in self._queue:
if element.active: # not previously removed?
return element
raise IndexError('peek at an empty priority queue')
[docs] def pop(self):
""" Remove the top-priority element from the queue head.
:raises: :exc:`IndexError` if queue was empty.
"""
# Return the top element that was not previously removed.
while self._queue:
element = heapq.heappop(self._queue)
if element.active: # not previously removed?
del self._requests[hash(element)]
return element
raise IndexError('pop from an empty priority queue')
[docs] def remove(self, request_id):
""" Remove element corresponding to *request_id*.
:param request_id: Identifier of the request to remove.
:type request_id: :class:`uuid.UUID` or :class:`.QueueElement`
:raises: :exc:`KeyError` if *request_id* not in the queue.
"""
# Remove it from the dictionary and mark it inactive, but
# leave it in the queue to avoid re-sorting.
element = self._requests.pop(hash(request_id))
element.active = False
[docs] def values(self):
""" Current queue contents.
:returns: iterable with active queue elements in random order.
"""
return self._requests.values()
[docs]class QueueElement(object):
""" Request queue element class.
:param request: Corresponding scheduler request object.
:type request: :class:`.ActiveRequest`
:param requester_id: Unique identifier of requester.
:type requester_id: :class:`uuid.UUID`
Queue elements need fit into normal Python dictionaries, so they
provide the required ``hash()`` operator, based on the unique ID
of that *request*.
.. describe:: hash(element)
:returns: (int) Hash signature for *element*.
Python 3 requires that hashable objects must also provide an
equals operator. The hash signatures of equal requests must be
equal.
.. describe:: element == other
:returns: ``True`` if *element* and *other* have the same
*request* ID (*not* their *requester_id* values).
.. describe:: element != other
:returns: ``True`` if *element* and *other* do not have the
same *request* ID.
Queue elements need to sort in the normal Python way, so they
provide the required ``<`` operator. The ``__cmp__`` method is
not used, because Python 3 does not allow it. **But**, we want
requests with higher-numbered priorities to sort *ahead* of lower
priority ones, so :py:mod:`heapq` and other Python modules work
properly.
.. describe:: element < other
:returns: ``True`` if *element* has higher priority than *other*, or
their priorities are the same and *element* has a lower sequence
number.
This class does *not* provide a total ordering. The ``==`` and
``<`` operators test completely different fields. However, the
*request* identifiers are unique, so no two valid queue elements
should ever compare both equal and less, although that situation
could be constructed artificially.
.. describe:: str(element)
:returns: String representation of this queue element, empty if
it is not active.
"""
_sequence = itertools.count()
""" Class variable: next available sequence number. """
def __init__(self, request, requester_id):
self.request = request
""" Corresponding scheduler :class:`ActiveRequest` object. """
self.requester_id = requester_id
""" :class:`uuid.UUID` of requester. """
self.sequence = next(self.__class__._sequence)
"""
Unique sequence number of this queue element. All elements
created earlier have lower numbers, those created afterward
will be higher.
"""
self.active = True
""" ``True`` unless this element has been removed from its queue. """
def __eq__(self, other):
return self.request.msg.id == other.request.msg.id
def __hash__(self):
return hash(self.request.uuid)
def __lt__(self, other):
return (self.request.msg.priority > other.request.msg.priority
or (self.request.msg.priority == other.request.msg.priority
and self.sequence < other.sequence))
def __ne__(self, other):
return self.request.msg.id != other.request.msg.id
def __str__(self):
""" Generate string representation. """
if not self.active:
return ''
return 'id: ' + str(self.requester_id) \
+ '\n seq: ' + str(self.sequence) \
+ '\n request: ' + str(self.request)