14 """Invocation-side implementation of gRPC Asyncio Python."""
18 from typing
import Any, Iterable, List, Optional, Sequence
21 from grpc
import _common
22 from grpc
import _compression
23 from grpc
import _grpcio_metadata
26 from .
import _base_call
27 from .
import _base_channel
28 from ._call
import StreamStreamCall
29 from ._call
import StreamUnaryCall
30 from ._call
import UnaryStreamCall
31 from ._call
import UnaryUnaryCall
32 from ._interceptor
import ClientInterceptor
33 from ._interceptor
import InterceptedStreamStreamCall
34 from ._interceptor
import InterceptedStreamUnaryCall
35 from ._interceptor
import InterceptedUnaryStreamCall
36 from ._interceptor
import InterceptedUnaryUnaryCall
37 from ._interceptor
import StreamStreamClientInterceptor
38 from ._interceptor
import StreamUnaryClientInterceptor
39 from ._interceptor
import UnaryStreamClientInterceptor
40 from ._interceptor
import UnaryUnaryClientInterceptor
41 from ._metadata
import Metadata
42 from ._typing
import ChannelArgumentType
43 from ._typing
import DeserializingFunction
44 from ._typing
import RequestIterableType
45 from ._typing
import SerializingFunction
46 from ._utils
import _timeout_to_deadline
48 _USER_AGENT =
'grpc-python-asyncio/{}'.
format(_grpcio_metadata.__version__)
50 if sys.version_info[1] < 7:
53 return asyncio.Task.all_tasks()
57 return asyncio.all_tasks()
62 compression_channel_argument = _compression.create_channel_option(
64 user_agent_channel_argument = ((
65 cygrpc.ChannelArgKey.primary_user_agent_string,
68 return tuple(base_options
69 ) + compression_channel_argument + user_agent_channel_argument
73 """Base class of all multi callable objects.
75 Handles the initialization logic and stores common attributes.
77 _loop: asyncio.AbstractEventLoop
78 _channel: cygrpc.AioChannel
80 _request_serializer: SerializingFunction
81 _response_deserializer: DeserializingFunction
82 _interceptors: Optional[Sequence[ClientInterceptor]]
83 _references: List[Any]
84 _loop: asyncio.AbstractEventLoop
89 channel: cygrpc.AioChannel,
91 request_serializer: SerializingFunction,
92 response_deserializer: DeserializingFunction,
93 interceptors: Optional[Sequence[ClientInterceptor]],
94 references: List[Any],
95 loop: asyncio.AbstractEventLoop,
107 metadata: Optional[Metadata] =
None,
109 """Based on the provided values for <metadata> or <compression> initialise the final
110 metadata, as it should be used for the current call.
115 *_compression.augment_metadata(metadata, compression))
126 timeout: Optional[float] =
None,
127 metadata: Optional[Metadata] =
None,
129 wait_for_ready: Optional[bool] =
None,
136 metadata, credentials, wait_for_ready,
157 timeout: Optional[float] =
None,
158 metadata: Optional[Metadata] =
None,
160 wait_for_ready: Optional[bool] =
None,
174 self.
_interceptors, request, deadline, metadata, credentials,
187 request_iterator: Optional[RequestIterableType] =
None,
188 timeout: Optional[float] =
None,
189 metadata: Optional[Metadata] =
None,
191 wait_for_ready: Optional[bool] =
None,
200 credentials, wait_for_ready, self.
_channel,
218 request_iterator: Optional[RequestIterableType] =
None,
219 timeout: Optional[float] =
None,
220 metadata: Optional[Metadata] =
None,
222 wait_for_ready: Optional[bool] =
None,
231 credentials, wait_for_ready, self.
_channel,
245 _loop: asyncio.AbstractEventLoop
246 _channel: cygrpc.AioChannel
247 _unary_unary_interceptors: List[UnaryUnaryClientInterceptor]
248 _unary_stream_interceptors: List[UnaryStreamClientInterceptor]
249 _stream_unary_interceptors: List[StreamUnaryClientInterceptor]
250 _stream_stream_interceptors: List[StreamStreamClientInterceptor]
252 def __init__(self, target: str, options: ChannelArgumentType,
255 interceptors: Optional[Sequence[ClientInterceptor]]):
259 target: The target to which to connect.
260 options: Configuration options for the channel.
261 credentials: A cygrpc.ChannelCredentials or None.
262 compression: An optional value indicating the compression method to be
263 used over the lifetime of the channel.
264 interceptors: An optional list of interceptors that would be used for
265 intercepting any RPC executed with that channel.
272 if interceptors
is not None:
273 for interceptor
in interceptors:
274 if isinstance(interceptor, UnaryUnaryClientInterceptor):
276 elif isinstance(interceptor, UnaryStreamClientInterceptor):
278 elif isinstance(interceptor, StreamUnaryClientInterceptor):
280 elif isinstance(interceptor, StreamStreamClientInterceptor):
284 "Interceptor {} must be ".
format(interceptor) +
285 "{} or ".
format(UnaryUnaryClientInterceptor.__name__) +
286 "{} or ".
format(UnaryStreamClientInterceptor.__name__) +
287 "{} or ".
format(StreamUnaryClientInterceptor.__name__) +
288 "{}. ".
format(StreamStreamClientInterceptor.__name__))
290 self.
_loop = cygrpc.get_working_loop()
292 _common.encode(target),
315 stack = task.get_stack(limit=1)
316 except AttributeError
as attribute_error:
331 if 'frame' in str(attribute_error):
342 candidate = frame.f_locals.get(
'self')
345 if hasattr(candidate,
'_channel'):
347 if candidate._channel
is not self.
_channel:
349 elif hasattr(candidate,
'_cython_call'):
351 if candidate._cython_call._channel
is not self.
_channel:
355 raise cygrpc.InternalError(
356 f
'Unrecognized call object: {candidate}')
358 calls.append(candidate)
359 call_tasks.append(task)
363 if grace
and call_tasks:
364 await asyncio.wait(call_tasks, timeout=grace)
373 async
def close(self, grace: Optional[float] =
None):
377 if hasattr(self,
'_channel'):
383 result = self.
_channel.check_connectivity_state(try_to_connect)
384 return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result]
388 last_observed_state: grpc.ChannelConnectivity,
390 assert await self.
_channel.watch_connectivity_state(
391 last_observed_state.value[0],
None)
394 state = self.
get_state(try_to_connect=
True)
395 while state != grpc.ChannelConnectivity.READY:
397 state = self.
get_state(try_to_connect=
True)
402 request_serializer: Optional[SerializingFunction] =
None,
403 response_deserializer: Optional[DeserializingFunction] =
None
404 ) -> UnaryUnaryMultiCallable:
407 response_deserializer,
414 request_serializer: Optional[SerializingFunction] =
None,
415 response_deserializer: Optional[DeserializingFunction] =
None
416 ) -> UnaryStreamMultiCallable:
419 response_deserializer,
426 request_serializer: Optional[SerializingFunction] =
None,
427 response_deserializer: Optional[DeserializingFunction] =
None
428 ) -> StreamUnaryMultiCallable:
431 response_deserializer,
438 request_serializer: Optional[SerializingFunction] =
None,
439 response_deserializer: Optional[DeserializingFunction] =
None
440 ) -> StreamStreamMultiCallable:
443 response_deserializer,
450 options: Optional[ChannelArgumentType] =
None,
452 interceptors: Optional[Sequence[ClientInterceptor]] =
None):
453 """Creates an insecure asynchronous Channel to a server.
456 target: The server address
457 options: An optional list of key-value pairs (:term:`channel_arguments`
458 in gRPC Core runtime) to configure the channel.
459 compression: An optional value indicating the compression method to be
460 used over the lifetime of the channel. This is an EXPERIMENTAL option.
461 interceptors: An optional sequence of interceptors that will be executed for
462 any call executed with this channel.
467 return Channel(target, ()
if options
is None else options,
None,
468 compression, interceptors)
473 options: Optional[ChannelArgumentType] =
None,
475 interceptors: Optional[Sequence[ClientInterceptor]] =
None):
476 """Creates a secure asynchronous Channel to a server.
479 target: The server address.
480 credentials: A ChannelCredentials instance.
481 options: An optional list of key-value pairs (:term:`channel_arguments`
482 in gRPC Core runtime) to configure the channel.
483 compression: An optional value indicating the compression method to be
484 used over the lifetime of the channel. This is an EXPERIMENTAL option.
485 interceptors: An optional sequence of interceptors that will be executed for
486 any call executed with this channel.
491 return Channel(target, ()
if options
is None else options,
492 credentials._credentials, compression, interceptors)