_cancel_many_calls_test.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test making many calls and immediately cancelling most of them."""
15 
16 import threading
17 import unittest
18 
19 from grpc._cython import cygrpc
20 from grpc.framework.foundation import logging_pool
21 
22 from tests.unit._cython import test_utilities
23 from tests.unit.framework.common import test_constants
24 
25 _EMPTY_FLAGS = 0
26 _EMPTY_METADATA = ()
27 
28 _SERVER_SHUTDOWN_TAG = 'server_shutdown'
29 _REQUEST_CALL_TAG = 'request_call'
30 _RECEIVE_CLOSE_ON_SERVER_TAG = 'receive_close_on_server'
31 _RECEIVE_MESSAGE_TAG = 'receive_message'
32 _SERVER_COMPLETE_CALL_TAG = 'server_complete_call'
33 
34 _SUCCESS_CALL_FRACTION = 1.0 / 8.0
35 _SUCCESSFUL_CALLS = int(test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION)
36 _UNSUCCESSFUL_CALLS = test_constants.RPC_CONCURRENCY - _SUCCESSFUL_CALLS
37 
38 
39 class _State(object):
40 
41  def __init__(self):
42  self.condition = threading.Condition()
43  self.handlers_released = False
44  self.parked_handlers = 0
45  self.handled_rpcs = 0
46 
47 
49  return (event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and
50  event.batch_operations[0].cancelled())
51 
52 
53 class _Handler(object):
54 
55  def __init__(self, state, completion_queue, rpc_event):
56  self._state = state
57  self._lock = threading.Lock()
58  self._completion_queue = completion_queue
59  self._call = rpc_event.call
60 
61  def __call__(self):
62  with self._state.condition:
63  self._state.parked_handlers += 1
64  if self._state.parked_handlers == test_constants.THREAD_CONCURRENCY:
65  self._state.condition.notify_all()
66  while not self._state.handlers_released:
67  self._state.condition.wait()
68 
69  with self._lock:
70  self._call.start_server_batch(
71  (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
72  _RECEIVE_CLOSE_ON_SERVER_TAG)
73  self._call.start_server_batch(
74  (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
75  _RECEIVE_MESSAGE_TAG)
76  first_event = self._completion_queue.poll()
77  if _is_cancellation_event(first_event):
78  self._completion_queue.poll()
79  else:
80  with self._lock:
81  operations = (
82  cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
83  _EMPTY_FLAGS),
84  cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS),
85  cygrpc.SendStatusFromServerOperation(
86  _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!',
87  _EMPTY_FLAGS),
88  )
89  self._call.start_server_batch(operations,
90  _SERVER_COMPLETE_CALL_TAG)
91  self._completion_queue.poll()
92  self._completion_queue.poll()
93 
94 
95 def _serve(state, server, server_completion_queue, thread_pool):
96  for _ in range(test_constants.RPC_CONCURRENCY):
97  call_completion_queue = cygrpc.CompletionQueue()
98  server.request_call(call_completion_queue, server_completion_queue,
99  _REQUEST_CALL_TAG)
100  rpc_event = server_completion_queue.poll()
101  thread_pool.submit(_Handler(state, call_completion_queue, rpc_event))
102  with state.condition:
103  state.handled_rpcs += 1
104  if test_constants.RPC_CONCURRENCY <= state.handled_rpcs:
105  state.condition.notify_all()
106  server_completion_queue.poll()
107 
108 
109 class _QueueDriver(object):
110 
111  def __init__(self, condition, completion_queue, due):
112  self._condition = condition
113  self._completion_queue = completion_queue
114  self._due = due
115  self._events = []
116  self._returned = False
117 
118  def start(self):
119 
120  def in_thread():
121  while True:
122  event = self._completion_queue.poll()
123  with self._condition:
124  self._events.append(event)
125  self._due.remove(event.tag)
126  self._condition.notify_all()
127  if not self._due:
128  self._returned = True
129  return
130 
131  thread = threading.Thread(target=in_thread)
132  thread.start()
133 
134  def events(self, at_least):
135  with self._condition:
136  while len(self._events) < at_least:
137  self._condition.wait()
138  return tuple(self._events)
139 
140 
141 class CancelManyCallsTest(unittest.TestCase):
142 
144  server_thread_pool = logging_pool.pool(
145  test_constants.THREAD_CONCURRENCY)
146 
147  server_completion_queue = cygrpc.CompletionQueue()
148  server = cygrpc.Server([(
149  b'grpc.so_reuseport',
150  0,
151  )], False)
152  server.register_completion_queue(server_completion_queue)
153  port = server.add_http2_port(b'[::]:0')
154  server.start()
155  channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None,
156  None)
157 
158  state = _State()
159 
160  server_thread_args = (
161  state,
162  server,
163  server_completion_queue,
164  server_thread_pool,
165  )
166  server_thread = threading.Thread(target=_serve, args=server_thread_args)
167  server_thread.start()
168 
169  client_condition = threading.Condition()
170  client_due = set()
171 
172  with client_condition:
173  client_calls = []
174  for index in range(test_constants.RPC_CONCURRENCY):
175  tag = 'client_complete_call_{0:04d}_tag'.format(index)
176  client_call = channel.integrated_call(
177  _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA,
178  None, ((
179  (
180  cygrpc.SendInitialMetadataOperation(
181  _EMPTY_METADATA, _EMPTY_FLAGS),
182  cygrpc.SendMessageOperation(b'\x45\x56',
183  _EMPTY_FLAGS),
184  cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
185  cygrpc.ReceiveInitialMetadataOperation(
186  _EMPTY_FLAGS),
187  cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
188  cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
189  ),
190  tag,
191  ),))
192  client_due.add(tag)
193  client_calls.append(client_call)
194 
195  client_events_future = test_utilities.SimpleFuture(lambda: tuple(
196  channel.next_call_event() for _ in range(_SUCCESSFUL_CALLS)))
197 
198  with state.condition:
199  while True:
200  if state.parked_handlers < test_constants.THREAD_CONCURRENCY:
201  state.condition.wait()
202  elif state.handled_rpcs < test_constants.RPC_CONCURRENCY:
203  state.condition.wait()
204  else:
205  state.handlers_released = True
206  state.condition.notify_all()
207  break
208 
209  client_events_future.result()
210  with client_condition:
211  for client_call in client_calls:
212  client_call.cancel(cygrpc.StatusCode.cancelled, 'Cancelled!')
213  for _ in range(_UNSUCCESSFUL_CALLS):
214  channel.next_call_event()
215 
216  channel.close(cygrpc.StatusCode.unknown, 'Cancelled on channel close!')
217  with state.condition:
218  server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG)
219 
220 
221 if __name__ == '__main__':
222  unittest.main(verbosity=2)
http2_test_server.format
format
Definition: http2_test_server.py:118
tests.unit._cython
Definition: src/python/grpcio_tests/tests/unit/_cython/__init__.py:1
grpc.framework.foundation
Definition: src/python/grpcio/grpc/framework/foundation/__init__.py:1
tests.unit._cython._cancel_many_calls_test._QueueDriver._returned
_returned
Definition: _cancel_many_calls_test.py:116
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.unit._cython._cancel_many_calls_test._serve
def _serve(state, server, server_completion_queue, thread_pool)
Definition: _cancel_many_calls_test.py:95
tests.unit._cython._cancel_many_calls_test._State.__init__
def __init__(self)
Definition: _cancel_many_calls_test.py:41
tests.unit._cython._cancel_many_calls_test._QueueDriver
Definition: _cancel_many_calls_test.py:109
tests.unit._cython._cancel_many_calls_test._QueueDriver._completion_queue
_completion_queue
Definition: _cancel_many_calls_test.py:113
tests.unit._cython._cancel_many_calls_test._Handler.__init__
def __init__(self, state, completion_queue, rpc_event)
Definition: _cancel_many_calls_test.py:55
tests.unit._cython._cancel_many_calls_test._State.handled_rpcs
handled_rpcs
Definition: _cancel_many_calls_test.py:45
tests.unit._cython._cancel_many_calls_test._QueueDriver._due
_due
Definition: _cancel_many_calls_test.py:114
tests.unit._cython._cancel_many_calls_test._QueueDriver.start
def start(self)
Definition: _cancel_many_calls_test.py:118
tests.unit._cython._cancel_many_calls_test._Handler.__call__
def __call__(self)
Definition: _cancel_many_calls_test.py:61
grpc._common.encode
def encode(s)
Definition: grpc/_common.py:68
tests.unit._cython._cancel_many_calls_test._QueueDriver._events
_events
Definition: _cancel_many_calls_test.py:115
tests.unit._cython._cancel_many_calls_test._State
Definition: _cancel_many_calls_test.py:39
tests.unit._cython._cancel_many_calls_test._Handler._completion_queue
_completion_queue
Definition: _cancel_many_calls_test.py:58
tests.unit._cython._cancel_many_calls_test.CancelManyCallsTest
Definition: _cancel_many_calls_test.py:141
tests.unit._cython._cancel_many_calls_test._State.parked_handlers
parked_handlers
Definition: _cancel_many_calls_test.py:44
xds_interop_client.int
int
Definition: xds_interop_client.py:113
tests.unit._cython._cancel_many_calls_test._is_cancellation_event
def _is_cancellation_event(event)
Definition: _cancel_many_calls_test.py:48
tests.unit._cython._cancel_many_calls_test._State.handlers_released
handlers_released
Definition: _cancel_many_calls_test.py:43
tests.unit._cython._cancel_many_calls_test._Handler
Definition: _cancel_many_calls_test.py:53
tests.unit._cython._cancel_many_calls_test._QueueDriver._condition
_condition
Definition: _cancel_many_calls_test.py:112
tests.unit._cython._cancel_many_calls_test._QueueDriver.events
def events(self, at_least)
Definition: _cancel_many_calls_test.py:134
tests.unit._cython.test_utilities.SimpleFuture
Definition: _cython/test_utilities.py:20
tests.unit._cython._cancel_many_calls_test._State.condition
condition
Definition: _cancel_many_calls_test.py:42
tests.unit._cython._cancel_many_calls_test.CancelManyCallsTest.testCancelManyCalls
def testCancelManyCalls(self)
Definition: _cancel_many_calls_test.py:143
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
tests.unit._cython._cancel_many_calls_test._Handler._lock
_lock
Definition: _cancel_many_calls_test.py:57
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
grpc._cython
Definition: src/python/grpcio/grpc/_cython/__init__.py:1
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests.unit._cython._cancel_many_calls_test._QueueDriver.__init__
def __init__(self, condition, completion_queue, due)
Definition: _cancel_many_calls_test.py:111
tests.unit._cython._cancel_many_calls_test._Handler._state
_state
Definition: _cancel_many_calls_test.py:56
tests.unit._cython._cancel_many_calls_test._Handler._call
_call
Definition: _cancel_many_calls_test.py:59


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38