14 """Testing the done callbacks mechanism."""
26 from src.proto.grpc.testing
import messages_pb2
27 from src.proto.grpc.testing
import test_pb2_grpc
34 _NUM_STREAM_RESPONSES = 5
35 _REQUEST_PAYLOAD_SIZE = 7
36 _RESPONSE_PAYLOAD_SIZE = 42
41 timeout=test_constants.LONG_TIMEOUT,
42 wait_for_ready=wait_for_ready)
47 for _
in range(_NUM_STREAM_RESPONSES):
48 request.response_parameters.append(
51 call = stub.StreamingOutputCall(request,
52 timeout=test_constants.LONG_TIMEOUT,
53 wait_for_ready=wait_for_ready)
55 for _
in range(_NUM_STREAM_RESPONSES):
57 assert await call.code() == grpc.StatusCode.OK
65 for _
in range(_NUM_STREAM_RESPONSES):
68 await stub.StreamingInputCall(
gen(),
69 timeout=test_constants.LONG_TIMEOUT,
70 wait_for_ready=wait_for_ready)
74 call = stub.FullDuplexCall(timeout=test_constants.LONG_TIMEOUT,
75 wait_for_ready=wait_for_ready)
78 request.response_parameters.append(
81 for _
in range(_NUM_STREAM_RESPONSES):
82 await call.write(request)
83 response = await call.read()
84 assert _RESPONSE_PAYLOAD_SIZE ==
len(response.payload.body)
86 await call.done_writing()
87 assert await call.code() == grpc.StatusCode.OK
92 _perform_unary_stream,
93 _perform_stream_unary,
94 _perform_stream_stream,
102 self.
_channel = aio.insecure_channel(f
"{address}:{self._port}")
106 async
def tearDown(self):
109 async
def _connection_fails_fast(self, wait_for_ready):
110 for action
in _RPC_ACTIONS:
111 with self.subTest(name=action):
112 with self.assertRaises(aio.AioRpcError)
as exception_context:
114 rpc_error = exception_context.exception
115 self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
117 async
def test_call_wait_for_ready_default(self):
118 """RPC should fail immediately after connection failed."""
119 await self._connection_fails_fast(
None)
121 async
def test_call_wait_for_ready_disabled(self):
122 """RPC should fail immediately after connection failed."""
123 await self._connection_fails_fast(
False)
125 @unittest.skipIf(platform.system() ==
'Windows',
126 'https://github.com/grpc/grpc/pull/26729')
127 async
def test_call_wait_for_ready_enabled(self):
128 """RPC will wait until the connection is ready."""
129 for action
in _RPC_ACTIONS:
130 with self.subTest(name=action.__name__):
135 await _common.block_until_certain_state(
136 self.
_channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE)
145 if server
is not None:
146 await server.stop(
None)
149 if __name__ ==
'__main__':
150 logging.basicConfig(level=logging.DEBUG)
151 unittest.main(verbosity=2)