14 """Tests behavior of the Call classes."""
24 from src.proto.grpc.testing
import messages_pb2
25 from src.proto.grpc.testing
import test_pb2_grpc
30 _SHORT_TIMEOUT_S = datetime.timedelta(seconds=1).total_seconds()
32 _NUM_STREAM_RESPONSES = 5
33 _RESPONSE_PAYLOAD_SIZE = 42
34 _REQUEST_PAYLOAD_SIZE = 7
35 _LOCAL_CANCEL_DETAILS_EXPECTATION =
'Locally cancelled by application!'
36 _RESPONSE_INTERVAL_US =
int(_SHORT_TIMEOUT_S * 1000 * 1000)
37 _INFINITE_INTERVAL_US = 2**31 - 1
47 async
def tearDown(self):
57 self.assertTrue(
str(call)
is not None)
58 self.assertTrue(repr(call)
is not None)
62 self.assertTrue(
str(call)
is not None)
63 self.assertTrue(repr(call)
is not None)
65 async
def test_call_ok(self):
68 self.assertFalse(call.done())
72 self.assertTrue(call.done())
73 self.assertIsInstance(response, messages_pb2.SimpleResponse)
74 self.assertEqual(await call.code(), grpc.StatusCode.OK)
78 response_retry = await call
79 self.assertIs(response, response_retry)
81 async
def test_call_rpc_error(self):
82 async
with aio.insecure_channel(UNREACHABLE_TARGET)
as channel:
83 stub = test_pb2_grpc.TestServiceStub(channel)
87 with self.assertRaises(aio.AioRpcError)
as exception_context:
90 self.assertEqual(grpc.StatusCode.UNAVAILABLE,
91 exception_context.exception.code())
93 self.assertTrue(call.done())
94 self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code())
96 async
def test_call_code_awaitable(self):
98 self.assertEqual(await call.code(), grpc.StatusCode.OK)
100 async
def test_call_details_awaitable(self):
102 self.assertEqual(
'', await call.details())
104 async
def test_call_initial_metadata_awaitable(self):
106 self.assertEqual(aio.Metadata(), await call.initial_metadata())
108 async
def test_call_trailing_metadata_awaitable(self):
110 self.assertEqual(aio.Metadata(), await call.trailing_metadata())
112 async
def test_call_initial_metadata_cancelable(self):
113 coro_started = asyncio.Event()
118 await call.initial_metadata()
120 task = self.
loop.create_task(coro())
121 await coro_started.wait()
126 self.assertEqual(aio.Metadata(), await call.initial_metadata())
128 async
def test_call_initial_metadata_multiple_waiters(self):
132 return await call.initial_metadata()
134 task1 = self.
loop.create_task(coro())
135 task2 = self.
loop.create_task(coro())
138 expected = [aio.Metadata()
for _
in range(2)]
139 self.assertEqual(expected, await asyncio.gather(*[task1, task2]))
141 async
def test_call_code_cancelable(self):
142 coro_started = asyncio.Event()
149 task = self.
loop.create_task(coro())
150 await coro_started.wait()
155 self.assertEqual(grpc.StatusCode.OK, await call.code())
157 async
def test_call_code_multiple_waiters(self):
161 return await call.code()
163 task1 = self.
loop.create_task(coro())
164 task2 = self.
loop.create_task(coro())
168 self.assertEqual([grpc.StatusCode.OK, grpc.StatusCode.OK], await
169 asyncio.gather(task1, task2))
171 async
def test_cancel_unary_unary(self):
174 self.assertFalse(call.cancelled())
176 self.assertTrue(call.cancel())
177 self.assertFalse(call.cancel())
179 with self.assertRaises(asyncio.CancelledError):
183 self.assertTrue(call.cancelled())
184 self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
185 self.assertEqual(await call.details(),
186 'Locally cancelled by application!')
188 async
def test_cancel_unary_unary_in_task(self):
189 coro_started = asyncio.Event()
192 async
def another_coro():
196 task = self.
loop.create_task(another_coro())
197 await coro_started.wait()
199 self.assertFalse(task.done())
202 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
204 with self.assertRaises(asyncio.CancelledError):
207 async
def test_passing_credentials_fails_over_insecure_channel(self):
212 with self.assertRaisesRegex(
214 "Call credentials are only valid on secure channels"):
216 credentials=call_credentials)
222 channel = aio.insecure_channel(UNREACHABLE_TARGET)
224 stub = test_pb2_grpc.TestServiceStub(channel)
225 call = stub.StreamingOutputCall(request)
227 with self.assertRaises(aio.AioRpcError)
as exception_context:
228 async
for response
in call:
231 self.assertEqual(grpc.StatusCode.UNAVAILABLE,
232 exception_context.exception.code())
234 self.assertTrue(call.done())
235 self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code())
236 await channel.close()
238 async
def test_cancel_unary_stream(self):
241 for _
in range(_NUM_STREAM_RESPONSES):
242 request.response_parameters.append(
244 size=_RESPONSE_PAYLOAD_SIZE,
245 interval_us=_RESPONSE_INTERVAL_US,
249 call = self.
_stub.StreamingOutputCall(request)
250 self.assertFalse(call.cancelled())
252 response = await call.read()
253 self.assertIs(
type(response), messages_pb2.StreamingOutputCallResponse)
254 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
256 self.assertTrue(call.cancel())
257 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
258 self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await
260 self.assertFalse(call.cancel())
262 with self.assertRaises(asyncio.CancelledError):
264 self.assertTrue(call.cancelled())
266 async
def test_multiple_cancel_unary_stream(self):
269 for _
in range(_NUM_STREAM_RESPONSES):
270 request.response_parameters.append(
272 size=_RESPONSE_PAYLOAD_SIZE,
273 interval_us=_RESPONSE_INTERVAL_US,
277 call = self.
_stub.StreamingOutputCall(request)
278 self.assertFalse(call.cancelled())
280 response = await call.read()
281 self.assertIs(
type(response), messages_pb2.StreamingOutputCallResponse)
282 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
284 self.assertTrue(call.cancel())
285 self.assertFalse(call.cancel())
286 self.assertFalse(call.cancel())
287 self.assertFalse(call.cancel())
289 with self.assertRaises(asyncio.CancelledError):
292 async
def test_early_cancel_unary_stream(self):
293 """Test cancellation before receiving messages."""
296 for _
in range(_NUM_STREAM_RESPONSES):
297 request.response_parameters.append(
299 size=_RESPONSE_PAYLOAD_SIZE,
300 interval_us=_RESPONSE_INTERVAL_US,
304 call = self.
_stub.StreamingOutputCall(request)
306 self.assertFalse(call.cancelled())
307 self.assertTrue(call.cancel())
308 self.assertFalse(call.cancel())
310 with self.assertRaises(asyncio.CancelledError):
313 self.assertTrue(call.cancelled())
315 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
316 self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await
319 async
def test_late_cancel_unary_stream(self):
320 """Test cancellation after received all messages."""
323 for _
in range(_NUM_STREAM_RESPONSES):
324 request.response_parameters.append(
328 call = self.
_stub.StreamingOutputCall(request)
330 for _
in range(_NUM_STREAM_RESPONSES):
331 response = await call.read()
332 self.assertIs(
type(response),
333 messages_pb2.StreamingOutputCallResponse)
334 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
340 self.assertIn(await call.code(),
341 [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED])
343 async
def test_too_many_reads_unary_stream(self):
344 """Test calling read after received all messages fails."""
347 for _
in range(_NUM_STREAM_RESPONSES):
348 request.response_parameters.append(
352 call = self.
_stub.StreamingOutputCall(request)
354 for _
in range(_NUM_STREAM_RESPONSES):
355 response = await call.read()
356 self.assertIs(
type(response),
357 messages_pb2.StreamingOutputCallResponse)
358 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
359 self.assertIs(await call.read(), aio.EOF)
362 self.assertEqual(await call.code(), grpc.StatusCode.OK)
363 self.assertIs(await call.read(), aio.EOF)
365 async
def test_unary_stream_async_generator(self):
366 """Sunny day test case for unary_stream."""
369 for _
in range(_NUM_STREAM_RESPONSES):
370 request.response_parameters.append(
374 call = self.
_stub.StreamingOutputCall(request)
375 self.assertFalse(call.cancelled())
377 async
for response
in call:
378 self.assertIs(
type(response),
379 messages_pb2.StreamingOutputCallResponse)
380 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
382 self.assertEqual(await call.code(), grpc.StatusCode.OK)
384 async
def test_cancel_unary_stream_in_task_using_read(self):
385 coro_started = asyncio.Event()
389 request.response_parameters.append(
391 size=_RESPONSE_PAYLOAD_SIZE,
392 interval_us=_INFINITE_INTERVAL_US,
396 call = self.
_stub.StreamingOutputCall(request)
398 async
def another_coro():
402 task = self.
loop.create_task(another_coro())
403 await coro_started.wait()
405 self.assertFalse(task.done())
408 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
410 with self.assertRaises(asyncio.CancelledError):
413 async
def test_cancel_unary_stream_in_task_using_async_for(self):
414 coro_started = asyncio.Event()
418 request.response_parameters.append(
420 size=_RESPONSE_PAYLOAD_SIZE,
421 interval_us=_INFINITE_INTERVAL_US,
425 call = self.
_stub.StreamingOutputCall(request)
427 async
def another_coro():
432 task = self.
loop.create_task(another_coro())
433 await coro_started.wait()
435 self.assertFalse(task.done())
438 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
440 with self.assertRaises(asyncio.CancelledError):
443 async
def test_time_remaining(self):
446 request.response_parameters.append(
449 request.response_parameters.append(
451 size=_RESPONSE_PAYLOAD_SIZE,
452 interval_us=_RESPONSE_INTERVAL_US,
455 call = self.
_stub.StreamingOutputCall(request,
456 timeout=_SHORT_TIMEOUT_S * 2)
458 response = await call.read()
459 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
462 remained_time = call.time_remaining()
463 self.assertGreater(remained_time, _SHORT_TIMEOUT_S * 3 / 2)
464 self.assertLess(remained_time, _SHORT_TIMEOUT_S * 5 / 2)
466 response = await call.read()
467 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
470 remained_time = call.time_remaining()
471 self.assertGreater(remained_time, _SHORT_TIMEOUT_S / 2)
472 self.assertLess(remained_time, _SHORT_TIMEOUT_S * 3 / 2)
474 self.assertEqual(grpc.StatusCode.OK, await call.code())
476 async
def test_empty_responses(self):
479 for _
in range(_NUM_STREAM_RESPONSES):
480 request.response_parameters.append(
484 call = self.
_stub.StreamingOutputCall(request)
486 for _
in range(_NUM_STREAM_RESPONSES):
487 response = await call.read()
488 self.assertIs(
type(response),
489 messages_pb2.StreamingOutputCallResponse)
490 self.assertEqual(b
'', response.SerializeToString())
492 self.assertEqual(grpc.StatusCode.OK, await call.code())
498 call = self.
_stub.StreamingInputCall()
505 for _
in range(_NUM_STREAM_RESPONSES):
506 await call.write(request)
509 self.assertFalse(call.done())
510 self.assertFalse(call.cancelled())
511 self.assertTrue(call.cancel())
512 self.assertTrue(call.cancelled())
514 await call.done_writing()
516 with self.assertRaises(asyncio.CancelledError):
519 async
def test_early_cancel_stream_unary(self):
520 call = self.
_stub.StreamingInputCall()
523 self.assertFalse(call.done())
524 self.assertFalse(call.cancelled())
525 self.assertTrue(call.cancel())
526 self.assertTrue(call.cancelled())
528 with self.assertRaises(asyncio.InvalidStateError):
532 await call.done_writing()
534 with self.assertRaises(asyncio.CancelledError):
537 async
def test_write_after_done_writing(self):
538 call = self.
_stub.StreamingInputCall()
545 for _
in range(_NUM_STREAM_RESPONSES):
546 await call.write(request)
549 await call.done_writing()
551 with self.assertRaises(asyncio.InvalidStateError):
554 response = await call
555 self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
556 self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
557 response.aggregated_payload_size)
559 self.assertEqual(await call.code(), grpc.StatusCode.OK)
561 async
def test_error_in_async_generator(self):
564 request.response_parameters.append(
566 size=_RESPONSE_PAYLOAD_SIZE,
567 interval_us=_RESPONSE_INTERVAL_US,
571 request_iterator_received_the_exception = asyncio.Event()
573 async
def request_iterator():
574 with self.assertRaises(asyncio.CancelledError):
575 for _
in range(_NUM_STREAM_RESPONSES):
577 await asyncio.sleep(_SHORT_TIMEOUT_S)
578 request_iterator_received_the_exception.set()
580 call = self.
_stub.StreamingInputCall(request_iterator())
583 async
def cancel_later():
584 await asyncio.sleep(_SHORT_TIMEOUT_S * 2)
587 cancel_later_task = self.
loop.create_task(cancel_later())
589 with self.assertRaises(asyncio.CancelledError):
592 await request_iterator_received_the_exception.wait()
595 await cancel_later_task
597 async
def test_normal_iterable_requests(self):
601 requests = [request] * _NUM_STREAM_RESPONSES
604 call = self.
_stub.StreamingInputCall(requests)
607 response = await call
608 self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
609 self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
610 response.aggregated_payload_size)
612 self.assertEqual(await call.code(), grpc.StatusCode.OK)
614 async
def test_call_rpc_error(self):
615 async
with aio.insecure_channel(UNREACHABLE_TARGET)
as channel:
616 stub = test_pb2_grpc.TestServiceStub(channel)
619 call = stub.StreamingInputCall()
620 with self.assertRaises(aio.AioRpcError)
as exception_context:
623 self.assertEqual(grpc.StatusCode.UNAVAILABLE,
624 exception_context.exception.code())
626 self.assertTrue(call.done())
627 self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code())
630 call = self.
_stub.StreamingInputCall(timeout=_SHORT_TIMEOUT_S)
633 with self.assertRaises(aio.AioRpcError)
as exception_context:
636 rpc_error = exception_context.exception
637 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())
638 self.assertTrue(call.done())
639 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, await call.code())
644 _STREAM_OUTPUT_REQUEST_ONE_RESPONSE.response_parameters.append(
648 _STREAM_OUTPUT_REQUEST_ONE_EMPTY_RESPONSE.response_parameters.append(
656 call = self.
_stub.FullDuplexCall()
658 for _
in range(_NUM_STREAM_RESPONSES):
659 await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE)
660 response = await call.read()
661 self.assertIsInstance(response,
662 messages_pb2.StreamingOutputCallResponse)
663 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
666 self.assertFalse(call.done())
667 self.assertFalse(call.cancelled())
668 self.assertTrue(call.cancel())
669 self.assertTrue(call.cancelled())
670 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
672 async
def test_cancel_with_pending_read(self):
673 call = self.
_stub.FullDuplexCall()
675 await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE)
678 self.assertFalse(call.done())
679 self.assertFalse(call.cancelled())
680 self.assertTrue(call.cancel())
681 self.assertTrue(call.cancelled())
682 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
684 async
def test_cancel_with_ongoing_read(self):
685 call = self.
_stub.FullDuplexCall()
686 coro_started = asyncio.Event()
688 async
def read_coro():
692 read_task = self.
loop.create_task(read_coro())
693 await coro_started.wait()
694 self.assertFalse(read_task.done())
697 self.assertFalse(call.done())
698 self.assertFalse(call.cancelled())
699 self.assertTrue(call.cancel())
700 self.assertTrue(call.cancelled())
701 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
703 async
def test_early_cancel(self):
704 call = self.
_stub.FullDuplexCall()
707 self.assertFalse(call.done())
708 self.assertFalse(call.cancelled())
709 self.assertTrue(call.cancel())
710 self.assertTrue(call.cancelled())
711 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
713 async
def test_cancel_after_done_writing(self):
714 call = self.
_stub.FullDuplexCall()
715 await call.done_writing()
718 self.assertFalse(call.done())
719 self.assertFalse(call.cancelled())
720 self.assertTrue(call.cancel())
721 self.assertTrue(call.cancelled())
722 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
724 async
def test_late_cancel(self):
725 call = self.
_stub.FullDuplexCall()
726 await call.done_writing()
727 self.assertEqual(grpc.StatusCode.OK, await call.code())
730 self.assertTrue(call.done())
731 self.assertFalse(call.cancelled())
732 self.assertFalse(call.cancel())
733 self.assertFalse(call.cancelled())
736 self.assertEqual(grpc.StatusCode.OK, await call.code())
738 async
def test_async_generator(self):
740 async
def request_generator():
741 yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE
742 yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE
744 call = self.
_stub.FullDuplexCall(request_generator())
745 async
for response
in call:
746 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
748 self.assertEqual(await call.code(), grpc.StatusCode.OK)
750 async
def test_too_many_reads(self):
752 async
def request_generator():
753 for _
in range(_NUM_STREAM_RESPONSES):
754 yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE
756 call = self.
_stub.FullDuplexCall(request_generator())
757 for _
in range(_NUM_STREAM_RESPONSES):
758 response = await call.read()
759 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
760 self.assertIs(await call.read(), aio.EOF)
762 self.assertEqual(await call.code(), grpc.StatusCode.OK)
764 self.assertIs(await call.read(), aio.EOF)
766 async
def test_read_write_after_done_writing(self):
767 call = self.
_stub.FullDuplexCall()
770 await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE)
771 await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE)
772 await call.done_writing()
775 with self.assertRaises(asyncio.InvalidStateError):
776 await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE)
779 response = await call.read()
780 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
781 response = await call.read()
782 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
784 self.assertEqual(await call.code(), grpc.StatusCode.OK)
786 async
def test_error_in_async_generator(self):
789 request.response_parameters.append(
791 size=_RESPONSE_PAYLOAD_SIZE,
792 interval_us=_RESPONSE_INTERVAL_US,
796 request_iterator_received_the_exception = asyncio.Event()
798 async
def request_iterator():
799 with self.assertRaises(asyncio.CancelledError):
800 for _
in range(_NUM_STREAM_RESPONSES):
802 await asyncio.sleep(_SHORT_TIMEOUT_S)
803 request_iterator_received_the_exception.set()
805 call = self.
_stub.FullDuplexCall(request_iterator())
808 async
def cancel_later():
809 await asyncio.sleep(_SHORT_TIMEOUT_S * 2)
812 cancel_later_task = self.
loop.create_task(cancel_later())
814 with self.assertRaises(asyncio.CancelledError):
815 async
for response
in call:
816 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
817 len(response.payload.body))
819 await request_iterator_received_the_exception.wait()
821 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
823 await cancel_later_task
825 async
def test_normal_iterable_requests(self):
826 requests = [_STREAM_OUTPUT_REQUEST_ONE_RESPONSE] * _NUM_STREAM_RESPONSES
828 call = self.
_stub.FullDuplexCall(
iter(requests))
829 async
for response
in call:
830 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
832 self.assertEqual(await call.code(), grpc.StatusCode.OK)
834 async
def test_empty_ping_pong(self):
835 call = self.
_stub.FullDuplexCall()
836 for _
in range(_NUM_STREAM_RESPONSES):
837 await call.write(_STREAM_OUTPUT_REQUEST_ONE_EMPTY_RESPONSE)
838 response = await call.read()
839 self.assertEqual(b
'', response.SerializeToString())
840 await call.done_writing()
841 self.assertEqual(await call.code(), grpc.StatusCode.OK)
844 if __name__ ==
'__main__':
845 logging.basicConfig(level=logging.DEBUG)
846 unittest.main(verbosity=2)