_channel_connectivity_test.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of grpc._channel.Channel connectivity."""
15 
16 import logging
17 import threading
18 import time
19 import unittest
20 
21 import grpc
22 
23 from tests.unit import thread_pool
24 from tests.unit.framework.common import test_constants
25 
26 
27 def _ready_in_connectivities(connectivities):
28  return grpc.ChannelConnectivity.READY in connectivities
29 
30 
31 def _last_connectivity_is_not_ready(connectivities):
32  return connectivities[-1] is not grpc.ChannelConnectivity.READY
33 
34 
35 class _Callback(object):
36 
37  def __init__(self):
38  self._condition = threading.Condition()
39  self._connectivities = []
40 
41  def update(self, connectivity):
42  with self._condition:
43  self._connectivities.append(connectivity)
44  self._condition.notify()
45 
46  def connectivities(self):
47  with self._condition:
48  return tuple(self._connectivities)
49 
50  def block_until_connectivities_satisfy(self, predicate):
51  with self._condition:
52  while True:
53  connectivities = tuple(self._connectivities)
54  if predicate(connectivities):
55  return connectivities
56  else:
57  self._condition.wait()
58 
59 
60 class ChannelConnectivityTest(unittest.TestCase):
61 
63  callback = _Callback()
64 
65  channel = grpc.insecure_channel('localhost:12345')
66  channel.subscribe(callback.update, try_to_connect=False)
67  first_connectivities = callback.block_until_connectivities_satisfy(bool)
68  channel.subscribe(callback.update, try_to_connect=True)
69  second_connectivities = callback.block_until_connectivities_satisfy(
70  lambda connectivities: 2 <= len(connectivities))
71  # Wait for a connection that will never happen.
72  time.sleep(test_constants.SHORT_TIMEOUT)
73  third_connectivities = callback.connectivities()
74  channel.unsubscribe(callback.update)
75  fourth_connectivities = callback.connectivities()
76  channel.unsubscribe(callback.update)
77  fifth_connectivities = callback.connectivities()
78 
79  channel.close()
80 
81  self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
82  first_connectivities)
83  self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities)
84  self.assertNotIn(grpc.ChannelConnectivity.READY, third_connectivities)
85  self.assertNotIn(grpc.ChannelConnectivity.READY, fourth_connectivities)
86  self.assertNotIn(grpc.ChannelConnectivity.READY, fifth_connectivities)
87 
89  recording_thread_pool = thread_pool.RecordingThreadPool(
90  max_workers=None)
91  server = grpc.server(recording_thread_pool,
92  options=(('grpc.so_reuseport', 0),))
93  port = server.add_insecure_port('[::]:0')
94  server.start()
95  first_callback = _Callback()
96  second_callback = _Callback()
97 
98  channel = grpc.insecure_channel('localhost:{}'.format(port))
99  channel.subscribe(first_callback.update, try_to_connect=False)
100  first_connectivities = first_callback.block_until_connectivities_satisfy(
101  bool)
102  # Wait for a connection that will never happen because try_to_connect=True
103  # has not yet been passed.
104  time.sleep(test_constants.SHORT_TIMEOUT)
105  second_connectivities = first_callback.connectivities()
106  channel.subscribe(second_callback.update, try_to_connect=True)
107  third_connectivities = first_callback.block_until_connectivities_satisfy(
108  lambda connectivities: 2 <= len(connectivities))
109  fourth_connectivities = second_callback.block_until_connectivities_satisfy(
110  bool)
111  # Wait for a connection that will happen (or may already have happened).
112  first_callback.block_until_connectivities_satisfy(
113  _ready_in_connectivities)
114  second_callback.block_until_connectivities_satisfy(
115  _ready_in_connectivities)
116  channel.close()
117  server.stop(None)
118 
119  self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
120  first_connectivities)
121  self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
122  second_connectivities)
123  self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE,
124  third_connectivities)
125  self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN,
126  third_connectivities)
127  self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE,
128  fourth_connectivities)
129  self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN,
130  fourth_connectivities)
131  self.assertFalse(recording_thread_pool.was_used())
132 
134  recording_thread_pool = thread_pool.RecordingThreadPool(
135  max_workers=None)
136  server = grpc.server(recording_thread_pool,
137  options=(('grpc.so_reuseport', 0),))
138  port = server.add_insecure_port('[::]:0')
139  server.start()
140  callback = _Callback()
141 
142  channel = grpc.insecure_channel('localhost:{}'.format(port))
143  channel.subscribe(callback.update, try_to_connect=True)
144  callback.block_until_connectivities_satisfy(_ready_in_connectivities)
145  # Now take down the server and confirm that channel readiness is repudiated.
146  server.stop(None)
147  callback.block_until_connectivities_satisfy(
148  _last_connectivity_is_not_ready)
149  channel.unsubscribe(callback.update)
150  channel.close()
151  self.assertFalse(recording_thread_pool.was_used())
152 
153 
154 if __name__ == '__main__':
155  logging.basicConfig()
156  unittest.main(verbosity=2)
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._channel_connectivity_test._ready_in_connectivities
def _ready_in_connectivities(connectivities)
Definition: _channel_connectivity_test.py:27
tests.unit._channel_connectivity_test._Callback.connectivities
def connectivities(self)
Definition: _channel_connectivity_test.py:46
tests.unit._channel_connectivity_test._Callback._connectivities
_connectivities
Definition: _channel_connectivity_test.py:39
tests.unit._channel_connectivity_test.ChannelConnectivityTest.test_reachable_then_unreachable_channel_connectivity
def test_reachable_then_unreachable_channel_connectivity(self)
Definition: _channel_connectivity_test.py:133
tests.unit._channel_connectivity_test._Callback.block_until_connectivities_satisfy
def block_until_connectivities_satisfy(self, predicate)
Definition: _channel_connectivity_test.py:50
tests.unit._channel_connectivity_test.ChannelConnectivityTest.test_immediately_connectable_channel_connectivity
def test_immediately_connectable_channel_connectivity(self)
Definition: _channel_connectivity_test.py:88
tests.unit._channel_connectivity_test._Callback._condition
_condition
Definition: _channel_connectivity_test.py:38
tests.unit._channel_connectivity_test.ChannelConnectivityTest
Definition: _channel_connectivity_test.py:60
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests.unit._channel_connectivity_test._Callback.update
def update(self, connectivity)
Definition: _channel_connectivity_test.py:41
tests.unit._channel_connectivity_test._Callback
Definition: _channel_connectivity_test.py:35
tests.unit._channel_connectivity_test.ChannelConnectivityTest.test_lonely_channel_connectivity
def test_lonely_channel_connectivity(self)
Definition: _channel_connectivity_test.py:62
tests.unit._channel_connectivity_test._last_connectivity_is_not_ready
def _last_connectivity_is_not_ready(connectivities)
Definition: _channel_connectivity_test.py:31
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit.thread_pool.RecordingThreadPool
Definition: thread_pool.py:19
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests.unit._channel_connectivity_test._Callback.__init__
def __init__(self)
Definition: _channel_connectivity_test.py:37


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38