14 """Functions that obviate explicit stubs and explicit channels."""
21 from typing
import (Any, AnyStr, Callable, Dict, Iterator, Optional, Sequence,
22 Tuple, TypeVar, Union)
27 RequestType = TypeVar(
'RequestType')
28 ResponseType = TypeVar(
'ResponseType')
30 OptionsType = Sequence[Tuple[str, str]]
34 _LOGGER = logging.getLogger(__name__)
36 _EVICTION_PERIOD_KEY =
"GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
37 if _EVICTION_PERIOD_KEY
in os.environ:
38 _EVICTION_PERIOD = datetime.timedelta(
39 seconds=float(os.environ[_EVICTION_PERIOD_KEY]))
40 _LOGGER.debug(
"Setting managed channel eviction period to %s",
43 _EVICTION_PERIOD = datetime.timedelta(minutes=10)
45 _MAXIMUM_CHANNELS_KEY =
"GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
46 if _MAXIMUM_CHANNELS_KEY
in os.environ:
47 _MAXIMUM_CHANNELS =
int(os.environ[_MAXIMUM_CHANNELS_KEY])
48 _LOGGER.debug(
"Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
50 _MAXIMUM_CHANNELS = 2**8
52 _DEFAULT_TIMEOUT_KEY =
"GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
53 if _DEFAULT_TIMEOUT_KEY
in os.environ:
54 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
55 _LOGGER.debug(
"Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
57 _DEFAULT_TIMEOUT = 60.0
64 f
"Creating secure channel with credentials '{channel_credentials}', " +
65 f
"options '{options}' and compression '{compression}'")
67 credentials=channel_credentials,
69 compression=compression)
75 _lock: threading.RLock = threading.RLock()
76 _condition: threading.Condition = threading.Condition(lock=_lock)
77 _eviction_ready: threading.Event = threading.Event()
79 _mapping: Dict[CacheKey, Tuple[
grpc.Channel, datetime.datetime]]
80 _eviction_thread: threading.Thread
85 target=ChannelCache._perform_evictions, daemon=
True)
90 with ChannelCache._lock:
91 if ChannelCache._singleton
is None:
93 ChannelCache._eviction_ready.wait()
94 return ChannelCache._singleton
98 _LOGGER.debug(
"Evicting channel %s with configuration %s.", channel,
106 with ChannelCache._lock:
107 ChannelCache._eviction_ready.set()
108 if not ChannelCache._singleton._mapping:
109 ChannelCache._condition.wait()
110 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
111 key =
next(
iter(ChannelCache._singleton._mapping.keys()))
112 ChannelCache._singleton._evict_locked(key)
115 key, (_, eviction_time) =
next(
116 iter(ChannelCache._singleton._mapping.items()))
117 now = datetime.datetime.now()
118 if eviction_time <= now:
119 ChannelCache._singleton._evict_locked(key)
122 time_to_eviction = (eviction_time - now).total_seconds()
128 ChannelCache._condition.wait(timeout=time_to_eviction)
130 def get_channel(self, target: str, options: Sequence[Tuple[str, str]],
134 if insecure
and channel_credentials:
135 raise ValueError(
"The insecure option is mutually exclusive with " +
136 "the channel_credentials option. Please use one " +
141 elif channel_credentials
is None:
142 _LOGGER.debug(
"Defaulting to SSL channel credentials.")
144 key = (target, options, channel_credentials, compression)
147 if channel_data
is not None:
148 channel = channel_data[0]
150 self.
_mapping[key] = (channel, datetime.datetime.now() +
156 self.
_mapping[key] = (channel, datetime.datetime.now() +
159 self.
_mapping) >= _MAXIMUM_CHANNELS:
160 self._condition.notify()
170 request: RequestType,
173 request_serializer: Optional[Callable[[Any], bytes]] =
None,
174 response_deserializer: Optional[Callable[[bytes], Any]] =
None,
175 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
177 insecure: bool =
False,
180 wait_for_ready: Optional[bool] =
None,
181 timeout: Optional[float] = _DEFAULT_TIMEOUT,
182 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] =
None
184 """Invokes a unary-unary RPC without an explicitly specified channel.
186 THIS IS AN EXPERIMENTAL API.
188 This is backed by a per-process cache of channels. Channels are evicted
189 from the cache after a fixed period by a background. Channels will also be
190 evicted if more than a configured maximum accumulate.
192 The default eviction period is 10 minutes. One may set the environment
193 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
195 The default maximum number of channels is 256. One may set the
196 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
200 request: An iterator that yields request values for the RPC.
201 target: The server address.
202 method: The name of the RPC method.
203 request_serializer: Optional :term:`serializer` for serializing the request
204 message. Request goes unserialized in case None is passed.
205 response_deserializer: Optional :term:`deserializer` for deserializing the response
206 message. Response goes undeserialized in case None is passed.
207 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
208 runtime) to configure the channel.
209 channel_credentials: A credential applied to the whole channel, e.g. the
210 return value of grpc.ssl_channel_credentials() or
211 grpc.insecure_channel_credentials().
212 insecure: If True, specifies channel_credentials as
213 :term:`grpc.insecure_channel_credentials()`. This option is mutually
214 exclusive with the `channel_credentials` option.
215 call_credentials: A call credential applied to each call individually,
216 e.g. the output of grpc.metadata_call_credentials() or
217 grpc.access_token_call_credentials().
218 compression: An optional value indicating the compression method to be
219 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
220 wait_for_ready: An optional flag indicating whether the RPC should fail
221 immediately if the connection is not ready at the time the RPC is
222 invoked, or if it should wait until the connection to the server
223 becomes ready. When using this option, the user will likely also want
224 to set a timeout. Defaults to True.
225 timeout: An optional duration of time in seconds to allow for the RPC,
226 after which an exception will be raised. If timeout is unspecified,
227 defaults to a timeout controlled by the
228 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
229 unset, defaults to 60 seconds. Supply a value of None to indicate that
230 no timeout should be enforced.
231 metadata: Optional metadata to send to the server.
234 The response to the RPC.
236 channel = ChannelCache.get().get_channel(target, options,
237 channel_credentials, insecure,
239 multicallable = channel.unary_unary(method, request_serializer,
240 response_deserializer)
241 wait_for_ready = wait_for_ready
if wait_for_ready
is not None else True
242 return multicallable(request,
244 wait_for_ready=wait_for_ready,
245 credentials=call_credentials,
251 request: RequestType,
254 request_serializer: Optional[Callable[[Any], bytes]] =
None,
255 response_deserializer: Optional[Callable[[bytes], Any]] =
None,
256 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
258 insecure: bool =
False,
261 wait_for_ready: Optional[bool] =
None,
262 timeout: Optional[float] = _DEFAULT_TIMEOUT,
263 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] =
None
264 ) -> Iterator[ResponseType]:
265 """Invokes a unary-stream RPC without an explicitly specified channel.
267 THIS IS AN EXPERIMENTAL API.
269 This is backed by a per-process cache of channels. Channels are evicted
270 from the cache after a fixed period by a background. Channels will also be
271 evicted if more than a configured maximum accumulate.
273 The default eviction period is 10 minutes. One may set the environment
274 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
276 The default maximum number of channels is 256. One may set the
277 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
281 request: An iterator that yields request values for the RPC.
282 target: The server address.
283 method: The name of the RPC method.
284 request_serializer: Optional :term:`serializer` for serializing the request
285 message. Request goes unserialized in case None is passed.
286 response_deserializer: Optional :term:`deserializer` for deserializing the response
287 message. Response goes undeserialized in case None is passed.
288 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
289 runtime) to configure the channel.
290 channel_credentials: A credential applied to the whole channel, e.g. the
291 return value of grpc.ssl_channel_credentials().
292 insecure: If True, specifies channel_credentials as
293 :term:`grpc.insecure_channel_credentials()`. This option is mutually
294 exclusive with the `channel_credentials` option.
295 call_credentials: A call credential applied to each call individually,
296 e.g. the output of grpc.metadata_call_credentials() or
297 grpc.access_token_call_credentials().
298 compression: An optional value indicating the compression method to be
299 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
300 wait_for_ready: An optional flag indicating whether the RPC should fail
301 immediately if the connection is not ready at the time the RPC is
302 invoked, or if it should wait until the connection to the server
303 becomes ready. When using this option, the user will likely also want
304 to set a timeout. Defaults to True.
305 timeout: An optional duration of time in seconds to allow for the RPC,
306 after which an exception will be raised. If timeout is unspecified,
307 defaults to a timeout controlled by the
308 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
309 unset, defaults to 60 seconds. Supply a value of None to indicate that
310 no timeout should be enforced.
311 metadata: Optional metadata to send to the server.
314 An iterator of responses.
316 channel = ChannelCache.get().get_channel(target, options,
317 channel_credentials, insecure,
319 multicallable = channel.unary_stream(method, request_serializer,
320 response_deserializer)
321 wait_for_ready = wait_for_ready
if wait_for_ready
is not None else True
322 return multicallable(request,
324 wait_for_ready=wait_for_ready,
325 credentials=call_credentials,
331 request_iterator: Iterator[RequestType],
334 request_serializer: Optional[Callable[[Any], bytes]] =
None,
335 response_deserializer: Optional[Callable[[bytes], Any]] =
None,
336 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
338 insecure: bool =
False,
341 wait_for_ready: Optional[bool] =
None,
342 timeout: Optional[float] = _DEFAULT_TIMEOUT,
343 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] =
None
345 """Invokes a stream-unary RPC without an explicitly specified channel.
347 THIS IS AN EXPERIMENTAL API.
349 This is backed by a per-process cache of channels. Channels are evicted
350 from the cache after a fixed period by a background. Channels will also be
351 evicted if more than a configured maximum accumulate.
353 The default eviction period is 10 minutes. One may set the environment
354 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
356 The default maximum number of channels is 256. One may set the
357 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
361 request_iterator: An iterator that yields request values for the RPC.
362 target: The server address.
363 method: The name of the RPC method.
364 request_serializer: Optional :term:`serializer` for serializing the request
365 message. Request goes unserialized in case None is passed.
366 response_deserializer: Optional :term:`deserializer` for deserializing the response
367 message. Response goes undeserialized in case None is passed.
368 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
369 runtime) to configure the channel.
370 channel_credentials: A credential applied to the whole channel, e.g. the
371 return value of grpc.ssl_channel_credentials().
372 call_credentials: A call credential applied to each call individually,
373 e.g. the output of grpc.metadata_call_credentials() or
374 grpc.access_token_call_credentials().
375 insecure: If True, specifies channel_credentials as
376 :term:`grpc.insecure_channel_credentials()`. This option is mutually
377 exclusive with the `channel_credentials` option.
378 compression: An optional value indicating the compression method to be
379 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
380 wait_for_ready: An optional flag indicating whether the RPC should fail
381 immediately if the connection is not ready at the time the RPC is
382 invoked, or if it should wait until the connection to the server
383 becomes ready. When using this option, the user will likely also want
384 to set a timeout. Defaults to True.
385 timeout: An optional duration of time in seconds to allow for the RPC,
386 after which an exception will be raised. If timeout is unspecified,
387 defaults to a timeout controlled by the
388 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
389 unset, defaults to 60 seconds. Supply a value of None to indicate that
390 no timeout should be enforced.
391 metadata: Optional metadata to send to the server.
394 The response to the RPC.
396 channel = ChannelCache.get().get_channel(target, options,
397 channel_credentials, insecure,
399 multicallable = channel.stream_unary(method, request_serializer,
400 response_deserializer)
401 wait_for_ready = wait_for_ready
if wait_for_ready
is not None else True
402 return multicallable(request_iterator,
404 wait_for_ready=wait_for_ready,
405 credentials=call_credentials,
411 request_iterator: Iterator[RequestType],
414 request_serializer: Optional[Callable[[Any], bytes]] =
None,
415 response_deserializer: Optional[Callable[[bytes], Any]] =
None,
416 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
418 insecure: bool =
False,
421 wait_for_ready: Optional[bool] =
None,
422 timeout: Optional[float] = _DEFAULT_TIMEOUT,
423 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] =
None
424 ) -> Iterator[ResponseType]:
425 """Invokes a stream-stream RPC without an explicitly specified channel.
427 THIS IS AN EXPERIMENTAL API.
429 This is backed by a per-process cache of channels. Channels are evicted
430 from the cache after a fixed period by a background. Channels will also be
431 evicted if more than a configured maximum accumulate.
433 The default eviction period is 10 minutes. One may set the environment
434 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
436 The default maximum number of channels is 256. One may set the
437 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
441 request_iterator: An iterator that yields request values for the RPC.
442 target: The server address.
443 method: The name of the RPC method.
444 request_serializer: Optional :term:`serializer` for serializing the request
445 message. Request goes unserialized in case None is passed.
446 response_deserializer: Optional :term:`deserializer` for deserializing the response
447 message. Response goes undeserialized in case None is passed.
448 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
449 runtime) to configure the channel.
450 channel_credentials: A credential applied to the whole channel, e.g. the
451 return value of grpc.ssl_channel_credentials().
452 call_credentials: A call credential applied to each call individually,
453 e.g. the output of grpc.metadata_call_credentials() or
454 grpc.access_token_call_credentials().
455 insecure: If True, specifies channel_credentials as
456 :term:`grpc.insecure_channel_credentials()`. This option is mutually
457 exclusive with the `channel_credentials` option.
458 compression: An optional value indicating the compression method to be
459 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
460 wait_for_ready: An optional flag indicating whether the RPC should fail
461 immediately if the connection is not ready at the time the RPC is
462 invoked, or if it should wait until the connection to the server
463 becomes ready. When using this option, the user will likely also want
464 to set a timeout. Defaults to True.
465 timeout: An optional duration of time in seconds to allow for the RPC,
466 after which an exception will be raised. If timeout is unspecified,
467 defaults to a timeout controlled by the
468 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
469 unset, defaults to 60 seconds. Supply a value of None to indicate that
470 no timeout should be enforced.
471 metadata: Optional metadata to send to the server.
474 An iterator of responses.
476 channel = ChannelCache.get().get_channel(target, options,
477 channel_credentials, insecure,
479 multicallable = channel.stream_stream(method, request_serializer,
480 response_deserializer)
481 wait_for_ready = wait_for_ready
if wait_for_ready
is not None else True
482 return multicallable(request_iterator,
484 wait_for_ready=wait_for_ready,
485 credentials=call_credentials,