connectivity_test.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests behavior of the connectivity state."""
15 
16 import asyncio
17 import logging
18 import platform
19 import threading
20 import time
21 import unittest
22 
23 import grpc
24 from grpc.experimental import aio
25 
26 from tests.unit.framework.common import test_constants
27 from tests_aio.unit import _common
28 from tests_aio.unit._constants import UNREACHABLE_TARGET
29 from tests_aio.unit._test_base import AioTestBase
30 from tests_aio.unit._test_server import start_test_server
31 
32 
34 
35  async def setUp(self):
36  self._server_address, self._server = await start_test_server()
37 
38  async def tearDown(self):
39  await self._server.stop(None)
40 
41  @unittest.skipIf('aarch64' in platform.machine(),
42  'The transient failure propagation is slower on aarch64')
43  async def test_unavailable_backend(self):
44  async with aio.insecure_channel(UNREACHABLE_TARGET) as channel:
45  self.assertEqual(grpc.ChannelConnectivity.IDLE,
46  channel.get_state(False))
47  self.assertEqual(grpc.ChannelConnectivity.IDLE,
48  channel.get_state(True))
49 
50  # Should not time out
51  await asyncio.wait_for(
52  _common.block_until_certain_state(
53  channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE),
54  test_constants.SHORT_TIMEOUT)
55 
56  async def test_normal_backend(self):
57  async with aio.insecure_channel(self._server_address) as channel:
58  current_state = channel.get_state(True)
59  self.assertEqual(grpc.ChannelConnectivity.IDLE, current_state)
60 
61  # Should not time out
62  await asyncio.wait_for(
63  _common.block_until_certain_state(
64  channel, grpc.ChannelConnectivity.READY),
65  test_constants.SHORT_TIMEOUT)
66 
67  async def test_timeout(self):
68  async with aio.insecure_channel(self._server_address) as channel:
69  self.assertEqual(grpc.ChannelConnectivity.IDLE,
70  channel.get_state(False))
71 
72  # If timed out, the function should return None.
73  with self.assertRaises(asyncio.TimeoutError):
74  await asyncio.wait_for(
75  _common.block_until_certain_state(
76  channel, grpc.ChannelConnectivity.READY),
77  test_constants.SHORT_TIMEOUT)
78 
79  async def test_shutdown(self):
80  channel = aio.insecure_channel(self._server_address)
81 
82  self.assertEqual(grpc.ChannelConnectivity.IDLE,
83  channel.get_state(False))
84 
85  # Waiting for changes in a separate coroutine
86  wait_started = asyncio.Event()
87 
88  async def a_pending_wait():
89  wait_started.set()
90  await channel.wait_for_state_change(grpc.ChannelConnectivity.IDLE)
91 
92  pending_task = self.loop.create_task(a_pending_wait())
93  await wait_started.wait()
94 
95  await channel.close()
96 
97  self.assertEqual(grpc.ChannelConnectivity.SHUTDOWN,
98  channel.get_state(True))
99 
100  self.assertEqual(grpc.ChannelConnectivity.SHUTDOWN,
101  channel.get_state(False))
102 
103  # Make sure there isn't any exception in the task
104  await pending_task
105 
106  # It can raise exceptions since it is an usage error, but it should not
107  # segfault or abort.
108  with self.assertRaises(aio.UsageError):
109  await channel.wait_for_state_change(
110  grpc.ChannelConnectivity.SHUTDOWN)
111 
112 
113 if __name__ == '__main__':
114  logging.basicConfig(level=logging.DEBUG)
115  unittest.main(verbosity=2)
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
tests_aio.unit._test_base.AioTestBase.loop
def loop(self)
Definition: _test_base.py:55
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.connectivity_test.TestConnectivityState
Definition: connectivity_test.py:33
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.connectivity_test.TestConnectivityState.setUp
def setUp(self)
Definition: connectivity_test.py:35
tests_aio.unit.connectivity_test.TestConnectivityState._server
_server
Definition: connectivity_test.py:36
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
run_xds_tests.test_timeout
def test_timeout(gcp, original_backend_service, instance_group)
Definition: run_xds_tests.py:1924
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests_aio.unit._constants
Definition: _constants.py:1
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:54