tests/tests_aio/unit/_common.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import asyncio
16 from typing import AsyncIterable
17 
18 import grpc
19 from grpc.aio._metadata import Metadata
20 from grpc.aio._typing import MetadataKey
21 from grpc.aio._typing import MetadataValue
22 from grpc.aio._typing import MetadatumType
23 from grpc.experimental import aio
24 
25 from tests.unit.framework.common import test_constants
26 
27 ADHOC_METHOD = '/test/AdHoc'
28 
29 
30 def seen_metadata(expected: Metadata, actual: Metadata):
31  return not bool(set(tuple(expected)) - set(tuple(actual)))
32 
33 
34 def seen_metadatum(expected_key: MetadataKey, expected_value: MetadataValue,
35  actual: Metadata) -> bool:
36  obtained = actual[expected_key]
37  return obtained == expected_value
38 
39 
40 async def block_until_certain_state(channel: aio.Channel,
41  expected_state: grpc.ChannelConnectivity):
42  state = channel.get_state()
43  while state != expected_state:
44  await channel.wait_for_state_change(state)
45  state = channel.get_state()
46 
47 
48 def inject_callbacks(call: aio.Call):
49  first_callback_ran = asyncio.Event()
50 
51  def first_callback(call):
52  # Validate that all resopnses have been received
53  # and the call is an end state.
54  assert call.done()
55  first_callback_ran.set()
56 
57  second_callback_ran = asyncio.Event()
58 
59  def second_callback(call):
60  # Validate that all responses have been received
61  # and the call is an end state.
62  assert call.done()
63  second_callback_ran.set()
64 
65  call.add_done_callback(first_callback)
66  call.add_done_callback(second_callback)
67 
68  async def validation():
69  await asyncio.wait_for(
70  asyncio.gather(first_callback_ran.wait(),
71  second_callback_ran.wait()),
72  test_constants.SHORT_TIMEOUT)
73 
74  return validation()
75 
76 
78 
79  def __init__(self, request_iterator):
80  self.request_cnt = 0
81  self._request_iterator = request_iterator
82 
83  async def _forward_requests(self):
84  async for request in self._request_iterator:
85  self.request_cnt += 1
86  yield request
87 
88  def __aiter__(self):
89  return self._forward_requests()
90 
91 
93 
94  def __init__(self, response_iterator):
95  self.response_cnt = 0
96  self._response_iterator = response_iterator
97 
98  async def _forward_responses(self):
99  async for response in self._response_iterator:
100  self.response_cnt += 1
101  yield response
102 
103  def __aiter__(self):
104  return self._forward_responses()
105 
106 
108  """A generic handler to plugin testing server methods on the fly."""
109  _handler: grpc.RpcMethodHandler
110 
111  def __init__(self):
112  self._handler = None
113 
114  def set_adhoc_handler(self, handler: grpc.RpcMethodHandler):
115  self._handler = handler
116 
117  def service(self, handler_call_details):
118  if handler_call_details.method == ADHOC_METHOD:
119  return self._handler
120  else:
121  return None
tests_aio.unit._common.AdhocGenericHandler
Definition: tests/tests_aio/unit/_common.py:107
tests_aio.unit._common.CountingResponseIterator._forward_responses
def _forward_responses(self)
Definition: tests/tests_aio/unit/_common.py:98
tests_aio.unit._common.CountingResponseIterator.__init__
def __init__(self, response_iterator)
Definition: tests/tests_aio/unit/_common.py:94
tests_aio.unit._common.CountingRequestIterator._forward_requests
def _forward_requests(self)
Definition: tests/tests_aio/unit/_common.py:83
tests_aio.unit._common.CountingResponseIterator
Definition: tests/tests_aio/unit/_common.py:92
tests_aio.unit._common.CountingResponseIterator._response_iterator
_response_iterator
Definition: tests/tests_aio/unit/_common.py:96
grpc.aio._metadata
Definition: src/python/grpcio/grpc/aio/_metadata.py:1
tests_aio.unit._common.CountingRequestIterator
Definition: tests/tests_aio/unit/_common.py:77
tests_aio.unit._common.CountingRequestIterator.__init__
def __init__(self, request_iterator)
Definition: tests/tests_aio/unit/_common.py:79
tests_aio.unit._common.CountingRequestIterator.__aiter__
def __aiter__(self)
Definition: tests/tests_aio/unit/_common.py:88
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit._common.seen_metadatum
bool seen_metadatum(MetadataKey expected_key, MetadataValue expected_value, Metadata actual)
Definition: tests/tests_aio/unit/_common.py:34
tests_aio.unit._common.CountingRequestIterator.request_cnt
request_cnt
Definition: tests/tests_aio/unit/_common.py:80
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
tests_aio.unit._common.CountingResponseIterator.__aiter__
def __aiter__(self)
Definition: tests/tests_aio/unit/_common.py:103
grpc.ChannelConnectivity
Definition: src/python/grpcio/grpc/__init__.py:212
tests_aio.unit._common.AdhocGenericHandler._handler
_handler
Definition: tests/tests_aio/unit/_common.py:112
tests_aio.unit._common.AdhocGenericHandler.service
def service(self, handler_call_details)
Definition: tests/tests_aio/unit/_common.py:117
tests_aio.unit._common.inject_callbacks
def inject_callbacks(aio.Call call)
Definition: tests/tests_aio/unit/_common.py:48
tests_aio.unit._common.AdhocGenericHandler.set_adhoc_handler
def set_adhoc_handler(self, grpc.RpcMethodHandler handler)
Definition: tests/tests_aio/unit/_common.py:114
tests_aio.unit._common.CountingRequestIterator._request_iterator
_request_iterator
Definition: tests/tests_aio/unit/_common.py:81
tests_aio.unit._common.CountingResponseIterator.response_cnt
response_cnt
Definition: tests/tests_aio/unit/_common.py:95
tests_aio.unit._common.AdhocGenericHandler.__init__
def __init__(self)
Definition: tests/tests_aio/unit/_common.py:111
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
grpc.aio._typing
Definition: _typing.py:1
tests_aio.unit._common.block_until_certain_state
def block_until_certain_state(aio.Channel channel, grpc.ChannelConnectivity expected_state)
Definition: tests/tests_aio/unit/_common.py:40
tests_aio.unit._common.seen_metadata
def seen_metadata(Metadata expected, Metadata actual)
Definition: tests/tests_aio/unit/_common.py:30
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
grpc.RpcMethodHandler
Definition: src/python/grpcio/grpc/__init__.py:1288


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38