14 """Tests behavior of the wait for connection API on client side."""
19 from typing
import Callable, Tuple
25 from src.proto.grpc.testing
import messages_pb2
26 from src.proto.grpc.testing
import test_pb2_grpc
32 _REQUEST = b
'\x01\x02\x03'
33 _TEST_METHOD =
'/test/Test'
35 _NUM_STREAM_RESPONSES = 5
36 _REQUEST_PAYLOAD_SIZE = 7
37 _RESPONSE_PAYLOAD_SIZE = 42
41 """Tests if wait_for_connection raises connectivity issue."""
49 async
def tearDown(self):
54 async
def test_unary_unary_ok(self):
58 await call.wait_for_connection()
61 self.assertIsInstance(response, messages_pb2.SimpleResponse)
63 async
def test_unary_stream_ok(self):
65 for _
in range(_NUM_STREAM_RESPONSES):
66 request.response_parameters.append(
69 call = self.
_stub.StreamingOutputCall(request)
72 await call.wait_for_connection()
75 async
for response
in call:
77 self.assertIs(
type(response),
78 messages_pb2.StreamingOutputCallResponse)
79 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
81 self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt)
82 self.assertEqual(await call.code(), grpc.StatusCode.OK)
84 async
def test_stream_unary_ok(self):
85 call = self.
_stub.StreamingInputCall()
88 await call.wait_for_connection()
93 for _
in range(_NUM_STREAM_RESPONSES):
94 await call.write(request)
95 await call.done_writing()
98 self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
99 self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
100 response.aggregated_payload_size)
102 self.assertEqual(await call.code(), grpc.StatusCode.OK)
104 async
def test_stream_stream_ok(self):
105 call = self.
_stub.FullDuplexCall()
108 await call.wait_for_connection()
111 request.response_parameters.append(
114 for _
in range(_NUM_STREAM_RESPONSES):
115 await call.write(request)
116 response = await call.read()
117 self.assertIsInstance(response,
118 messages_pb2.StreamingOutputCallResponse)
119 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
121 await call.done_writing()
123 self.assertEqual(grpc.StatusCode.OK, await call.code())
125 async
def test_unary_unary_error(self):
128 with self.assertRaises(aio.AioRpcError)
as exception_context:
129 await call.wait_for_connection()
130 rpc_error = exception_context.exception
131 self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
133 async
def test_unary_stream_error(self):
136 with self.assertRaises(aio.AioRpcError)
as exception_context:
137 await call.wait_for_connection()
138 rpc_error = exception_context.exception
139 self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
141 async
def test_stream_unary_error(self):
144 with self.assertRaises(aio.AioRpcError)
as exception_context:
145 await call.wait_for_connection()
146 rpc_error = exception_context.exception
147 self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
149 async
def test_stream_stream_error(self):
152 with self.assertRaises(aio.AioRpcError)
as exception_context:
153 await call.wait_for_connection()
154 rpc_error = exception_context.exception
155 self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
158 if __name__ ==
'__main__':
159 logging.basicConfig(level=logging.DEBUG)
160 unittest.main(verbosity=2)