14 """Tests behavior of the grpc.aio.Channel class."""
23 from src.proto.grpc.testing
import messages_pb2
24 from src.proto.grpc.testing
import test_pb2_grpc
31 _UNARY_CALL_METHOD =
'/grpc.testing.TestService/UnaryCall'
32 _UNARY_CALL_METHOD_WITH_SLEEP =
'/grpc.testing.TestService/UnaryCallWithSleep'
33 _STREAMING_OUTPUT_CALL_METHOD =
'/grpc.testing.TestService/StreamingOutputCall'
35 _INVOCATION_METADATA = (
36 (
'x-grpc-test-echo-initial',
'initial-md-value'),
37 (
'x-grpc-test-echo-trailing-bin', b
'\x00\x02'),
40 _NUM_STREAM_RESPONSES = 5
41 _REQUEST_PAYLOAD_SIZE = 7
42 _RESPONSE_PAYLOAD_SIZE = 42
50 async
def tearDown(self):
53 async
def test_async_context(self):
54 async
with aio.insecure_channel(self._server_target)
as channel:
55 hi = channel.unary_unary(
57 request_serializer=messages_pb2.SimpleRequest.SerializeToString,
58 response_deserializer=messages_pb2.SimpleResponse.FromString)
61 async
def test_unary_unary(self):
62 async
with aio.insecure_channel(self._server_target)
as channel:
63 hi = channel.unary_unary(
65 request_serializer=messages_pb2.SimpleRequest.SerializeToString,
66 response_deserializer=messages_pb2.SimpleResponse.FromString)
69 self.assertIsInstance(response, messages_pb2.SimpleResponse)
71 async
def test_unary_call_times_out(self):
72 async
with aio.insecure_channel(self._server_target)
as channel:
73 hi = channel.unary_unary(
74 _UNARY_CALL_METHOD_WITH_SLEEP,
75 request_serializer=messages_pb2.SimpleRequest.SerializeToString,
76 response_deserializer=messages_pb2.SimpleResponse.FromString,
81 timeout=UNARY_CALL_WITH_SLEEP_VALUE / 2)
83 _, details = grpc.StatusCode.DEADLINE_EXCEEDED.value
84 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED,
85 exception_context.exception.code())
86 self.assertEqual(details.title(),
87 exception_context.exception.details())
88 self.assertIsNotNone(exception_context.exception.initial_metadata())
90 exception_context.exception.trailing_metadata())
92 @unittest.skipIf(os.name ==
'nt',
93 'TODO: https://github.com/grpc/grpc/issues/21658')
94 async
def test_unary_call_does_not_times_out(self):
95 async
with aio.insecure_channel(self._server_target)
as channel:
96 hi = channel.unary_unary(
97 _UNARY_CALL_METHOD_WITH_SLEEP,
98 request_serializer=messages_pb2.SimpleRequest.SerializeToString,
99 response_deserializer=messages_pb2.SimpleResponse.FromString,
103 timeout=UNARY_CALL_WITH_SLEEP_VALUE * 5)
104 self.assertEqual(await call.code(), grpc.StatusCode.OK)
106 async
def test_unary_stream(self):
107 channel = aio.insecure_channel(self._server_target)
108 stub = test_pb2_grpc.TestServiceStub(channel)
112 for _
in range(_NUM_STREAM_RESPONSES):
113 request.response_parameters.append(
117 call = stub.StreamingOutputCall(request)
121 async
for response
in call:
123 self.assertIs(
type(response),
124 messages_pb2.StreamingOutputCallResponse)
125 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
127 self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt)
128 self.assertEqual(await call.code(), grpc.StatusCode.OK)
129 await channel.close()
131 async
def test_stream_unary_using_write(self):
132 channel = aio.insecure_channel(self._server_target)
133 stub = test_pb2_grpc.TestServiceStub(channel)
136 call = stub.StreamingInputCall()
143 for _
in range(_NUM_STREAM_RESPONSES):
144 await call.write(request)
145 await call.done_writing()
148 response = await call
149 self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
150 self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
151 response.aggregated_payload_size)
153 self.assertEqual(await call.code(), grpc.StatusCode.OK)
154 await channel.close()
156 async
def test_stream_unary_using_async_gen(self):
157 channel = aio.insecure_channel(self._server_target)
158 stub = test_pb2_grpc.TestServiceStub(channel)
165 for _
in range(_NUM_STREAM_RESPONSES):
169 call = stub.StreamingInputCall(
gen())
172 response = await call
173 self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
174 self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
175 response.aggregated_payload_size)
177 self.assertEqual(await call.code(), grpc.StatusCode.OK)
178 await channel.close()
180 async
def test_stream_stream_using_read_write(self):
181 channel = aio.insecure_channel(self._server_target)
182 stub = test_pb2_grpc.TestServiceStub(channel)
185 call = stub.FullDuplexCall()
189 request.response_parameters.append(
192 for _
in range(_NUM_STREAM_RESPONSES):
193 await call.write(request)
194 response = await call.read()
195 self.assertIsInstance(response,
196 messages_pb2.StreamingOutputCallResponse)
197 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
199 await call.done_writing()
201 self.assertEqual(grpc.StatusCode.OK, await call.code())
202 await channel.close()
204 async
def test_stream_stream_using_async_gen(self):
205 channel = aio.insecure_channel(self._server_target)
206 stub = test_pb2_grpc.TestServiceStub(channel)
210 request.response_parameters.append(
214 for _
in range(_NUM_STREAM_RESPONSES):
218 call = stub.FullDuplexCall(
gen())
220 async
for response
in call:
221 self.assertIsInstance(response,
222 messages_pb2.StreamingOutputCallResponse)
223 self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
225 self.assertEqual(grpc.StatusCode.OK, await call.code())
226 await channel.close()
229 if __name__ ==
'__main__':
230 logging.basicConfig(level=logging.DEBUG)
231 unittest.main(verbosity=2)