tests_aio/unit/close_channel_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests behavior of closing a grpc.aio.Channel."""
15 
16 import asyncio
17 import logging
18 import unittest
19 
20 import grpc
21 from grpc.aio import _base_call
22 from grpc.experimental import aio
23 
24 from src.proto.grpc.testing import messages_pb2
25 from src.proto.grpc.testing import test_pb2_grpc
26 from tests_aio.unit._test_base import AioTestBase
27 from tests_aio.unit._test_server import start_test_server
28 
29 _UNARY_CALL_METHOD_WITH_SLEEP = '/grpc.testing.TestService/UnaryCallWithSleep'
30 _LONG_TIMEOUT_THAT_SHOULD_NOT_EXPIRE = 60
31 
32 
34 
35  async def setUp(self):
36  self._server_target, self._server = await start_test_server()
37 
38  async def tearDown(self):
39  await self._server.stop(None)
40 
41  async def test_graceful_close(self):
42  channel = aio.insecure_channel(self._server_target)
43  UnaryCallWithSleep = channel.unary_unary(
44  _UNARY_CALL_METHOD_WITH_SLEEP,
45  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
46  response_deserializer=messages_pb2.SimpleResponse.FromString,
47  )
48 
49  call = UnaryCallWithSleep(messages_pb2.SimpleRequest())
50 
51  await channel.close(grace=_LONG_TIMEOUT_THAT_SHOULD_NOT_EXPIRE)
52 
53  self.assertEqual(grpc.StatusCode.OK, await call.code())
54 
55  async def test_none_graceful_close(self):
56  channel = aio.insecure_channel(self._server_target)
57  UnaryCallWithSleep = channel.unary_unary(
58  _UNARY_CALL_METHOD_WITH_SLEEP,
59  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
60  response_deserializer=messages_pb2.SimpleResponse.FromString,
61  )
62 
63  call = UnaryCallWithSleep(messages_pb2.SimpleRequest())
64 
65  await channel.close(None)
66 
67  self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
68 
69  async def test_close_unary_unary(self):
70  channel = aio.insecure_channel(self._server_target)
71  stub = test_pb2_grpc.TestServiceStub(channel)
72 
73  calls = [stub.UnaryCall(messages_pb2.SimpleRequest()) for _ in range(2)]
74 
75  await channel.close()
76 
77  for call in calls:
78  self.assertTrue(call.cancelled())
79 
80  async def test_close_unary_stream(self):
81  channel = aio.insecure_channel(self._server_target)
82  stub = test_pb2_grpc.TestServiceStub(channel)
83 
85  calls = [stub.StreamingOutputCall(request) for _ in range(2)]
86 
87  await channel.close()
88 
89  for call in calls:
90  self.assertTrue(call.cancelled())
91 
92  async def test_close_stream_unary(self):
93  channel = aio.insecure_channel(self._server_target)
94  stub = test_pb2_grpc.TestServiceStub(channel)
95 
96  calls = [stub.StreamingInputCall() for _ in range(2)]
97 
98  await channel.close()
99 
100  for call in calls:
101  self.assertTrue(call.cancelled())
102 
103  async def test_close_stream_stream(self):
104  channel = aio.insecure_channel(self._server_target)
105  stub = test_pb2_grpc.TestServiceStub(channel)
106 
107  calls = [stub.FullDuplexCall() for _ in range(2)]
108 
109  await channel.close()
110 
111  for call in calls:
112  self.assertTrue(call.cancelled())
113 
114  async def test_close_async_context(self):
115  async with aio.insecure_channel(self._server_target) as channel:
116  stub = test_pb2_grpc.TestServiceStub(channel)
117  calls = [
118  stub.UnaryCall(messages_pb2.SimpleRequest()) for _ in range(2)
119  ]
120 
121  for call in calls:
122  self.assertTrue(call.cancelled())
123 
124  async def test_channel_isolation(self):
125  async with aio.insecure_channel(self._server_target) as channel1:
126  async with aio.insecure_channel(self._server_target) as channel2:
127  stub1 = test_pb2_grpc.TestServiceStub(channel1)
128  stub2 = test_pb2_grpc.TestServiceStub(channel2)
129 
130  call1 = stub1.UnaryCall(messages_pb2.SimpleRequest())
131  call2 = stub2.UnaryCall(messages_pb2.SimpleRequest())
132 
133  self.assertFalse(call1.cancelled())
134  self.assertTrue(call2.cancelled())
135 
136 
137 if __name__ == '__main__':
138  logging.basicConfig(level=logging.DEBUG)
139  unittest.main(verbosity=2)
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
tests_aio.unit.close_channel_test.TestCloseChannel
Definition: tests_aio/unit/close_channel_test.py:33
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.close_channel_test.TestCloseChannel._server
_server
Definition: tests_aio/unit/close_channel_test.py:36
grpc.aio
Definition: src/python/grpcio/grpc/aio/__init__.py:1
grpc::experimental
Definition: include/grpcpp/channel.h:46
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
tests_aio.unit.close_channel_test.TestCloseChannel.setUp
def setUp(self)
Definition: tests_aio/unit/close_channel_test.py:35
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:56