channel_ready_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Testing the channel_ready function."""
15 
16 import asyncio
17 import gc
18 import logging
19 import socket
20 import time
21 import unittest
22 
23 import grpc
24 from grpc.experimental import aio
25 
26 from tests.unit.framework.common import get_socket
27 from tests.unit.framework.common import test_constants
28 from tests_aio.unit import _common
29 from tests_aio.unit._test_base import AioTestBase
30 from tests_aio.unit._test_server import start_test_server
31 
32 
34 
35  async def setUp(self):
36  address, self._port, self._socket = get_socket(
37  listen=False, sock_options=(socket.SO_REUSEADDR,))
38  self._channel = aio.insecure_channel(f"{address}:{self._port}")
39  self._socket.close()
40 
41  async def tearDown(self):
42  await self._channel.close()
43 
44  async def test_channel_ready_success(self):
45  # Start `channel_ready` as another Task
46  channel_ready_task = self.loop.create_task(
47  self._channel.channel_ready())
48 
49  # Wait for TRANSIENT_FAILURE
50  await _common.block_until_certain_state(
51  self._channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE)
52 
53  try:
54  # Start the server
55  _, server = await start_test_server(port=self._port)
56 
57  # The RPC should recover itself
58  await channel_ready_task
59  finally:
60  await server.stop(None)
61 
62  async def test_channel_ready_blocked(self):
63  with self.assertRaises(asyncio.TimeoutError):
64  await asyncio.wait_for(self._channel.channel_ready(),
65  test_constants.SHORT_TIMEOUT)
66 
67 
68 if __name__ == '__main__':
69  logging.basicConfig(level=logging.DEBUG)
70  unittest.main(verbosity=2)
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
tests_aio.unit._test_base.AioTestBase.loop
def loop(self)
Definition: _test_base.py:55
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.channel_ready_test.TestChannelReady._channel
_channel
Definition: channel_ready_test.py:38
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests.unit.framework.common.get_socket
def get_socket(bind_address='localhost', port=0, listen=True, sock_options=_DEFAULT_SOCK_OPTIONS)
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:26
close
#define close
Definition: test-fs.c:48
tests_aio.unit.channel_ready_test.TestChannelReady._socket
_socket
Definition: channel_ready_test.py:36
tests_aio.unit.channel_ready_test.TestChannelReady.setUp
def setUp(self)
Definition: channel_ready_test.py:35
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests_aio.unit.channel_ready_test.TestChannelReady
Definition: channel_ready_test.py:33
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:44