_channel_close_test.py
Go to the documentation of this file.
1 # Copyright 2018 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests server and client side compression."""
15 
16 import itertools
17 import logging
18 import threading
19 import time
20 import unittest
21 
22 import grpc
23 
24 from tests.unit import test_common
25 from tests.unit.framework.common import test_constants
26 
27 _BEAT = 0.5
28 _SOME_TIME = 5
29 _MORE_TIME = 10
30 
31 _STREAM_URI = 'Meffod'
32 _UNARY_URI = 'MeffodMan'
33 
34 
36 
37  request_streaming = True
38  response_streaming = True
39  request_deserializer = None
40  response_serializer = None
41 
42  def stream_stream(self, request_iterator, servicer_context):
43  for request in request_iterator:
44  yield request * 2
45 
46 
48 
49  request_streaming = False
50  response_streaming = False
51  request_deserializer = None
52  response_serializer = None
53 
54  def unary_unary(self, request, servicer_context):
55  return request * 2
56 
57 
58 _STREAMING_METHOD_HANDLER = _StreamingMethodHandler()
59 _UNARY_METHOD_HANDLER = _UnaryMethodHandler()
60 
61 
63 
64  def service(self, handler_call_details):
65  if handler_call_details.method == _STREAM_URI:
66  return _STREAMING_METHOD_HANDLER
67  else:
68  return _UNARY_METHOD_HANDLER
69 
70 
71 _GENERIC_HANDLER = _GenericHandler()
72 
73 
74 class _Pipe(object):
75 
76  def __init__(self, values):
77  self._condition = threading.Condition()
78  self._values = list(values)
79  self._open = True
80 
81  def __iter__(self):
82  return self
83 
84  def _next(self):
85  with self._condition:
86  while not self._values and self._open:
87  self._condition.wait()
88  if self._values:
89  return self._values.pop(0)
90  else:
91  raise StopIteration()
92 
93  def next(self):
94  return self._next()
95 
96  def __next__(self):
97  return self._next()
98 
99  def add(self, value):
100  with self._condition:
101  self._values.append(value)
102  self._condition.notify()
103 
104  def close(self):
105  with self._condition:
106  self._open = False
107  self._condition.notify()
108 
109  def __enter__(self):
110  return self
111 
112  def __exit__(self, type, value, traceback):
113  self.close()
114 
115 
116 class ChannelCloseTest(unittest.TestCase):
117 
118  def setUp(self):
119  self._server = test_common.test_server(
120  max_workers=test_constants.THREAD_CONCURRENCY)
121  self._server.add_generic_rpc_handlers((_GENERIC_HANDLER,))
122  self._port = self._server.add_insecure_port('[::]:0')
123  self._server.start()
124 
125  def tearDown(self):
126  self._server.stop(None)
127 
129  channel = grpc.insecure_channel('localhost:{}'.format(self._port))
130  multi_callable = channel.stream_stream(_STREAM_URI)
131  request_iterator = _Pipe(())
132  response_iterator = multi_callable(request_iterator)
133  channel.close()
134  request_iterator.close()
135 
136  self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
137 
139  channel = grpc.insecure_channel('localhost:{}'.format(self._port))
140  multi_callable = channel.stream_stream(_STREAM_URI)
141  request_iterator = _Pipe((b'abc',))
142  response_iterator = multi_callable(request_iterator)
143  next(response_iterator)
144  channel.close()
145  request_iterator.close()
146 
147  self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
148 
150  with grpc.insecure_channel('localhost:{}'.format(
151  self._port)) as channel: # pylint: disable=bad-continuation
152  multi_callable = channel.stream_stream(_STREAM_URI)
153  request_iterator = _Pipe((b'abc',))
154  response_iterator = multi_callable(request_iterator)
155  next(response_iterator)
156  request_iterator.close()
157 
158  self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
159 
161  with grpc.insecure_channel('localhost:{}'.format(
162  self._port)) as channel: # pylint: disable=bad-continuation
163  multi_callable = channel.stream_stream(_STREAM_URI)
164  request_iterators = tuple(
165  _Pipe((b'abc',))
166  for _ in range(test_constants.THREAD_CONCURRENCY))
167  response_iterators = []
168  for request_iterator in request_iterators:
169  response_iterator = multi_callable(request_iterator)
170  next(response_iterator)
171  response_iterators.append(response_iterator)
172  for request_iterator in request_iterators:
173  request_iterator.close()
174 
175  for response_iterator in response_iterators:
176  self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
177 
179  channel = grpc.insecure_channel('localhost:{}'.format(self._port))
180  multi_callable = channel.stream_stream(_STREAM_URI)
181  request_iterator = _Pipe((b'abc',))
182  response_iterator = multi_callable(request_iterator)
183  next(response_iterator)
184  start = time.time()
185  end = start + _MORE_TIME
186 
187  def sleep_some_time_then_close():
188  time.sleep(_SOME_TIME)
189  channel.close()
190 
191  for _ in range(test_constants.THREAD_CONCURRENCY):
192  close_thread = threading.Thread(target=sleep_some_time_then_close)
193  close_thread.start()
194  while True:
195  request_iterator.add(b'def')
196  time.sleep(_BEAT)
197  if end < time.time():
198  break
199  request_iterator.close()
200 
201  self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
202 
204  with grpc.insecure_channel('localhost:{}'.format(
205  self._port)) as channel:
206  stream_multi_callable = channel.stream_stream(_STREAM_URI)
207  endless_iterator = itertools.repeat(b'abc')
208  stream_response_iterator = stream_multi_callable(endless_iterator)
209  future = channel.unary_unary(_UNARY_URI).future(b'abc')
210 
211  def on_done_callback(future):
212  raise Exception("This should not cause a deadlock.")
213 
214  future.add_done_callback(on_done_callback)
215  future.result()
216 
217 
218 if __name__ == '__main__':
219  logging.basicConfig()
220  unittest.main(verbosity=2)
tests.unit._channel_close_test.ChannelCloseTest.test_many_concurrent_closes
def test_many_concurrent_closes(self)
Definition: _channel_close_test.py:178
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.unit._channel_close_test.ChannelCloseTest
Definition: _channel_close_test.py:116
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._channel_close_test.ChannelCloseTest.test_exception_in_callback
def test_exception_in_callback(self)
Definition: _channel_close_test.py:203
tests.unit._channel_close_test._Pipe.next
def next(self)
Definition: _channel_close_test.py:93
tests.unit._channel_close_test._GenericHandler
Definition: _channel_close_test.py:62
tests.unit._channel_close_test._StreamingMethodHandler
Definition: _channel_close_test.py:35
tests.unit._channel_close_test._Pipe.add
def add(self, value)
Definition: _channel_close_test.py:99
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.unit._channel_close_test.ChannelCloseTest._port
_port
Definition: _channel_close_test.py:122
tests.unit._channel_close_test._UnaryMethodHandler
Definition: _channel_close_test.py:47
tests.unit._channel_close_test._Pipe._values
_values
Definition: _channel_close_test.py:78
tests.unit._channel_close_test._Pipe._condition
_condition
Definition: _channel_close_test.py:77
tests.unit._channel_close_test._Pipe._next
def _next(self)
Definition: _channel_close_test.py:84
tests.unit._channel_close_test._Pipe
Definition: _channel_close_test.py:74
tests.unit._channel_close_test._StreamingMethodHandler.stream_stream
def stream_stream(self, request_iterator, servicer_context)
Definition: _channel_close_test.py:42
tests.unit._exit_scenarios.future
future
Definition: _exit_scenarios.py:217
tests.unit._channel_close_test.ChannelCloseTest.test_context_manager_close_while_many_calls_active
def test_context_manager_close_while_many_calls_active(self)
Definition: _channel_close_test.py:160
tests.unit._channel_close_test._Pipe.__init__
def __init__(self, values)
Definition: _channel_close_test.py:76
start
static uint64_t start
Definition: benchmark-pound.c:74
tests.unit._exit_scenarios.multi_callable
multi_callable
Definition: _exit_scenarios.py:216
tests.unit._channel_close_test._Pipe.__next__
def __next__(self)
Definition: _channel_close_test.py:96
tests.unit._channel_close_test._GenericHandler.service
def service(self, handler_call_details)
Definition: _channel_close_test.py:64
tests.unit._channel_close_test._Pipe.close
def close(self)
Definition: _channel_close_test.py:104
tests.unit._channel_close_test.ChannelCloseTest.test_close_while_call_active
def test_close_while_call_active(self)
Definition: _channel_close_test.py:138
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
tests.unit._channel_close_test._Pipe.__iter__
def __iter__(self)
Definition: _channel_close_test.py:81
tests.unit._channel_close_test._Pipe._open
_open
Definition: _channel_close_test.py:79
tests.unit._channel_close_test.ChannelCloseTest._server
_server
Definition: _channel_close_test.py:119
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit._channel_close_test.ChannelCloseTest.tearDown
def tearDown(self)
Definition: _channel_close_test.py:125
tests.unit._channel_close_test._UnaryMethodHandler.unary_unary
def unary_unary(self, request, servicer_context)
Definition: _channel_close_test.py:54
tests.unit._channel_close_test.ChannelCloseTest.test_close_immediately_after_call_invocation
def test_close_immediately_after_call_invocation(self)
Definition: _channel_close_test.py:128
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
tests.unit._channel_close_test.ChannelCloseTest.test_context_manager_close_while_call_active
def test_context_manager_close_while_call_active(self)
Definition: _channel_close_test.py:149
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.unit._channel_close_test._Pipe.__enter__
def __enter__(self)
Definition: _channel_close_test.py:109
tests.unit._channel_close_test.ChannelCloseTest.setUp
def setUp(self)
Definition: _channel_close_test.py:118
grpc.RpcMethodHandler
Definition: src/python/grpcio/grpc/__init__.py:1288
tests.unit._channel_close_test._Pipe.__exit__
def __exit__(self, type, value, traceback)
Definition: _channel_close_test.py:112


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38