Source code for rocon_scheduler_requests.transitions

# Software License Agreement (BSD License)
#
# Copyright (C) 2013, Jack O'Quin
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of the author nor of other contributors may be
#    used to endorse or promote products derived from this software
#    without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
.. module:: transitions

This module tracks resource request state transitions, which occur as
`scheduler_msgs/Request`_ messages flow between schedulers and
requesters.

As individual ``Request`` messages are passed back and forth between
the original requester and the scheduler, their ``status`` passes
through several state transitions.  Gray states are created by the
scheduler via solid transitions.  The dashed **cancel** transitions
may be initiated by either the requester or the scheduler.

.. graphviz:: state_transitions.dot

.. _scheduler_msgs:
    http://wiki.ros.org/scheduler_msgs
.. _`scheduler_msgs/Request`:
    http://docs.ros.org/api/scheduler_msgs/html/msg/Request.html

"""

# (unicode_literals not compatible with python2 uuid module)
from __future__ import absolute_import, print_function

# Ros dependencies
import rospy
import unique_id

from scheduler_msgs.msg import Request, SchedulerRequests
from . import TransitionError

# Starting and terminal request states:
STARTING_STATES = frozenset([Request.NEW, Request.RESERVED])
TERMINAL_STATES = frozenset([Request.CLOSED])

## State transition merge table.
#
#  An immutable set of (old, new) status pairs.  All pairs in the
#  table are considered valid state transitions.  Any others are not.
#
#  This table is only used for merging scheduler and requester request
#  sets when a new message arrives.  Data from the new message are
#  ignored unless the corresponding transition appears here.
#
TRANS_TABLE = frozenset([
    (Request.CANCELING, Request.CANCELING),
    (Request.CANCELING, Request.CLOSED),

    (Request.GRANTED, Request.CANCELING),
    (Request.GRANTED, Request.GRANTED),
    (Request.GRANTED, Request.PREEMPTING),

    (Request.NEW, Request.CANCELING),
    (Request.NEW, Request.GRANTED),
    (Request.NEW, Request.PREEMPTING),
    (Request.NEW, Request.WAITING),

    (Request.PREEMPTING, Request.CANCELING),
    (Request.PREEMPTING, Request.CLOSED),       # really??
    (Request.PREEMPTING, Request.PREEMPTING),

    (Request.RESERVED, Request.CANCELING),
    (Request.RESERVED, Request.GRANTED),
    (Request.RESERVED, Request.PREEMPTING),
    (Request.RESERVED, Request.RESERVED),
    (Request.RESERVED, Request.WAITING),

    (Request.WAITING, Request.CANCELING),
    (Request.WAITING, Request.GRANTED),
    (Request.WAITING, Request.PREEMPTING),
    (Request.WAITING, Request.WAITING)])


class _EventTranitions:
    """
    Define valid status transitions for a given event type.

    :param name: Human-readable name for this event type.
    :type name: str
    :param trans: Dictionary of valid status transitions.
    :type trans: dict

    """
    def __init__(self, name, trans):
        self.name = name
        """ Name of this event type. """
        self.trans = trans
        """ Dictionary of valid status transitions. """

##  Requester or scheduler transitions:
#
EVENT_CANCEL = _EventTranitions('cancel', {
    Request.CANCELING: Request.CANCELING,
    Request.CLOSED: Request.CLOSED,
    Request.GRANTED: Request.CANCELING,
    Request.NEW: Request.CANCELING,
    Request.PREEMPTING: Request.CANCELING,
    Request.RESERVED: Request.CANCELING,
    Request.WAITING: Request.CANCELING,
    })

##  Scheduler transitions:
#
EVENT_CLOSE = _EventTranitions('close', {
    Request.CANCELING: Request.CLOSED,
    Request.PREEMPTING: Request.CLOSED,
    })

EVENT_GRANT = _EventTranitions('grant', {
    Request.NEW: Request.GRANTED,
    Request.RESERVED: Request.GRANTED,
    Request.WAITING: Request.GRANTED,
    })

EVENT_PREEMPT = _EventTranitions('preempt', {
    Request.CANCELING: Request.CANCELING,
    Request.CLOSED: Request.CLOSED,
    Request.GRANTED: Request.PREEMPTING,
    Request.NEW: Request.NEW,
    Request.PREEMPTING: Request.PREEMPTING,
    Request.RESERVED: Request.RESERVED,
    Request.WAITING: Request.WAITING,
    })

EVENT_WAIT = _EventTranitions('wait', {
    Request.NEW: Request.WAITING,
    Request.RESERVED: Request.WAITING,
    Request.WAITING: Request.WAITING,
    })


[docs]class RequestBase(object): """ Base class for tracking the status of a single resource request. *Not for general use.* :param msg: ROCON scheduler request message. :type msg: scheduler_msgs/Request Requesters and schedulers will use one of the following derived classes, with higher-level interfaces creating it automatically: * Requester: :class:`.ResourceRequest` * Scheduler: :class:`.ActiveRequest` .. describe:: str(rq) :returns: String representation of this resource request. """ def __init__(self, msg): """ Constructor. """ self.msg = msg """ Corresponding *scheduler_msgs/Request* message. """ self.uuid = unique_id.fromMsg(msg.id) """ The :class:`uuid.UUID` of this request. """
[docs] def cancel(self, reason=None): """ Cancel a previously-requested resource. :param reason: Reason code for cancellation, or ``None``. *Always valid for both requesters and schedulers.* """ self._transition(EVENT_CANCEL, reason)
def __str__(self): """ Generate string representation. """ return 'id: ' + str(self.uuid) \ + '\n priority: ' + str(self.msg.priority) \ + '\n resources: ' + self._str_resources() \ + '\n status: ' + str(self.msg.status) def _str_resources(self): """ Format requested resource into a human-readable string. """ retval = '' for res in self.msg.resources: retval += '\n ' + res.uri + '#' + res.rapp return retval def _transition(self, event, reason=None): """ Update status for this resource request. :param event: Transition table for this type of *event*. :type event: :class:`._EventTranitions` :param reason: Reason code for transition, or ``None``. :raises: :exc:`.TransitionError` if not a valid transition. """ new_status = event.trans.get(self.msg.status) if new_status is None: raise TransitionError('invalid event ' + event.name + ' in state ' + str(self.msg.status)) self.msg.status = new_status if reason is not None: self.msg.reason = reason def _validate(self, new_status): """ Validate status update for this resource request. :param new_status: Proposed new status for this request. :returns: ``True`` if this is a valid state transition. """ return (self.msg.status, new_status) in TRANS_TABLE
[docs]class ResourceRequest(RequestBase): """ This represents a single resource request created by and for its original requester. :param msg: ROCON scheduler request message. :type msg: scheduler_msgs/Request Provides all attributes defined for :class:`.RequestBase`. """
[docs] def reconcile(self, update): """ Reconcile scheduler updates with requester status for a merge operation. :param update: Latest information for this request, or ``None`` if no longer present. :type update: :class:`.ResourceRequest` or ``None`` Only the requester creates new requests. If something is missing from the scheduler feedback, that just means the scheduler has not gotten to it yet. """ if update is None: # this request not yet known to scheduler? return # leave it alone if self._validate(update.msg.status): self.msg.status = update.msg.status self.msg.priority = update.msg.priority self.msg.resources = update.msg.resources if update.msg.availability != rospy.Time(): self.msg.availability = update.msg.availability # test gap
[docs]class ActiveRequest(RequestBase): """ This represents a single active resource known to the scheduler. :param msg: ROCON scheduler request message. :type msg: scheduler_msgs/Request Provides all attributes defined for :class:`.RequestBase`. """ def __init__(self, msg): """ Constructor """ super(ActiveRequest, self).__init__(msg) self.allocations = [] """ List of resources actually allocated for this request (not just those requested). """
[docs] def close(self): """ Close resource request. :raises: :exc:`.TransitionError` """ self._transition(EVENT_CLOSE)
[docs] def grant(self, resources): """ Grant some specific requested resources. :param resources: Exact resources granted. :type resources: list of ``scheduler_msgs/Resource`` :raises: :exc:`.TransitionError` The caller is responsible for ensuring that the granted resources really do fully satisfy this request. """ self._transition(EVENT_GRANT, reason=Request.NONE) self.msg.resources = resources self.allocations = resources
[docs] def reconcile(self, update): """ Reconcile updated request with current scheduler status for a merge operation. :param update: Latest information for this request, or ``None`` if no longer present. :type update: :class:`.ActiveRequest` or ``None`` """ # test gap: if update is None: # Only the requester creates new requests. Since no # update is present, the requester has either deleted this # request, or a new requester instance no longer knows # about it. So, consider it closed. update = ResourceRequest(self.msg) update.msg.status = Request.CLOSED if self._validate(update.msg.status): self.msg.status = update.msg.status self.msg.hold_time = update.msg.hold_time if (update.msg.status == Request.RESERVED and update.msg.availability != rospy.Time()): self.msg.availability = update.msg.availability
[docs] def preempt(self, reason=Request.NONE): """ Preempt a previously granted request. :param reason: Reason for preemption. :type reason: int Always valid for the scheduler, but has no effect unless the request was previously granted. """ if self.msg.status != Request.GRANTED: reason = self.msg.reason # leave reason unchanged self._transition(EVENT_PREEMPT, reason)
[docs] def wait(self, reason=Request.NONE): """ Put request in wait status until a suitable resource is available. :param reason: Reason for waiting. :type reason: int :raises: :exc:`.TransitionError` """ self._transition(EVENT_WAIT, reason)
[docs]class RequestSet: """ This class is a container for all the resource requests or responses for a single requester. It acts like a dictionary. :param reqs: Either a ``SchedulerRequests`` message or a list of ``Request`` messages, like the ``requests`` component of a ``SchedulerRequests`` message. :param requester_id: (:class:`uuid.UUID`) Unique ID this requester or ``None``. :param contents: Class from which to instantiate set members, either :class:`.ResourceRequest` (the default) for a requester or :class:`.ActiveRequest` for a scheduler. :raises: :exc:`TypeError` if the *requester_id* is not specified explicitly or as part of a ``SchedulerRequests`` message. :class:`.RequestSet` supports these standard container operations: .. describe:: len(rset) :returns: The number of requests in the set. .. describe:: rset[uuid] :returns: The item corresponding to *uuid*. :raises: :exc:`KeyError` if no such request. .. describe:: rset[uuid] = msg Assign a Request message for this *uuid*. :param uuid: (:class:`uuid.UUID`) UUID of the request. :param msg: (*scheduler_msgs/Request*) message to add. .. describe:: rset == other :returns: ``True`` if *rset* and *other* have the same contents. Ignores the difference between request and reply messages. .. describe:: rset != other :returns: ``True`` if *rset* and *other* have different contents. Ignores the difference between request and reply messages. .. describe:: str(rset) :returns: String representation of :class:`.RequestSet`. .. describe:: uuid in rset :returns: ``True`` if *rset* has a key *uuid*, else ``False``. .. describe:: uuid not in rset Equivalent to ``not uuid in rset``. These attributes are also provided: """ def __init__(self, reqs, requester_id=None, contents=ResourceRequest): """ Constructor. """ self.requester_id = requester_id """ :class:`uuid.UUID` of this requester. """ self.contents = contents """ Type of objects this request set contains. """ self.stamp = rospy.Time() # zero time stamp """ ROS time (:class:`rospy.Time`) of last update, or time zero. """ # reqs is either a SchedulerRequests message, or a list of # Request messages. if isinstance(reqs, SchedulerRequests): self.stamp = reqs.header.stamp self.requester_id = unique_id.fromMsg(reqs.requester) # Reset *reqs* to list of requests from the message. reqs = reqs.requests if self.requester_id is None: raise TypeError('Requester ID missing.') self.requests = {} """ Dictionary of active requests. """ for msg in reqs: rq = self.contents(msg) self.requests[rq.uuid] = rq def __contains__(self, uuid): """ Request set membership. """ return uuid in self.requests def __eq__(self, other): """ RequestSet equality operator. """ if self.requester_id != other.requester_id: return False # different requester if set(self.requests.keys()) != set(other.requests.keys()): return False # different request IDs for rqid, rq in self.requests.items(): other_msg = other[rqid].msg if rq.msg.status != other_msg.status: return False if rq.msg.priority != other_msg.priority: return False if rq.msg.availability != other_msg.availability: return False if rq.msg.hold_time != other_msg.hold_time: return False if rq.msg.resources != other_msg.resources: return False return True def __getitem__(self, uuid): """ :param uuid: UUID of desired request. :type uuid: :class:`uuid.UUID` :returns: named item. :raises: :exc:`KeyError` if no such request """ return self.requests[uuid] def __len__(self): """ Number of requests. """ return len(self.requests) def __ne__(self, other): """ RequestSet != operator. """ return not self == other def __setitem__(self, uuid, msg): """ Assign a Request message for this *uuid*. """ self.requests[uuid] = self.contents(msg) # test gap def __str__(self): rval = 'requester_id: ' + str(self.requester_id) + '\nrequests:' for rq in self.requests.values(): rval += '\n ' + str(rq) return rval
[docs] def cancel_all(self, reason=None): """ Cancel every active request in this set. :param reason: Reason code for mass cancellation, or ``None``. """ # test gap: for rq in self.requests.values(): rq.cancel(reason=reason)
[docs] def cancel_out_of_date(self, reason=None): """ Cancel every out-of-date request in this set. Only requests in a starting state are preserved. This is done whenever a requester first connects to the scheduler. If it presents requests that had previously been granted or preempted, they will be canceled and then closed. :param reason: Reason code for mass cancellation, or ``None``. """ # test gap: for rq in self.requests.values(): if rq.msg.status not in STARTING_STATES: rq.cancel(reason=reason)
[docs] def get(self, uuid, default=None): """ Get request, if known. :param uuid: UUID of desired request. :type uuid: :class:`uuid.UUID` :param default: value to return if no such request. :returns: named item, if successful; else *default*. """ return self.requests.get(uuid, default)
[docs] def items(self): """ :returns: all (key, value) pairs for this :class:`.RequestSet`. :rtype: list (Python2) or dictionary view (Python3) """ return self.requests.items()
[docs] def keys(self): """ :returns: all UUIDs for this :class:`.RequestSet`. :rtype: list (Python2) or dictionary view (Python3) """ return self.requests.keys() # test gap
def _list_requests(self): """ Return a list of resource requests suitable for inclusion in a ``SchedulerRequests`` message. :returns: list of *scheduler_msgs/Request* messages. """ return [rq.msg for rq in self.requests.values()]
[docs] def merge(self, updates): """ Merge new request information into this RequestSet. :param updates: Request set containing updated information. :type updates: :class:`.RequestSet` This is *not* a :py:meth:`set.update` or :py:meth:`set.union` operation: * New elements from the *updates* will be added, but only if they are in an initial state (NEW or RESERVED). * Existing elements will be reconciled with the corresponding *updates* status. * Any element reaching a terminal status known by both sides of the protocol will be deleted. """ # Add any new requests not previously known. for rid, new_rq in updates.items(): if (rid not in self.requests and new_rq.msg.status in STARTING_STATES): self.requests[rid] = self.contents(new_rq.msg) # test gap # Reconcile each existing request with the updates. Make a # copy of the dictionary items, so it can be altered in the loop. for rid, rq in self.requests.items(): new_rq = updates.get(rid) if ((rq.msg.status == Request.CANCELING and new_rq.msg.status == Request.CLOSED) or (rq.msg.status == Request.CLOSED and new_rq is None)): del self.requests[rid] # no longer needed else: rq.reconcile(new_rq)
[docs] def to_msg(self, stamp=None): """ Convert to ROS ``scheduler_msgs/SchedulerRequest`` message. :param stamp: Time stamp for message header. If ``None``, use current time. :type stamp: rospy.Time :returns: corresponding ``scheduler_msgs/SchedulerRequests`` """ msg = SchedulerRequests(requester=unique_id.toMsg(self.requester_id), requests=self._list_requests()) if stamp is None: stamp = rospy.Time.now() # test gap msg.header.stamp = stamp return msg
[docs] def values(self): """ :returns: all requests for this :class:`.RequestSet`. :rtype: list (Python2) or dictionary view (Python3) """ return self.requests.values() # test gap