service_pair_client.py
Go to the documentation of this file.
00001 #
00002 # License: BSD
00003 #   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
00004 #
00005 ##############################################################################
00006 # Imports
00007 ##############################################################################
00008 
00009 import threading
00010 import time
00011 import rospy
00012 import uuid
00013 import unique_id
00014 import functools
00015 
00016 # Local imports
00017 from .exceptions import ServicePairException
00018 
00019 ##############################################################################
00020 # Request Handlers
00021 ##############################################################################
00022 
00023 
00024 class RequestHandlerBase(object):
00025 
00026     def __init__(self, key):
00027         self.key = key  # uuid hexstring key (the request_handlers key)
00028         self.response = None
00029 
00030 
00031 class BlockingRequestHandler(RequestHandlerBase):
00032 
00033     def __init__(self, key):
00034         super(BlockingRequestHandler, self).__init__(key)
00035         self.event = threading.Event()
00036 
00037 
00038 class NonBlockingRequestHandler(RequestHandlerBase):
00039 
00040     def __init__(self, key, callback, error_callback):
00041         super(NonBlockingRequestHandler, self).__init__(key)
00042         self.timer = None
00043         self.callback = callback
00044         self.error_callback = error_callback
00045 
00046     def copy(self):
00047         '''
00048           The deepcopy function has some issues (related to threads),
00049           so using this independant copy method here. Note that this only
00050           ever gets used for non-blocking calls to help handle the
00051           race conditions between timeout handling and normal callback
00052           handling
00053         '''
00054         new_copy = NonBlockingRequestHandler(self.key, self.callback, self.error_callback)
00055         new_copy.timer = self.timer
00056         return new_copy
00057 
00058 ##############################################################################
00059 # Client Class
00060 ##############################################################################
00061 
00062 
00063 class ServicePairClient(object):
00064     '''
00065       The client side of a pubsub service pair.
00066     '''
00067     __slots__ = [
00068             '_publisher',
00069             '_subscriber',
00070             '_request_handlers',  # initiate, track and execute requests with these { hex string ids : dic of RequestHandler objects (Blocking/NonBlocking) }
00071             'ServicePairSpec',
00072             'ServicePairRequest',
00073             'ServicePairResponse',
00074             '_lock'               # prevent race conditions in handling of non-blocking callbacks and timeouts.
00075         ]
00076 
00077     ##########################################################################
00078     # Initialisation
00079     ##########################################################################
00080 
00081     def __init__(self, name, ServicePairSpec):
00082         '''
00083           @param name : resource name of service pair (e.g. testies for pair topics testies/request, testies/response)
00084           @type str
00085           @param ServicePairSpec : the pair type (e.g. rocon_service_pair_msgs.msg.TestiesPair)
00086           @type str
00087         '''
00088         try:
00089             p = ServicePairSpec()
00090             self.ServicePairSpec = ServicePairSpec
00091             self.ServicePairRequest = type(p.pair_request)
00092             self.ServicePairResponse = type(p.pair_response)
00093         except AttributeError:
00094             raise ServicePairException("Type is not an pair spec: %s" % str(ServicePairSpec))
00095         self._lock = threading.Lock()
00096         self._subscriber = rospy.Subscriber(name + "/response", self.ServicePairResponse, self._internal_callback)
00097         self._publisher = rospy.Publisher(name + "/request", self.ServicePairRequest)
00098         self._request_handlers = {}  # [uuid_msgs/UniqueId]
00099 
00100     def wait_for_service(self, timeout):
00101         '''
00102           Waits for the service pair server to appear.
00103 
00104           @param timeout : time to wait for data
00105           @type rospy.Duration
00106 
00107           @raise ROSException: if specified timeout is exceeded
00108           @raise ROSInterruptException: if shutdown interrupts wait
00109         '''
00110         timeout_time = time.time() + timeout.to_sec()
00111         while not rospy.is_shutdown() and time.time() < timeout_time:
00112             if self._subscriber.get_num_connections() > 0 and self._publisher.get_num_connections() > 0:
00113                 return
00114             rospy.rostime.wallsleep(0.1)
00115         if rospy.is_shutdown():
00116             raise rospy.ROSInterruptException("rospy shutdown")
00117         else:
00118             raise rospy.ROSException("timeout exceeded while waiting for service pair server %s" % self._subscriber.resolved_name[:-len('/response')])
00119 
00120     ##########################################################################
00121     # Execute Blocking/NonBlocking
00122     ##########################################################################
00123 
00124     def __call__(self, msg, timeout=None, callback=None, error_callback=None):
00125         '''
00126           Initiates and executes the client request to the server. The type of arguments
00127           supplied determines whether to apply blocking or non-blocking behaviour.
00128 
00129           @param msg : the request message
00130           @type <name>Request
00131 
00132           @param timeout : time to wait for data
00133           @type rospy.Duration
00134 
00135           @param callback : user callback invoked for responses of non-blocking calls
00136           @type method with arguments (uuid_msgs.UniqueID, <name>Response)
00137 
00138           @return msg/id : for blocking calls it is the response message, for non-blocking it is the unique id
00139           @rtype <name>Response/uuid_msgs.UniqueID
00140         '''
00141         pair_request_msg = self.ServicePairRequest()
00142         pair_request_msg.id = unique_id.toMsg(unique_id.fromRandom())
00143         pair_request_msg.request = msg
00144         key = unique_id.toHexString(pair_request_msg.id)
00145         if callback == None and error_callback == None:
00146             self._request_handlers[key] = BlockingRequestHandler(key)
00147             return self._make_blocking_call(self._request_handlers[key], pair_request_msg, timeout)
00148         else:
00149             request_handler = NonBlockingRequestHandler(key, callback, error_callback)
00150             self._request_handlers[key] = request_handler.copy()
00151             self._make_non_blocking_call(request_handler, pair_request_msg, timeout)
00152             return pair_request_msg.id
00153 
00154     ##########################################################################
00155     # Private Support Methods
00156     ##########################################################################
00157 
00158     def _make_blocking_call(self, request_handler, msg, timeout):
00159         '''
00160           @param request_handler : information and event handler for the request
00161           @type RequestHandler
00162 
00163           @param msg : the request pair message structure
00164           @type self.ServicePairRequest
00165         '''
00166         self._publisher.publish(msg)
00167         if timeout is None:
00168             request_handler.event.wait()
00169         else:
00170             request_handler.event.wait(timeout.to_sec())
00171         if request_handler.response is not None:
00172             response = request_handler.response
00173         else:
00174             response = None
00175         del self._request_handlers[request_handler.key]
00176         return response
00177 
00178     def _make_non_blocking_call(self, request_handler, msg, timeout):
00179         '''
00180           @param request_handler : a copy of information and event handler for the request (used for the timer)
00181           @type RequestHandler
00182 
00183           @param msg : the request pair message structure
00184           @type self.ServicePairRequest
00185         '''
00186         self._publisher.publish(msg)
00187         if timeout is not None:
00188             # bind the key so the timer callback knows which request to handle.
00189             delete_request_handler = functools.partial(self._timer_callback, request_handler=request_handler)
00190             request_handler.timer = rospy.Timer(timeout, delete_request_handler, oneshot=True)
00191 
00192     def _timer_callback(self, unused_event, request_handler):
00193         '''
00194           Handle a timeout for non-blocking requests. This will call the user's defined error callback function
00195           (with args: (uuid_msgs.UniqueID, str)).
00196 
00197           @param event : regular rospy timer event object (not used)
00198 
00199           @param request_handler : a copy of the handler that gets bound when this callback is passed into the timer
00200           @type NonBlockingRequestHandler
00201 
00202           @todo respond on the error callback.
00203         '''
00204         already_handled = False
00205         self._lock.acquire()
00206         try:
00207             del self._request_handlers[request_handler.key]
00208         except KeyError:
00209             already_handled = True
00210         self._lock.release()
00211         if not already_handled:
00212             if request_handler.error_callback is not None:
00213                 request_handler.error_callback(unique_id.toMsg(uuid.UUID(request_handler.key)), "timeout")
00214 
00215     def _internal_callback(self, msg):
00216         '''
00217           @param msg : message returned from the server (with pair id etc)
00218           @type self.ServicePairResponse
00219         '''
00220         # Check if it is a blocking call that has requested it.
00221         key = unique_id.toHexString(msg.id)
00222         already_handled = False
00223         non_blocking_request_handler = None
00224         self._lock.acquire()
00225         try:
00226             request_handler = self._request_handlers[key]
00227             request_handler.response = msg.response
00228             if isinstance(request_handler, BlockingRequestHandler):
00229                 request_handler.event.set()
00230                 already_handled = True
00231             else:  # NonBlocking
00232                 # make a copy and delete so we can release the lock. Process after.
00233                 non_blocking_request_handler = request_handler.copy()
00234                 del self._request_handlers[key]
00235         except KeyError:
00236             already_handled = True  # it's either a blocking, or a non-blocking call handled by the timeout
00237         self._lock.release()
00238         if not already_handled:
00239             # Could use EAFP approach here since they will almost never be None, but this is more readable
00240             if non_blocking_request_handler.callback is not None:
00241                 request_handler.callback(msg.id, msg.response)


rocon_python_comms
Author(s): Daniel Stonier
autogenerated on Fri May 2 2014 10:35:42