client_stream_stream_interceptor_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 import unittest
16 
17 import grpc
18 from grpc.experimental import aio
19 
20 from src.proto.grpc.testing import messages_pb2
21 from src.proto.grpc.testing import test_pb2_grpc
22 from tests_aio.unit._common import CountingRequestIterator
23 from tests_aio.unit._common import CountingResponseIterator
24 from tests_aio.unit._test_base import AioTestBase
25 from tests_aio.unit._test_server import start_test_server
26 
27 _NUM_STREAM_RESPONSES = 5
28 _NUM_STREAM_REQUESTS = 5
29 _RESPONSE_PAYLOAD_SIZE = 7
30 
31 
32 class _StreamStreamInterceptorEmpty(aio.StreamStreamClientInterceptor):
33 
34  async def intercept_stream_stream(self, continuation, client_call_details,
35  request_iterator):
36  return await continuation(client_call_details, request_iterator)
37 
38  def assert_in_final_state(self, test: unittest.TestCase):
39  pass
40 
41 
43  aio.StreamStreamClientInterceptor):
44 
45  async def intercept_stream_stream(self, continuation, client_call_details,
46  request_iterator):
47  self.request_iterator = CountingRequestIterator(request_iterator)
48  call = await continuation(client_call_details, self.request_iterator)
50  return self.response_iterator
51 
52  def assert_in_final_state(self, test: unittest.TestCase):
53  test.assertEqual(_NUM_STREAM_REQUESTS,
54  self.request_iterator.request_cnt)
55  test.assertEqual(_NUM_STREAM_RESPONSES,
56  self.response_iterator.response_cnt)
57 
58 
60 
61  async def setUp(self):
62  self._server_target, self._server = await start_test_server()
63 
64  async def tearDown(self):
65  await self._server.stop(None)
66 
67  async def test_intercepts(self):
68 
69  for interceptor_class in (
70  _StreamStreamInterceptorEmpty,
71  _StreamStreamInterceptorWithRequestAndResponseIterator):
72 
73  with self.subTest(name=interceptor_class):
74  interceptor = interceptor_class()
75  channel = aio.insecure_channel(self._server_target,
76  interceptors=[interceptor])
77  stub = test_pb2_grpc.TestServiceStub(channel)
78 
79  # Prepares the request
81  request.response_parameters.append(
83  size=_RESPONSE_PAYLOAD_SIZE))
84 
85  async def request_iterator():
86  for _ in range(_NUM_STREAM_REQUESTS):
87  yield request
88 
89  call = stub.FullDuplexCall(request_iterator())
90 
91  await call.wait_for_connection()
92 
93  response_cnt = 0
94  async for response in call:
95  response_cnt += 1
96  self.assertIsInstance(
97  response, messages_pb2.StreamingOutputCallResponse)
98  self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
99  len(response.payload.body))
100 
101  self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
102  self.assertEqual(await call.code(), grpc.StatusCode.OK)
103  self.assertEqual(await call.initial_metadata(), aio.Metadata())
104  self.assertEqual(await call.trailing_metadata(), aio.Metadata())
105  self.assertEqual(await call.details(), '')
106  self.assertEqual(await call.debug_error_string(), '')
107  self.assertEqual(call.cancel(), False)
108  self.assertEqual(call.cancelled(), False)
109  self.assertEqual(call.done(), True)
110 
111  interceptor.assert_in_final_state(self)
112 
113  await channel.close()
114 
115  async def test_intercepts_using_write_and_read(self):
116  for interceptor_class in (
117  _StreamStreamInterceptorEmpty,
118  _StreamStreamInterceptorWithRequestAndResponseIterator):
119 
120  with self.subTest(name=interceptor_class):
121  interceptor = interceptor_class()
122  channel = aio.insecure_channel(self._server_target,
123  interceptors=[interceptor])
124  stub = test_pb2_grpc.TestServiceStub(channel)
125 
126  # Prepares the request
128  request.response_parameters.append(
130  size=_RESPONSE_PAYLOAD_SIZE))
131 
132  call = stub.FullDuplexCall()
133 
134  for _ in range(_NUM_STREAM_RESPONSES):
135  await call.write(request)
136  response = await call.read()
137  self.assertIsInstance(
138  response, messages_pb2.StreamingOutputCallResponse)
139  self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
140  len(response.payload.body))
141 
142  await call.done_writing()
143 
144  self.assertEqual(await call.code(), grpc.StatusCode.OK)
145  self.assertEqual(await call.initial_metadata(), aio.Metadata())
146  self.assertEqual(await call.trailing_metadata(), aio.Metadata())
147  self.assertEqual(await call.details(), '')
148  self.assertEqual(await call.debug_error_string(), '')
149  self.assertEqual(call.cancel(), False)
150  self.assertEqual(call.cancelled(), False)
151  self.assertEqual(call.done(), True)
152 
153  interceptor.assert_in_final_state(self)
154 
155  await channel.close()
156 
157  async def test_multiple_interceptors_request_iterator(self):
158  for interceptor_class in (
159  _StreamStreamInterceptorEmpty,
160  _StreamStreamInterceptorWithRequestAndResponseIterator):
161 
162  with self.subTest(name=interceptor_class):
163 
164  interceptors = [interceptor_class(), interceptor_class()]
165  channel = aio.insecure_channel(self._server_target,
166  interceptors=interceptors)
167  stub = test_pb2_grpc.TestServiceStub(channel)
168 
169  # Prepares the request
171  request.response_parameters.append(
173  size=_RESPONSE_PAYLOAD_SIZE))
174 
175  call = stub.FullDuplexCall()
176 
177  for _ in range(_NUM_STREAM_RESPONSES):
178  await call.write(request)
179  response = await call.read()
180  self.assertIsInstance(
181  response, messages_pb2.StreamingOutputCallResponse)
182  self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
183  len(response.payload.body))
184 
185  await call.done_writing()
186 
187  self.assertEqual(await call.code(), grpc.StatusCode.OK)
188  self.assertEqual(await call.initial_metadata(), aio.Metadata())
189  self.assertEqual(await call.trailing_metadata(), aio.Metadata())
190  self.assertEqual(await call.details(), '')
191  self.assertEqual(await call.debug_error_string(), '')
192  self.assertEqual(call.cancel(), False)
193  self.assertEqual(call.cancelled(), False)
194  self.assertEqual(call.done(), True)
195 
196  for interceptor in interceptors:
197  interceptor.assert_in_final_state(self)
198 
199  await channel.close()
200 
201 
202 if __name__ == '__main__':
203  logging.basicConfig(level=logging.DEBUG)
204  unittest.main(verbosity=2)
tests_aio.unit.client_stream_stream_interceptor_test.TestStreamStreamClientInterceptor.setUp
def setUp(self)
Definition: client_stream_stream_interceptor_test.py:61
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.client_stream_stream_interceptor_test._StreamStreamInterceptorEmpty
Definition: client_stream_stream_interceptor_test.py:32
tests_aio.unit._common.CountingResponseIterator
Definition: tests/tests_aio/unit/_common.py:92
tests_aio.unit.client_stream_stream_interceptor_test._StreamStreamInterceptorWithRequestAndResponseIterator.request_iterator
request_iterator
Definition: client_stream_stream_interceptor_test.py:46
tests_aio.unit._common.CountingRequestIterator
Definition: tests/tests_aio/unit/_common.py:77
tests_aio.unit.client_stream_stream_interceptor_test.TestStreamStreamClientInterceptor._server
_server
Definition: client_stream_stream_interceptor_test.py:62
grpc::experimental
Definition: include/grpcpp/channel.h:46
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
tests_aio.unit.client_stream_stream_interceptor_test._StreamStreamInterceptorEmpty.intercept_stream_stream
def intercept_stream_stream(self, continuation, client_call_details, request_iterator)
Definition: client_stream_stream_interceptor_test.py:34
tests_aio.unit.client_stream_stream_interceptor_test._StreamStreamInterceptorWithRequestAndResponseIterator
Definition: client_stream_stream_interceptor_test.py:43
tests_aio.unit._common
Definition: tests/tests_aio/unit/_common.py:1
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests_aio.unit.client_stream_stream_interceptor_test._StreamStreamInterceptorWithRequestAndResponseIterator.response_iterator
response_iterator
Definition: client_stream_stream_interceptor_test.py:48
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_aio.unit.client_stream_stream_interceptor_test.TestStreamStreamClientInterceptor
Definition: client_stream_stream_interceptor_test.py:59
tests_aio.unit.client_stream_stream_interceptor_test._StreamStreamInterceptorWithRequestAndResponseIterator.intercept_stream_stream
def intercept_stream_stream(self, continuation, client_call_details, request_iterator)
Definition: client_stream_stream_interceptor_test.py:45
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:47