00001
00002
00003
00004
00005
00006
00007
00008
00009 import threading
00010 import time
00011 import rospy
00012 import uuid
00013 import unique_id
00014 import functools
00015
00016
00017 from .exceptions import ServicePairException
00018
00019
00020
00021
00022
00023
00024 class RequestHandlerBase(object):
00025
00026 def __init__(self, key):
00027 self.key = key
00028 self.response = None
00029
00030
00031 class BlockingRequestHandler(RequestHandlerBase):
00032
00033 def __init__(self, key):
00034 super(BlockingRequestHandler, self).__init__(key)
00035 self.event = threading.Event()
00036
00037
00038 class NonBlockingRequestHandler(RequestHandlerBase):
00039
00040 def __init__(self, key, callback, error_callback):
00041 super(NonBlockingRequestHandler, self).__init__(key)
00042 self.timer = None
00043 self.callback = callback
00044 self.error_callback = error_callback
00045
00046 def copy(self):
00047 '''
00048 The deepcopy function has some issues (related to threads),
00049 so using this independant copy method here. Note that this only
00050 ever gets used for non-blocking calls to help handle the
00051 race conditions between timeout handling and normal callback
00052 handling
00053 '''
00054 new_copy = NonBlockingRequestHandler(self.key, self.callback, self.error_callback)
00055 new_copy.timer = self.timer
00056 return new_copy
00057
00058
00059
00060
00061
00062
00063 class ServicePairClient(object):
00064 '''
00065 The client side of a pubsub service pair.
00066 '''
00067 __slots__ = [
00068 '_publisher',
00069 '_subscriber',
00070 '_request_handlers',
00071 'ServicePairSpec',
00072 'ServicePairRequest',
00073 'ServicePairResponse',
00074 '_lock'
00075 ]
00076
00077
00078
00079
00080
00081 def __init__(self, name, ServicePairSpec):
00082 '''
00083 @param name : resource name of service pair (e.g. testies for pair topics testies/request, testies/response)
00084 @type str
00085 @param ServicePairSpec : the pair type (e.g. rocon_service_pair_msgs.msg.TestiesPair)
00086 @type str
00087 '''
00088 try:
00089 p = ServicePairSpec()
00090 self.ServicePairSpec = ServicePairSpec
00091 self.ServicePairRequest = type(p.pair_request)
00092 self.ServicePairResponse = type(p.pair_response)
00093 except AttributeError:
00094 raise ServicePairException("Type is not an pair spec: %s" % str(ServicePairSpec))
00095 self._lock = threading.Lock()
00096 self._subscriber = rospy.Subscriber(name + "/response", self.ServicePairResponse, self._internal_callback)
00097 self._publisher = rospy.Publisher(name + "/request", self.ServicePairRequest)
00098 self._request_handlers = {}
00099
00100 def wait_for_service(self, timeout):
00101 '''
00102 Waits for the service pair server to appear.
00103
00104 @param timeout : time to wait for data
00105 @type rospy.Duration
00106
00107 @raise ROSException: if specified timeout is exceeded
00108 @raise ROSInterruptException: if shutdown interrupts wait
00109 '''
00110 timeout_time = time.time() + timeout.to_sec()
00111 while not rospy.is_shutdown() and time.time() < timeout_time:
00112 if self._subscriber.get_num_connections() > 0 and self._publisher.get_num_connections() > 0:
00113 return
00114 rospy.rostime.wallsleep(0.1)
00115 if rospy.is_shutdown():
00116 raise rospy.ROSInterruptException("rospy shutdown")
00117 else:
00118 raise rospy.ROSException("timeout exceeded while waiting for service pair server %s" % self._subscriber.resolved_name[:-len('/response')])
00119
00120
00121
00122
00123
00124 def __call__(self, msg, timeout=None, callback=None, error_callback=None):
00125 '''
00126 Initiates and executes the client request to the server. The type of arguments
00127 supplied determines whether to apply blocking or non-blocking behaviour.
00128
00129 @param msg : the request message
00130 @type <name>Request
00131
00132 @param timeout : time to wait for data
00133 @type rospy.Duration
00134
00135 @param callback : user callback invoked for responses of non-blocking calls
00136 @type method with arguments (uuid_msgs.UniqueID, <name>Response)
00137
00138 @return msg/id : for blocking calls it is the response message, for non-blocking it is the unique id
00139 @rtype <name>Response/uuid_msgs.UniqueID
00140 '''
00141 pair_request_msg = self.ServicePairRequest()
00142 pair_request_msg.id = unique_id.toMsg(unique_id.fromRandom())
00143 pair_request_msg.request = msg
00144 key = unique_id.toHexString(pair_request_msg.id)
00145 if callback == None and error_callback == None:
00146 self._request_handlers[key] = BlockingRequestHandler(key)
00147 return self._make_blocking_call(self._request_handlers[key], pair_request_msg, timeout)
00148 else:
00149 request_handler = NonBlockingRequestHandler(key, callback, error_callback)
00150 self._request_handlers[key] = request_handler.copy()
00151 self._make_non_blocking_call(request_handler, pair_request_msg, timeout)
00152 return pair_request_msg.id
00153
00154
00155
00156
00157
00158 def _make_blocking_call(self, request_handler, msg, timeout):
00159 '''
00160 @param request_handler : information and event handler for the request
00161 @type RequestHandler
00162
00163 @param msg : the request pair message structure
00164 @type self.ServicePairRequest
00165 '''
00166 self._publisher.publish(msg)
00167 if timeout is None:
00168 request_handler.event.wait()
00169 else:
00170 request_handler.event.wait(timeout.to_sec())
00171 if request_handler.response is not None:
00172 response = request_handler.response
00173 else:
00174 response = None
00175 del self._request_handlers[request_handler.key]
00176 return response
00177
00178 def _make_non_blocking_call(self, request_handler, msg, timeout):
00179 '''
00180 @param request_handler : a copy of information and event handler for the request (used for the timer)
00181 @type RequestHandler
00182
00183 @param msg : the request pair message structure
00184 @type self.ServicePairRequest
00185 '''
00186 self._publisher.publish(msg)
00187 if timeout is not None:
00188
00189 delete_request_handler = functools.partial(self._timer_callback, request_handler=request_handler)
00190 request_handler.timer = rospy.Timer(timeout, delete_request_handler, oneshot=True)
00191
00192 def _timer_callback(self, unused_event, request_handler):
00193 '''
00194 Handle a timeout for non-blocking requests. This will call the user's defined error callback function
00195 (with args: (uuid_msgs.UniqueID, str)).
00196
00197 @param event : regular rospy timer event object (not used)
00198
00199 @param request_handler : a copy of the handler that gets bound when this callback is passed into the timer
00200 @type NonBlockingRequestHandler
00201
00202 @todo respond on the error callback.
00203 '''
00204 already_handled = False
00205 self._lock.acquire()
00206 try:
00207 del self._request_handlers[request_handler.key]
00208 except KeyError:
00209 already_handled = True
00210 self._lock.release()
00211 if not already_handled:
00212 if request_handler.error_callback is not None:
00213 request_handler.error_callback(unique_id.toMsg(uuid.UUID(request_handler.key)), "timeout")
00214
00215 def _internal_callback(self, msg):
00216 '''
00217 @param msg : message returned from the server (with pair id etc)
00218 @type self.ServicePairResponse
00219 '''
00220
00221 key = unique_id.toHexString(msg.id)
00222 already_handled = False
00223 non_blocking_request_handler = None
00224 self._lock.acquire()
00225 try:
00226 request_handler = self._request_handlers[key]
00227 request_handler.response = msg.response
00228 if isinstance(request_handler, BlockingRequestHandler):
00229 request_handler.event.set()
00230 already_handled = True
00231 else:
00232
00233 non_blocking_request_handler = request_handler.copy()
00234 del self._request_handlers[key]
00235 except KeyError:
00236 already_handled = True
00237 self._lock.release()
00238 if not already_handled:
00239
00240 if non_blocking_request_handler.callback is not None:
00241 request_handler.callback(msg.id, msg.response)