14 """Tests behavior of closing a grpc.aio.Channel."""
24 from src.proto.grpc.testing
import messages_pb2
25 from src.proto.grpc.testing
import test_pb2_grpc
29 _UNARY_CALL_METHOD_WITH_SLEEP =
'/grpc.testing.TestService/UnaryCallWithSleep'
30 _LONG_TIMEOUT_THAT_SHOULD_NOT_EXPIRE = 60
38 async
def tearDown(self):
41 async
def test_graceful_close(self):
42 channel = aio.insecure_channel(self._server_target)
43 UnaryCallWithSleep = channel.unary_unary(
44 _UNARY_CALL_METHOD_WITH_SLEEP,
45 request_serializer=messages_pb2.SimpleRequest.SerializeToString,
46 response_deserializer=messages_pb2.SimpleResponse.FromString,
51 await channel.close(grace=_LONG_TIMEOUT_THAT_SHOULD_NOT_EXPIRE)
53 self.assertEqual(grpc.StatusCode.OK, await call.code())
55 async
def test_none_graceful_close(self):
56 channel = aio.insecure_channel(self._server_target)
57 UnaryCallWithSleep = channel.unary_unary(
58 _UNARY_CALL_METHOD_WITH_SLEEP,
59 request_serializer=messages_pb2.SimpleRequest.SerializeToString,
60 response_deserializer=messages_pb2.SimpleResponse.FromString,
65 await channel.close(
None)
67 self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
69 async
def test_close_unary_unary(self):
70 channel = aio.insecure_channel(self._server_target)
71 stub = test_pb2_grpc.TestServiceStub(channel)
78 self.assertTrue(call.cancelled())
80 async
def test_close_unary_stream(self):
81 channel = aio.insecure_channel(self._server_target)
82 stub = test_pb2_grpc.TestServiceStub(channel)
85 calls = [stub.StreamingOutputCall(request)
for _
in range(2)]
90 self.assertTrue(call.cancelled())
92 async
def test_close_stream_unary(self):
93 channel = aio.insecure_channel(self._server_target)
94 stub = test_pb2_grpc.TestServiceStub(channel)
96 calls = [stub.StreamingInputCall()
for _
in range(2)]
101 self.assertTrue(call.cancelled())
103 async
def test_close_stream_stream(self):
104 channel = aio.insecure_channel(self._server_target)
105 stub = test_pb2_grpc.TestServiceStub(channel)
107 calls = [stub.FullDuplexCall()
for _
in range(2)]
109 await channel.close()
112 self.assertTrue(call.cancelled())
114 async
def test_close_async_context(self):
115 async
with aio.insecure_channel(self._server_target)
as channel:
116 stub = test_pb2_grpc.TestServiceStub(channel)
122 self.assertTrue(call.cancelled())
124 async
def test_channel_isolation(self):
125 async
with aio.insecure_channel(self._server_target)
as channel1:
126 async
with aio.insecure_channel(self._server_target)
as channel2:
127 stub1 = test_pb2_grpc.TestServiceStub(channel1)
128 stub2 = test_pb2_grpc.TestServiceStub(channel2)
133 self.assertFalse(call1.cancelled())
134 self.assertTrue(call2.cancelled())
137 if __name__ ==
'__main__':
138 logging.basicConfig(level=logging.DEBUG)
139 unittest.main(verbosity=2)