_channel_ready_future_test.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of grpc.channel_ready_future."""
15 
16 import logging
17 import threading
18 import unittest
19 
20 import grpc
21 
22 from tests.unit import thread_pool
23 from tests.unit.framework.common import test_constants
24 
25 
26 class _Callback(object):
27 
28  def __init__(self):
29  self._condition = threading.Condition()
30  self._value = None
31 
32  def accept_value(self, value):
33  with self._condition:
34  self._value = value
35  self._condition.notify_all()
36 
37  def block_until_called(self):
38  with self._condition:
39  while self._value is None:
40  self._condition.wait()
41  return self._value
42 
43 
44 class ChannelReadyFutureTest(unittest.TestCase):
45 
47  channel = grpc.insecure_channel('localhost:12345')
48  callback = _Callback()
49 
50  ready_future = grpc.channel_ready_future(channel)
51  ready_future.add_done_callback(callback.accept_value)
52  with self.assertRaises(grpc.FutureTimeoutError):
53  ready_future.result(timeout=test_constants.SHORT_TIMEOUT)
54  self.assertFalse(ready_future.cancelled())
55  self.assertFalse(ready_future.done())
56  self.assertTrue(ready_future.running())
57  ready_future.cancel()
58  value_passed_to_callback = callback.block_until_called()
59  self.assertIs(ready_future, value_passed_to_callback)
60  self.assertTrue(ready_future.cancelled())
61  self.assertTrue(ready_future.done())
62  self.assertFalse(ready_future.running())
63 
64  channel.close()
65 
67  recording_thread_pool = thread_pool.RecordingThreadPool(
68  max_workers=None)
69  server = grpc.server(recording_thread_pool,
70  options=(('grpc.so_reuseport', 0),))
71  port = server.add_insecure_port('[::]:0')
72  server.start()
73  channel = grpc.insecure_channel('localhost:{}'.format(port))
74  callback = _Callback()
75 
76  ready_future = grpc.channel_ready_future(channel)
77  ready_future.add_done_callback(callback.accept_value)
78  self.assertIsNone(
79  ready_future.result(timeout=test_constants.LONG_TIMEOUT))
80  value_passed_to_callback = callback.block_until_called()
81  self.assertIs(ready_future, value_passed_to_callback)
82  self.assertFalse(ready_future.cancelled())
83  self.assertTrue(ready_future.done())
84  self.assertFalse(ready_future.running())
85  # Cancellation after maturity has no effect.
86  ready_future.cancel()
87  self.assertFalse(ready_future.cancelled())
88  self.assertTrue(ready_future.done())
89  self.assertFalse(ready_future.running())
90  self.assertFalse(recording_thread_pool.was_used())
91 
92  channel.close()
93  server.stop(None)
94 
95 
96 if __name__ == '__main__':
97  logging.basicConfig()
98  unittest.main(verbosity=2)
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._channel_ready_future_test._Callback._value
_value
Definition: _channel_ready_future_test.py:30
tests.unit._channel_ready_future_test._Callback.block_until_called
def block_until_called(self)
Definition: _channel_ready_future_test.py:37
tests.unit._channel_ready_future_test._Callback.__init__
def __init__(self)
Definition: _channel_ready_future_test.py:28
tests.unit._channel_ready_future_test.ChannelReadyFutureTest.test_lonely_channel_connectivity
def test_lonely_channel_connectivity(self)
Definition: _channel_ready_future_test.py:46
tests.unit._channel_ready_future_test.ChannelReadyFutureTest.test_immediately_connectable_channel_connectivity
def test_immediately_connectable_channel_connectivity(self)
Definition: _channel_ready_future_test.py:66
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests.unit._channel_ready_future_test._Callback.accept_value
def accept_value(self, value)
Definition: _channel_ready_future_test.py:32
grpc.channel_ready_future
def channel_ready_future(channel)
Definition: src/python/grpcio/grpc/__init__.py:1945
tests.unit._channel_ready_future_test._Callback._condition
_condition
Definition: _channel_ready_future_test.py:29
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
grpc.FutureTimeoutError
Future Interface ###############################.
Definition: src/python/grpcio/grpc/__init__.py:40
tests.unit.thread_pool.RecordingThreadPool
Definition: thread_pool.py:19
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.unit._channel_ready_future_test._Callback
Definition: _channel_ready_future_test.py:26
tests.unit._channel_ready_future_test.ChannelReadyFutureTest
Definition: _channel_ready_future_test.py:44


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38