client_unary_stream_interceptor_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import asyncio
15 import datetime
16 import logging
17 import unittest
18 
19 import grpc
20 from grpc.experimental import aio
21 
22 from src.proto.grpc.testing import messages_pb2
23 from src.proto.grpc.testing import test_pb2_grpc
24 from tests.unit.framework.common import test_constants
25 from tests_aio.unit._common import CountingResponseIterator
26 from tests_aio.unit._common import inject_callbacks
27 from tests_aio.unit._constants import UNREACHABLE_TARGET
28 from tests_aio.unit._test_base import AioTestBase
29 from tests_aio.unit._test_server import start_test_server
30 
31 _SHORT_TIMEOUT_S = 1.0
32 
33 _NUM_STREAM_RESPONSES = 5
34 _REQUEST_PAYLOAD_SIZE = 7
35 _RESPONSE_PAYLOAD_SIZE = 7
36 _RESPONSE_INTERVAL_US = int(_SHORT_TIMEOUT_S * 1000 * 1000)
37 
38 
39 class _UnaryStreamInterceptorEmpty(aio.UnaryStreamClientInterceptor):
40 
41  async def intercept_unary_stream(self, continuation, client_call_details,
42  request):
43  return await continuation(client_call_details, request)
44 
45  def assert_in_final_state(self, test: unittest.TestCase):
46  pass
47 
48 
50  aio.UnaryStreamClientInterceptor):
51 
52  async def intercept_unary_stream(self, continuation, client_call_details,
53  request):
54  call = await continuation(client_call_details, request)
56  return self.response_iterator
57 
58  def assert_in_final_state(self, test: unittest.TestCase):
59  test.assertEqual(_NUM_STREAM_RESPONSES,
60  self.response_iterator.response_cnt)
61 
62 
64 
65  async def setUp(self):
66  self._server_target, self._server = await start_test_server()
67 
68  async def tearDown(self):
69  await self._server.stop(None)
70 
71  async def test_intercepts(self):
72  for interceptor_class in (_UnaryStreamInterceptorEmpty,
73  _UnaryStreamInterceptorWithResponseIterator):
74 
75  with self.subTest(name=interceptor_class):
76  interceptor = interceptor_class()
77 
79  request.response_parameters.extend([
80  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)
81  ] * _NUM_STREAM_RESPONSES)
82 
83  channel = aio.insecure_channel(self._server_target,
84  interceptors=[interceptor])
85  stub = test_pb2_grpc.TestServiceStub(channel)
86  call = stub.StreamingOutputCall(request)
87 
88  await call.wait_for_connection()
89 
90  response_cnt = 0
91  async for response in call:
92  response_cnt += 1
93  self.assertIs(type(response),
94  messages_pb2.StreamingOutputCallResponse)
95  self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
96  len(response.payload.body))
97 
98  self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
99  self.assertEqual(await call.code(), grpc.StatusCode.OK)
100  self.assertEqual(await call.initial_metadata(), aio.Metadata())
101  self.assertEqual(await call.trailing_metadata(), aio.Metadata())
102  self.assertEqual(await call.details(), '')
103  self.assertEqual(await call.debug_error_string(), '')
104  self.assertEqual(call.cancel(), False)
105  self.assertEqual(call.cancelled(), False)
106  self.assertEqual(call.done(), True)
107 
108  interceptor.assert_in_final_state(self)
109 
110  await channel.close()
111 
112  async def test_add_done_callback_interceptor_task_not_finished(self):
113  for interceptor_class in (_UnaryStreamInterceptorEmpty,
114  _UnaryStreamInterceptorWithResponseIterator):
115 
116  with self.subTest(name=interceptor_class):
117  interceptor = interceptor_class()
118 
120  request.response_parameters.extend([
121  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)
122  ] * _NUM_STREAM_RESPONSES)
123 
124  channel = aio.insecure_channel(self._server_target,
125  interceptors=[interceptor])
126  stub = test_pb2_grpc.TestServiceStub(channel)
127  call = stub.StreamingOutputCall(request)
128 
129  validation = inject_callbacks(call)
130 
131  async for response in call:
132  pass
133 
134  await validation
135 
136  await channel.close()
137 
138  async def test_add_done_callback_interceptor_task_finished(self):
139  for interceptor_class in (_UnaryStreamInterceptorEmpty,
140  _UnaryStreamInterceptorWithResponseIterator):
141 
142  with self.subTest(name=interceptor_class):
143  interceptor = interceptor_class()
144 
146  request.response_parameters.extend([
147  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)
148  ] * _NUM_STREAM_RESPONSES)
149 
150  channel = aio.insecure_channel(self._server_target,
151  interceptors=[interceptor])
152  stub = test_pb2_grpc.TestServiceStub(channel)
153  call = stub.StreamingOutputCall(request)
154 
155  # This ensures that the callbacks will be registered
156  # with the intercepted call rather than saving in the
157  # pending state list.
158  await call.wait_for_connection()
159 
160  validation = inject_callbacks(call)
161 
162  async for response in call:
163  pass
164 
165  await validation
166 
167  await channel.close()
168 
169  async def test_response_iterator_using_read(self):
171 
172  channel = aio.insecure_channel(self._server_target,
173  interceptors=[interceptor])
174  stub = test_pb2_grpc.TestServiceStub(channel)
175 
177  request.response_parameters.extend(
178  [messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)] *
179  _NUM_STREAM_RESPONSES)
180 
181  call = stub.StreamingOutputCall(request)
182 
183  response_cnt = 0
184  for response in range(_NUM_STREAM_RESPONSES):
185  response = await call.read()
186  response_cnt += 1
187  self.assertIs(type(response),
188  messages_pb2.StreamingOutputCallResponse)
189  self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
190 
191  self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
192  self.assertEqual(interceptor.response_iterator.response_cnt,
193  _NUM_STREAM_RESPONSES)
194  self.assertEqual(await call.code(), grpc.StatusCode.OK)
195 
196  await channel.close()
197 
198  async def test_multiple_interceptors_response_iterator(self):
199  for interceptor_class in (_UnaryStreamInterceptorEmpty,
200  _UnaryStreamInterceptorWithResponseIterator):
201 
202  with self.subTest(name=interceptor_class):
203 
204  interceptors = [interceptor_class(), interceptor_class()]
205 
206  channel = aio.insecure_channel(self._server_target,
207  interceptors=interceptors)
208  stub = test_pb2_grpc.TestServiceStub(channel)
209 
211  request.response_parameters.extend([
212  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)
213  ] * _NUM_STREAM_RESPONSES)
214 
215  call = stub.StreamingOutputCall(request)
216 
217  response_cnt = 0
218  async for response in call:
219  response_cnt += 1
220  self.assertIs(type(response),
221  messages_pb2.StreamingOutputCallResponse)
222  self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
223  len(response.payload.body))
224 
225  self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
226  self.assertEqual(await call.code(), grpc.StatusCode.OK)
227 
228  await channel.close()
229 
230  async def test_intercepts_response_iterator_rpc_error(self):
231  for interceptor_class in (_UnaryStreamInterceptorEmpty,
232  _UnaryStreamInterceptorWithResponseIterator):
233 
234  with self.subTest(name=interceptor_class):
235 
236  channel = aio.insecure_channel(
237  UNREACHABLE_TARGET, interceptors=[interceptor_class()])
239  stub = test_pb2_grpc.TestServiceStub(channel)
240  call = stub.StreamingOutputCall(request)
241 
242  with self.assertRaises(aio.AioRpcError) as exception_context:
243  async for response in call:
244  pass
245 
246  self.assertEqual(grpc.StatusCode.UNAVAILABLE,
247  exception_context.exception.code())
248 
249  self.assertTrue(call.done())
250  self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code())
251  await channel.close()
252 
253  async def test_cancel_before_rpc(self):
254 
255  interceptor_reached = asyncio.Event()
256  wait_for_ever = self.loop.create_future()
257 
258  class Interceptor(aio.UnaryStreamClientInterceptor):
259 
260  async def intercept_unary_stream(self, continuation,
261  client_call_details, request):
262  interceptor_reached.set()
263  await wait_for_ever
264 
265  channel = aio.insecure_channel(UNREACHABLE_TARGET,
266  interceptors=[Interceptor()])
268  stub = test_pb2_grpc.TestServiceStub(channel)
269  call = stub.StreamingOutputCall(request)
270 
271  self.assertFalse(call.cancelled())
272  self.assertFalse(call.done())
273 
274  await interceptor_reached.wait()
275  self.assertTrue(call.cancel())
276 
277  with self.assertRaises(asyncio.CancelledError):
278  async for response in call:
279  pass
280 
281  self.assertTrue(call.cancelled())
282  self.assertTrue(call.done())
283  self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
284  self.assertEqual(await call.initial_metadata(), None)
285  self.assertEqual(await call.trailing_metadata(), None)
286  await channel.close()
287 
288  async def test_cancel_after_rpc(self):
289 
290  interceptor_reached = asyncio.Event()
291  wait_for_ever = self.loop.create_future()
292 
293  class Interceptor(aio.UnaryStreamClientInterceptor):
294 
295  async def intercept_unary_stream(self, continuation,
296  client_call_details, request):
297  call = await continuation(client_call_details, request)
298  interceptor_reached.set()
299  await wait_for_ever
300 
301  channel = aio.insecure_channel(UNREACHABLE_TARGET,
302  interceptors=[Interceptor()])
304  stub = test_pb2_grpc.TestServiceStub(channel)
305  call = stub.StreamingOutputCall(request)
306 
307  self.assertFalse(call.cancelled())
308  self.assertFalse(call.done())
309 
310  await interceptor_reached.wait()
311  self.assertTrue(call.cancel())
312 
313  with self.assertRaises(asyncio.CancelledError):
314  async for response in call:
315  pass
316 
317  self.assertTrue(call.cancelled())
318  self.assertTrue(call.done())
319  self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
320  self.assertEqual(await call.initial_metadata(), None)
321  self.assertEqual(await call.trailing_metadata(), None)
322  await channel.close()
323 
324  async def test_cancel_consuming_response_iterator(self):
326  request.response_parameters.extend(
327  [messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)] *
328  _NUM_STREAM_RESPONSES)
329 
330  channel = aio.insecure_channel(
331  self._server_target,
333  stub = test_pb2_grpc.TestServiceStub(channel)
334  call = stub.StreamingOutputCall(request)
335 
336  with self.assertRaises(asyncio.CancelledError):
337  async for response in call:
338  call.cancel()
339 
340  self.assertTrue(call.cancelled())
341  self.assertTrue(call.done())
342  self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
343  await channel.close()
344 
345  async def test_cancel_by_the_interceptor(self):
346 
347  class Interceptor(aio.UnaryStreamClientInterceptor):
348 
349  async def intercept_unary_stream(self, continuation,
350  client_call_details, request):
351  call = await continuation(client_call_details, request)
352  call.cancel()
353  return call
354 
355  channel = aio.insecure_channel(UNREACHABLE_TARGET,
356  interceptors=[Interceptor()])
358  stub = test_pb2_grpc.TestServiceStub(channel)
359  call = stub.StreamingOutputCall(request)
360 
361  with self.assertRaises(asyncio.CancelledError):
362  async for response in call:
363  pass
364 
365  self.assertTrue(call.cancelled())
366  self.assertTrue(call.done())
367  self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
368  await channel.close()
369 
370  async def test_exception_raised_by_interceptor(self):
371 
372  class InterceptorException(Exception):
373  pass
374 
375  class Interceptor(aio.UnaryStreamClientInterceptor):
376 
377  async def intercept_unary_stream(self, continuation,
378  client_call_details, request):
379  raise InterceptorException
380 
381  channel = aio.insecure_channel(UNREACHABLE_TARGET,
382  interceptors=[Interceptor()])
384  stub = test_pb2_grpc.TestServiceStub(channel)
385  call = stub.StreamingOutputCall(request)
386 
387  with self.assertRaises(InterceptorException):
388  async for response in call:
389  pass
390 
391  await channel.close()
392 
393 
394 if __name__ == '__main__':
395  logging.basicConfig(level=logging.DEBUG)
396  unittest.main(verbosity=2)
tests_aio.unit.client_unary_stream_interceptor_test._UnaryStreamInterceptorEmpty
Definition: client_unary_stream_interceptor_test.py:39
tests_aio.unit.client_unary_stream_interceptor_test.TestUnaryStreamClientInterceptor
Definition: client_unary_stream_interceptor_test.py:63
tests_aio.unit._test_base.AioTestBase.loop
def loop(self)
Definition: _test_base.py:55
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit._common.CountingResponseIterator
Definition: tests/tests_aio/unit/_common.py:92
tests_aio.unit.client_unary_stream_interceptor_test._UnaryStreamInterceptorWithResponseIterator
Definition: client_unary_stream_interceptor_test.py:50
xds_interop_client.int
int
Definition: xds_interop_client.py:113
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.client_unary_stream_interceptor_test._UnaryStreamInterceptorWithResponseIterator.intercept_unary_stream
def intercept_unary_stream(self, continuation, client_call_details, request)
Definition: client_unary_stream_interceptor_test.py:52
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
tests_aio.unit._common.inject_callbacks
def inject_callbacks(aio.Call call)
Definition: tests/tests_aio/unit/_common.py:48
tests_aio.unit.client_unary_stream_interceptor_test._UnaryStreamInterceptorEmpty.intercept_unary_stream
def intercept_unary_stream(self, continuation, client_call_details, request)
Definition: client_unary_stream_interceptor_test.py:41
tests_aio.unit.client_unary_stream_interceptor_test.TestUnaryStreamClientInterceptor._server
_server
Definition: client_unary_stream_interceptor_test.py:66
tests_aio.unit._common
Definition: tests/tests_aio/unit/_common.py:1
tests_aio.unit.client_unary_stream_interceptor_test.TestUnaryStreamClientInterceptor.setUp
def setUp(self)
Definition: client_unary_stream_interceptor_test.py:65
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_aio.unit._constants
Definition: _constants.py:1
tests_aio.unit.client_unary_stream_interceptor_test._UnaryStreamInterceptorWithResponseIterator.response_iterator
response_iterator
Definition: client_unary_stream_interceptor_test.py:54
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:55