14 """Interceptors implementation of gRPC Asyncio Python."""
15 from abc
import ABCMeta
16 from abc
import abstractmethod
20 from typing
import (AsyncIterable, Awaitable, Callable, Iterator, Optional,
26 from .
import _base_call
27 from ._call
import AioRpcError
28 from ._call
import StreamStreamCall
29 from ._call
import StreamUnaryCall
30 from ._call
import UnaryStreamCall
31 from ._call
import UnaryUnaryCall
32 from ._call
import _API_STYLE_ERROR
33 from ._call
import _RPC_ALREADY_FINISHED_DETAILS
34 from ._call
import _RPC_HALF_CLOSED_DETAILS
35 from ._metadata
import Metadata
36 from ._typing
import DeserializingFunction
37 from ._typing
import DoneCallbackType
38 from ._typing
import RequestIterableType
39 from ._typing
import RequestType
40 from ._typing
import ResponseIterableType
41 from ._typing
import ResponseType
42 from ._typing
import SerializingFunction
43 from ._utils
import _timeout_to_deadline
45 _LOCAL_CANCELLATION_DETAILS =
'Locally cancelled by application!'
49 """Affords intercepting incoming RPCs on the service-side.
51 This is an EXPERIMENTAL API.
60 """Intercepts incoming RPCs before handing them over to a handler.
63 continuation: A function that takes a HandlerCallDetails and
64 proceeds to invoke the next interceptor in the chain, if any,
65 or the RPC handler lookup logic, with the call details passed
66 as an argument, and returns an RpcMethodHandler instance if
67 the RPC is considered serviced, or None otherwise.
68 handler_call_details: A HandlerCallDetails describing the RPC.
71 An RpcMethodHandler with which the RPC may be serviced if the
72 interceptor chooses to service this RPC, or None otherwise.
77 collections.namedtuple(
79 (
'method',
'timeout',
'metadata',
'credentials',
'wait_for_ready')),
81 """Describes an RPC to be invoked.
83 This is an EXPERIMENTAL API.
86 method: The method name of the RPC.
87 timeout: An optional duration of time in seconds to allow for the RPC.
88 metadata: Optional metadata to be transmitted to the service-side of
90 credentials: An optional CallCredentials for the RPC.
91 wait_for_ready: This is an EXPERIMENTAL argument. An optional
92 flag to enable :term:`wait_for_ready` mechanism.
96 timeout: Optional[float]
97 metadata: Optional[Metadata]
99 wait_for_ready: Optional[bool]
103 """Base class used for all Aio Client Interceptor classes"""
107 """Affords intercepting unary-unary invocations."""
111 self, continuation: Callable[[ClientCallDetails, RequestType],
113 client_call_details: ClientCallDetails,
114 request: RequestType) -> Union[UnaryUnaryCall, ResponseType]:
115 """Intercepts a unary-unary invocation asynchronously.
118 continuation: A coroutine that proceeds with the invocation by
119 executing the next interceptor in the chain or invoking the
120 actual RPC on the underlying Channel. It is the interceptor's
121 responsibility to call it if it decides to move the RPC forward.
122 The interceptor can use
123 `call = await continuation(client_call_details, request)`
124 to continue with the RPC. `continuation` returns the call to the
126 client_call_details: A ClientCallDetails object describing the
128 request: The request value for the RPC.
131 An object with the RPC response.
134 AioRpcError: Indicating that the RPC terminated with non-OK status.
135 asyncio.CancelledError: Indicating that the RPC was canceled.
140 """Affords intercepting unary-stream invocations."""
144 self, continuation: Callable[[ClientCallDetails, RequestType],
146 client_call_details: ClientCallDetails, request: RequestType
147 ) -> Union[ResponseIterableType, UnaryStreamCall]:
148 """Intercepts a unary-stream invocation asynchronously.
150 The function could return the call object or an asynchronous
151 iterator, in case of being an asyncrhonous iterator this will
152 become the source of the reads done by the caller.
155 continuation: A coroutine that proceeds with the invocation by
156 executing the next interceptor in the chain or invoking the
157 actual RPC on the underlying Channel. It is the interceptor's
158 responsibility to call it if it decides to move the RPC forward.
159 The interceptor can use
160 `call = await continuation(client_call_details, request)`
161 to continue with the RPC. `continuation` returns the call to the
163 client_call_details: A ClientCallDetails object describing the
165 request: The request value for the RPC.
168 The RPC Call or an asynchronous iterator.
171 AioRpcError: Indicating that the RPC terminated with non-OK status.
172 asyncio.CancelledError: Indicating that the RPC was canceled.
177 """Affords intercepting stream-unary invocations."""
182 continuation: Callable[[ClientCallDetails, RequestType],
184 client_call_details: ClientCallDetails,
185 request_iterator: RequestIterableType,
186 ) -> StreamUnaryCall:
187 """Intercepts a stream-unary invocation asynchronously.
189 Within the interceptor the usage of the call methods like `write` or
190 even awaiting the call should be done carefully, since the caller
191 could be expecting an untouched call, for example for start writing
195 continuation: A coroutine that proceeds with the invocation by
196 executing the next interceptor in the chain or invoking the
197 actual RPC on the underlying Channel. It is the interceptor's
198 responsibility to call it if it decides to move the RPC forward.
199 The interceptor can use
200 `call = await continuation(client_call_details, request_iterator)`
201 to continue with the RPC. `continuation` returns the call to the
203 client_call_details: A ClientCallDetails object describing the
205 request_iterator: The request iterator that will produce requests
212 AioRpcError: Indicating that the RPC terminated with non-OK status.
213 asyncio.CancelledError: Indicating that the RPC was canceled.
218 """Affords intercepting stream-stream invocations."""
223 continuation: Callable[[ClientCallDetails, RequestType],
225 client_call_details: ClientCallDetails,
226 request_iterator: RequestIterableType,
227 ) -> Union[ResponseIterableType, StreamStreamCall]:
228 """Intercepts a stream-stream invocation asynchronously.
230 Within the interceptor the usage of the call methods like `write` or
231 even awaiting the call should be done carefully, since the caller
232 could be expecting an untouched call, for example for start writing
235 The function could return the call object or an asynchronous
236 iterator, in case of being an asyncrhonous iterator this will
237 become the source of the reads done by the caller.
240 continuation: A coroutine that proceeds with the invocation by
241 executing the next interceptor in the chain or invoking the
242 actual RPC on the underlying Channel. It is the interceptor's
243 responsibility to call it if it decides to move the RPC forward.
244 The interceptor can use
245 `call = await continuation(client_call_details, request_iterator)`
246 to continue with the RPC. `continuation` returns the call to the
248 client_call_details: A ClientCallDetails object describing the
250 request_iterator: The request iterator that will produce requests
254 The RPC Call or an asynchronous iterator.
257 AioRpcError: Indicating that the RPC terminated with non-OK status.
258 asyncio.CancelledError: Indicating that the RPC was canceled.
263 """Base implementation for all intercepted call arities.
265 Interceptors might have some work to do before the RPC invocation with
266 the capacity of changing the invocation parameters, and some work to do
267 after the RPC invocation with the capacity for accessing to the wrapped
270 It handles also early and later cancellations, when the RPC has not even
271 started and the execution is still held by the interceptors or when the
272 RPC has finished but again the execution is still held by the interceptors.
274 Once the RPC is finally executed, all methods are finally done against the
275 intercepted call, being at the same time the same call returned to the
278 As a base class for all of the interceptors implements the logic around
279 final status, metadata and cancellation.
282 _interceptors_task: asyncio.Task
283 _pending_add_done_callbacks: Sequence[DoneCallbackType]
285 def __init__(self, interceptors_task: asyncio.Task) ->
None:
295 self, interceptors_task: asyncio.Task) ->
None:
300 call_completed =
False
303 call = interceptors_task.result()
305 call_completed =
True
306 except (AioRpcError, asyncio.CancelledError):
307 call_completed =
True
316 call.add_done_callback(callback)
335 except asyncio.CancelledError:
346 except AioRpcError
as err:
347 return err.code() == grpc.StatusCode.CANCELLED
348 except asyncio.CancelledError:
351 return call.cancelled()
359 except (AioRpcError, asyncio.CancelledError):
371 except (AioRpcError, asyncio.CancelledError):
379 call.add_done_callback(callback)
382 raise NotImplementedError()
387 except AioRpcError
as err:
388 return err.initial_metadata()
389 except asyncio.CancelledError:
392 return await call.initial_metadata()
397 except AioRpcError
as err:
398 return err.trailing_metadata()
399 except asyncio.CancelledError:
402 return await call.trailing_metadata()
404 async
def code(self) -> grpc.StatusCode:
407 except AioRpcError
as err:
409 except asyncio.CancelledError:
410 return grpc.StatusCode.CANCELLED
412 return await call.code()
417 except AioRpcError
as err:
419 except asyncio.CancelledError:
420 return _LOCAL_CANCELLATION_DETAILS
422 return await call.details()
427 except AioRpcError
as err:
428 return err.debug_error_string()
429 except asyncio.CancelledError:
432 return await call.debug_error_string()
436 return await call.wait_for_connection()
442 call =
yield from self._interceptors_task.
__await__()
443 response =
yield from call.__await__()
448 _response_aiter: Optional[AsyncIterable[ResponseType]]
456 self) -> ResponseType:
457 call = await self._interceptors_task
458 async
for response
in call:
467 async
def read(self) -> ResponseType:
476 _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]]
477 _write_to_iterator_queue: Optional[asyncio.Queue]
478 _status_code_task: Optional[asyncio.Task]
480 _FINISH_ITERATOR_SENTINEL = object()
483 self, request_iterator: Optional[RequestIterableType]
484 ) -> RequestIterableType:
486 if request_iterator
is None:
497 return request_iterator
500 await self._interceptors_task
504 if value
is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL:
509 call: InterceptedCall):
519 return_when=asyncio.FIRST_COMPLETED)
521 async
def write(self, request: RequestType) ->
None:
526 raise cygrpc.UsageError(_API_STYLE_ERROR)
529 call = await self._interceptors_task
530 except (asyncio.CancelledError, AioRpcError):
531 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
534 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
535 elif call._done_writing_flag:
536 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
541 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
544 """Signal peer that client is done writing.
546 This method is idempotent.
552 raise cygrpc.UsageError(_API_STYLE_ERROR)
555 call = await self._interceptors_task
556 except asyncio.CancelledError:
557 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
560 _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call)
565 """Used for running a `UnaryUnaryCall` wrapped by interceptors.
567 For the `__await__` method is it is proxied to the intercepted call only when
568 the interceptor task is finished.
571 _loop: asyncio.AbstractEventLoop
572 _channel: cygrpc.AioChannel
575 def __init__(self, interceptors: Sequence[UnaryUnaryClientInterceptor],
576 request: RequestType, timeout: Optional[float],
579 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
580 method: bytes, request_serializer: SerializingFunction,
581 response_deserializer: DeserializingFunction,
582 loop: asyncio.AbstractEventLoop) ->
None:
585 interceptors_task = loop.create_task(
586 self.
_invoke(interceptors, method, timeout, metadata, credentials,
587 wait_for_ready, request, request_serializer,
588 response_deserializer))
593 self, interceptors: Sequence[UnaryUnaryClientInterceptor],
594 method: bytes, timeout: Optional[float],
595 metadata: Optional[Metadata],
597 wait_for_ready: Optional[bool], request: RequestType,
598 request_serializer: SerializingFunction,
599 response_deserializer: DeserializingFunction) -> UnaryUnaryCall:
600 """Run the RPC call wrapped in interceptors"""
602 async
def _run_interceptor(
603 interceptors: Iterator[UnaryUnaryClientInterceptor],
604 client_call_details: ClientCallDetails,
607 interceptor =
next(interceptors,
None)
610 continuation = functools.partial(_run_interceptor, interceptors)
612 call_or_response = await interceptor.intercept_unary_unary(
613 continuation, client_call_details, request)
616 return call_or_response
623 client_call_details.metadata,
624 client_call_details.credentials,
625 client_call_details.wait_for_ready, self.
_channel,
626 client_call_details.method, request_serializer,
627 response_deserializer, self.
_loop)
630 credentials, wait_for_ready)
631 return await _run_interceptor(
iter(interceptors), client_call_details,
635 raise NotImplementedError()
640 """Used for running a `UnaryStreamCall` wrapped by interceptors."""
642 _loop: asyncio.AbstractEventLoop
643 _channel: cygrpc.AioChannel
647 def __init__(self, interceptors: Sequence[UnaryStreamClientInterceptor],
648 request: RequestType, timeout: Optional[float],
651 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
652 method: bytes, request_serializer: SerializingFunction,
653 response_deserializer: DeserializingFunction,
654 loop: asyncio.AbstractEventLoop) ->
None:
659 interceptors_task = loop.create_task(
660 self.
_invoke(interceptors, method, timeout, metadata, credentials,
661 wait_for_ready, request, request_serializer,
662 response_deserializer))
667 self, interceptors: Sequence[UnaryUnaryClientInterceptor],
668 method: bytes, timeout: Optional[float],
669 metadata: Optional[Metadata],
671 wait_for_ready: Optional[bool], request: RequestType,
672 request_serializer: SerializingFunction,
673 response_deserializer: DeserializingFunction) -> UnaryStreamCall:
674 """Run the RPC call wrapped in interceptors"""
676 async
def _run_interceptor(
677 interceptors: Iterator[UnaryStreamClientInterceptor],
678 client_call_details: ClientCallDetails,
679 request: RequestType,
682 interceptor =
next(interceptors,
None)
685 continuation = functools.partial(_run_interceptor, interceptors)
687 call_or_response_iterator = await interceptor.intercept_unary_stream(
688 continuation, client_call_details, request)
690 if isinstance(call_or_response_iterator,
696 call_or_response_iterator)
701 client_call_details.metadata,
702 client_call_details.credentials,
703 client_call_details.wait_for_ready, self.
_channel,
704 client_call_details.method, request_serializer,
705 response_deserializer, self.
_loop)
710 credentials, wait_for_ready)
711 return await _run_interceptor(
iter(interceptors), client_call_details,
715 raise NotImplementedError()
719 _InterceptedStreamRequestMixin,
721 """Used for running a `StreamUnaryCall` wrapped by interceptors.
723 For the `__await__` method is it is proxied to the intercepted call only when
724 the interceptor task is finished.
727 _loop: asyncio.AbstractEventLoop
728 _channel: cygrpc.AioChannel
731 def __init__(self, interceptors: Sequence[StreamUnaryClientInterceptor],
732 request_iterator: Optional[RequestIterableType],
733 timeout: Optional[float], metadata: Metadata,
735 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
736 method: bytes, request_serializer: SerializingFunction,
737 response_deserializer: DeserializingFunction,
738 loop: asyncio.AbstractEventLoop) ->
None:
742 interceptors_task = loop.create_task(
743 self.
_invoke(interceptors, method, timeout, metadata, credentials,
744 wait_for_ready, request_iterator, request_serializer,
745 response_deserializer))
750 self, interceptors: Sequence[StreamUnaryClientInterceptor],
751 method: bytes, timeout: Optional[float],
752 metadata: Optional[Metadata],
754 wait_for_ready: Optional[bool],
755 request_iterator: RequestIterableType,
756 request_serializer: SerializingFunction,
757 response_deserializer: DeserializingFunction) -> StreamUnaryCall:
758 """Run the RPC call wrapped in interceptors"""
760 async
def _run_interceptor(
761 interceptors: Iterator[UnaryUnaryClientInterceptor],
762 client_call_details: ClientCallDetails,
763 request_iterator: RequestIterableType
766 interceptor =
next(interceptors,
None)
769 continuation = functools.partial(_run_interceptor, interceptors)
771 return await interceptor.intercept_stream_unary(
772 continuation, client_call_details, request_iterator)
777 client_call_details.metadata,
778 client_call_details.credentials,
779 client_call_details.wait_for_ready, self.
_channel,
780 client_call_details.method, request_serializer,
781 response_deserializer, self.
_loop)
784 credentials, wait_for_ready)
785 return await _run_interceptor(
iter(interceptors), client_call_details,
789 raise NotImplementedError()
793 _InterceptedStreamRequestMixin,
795 """Used for running a `StreamStreamCall` wrapped by interceptors."""
797 _loop: asyncio.AbstractEventLoop
798 _channel: cygrpc.AioChannel
802 def __init__(self, interceptors: Sequence[StreamStreamClientInterceptor],
803 request_iterator: Optional[RequestIterableType],
804 timeout: Optional[float], metadata: Metadata,
806 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
807 method: bytes, request_serializer: SerializingFunction,
808 response_deserializer: DeserializingFunction,
809 loop: asyncio.AbstractEventLoop) ->
None:
815 interceptors_task = loop.create_task(
816 self.
_invoke(interceptors, method, timeout, metadata, credentials,
817 wait_for_ready, request_iterator, request_serializer,
818 response_deserializer))
823 self, interceptors: Sequence[StreamStreamClientInterceptor],
824 method: bytes, timeout: Optional[float],
825 metadata: Optional[Metadata],
827 wait_for_ready: Optional[bool],
828 request_iterator: RequestIterableType,
829 request_serializer: SerializingFunction,
830 response_deserializer: DeserializingFunction) -> StreamStreamCall:
831 """Run the RPC call wrapped in interceptors"""
833 async
def _run_interceptor(
834 interceptors: Iterator[StreamStreamClientInterceptor],
835 client_call_details: ClientCallDetails,
836 request_iterator: RequestIterableType
839 interceptor =
next(interceptors,
None)
842 continuation = functools.partial(_run_interceptor, interceptors)
844 call_or_response_iterator = await interceptor.intercept_stream_stream(
845 continuation, client_call_details, request_iterator)
847 if isinstance(call_or_response_iterator,
853 call_or_response_iterator)
859 client_call_details.metadata,
860 client_call_details.credentials,
861 client_call_details.wait_for_ready, self.
_channel,
862 client_call_details.method, request_serializer,
863 response_deserializer, self.
_loop)
867 credentials, wait_for_ready)
868 return await _run_interceptor(
iter(interceptors), client_call_details,
872 raise NotImplementedError()
876 """Final UnaryUnaryCall class finished with a response."""
877 _response: ResponseType
879 def __init__(self, response: ResponseType) ->
None:
892 raise NotImplementedError()
895 raise NotImplementedError()
903 async
def code(self) -> grpc.StatusCode:
904 return grpc.StatusCode.OK
923 class _StreamCallResponseIterator:
925 _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall]
926 _response_iterator: AsyncIterable[ResponseType]
930 response_iterator: AsyncIterable[ResponseType]) ->
None:
955 async
def code(self) -> grpc.StatusCode:
973 """UnaryStreamCall class wich uses an alternative response iterator."""
975 async
def read(self) -> ResponseType:
978 raise NotImplementedError()
983 """StreamStreamCall class wich uses an alternative response iterator."""
985 async
def read(self) -> ResponseType:
988 raise NotImplementedError()
990 async
def write(self, request: RequestType) ->
None:
994 raise NotImplementedError()
1000 raise NotImplementedError()
1004 return self.
_call._done_writing_flag