22 from src.proto.grpc.testing
import messages_pb2
23 from src.proto.grpc.testing
import test_pb2_grpc
31 _SHORT_TIMEOUT_S = 1.0
33 _NUM_STREAM_RESPONSES = 5
34 _REQUEST_PAYLOAD_SIZE = 7
35 _RESPONSE_PAYLOAD_SIZE = 7
36 _RESPONSE_INTERVAL_US =
int(_SHORT_TIMEOUT_S * 1000 * 1000)
43 return await continuation(client_call_details, request)
45 def assert_in_final_state(self, test: unittest.TestCase):
50 aio.UnaryStreamClientInterceptor):
54 call = await continuation(client_call_details, request)
58 def assert_in_final_state(self, test: unittest.TestCase):
59 test.assertEqual(_NUM_STREAM_RESPONSES,
68 async
def tearDown(self):
71 async
def test_intercepts(self):
72 for interceptor_class
in (_UnaryStreamInterceptorEmpty,
73 _UnaryStreamInterceptorWithResponseIterator):
75 with self.subTest(name=interceptor_class):
76 interceptor = interceptor_class()
79 request.response_parameters.extend([
81 ] * _NUM_STREAM_RESPONSES)
83 channel = aio.insecure_channel(self._server_target,
84 interceptors=[interceptor])
85 stub = test_pb2_grpc.TestServiceStub(channel)
86 call = stub.StreamingOutputCall(request)
88 await call.wait_for_connection()
91 async
for response
in call:
93 self.assertIs(
type(response),
94 messages_pb2.StreamingOutputCallResponse)
95 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
96 len(response.payload.body))
98 self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
99 self.assertEqual(await call.code(), grpc.StatusCode.OK)
100 self.assertEqual(await call.initial_metadata(), aio.Metadata())
101 self.assertEqual(await call.trailing_metadata(), aio.Metadata())
102 self.assertEqual(await call.details(),
'')
103 self.assertEqual(await call.debug_error_string(),
'')
104 self.assertEqual(call.cancel(),
False)
105 self.assertEqual(call.cancelled(),
False)
106 self.assertEqual(call.done(),
True)
108 interceptor.assert_in_final_state(self)
110 await channel.close()
112 async
def test_add_done_callback_interceptor_task_not_finished(self):
113 for interceptor_class
in (_UnaryStreamInterceptorEmpty,
114 _UnaryStreamInterceptorWithResponseIterator):
116 with self.subTest(name=interceptor_class):
117 interceptor = interceptor_class()
120 request.response_parameters.extend([
122 ] * _NUM_STREAM_RESPONSES)
124 channel = aio.insecure_channel(self._server_target,
125 interceptors=[interceptor])
126 stub = test_pb2_grpc.TestServiceStub(channel)
127 call = stub.StreamingOutputCall(request)
131 async
for response
in call:
136 await channel.close()
138 async
def test_add_done_callback_interceptor_task_finished(self):
139 for interceptor_class
in (_UnaryStreamInterceptorEmpty,
140 _UnaryStreamInterceptorWithResponseIterator):
142 with self.subTest(name=interceptor_class):
143 interceptor = interceptor_class()
146 request.response_parameters.extend([
148 ] * _NUM_STREAM_RESPONSES)
150 channel = aio.insecure_channel(self._server_target,
151 interceptors=[interceptor])
152 stub = test_pb2_grpc.TestServiceStub(channel)
153 call = stub.StreamingOutputCall(request)
158 await call.wait_for_connection()
162 async
for response
in call:
167 await channel.close()
169 async
def test_response_iterator_using_read(self):
172 channel = aio.insecure_channel(self._server_target,
173 interceptors=[interceptor])
174 stub = test_pb2_grpc.TestServiceStub(channel)
177 request.response_parameters.extend(
179 _NUM_STREAM_RESPONSES)
181 call = stub.StreamingOutputCall(request)
184 for response
in range(_NUM_STREAM_RESPONSES):
185 response = await call.read()
187 self.assertIs(
type(response),
188 messages_pb2.StreamingOutputCallResponse)
189 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
191 self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
192 self.assertEqual(interceptor.response_iterator.response_cnt,
193 _NUM_STREAM_RESPONSES)
194 self.assertEqual(await call.code(), grpc.StatusCode.OK)
196 await channel.close()
198 async
def test_multiple_interceptors_response_iterator(self):
199 for interceptor_class
in (_UnaryStreamInterceptorEmpty,
200 _UnaryStreamInterceptorWithResponseIterator):
202 with self.subTest(name=interceptor_class):
204 interceptors = [interceptor_class(), interceptor_class()]
206 channel = aio.insecure_channel(self._server_target,
207 interceptors=interceptors)
208 stub = test_pb2_grpc.TestServiceStub(channel)
211 request.response_parameters.extend([
213 ] * _NUM_STREAM_RESPONSES)
215 call = stub.StreamingOutputCall(request)
218 async
for response
in call:
220 self.assertIs(
type(response),
221 messages_pb2.StreamingOutputCallResponse)
222 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
223 len(response.payload.body))
225 self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
226 self.assertEqual(await call.code(), grpc.StatusCode.OK)
228 await channel.close()
230 async
def test_intercepts_response_iterator_rpc_error(self):
231 for interceptor_class
in (_UnaryStreamInterceptorEmpty,
232 _UnaryStreamInterceptorWithResponseIterator):
234 with self.subTest(name=interceptor_class):
236 channel = aio.insecure_channel(
237 UNREACHABLE_TARGET, interceptors=[interceptor_class()])
239 stub = test_pb2_grpc.TestServiceStub(channel)
240 call = stub.StreamingOutputCall(request)
242 with self.assertRaises(aio.AioRpcError)
as exception_context:
243 async
for response
in call:
246 self.assertEqual(grpc.StatusCode.UNAVAILABLE,
247 exception_context.exception.code())
249 self.assertTrue(call.done())
250 self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code())
251 await channel.close()
253 async
def test_cancel_before_rpc(self):
255 interceptor_reached = asyncio.Event()
256 wait_for_ever = self.
loop.create_future()
258 class Interceptor(aio.UnaryStreamClientInterceptor):
260 async
def intercept_unary_stream(self, continuation,
261 client_call_details, request):
262 interceptor_reached.set()
265 channel = aio.insecure_channel(UNREACHABLE_TARGET,
266 interceptors=[Interceptor()])
268 stub = test_pb2_grpc.TestServiceStub(channel)
269 call = stub.StreamingOutputCall(request)
271 self.assertFalse(call.cancelled())
272 self.assertFalse(call.done())
274 await interceptor_reached.wait()
275 self.assertTrue(call.cancel())
277 with self.assertRaises(asyncio.CancelledError):
278 async
for response
in call:
281 self.assertTrue(call.cancelled())
282 self.assertTrue(call.done())
283 self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
284 self.assertEqual(await call.initial_metadata(),
None)
285 self.assertEqual(await call.trailing_metadata(),
None)
286 await channel.close()
288 async
def test_cancel_after_rpc(self):
290 interceptor_reached = asyncio.Event()
291 wait_for_ever = self.
loop.create_future()
293 class Interceptor(aio.UnaryStreamClientInterceptor):
295 async
def intercept_unary_stream(self, continuation,
296 client_call_details, request):
297 call = await continuation(client_call_details, request)
298 interceptor_reached.set()
301 channel = aio.insecure_channel(UNREACHABLE_TARGET,
302 interceptors=[Interceptor()])
304 stub = test_pb2_grpc.TestServiceStub(channel)
305 call = stub.StreamingOutputCall(request)
307 self.assertFalse(call.cancelled())
308 self.assertFalse(call.done())
310 await interceptor_reached.wait()
311 self.assertTrue(call.cancel())
313 with self.assertRaises(asyncio.CancelledError):
314 async
for response
in call:
317 self.assertTrue(call.cancelled())
318 self.assertTrue(call.done())
319 self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
320 self.assertEqual(await call.initial_metadata(),
None)
321 self.assertEqual(await call.trailing_metadata(),
None)
322 await channel.close()
324 async
def test_cancel_consuming_response_iterator(self):
326 request.response_parameters.extend(
328 _NUM_STREAM_RESPONSES)
330 channel = aio.insecure_channel(
333 stub = test_pb2_grpc.TestServiceStub(channel)
334 call = stub.StreamingOutputCall(request)
336 with self.assertRaises(asyncio.CancelledError):
337 async
for response
in call:
340 self.assertTrue(call.cancelled())
341 self.assertTrue(call.done())
342 self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
343 await channel.close()
345 async
def test_cancel_by_the_interceptor(self):
347 class Interceptor(aio.UnaryStreamClientInterceptor):
349 async
def intercept_unary_stream(self, continuation,
350 client_call_details, request):
351 call = await continuation(client_call_details, request)
355 channel = aio.insecure_channel(UNREACHABLE_TARGET,
356 interceptors=[Interceptor()])
358 stub = test_pb2_grpc.TestServiceStub(channel)
359 call = stub.StreamingOutputCall(request)
361 with self.assertRaises(asyncio.CancelledError):
362 async
for response
in call:
365 self.assertTrue(call.cancelled())
366 self.assertTrue(call.done())
367 self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
368 await channel.close()
370 async
def test_exception_raised_by_interceptor(self):
372 class InterceptorException(Exception):
375 class Interceptor(aio.UnaryStreamClientInterceptor):
377 async
def intercept_unary_stream(self, continuation,
378 client_call_details, request):
379 raise InterceptorException
381 channel = aio.insecure_channel(UNREACHABLE_TARGET,
382 interceptors=[Interceptor()])
384 stub = test_pb2_grpc.TestServiceStub(channel)
385 call = stub.StreamingOutputCall(request)
387 with self.assertRaises(InterceptorException):
388 async
for response
in call:
391 await channel.close()
394 if __name__ ==
'__main__':
395 logging.basicConfig(level=logging.DEBUG)
396 unittest.main(verbosity=2)