14 """Test making many calls and immediately cancelling most of them."""
28 _SERVER_SHUTDOWN_TAG =
'server_shutdown'
29 _REQUEST_CALL_TAG =
'request_call'
30 _RECEIVE_CLOSE_ON_SERVER_TAG =
'receive_close_on_server'
31 _RECEIVE_MESSAGE_TAG =
'receive_message'
32 _SERVER_COMPLETE_CALL_TAG =
'server_complete_call'
34 _SUCCESS_CALL_FRACTION = 1.0 / 8.0
35 _SUCCESSFUL_CALLS =
int(test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION)
36 _UNSUCCESSFUL_CALLS = test_constants.RPC_CONCURRENCY - _SUCCESSFUL_CALLS
49 return (event.tag
is _RECEIVE_CLOSE_ON_SERVER_TAG
and
50 event.batch_operations[0].cancelled())
55 def __init__(self, state, completion_queue, rpc_event):
62 with self.
_state.condition:
63 self.
_state.parked_handlers += 1
64 if self.
_state.parked_handlers == test_constants.THREAD_CONCURRENCY:
65 self.
_state.condition.notify_all()
66 while not self.
_state.handlers_released:
67 self.
_state.condition.wait()
70 self.
_call.start_server_batch(
71 (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
72 _RECEIVE_CLOSE_ON_SERVER_TAG)
73 self.
_call.start_server_batch(
74 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
82 cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
84 cygrpc.SendMessageOperation(b
'\x79\x57', _EMPTY_FLAGS),
85 cygrpc.SendStatusFromServerOperation(
86 _EMPTY_METADATA, cygrpc.StatusCode.ok, b
'test details!',
89 self.
_call.start_server_batch(operations,
90 _SERVER_COMPLETE_CALL_TAG)
95 def _serve(state, server, server_completion_queue, thread_pool):
96 for _
in range(test_constants.RPC_CONCURRENCY):
97 call_completion_queue = cygrpc.CompletionQueue()
98 server.request_call(call_completion_queue, server_completion_queue,
100 rpc_event = server_completion_queue.poll()
101 thread_pool.submit(
_Handler(state, call_completion_queue, rpc_event))
102 with state.condition:
103 state.handled_rpcs += 1
104 if test_constants.RPC_CONCURRENCY <= state.handled_rpcs:
105 state.condition.notify_all()
106 server_completion_queue.poll()
111 def __init__(self, condition, completion_queue, due):
125 self.
_due.remove(event.tag)
131 thread = threading.Thread(target=in_thread)
144 server_thread_pool = logging_pool.pool(
145 test_constants.THREAD_CONCURRENCY)
147 server_completion_queue = cygrpc.CompletionQueue()
148 server = cygrpc.Server([(
149 b
'grpc.so_reuseport',
152 server.register_completion_queue(server_completion_queue)
153 port = server.add_http2_port(b
'[::]:0')
155 channel = cygrpc.Channel(
'localhost:{}'.
format(port).
encode(),
None,
160 server_thread_args = (
163 server_completion_queue,
166 server_thread = threading.Thread(target=_serve, args=server_thread_args)
167 server_thread.start()
169 client_condition = threading.Condition()
172 with client_condition:
174 for index
in range(test_constants.RPC_CONCURRENCY):
175 tag =
'client_complete_call_{0:04d}_tag'.
format(index)
176 client_call = channel.integrated_call(
177 _EMPTY_FLAGS, b
'/twinkies',
None,
None, _EMPTY_METADATA,
180 cygrpc.SendInitialMetadataOperation(
181 _EMPTY_METADATA, _EMPTY_FLAGS),
182 cygrpc.SendMessageOperation(b
'\x45\x56',
184 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
185 cygrpc.ReceiveInitialMetadataOperation(
187 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
188 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
193 client_calls.append(client_call)
196 channel.next_call_event()
for _
in range(_SUCCESSFUL_CALLS)))
198 with state.condition:
200 if state.parked_handlers < test_constants.THREAD_CONCURRENCY:
201 state.condition.wait()
202 elif state.handled_rpcs < test_constants.RPC_CONCURRENCY:
203 state.condition.wait()
205 state.handlers_released =
True
206 state.condition.notify_all()
209 client_events_future.result()
210 with client_condition:
211 for client_call
in client_calls:
212 client_call.cancel(cygrpc.StatusCode.cancelled,
'Cancelled!')
213 for _
in range(_UNSUCCESSFUL_CALLS):
214 channel.next_call_event()
216 channel.close(cygrpc.StatusCode.unknown,
'Cancelled on channel close!')
217 with state.condition:
218 server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG)
221 if __name__ ==
'__main__':
222 unittest.main(verbosity=2)