14 """Invocation-side implementation of gRPC Asyncio Python."""
18 from functools
import partial
22 from typing
import AsyncIterable, Optional, Tuple
25 from grpc
import _common
28 from .
import _base_call
29 from ._metadata
import Metadata
30 from ._typing
import DeserializingFunction
31 from ._typing
import DoneCallbackType
32 from ._typing
import MetadatumType
33 from ._typing
import RequestIterableType
34 from ._typing
import RequestType
35 from ._typing
import ResponseType
36 from ._typing
import SerializingFunction
38 __all__ =
'AioRpcError',
'Call',
'UnaryUnaryCall',
'UnaryStreamCall'
40 _LOCAL_CANCELLATION_DETAILS =
'Locally cancelled by application!'
41 _GC_CANCELLATION_DETAILS =
'Cancelled upon garbage collection!'
42 _RPC_ALREADY_FINISHED_DETAILS =
'RPC already finished.'
43 _RPC_HALF_CLOSED_DETAILS =
'RPC is half closed after calling "done_writing".'
44 _API_STYLE_ERROR =
'The iterator and read/write APIs may not be mixed on a single RPC.'
46 _OK_CALL_REPRESENTATION = (
'<{} of RPC that terminated with:\n'
51 _NON_OK_CALL_REPRESENTATION = (
'<{} of RPC that terminated with:\n'
54 '\tdebug_error_string = "{}"\n'
57 _LOGGER = logging.getLogger(__name__)
61 """An implementation of RpcError to be used by the asynchronous API.
63 Raised RpcError is a snapshot of the final status of the RPC, values are
64 determined. Hence, its methods no longer needs to be coroutines.
68 _details: Optional[str]
69 _initial_metadata: Optional[Metadata]
70 _trailing_metadata: Optional[Metadata]
71 _debug_error_string: Optional[str]
74 code: grpc.StatusCode,
75 initial_metadata: Metadata,
76 trailing_metadata: Metadata,
77 details: Optional[str] =
None,
78 debug_error_string: Optional[str] =
None) ->
None:
82 code: The status code with which the RPC has been finalized.
83 details: Optional details explaining the reason of the error.
84 initial_metadata: Optional initial metadata that could be sent by the
86 trailing_metadata: Optional metadata that could be sent by the Server.
96 def code(self) -> grpc.StatusCode:
97 """Accesses the status code sent by the server.
100 The `grpc.StatusCode` status code.
105 """Accesses the details sent by the server.
108 The description of the error.
113 """Accesses the initial metadata sent by the server.
116 The initial metadata received.
121 """Accesses the trailing metadata sent by the server.
124 The trailing metadata received.
129 """Accesses the debug error string sent by the server.
132 The debug error string received.
137 """Assembles the error string for the RPC error."""
138 return _NON_OK_CALL_REPRESENTATION.format(self.__class__.__name__,
150 status: cygrpc.AioRpcStatus) -> AioRpcError:
152 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()],
153 Metadata.from_tuple(initial_metadata),
154 Metadata.from_tuple(status.trailing_metadata()),
155 details=status.details(),
156 debug_error_string=status.debug_error_string(),
161 """Base implementation of client RPC Call object.
163 Implements logic around final status, metadata and cancellation.
165 _loop: asyncio.AbstractEventLoop
167 _cython_call: cygrpc._AioCall
168 _metadata: Tuple[MetadatumType, ...]
169 _request_serializer: SerializingFunction
170 _response_deserializer: DeserializingFunction
172 def __init__(self, cython_call: cygrpc._AioCall, metadata: Metadata,
173 request_serializer: SerializingFunction,
174 response_deserializer: DeserializingFunction,
175 loop: asyncio.AbstractEventLoop) ->
None:
184 if hasattr(self,
'_cython_call'):
186 self.
_cancel(_GC_CANCELLATION_DETAILS)
192 """Forwards the application cancellation reasoning."""
200 return self.
_cancel(_LOCAL_CANCELLATION_DETAILS)
206 cb = partial(callback, self)
214 return Metadata.from_tuple(raw_metadata_tuple)
217 raw_metadata_tuple = (await
219 return Metadata.from_tuple(raw_metadata_tuple)
221 async
def code(self) -> grpc.StatusCode:
223 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code]
233 raise asyncio.CancelledError()
234 code = await self.
code()
235 if code != grpc.StatusCode.OK:
256 _call_response: asyncio.Task
269 """Wait till the ongoing RPC request finishes."""
272 except asyncio.CancelledError:
286 if response
is cygrpc.EOF:
288 raise asyncio.CancelledError()
297 _message_aiter: AsyncIterable[ResponseType]
298 _preparation: asyncio.Task
299 _response_style: _APIStyle
310 raise cygrpc.UsageError(_API_STYLE_ERROR)
320 message = await self.
_read()
321 while message
is not cygrpc.EOF:
323 message = await self.
_read()
334 async
def _read(self) -> ResponseType:
340 raw_response = await self.
_cython_call.receive_serialized_message()
341 except asyncio.CancelledError:
346 if raw_response
is cygrpc.EOF:
349 return _common.deserialize(raw_response,
352 async
def read(self) -> ResponseType:
358 response_message = await self.
_read()
360 if response_message
is cygrpc.EOF:
363 return response_message
367 _metadata_sent: asyncio.Event
368 _done_writing_flag: bool
369 _async_request_poller: Optional[asyncio.Task]
370 _request_style: _APIStyle
373 self, request_iterator: Optional[RequestIterableType]):
378 if request_iterator
is not None:
388 raise cygrpc.UsageError(_API_STYLE_ERROR)
402 self, request_iterator: RequestIterableType) ->
None:
404 if inspect.isasyncgen(request_iterator)
or hasattr(
405 request_iterator,
'__aiter__'):
406 async
for request
in request_iterator:
408 await self.
_write(request)
409 except AioRpcError
as rpc_error:
411 'Exception while consuming the request_iterator: %s',
415 for request
in request_iterator:
417 await self.
_write(request)
418 except AioRpcError
as rpc_error:
420 'Exception while consuming the request_iterator: %s',
429 _LOGGER.debug(
'Client request_iterator raised exception:\n%s',
430 traceback.format_exc())
433 async
def _write(self, request: RequestType) ->
None:
435 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
437 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
443 serialized_request = _common.serialize(request,
446 await self.
_cython_call.send_serialized_message(serialized_request)
447 except cygrpc.InternalError:
449 except asyncio.CancelledError:
463 except asyncio.CancelledError:
468 async
def write(self, request: RequestType) ->
None:
470 await self.
_write(request)
473 """Signal peer that client is done writing.
475 This method is idempotent.
487 """Object for managing unary-unary RPC calls.
489 Returned when an instance of `UnaryUnaryMultiCallable` object is called.
491 _request: RequestType
492 _invocation_task: asyncio.Task
495 def __init__(self, request: RequestType, deadline: Optional[float],
498 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
499 method: bytes, request_serializer: SerializingFunction,
500 response_deserializer: DeserializingFunction,
501 loop: asyncio.AbstractEventLoop) ->
None:
503 channel.call(method, deadline, credentials, wait_for_ready),
504 metadata, request_serializer, response_deserializer, loop)
510 serialized_request = _common.serialize(self.
_request,
519 except asyncio.CancelledError:
524 return _common.deserialize(serialized_response,
536 """Object for managing unary-stream RPC calls.
538 Returned when an instance of `UnaryStreamMultiCallable` object is called.
540 _request: RequestType
541 _send_unary_request_task: asyncio.Task
544 def __init__(self, request: RequestType, deadline: Optional[float],
547 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
548 method: bytes, request_serializer: SerializingFunction,
549 response_deserializer: DeserializingFunction,
550 loop: asyncio.AbstractEventLoop) ->
None:
552 channel.call(method, deadline, credentials, wait_for_ready),
553 metadata, request_serializer, response_deserializer, loop)
560 serialized_request = _common.serialize(self.
_request,
565 except asyncio.CancelledError:
578 """Object for managing stream-unary RPC calls.
580 Returned when an instance of `StreamUnaryMultiCallable` object is called.
584 def __init__(self, request_iterator: Optional[RequestIterableType],
585 deadline: Optional[float], metadata: Metadata,
587 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
588 method: bytes, request_serializer: SerializingFunction,
589 response_deserializer: DeserializingFunction,
590 loop: asyncio.AbstractEventLoop) ->
None:
592 channel.call(method, deadline, credentials, wait_for_ready),
593 metadata, request_serializer, response_deserializer, loop)
602 except asyncio.CancelledError:
607 return _common.deserialize(serialized_response,
615 """Object for managing stream-stream RPC calls.
617 Returned when an instance of `StreamStreamMultiCallable` object is called.
619 _initializer: asyncio.Task
622 def __init__(self, request_iterator: Optional[RequestIterableType],
623 deadline: Optional[float], metadata: Metadata,
625 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
626 method: bytes, request_serializer: SerializingFunction,
627 response_deserializer: DeserializingFunction,
628 loop: asyncio.AbstractEventLoop) ->
None:
630 channel.call(method, deadline, credentials, wait_for_ready),
631 metadata, request_serializer, response_deserializer, loop)
637 """This method prepares the RPC for receiving/sending messages.
639 All other operations around the stream should only happen after the
640 completion of this method.
645 except asyncio.CancelledError: