14 """Testing the channel_ready function."""
37 listen=
False, sock_options=(socket.SO_REUSEADDR,))
38 self.
_channel = aio.insecure_channel(f
"{address}:{self._port}")
41 async
def tearDown(self):
44 async
def test_channel_ready_success(self):
46 channel_ready_task = self.
loop.create_task(
50 await _common.block_until_certain_state(
51 self.
_channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE)
58 await channel_ready_task
60 await server.stop(
None)
62 async
def test_channel_ready_blocked(self):
63 with self.assertRaises(asyncio.TimeoutError):
64 await asyncio.wait_for(self.
_channel.channel_ready(),
65 test_constants.SHORT_TIMEOUT)
68 if __name__ ==
'__main__':
69 logging.basicConfig(level=logging.DEBUG)
70 unittest.main(verbosity=2)