14 """Test a corner-case at the level of the Cython API."""
29 def __init__(self, completion_queue, shutdown_tag):
48 thread = threading.Thread(target=in_thread)
70 def __init__(self, condition, completion_queue, due):
84 self.
_due.remove(event.tag)
90 thread = threading.Thread(target=in_thread)
115 server_completion_queue = cygrpc.CompletionQueue()
116 server = cygrpc.Server([(
117 b
'grpc.so_reuseport',
120 server.register_completion_queue(server_completion_queue)
121 port = server.add_http2_port(b
'[::]:0')
126 server_shutdown_tag =
'server_shutdown_tag'
129 server_driver.start()
131 client_condition = threading.Condition()
134 server_call_condition = threading.Condition()
135 server_send_initial_metadata_tag =
'server_send_initial_metadata_tag'
136 server_send_first_message_tag =
'server_send_first_message_tag'
137 server_send_second_message_tag =
'server_send_second_message_tag'
138 server_complete_rpc_tag =
'server_complete_rpc_tag'
139 server_call_due =
set((
140 server_send_initial_metadata_tag,
141 server_send_first_message_tag,
142 server_send_second_message_tag,
143 server_complete_rpc_tag,
145 server_call_completion_queue = cygrpc.CompletionQueue()
146 server_call_driver =
_QueueDriver(server_call_condition,
147 server_call_completion_queue,
149 server_call_driver.start()
151 server_rpc_tag =
'server_rpc_tag'
152 request_call_result = server.request_call(server_call_completion_queue,
153 server_completion_queue,
156 client_receive_initial_metadata_tag =
'client_receive_initial_metadata_tag'
157 client_complete_rpc_tag =
'client_complete_rpc_tag'
158 client_call = channel.segregated_call(
159 _EMPTY_FLAGS, b
'/twinkies',
None,
None, _EMPTY_METADATA,
None, (
162 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
164 client_receive_initial_metadata_tag,
168 cygrpc.SendInitialMetadataOperation(
169 _EMPTY_METADATA, _EMPTY_FLAGS),
170 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
171 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
173 client_complete_rpc_tag,
177 client_call.next_event)
179 server_rpc_event = server_driver.first_event()
181 with server_call_condition:
182 server_send_initial_metadata_start_batch_result = (
183 server_rpc_event.call.start_server_batch([
184 cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
186 ], server_send_initial_metadata_tag))
187 server_send_first_message_start_batch_result = (
188 server_rpc_event.call.start_server_batch([
189 cygrpc.SendMessageOperation(b
'\x07', _EMPTY_FLAGS),
190 ], server_send_first_message_tag))
191 server_send_initial_metadata_event = server_call_driver.event_with_tag(
192 server_send_initial_metadata_tag)
193 server_send_first_message_event = server_call_driver.event_with_tag(
194 server_send_first_message_tag)
195 with server_call_condition:
196 server_send_second_message_start_batch_result = (
197 server_rpc_event.call.start_server_batch([
198 cygrpc.SendMessageOperation(b
'\x07', _EMPTY_FLAGS),
199 ], server_send_second_message_tag))
200 server_complete_rpc_start_batch_result = (
201 server_rpc_event.call.start_server_batch([
202 cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
203 cygrpc.SendStatusFromServerOperation(
204 (), cygrpc.StatusCode.ok, b
'test details',
206 ], server_complete_rpc_tag))
207 server_send_second_message_event = server_call_driver.event_with_tag(
208 server_send_second_message_tag)
209 server_complete_rpc_event = server_call_driver.event_with_tag(
210 server_complete_rpc_tag)
211 server_call_driver.events()
213 client_recieve_initial_metadata_event = client_receive_initial_metadata_event_future.result(
216 client_receive_first_message_tag =
'client_receive_first_message_tag'
217 client_call.operate([
218 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
219 ], client_receive_first_message_tag)
220 client_receive_first_message_event = client_call.next_event()
222 client_call_cancel_result = client_call.cancel(
223 cygrpc.StatusCode.cancelled,
'Cancelled during test!')
224 client_complete_rpc_event = client_call.next_event()
226 channel.close(cygrpc.StatusCode.unknown,
'Channel closed!')
227 server.shutdown(server_completion_queue, server_shutdown_tag)
228 server.cancel_all_calls()
229 server_driver.events()
231 self.assertEqual(cygrpc.CallError.ok, request_call_result)
232 self.assertEqual(cygrpc.CallError.ok,
233 server_send_initial_metadata_start_batch_result)
234 self.assertIs(server_rpc_tag, server_rpc_event.tag)
235 self.assertEqual(cygrpc.CompletionType.operation_complete,
236 server_rpc_event.completion_type)
237 self.assertIsInstance(server_rpc_event.call, cygrpc.Call)
240 if __name__ ==
'__main__':
241 unittest.main(verbosity=2)