26 _SSL_HOST_OVERRIDE = b
'foo.test.google.fr'
27 _CALL_CREDENTIALS_METADATA_KEY =
'call-creds-key'
28 _CALL_CREDENTIALS_METADATA_VALUE =
'call-creds-value'
34 _CALL_CREDENTIALS_METADATA_KEY,
35 _CALL_CREDENTIALS_METADATA_VALUE,
36 ),), cygrpc.StatusCode.ok, b
'')
42 completion_queue = cygrpc.CompletionQueue()
46 server = cygrpc.Server(
set([(
53 channel = cygrpc.Channel(b
'[::]:0',
None,
None)
54 channel.close(cygrpc.StatusCode.cancelled,
'Test method anyway!')
57 cygrpc.MetadataPluginCallCredentials(_metadata_plugin,
61 server = cygrpc.Server([(
65 completion_queue = cygrpc.CompletionQueue()
66 server.register_completion_queue(completion_queue)
67 port = server.add_http2_port(b
'[::]:0')
68 self.assertIsInstance(port, int)
73 completion_queue = cygrpc.CompletionQueue()
74 server = cygrpc.Server([
80 server.add_http2_port(b
'[::]:0')
81 server.register_completion_queue(completion_queue)
83 shutdown_tag = object()
84 server.shutdown(completion_queue, shutdown_tag)
85 event = completion_queue.poll()
86 self.assertEqual(cygrpc.CompletionType.operation_complete,
87 event.completion_type)
88 self.assertIs(shutdown_tag, event.tag)
95 def setUpMixin(self, server_credentials, client_credentials, host_override):
102 if server_credentials:
106 self.
port = self.
server.add_http2_port(b
'[::]:0')
109 if client_credentials:
110 client_channel_arguments = ((
111 cygrpc.ChannelArgKey.ssl_target_name_override,
116 client_channel_arguments, client_credentials)
137 """Perform the operations with given call, queue, and deadline.
139 Invocation errors are reported with as an exception with `description`
140 in the message. Performs the operations asynchronously, returning a
147 call_result = call.start_client_batch(operations, tag)
148 self.assertEqual(cygrpc.CallError.ok, call_result)
149 event = queue.poll(deadline=deadline)
150 self.assertEqual(cygrpc.CompletionType.operation_complete,
151 event.completion_type)
152 self.assertTrue(event.success)
153 self.assertIs(tag, event.tag)
154 except Exception
as error:
155 raise Exception(
"Error in '{}': {}".
format(
156 description, error.message))
162 DEADLINE = time.time() + 5
163 DEADLINE_TOLERANCE = 0.25
164 CLIENT_METADATA_ASCII_KEY =
'key'
165 CLIENT_METADATA_ASCII_VALUE =
'val'
166 CLIENT_METADATA_BIN_KEY =
'key-bin'
167 CLIENT_METADATA_BIN_VALUE = b
'\0' * 1000
168 SERVER_INITIAL_METADATA_KEY =
'init_me_me_me'
169 SERVER_INITIAL_METADATA_VALUE =
'whodawha?'
170 SERVER_TRAILING_METADATA_KEY =
'california_is_in_a_drought'
171 SERVER_TRAILING_METADATA_VALUE =
'zomg it is'
172 SERVER_STATUS_CODE = cygrpc.StatusCode.ok
173 SERVER_STATUS_DETAILS =
'our work is never over'
174 REQUEST = b
'in death a member of project mayhem has a name'
175 RESPONSE = b
'his name is robert paulson'
178 server_request_tag = object()
183 self.assertEqual(cygrpc.CallError.ok, request_call_result)
185 client_call_tag = object()
186 client_initial_metadata = (
188 CLIENT_METADATA_ASCII_KEY,
189 CLIENT_METADATA_ASCII_VALUE,
192 CLIENT_METADATA_BIN_KEY,
193 CLIENT_METADATA_BIN_VALUE,
197 0, METHOD, self.
host_argument, DEADLINE, client_initial_metadata,
201 cygrpc.SendInitialMetadataOperation(
202 client_initial_metadata, _EMPTY_FLAGS),
203 cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
204 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
205 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
206 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
207 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
216 self.assertEqual(cygrpc.CompletionType.operation_complete,
217 request_event.completion_type)
218 self.assertIsInstance(request_event.call, cygrpc.Call)
219 self.assertIs(server_request_tag, request_event.tag)
221 test_common.metadata_transmitted(client_initial_metadata,
222 request_event.invocation_metadata))
223 self.assertEqual(METHOD, request_event.call_details.method)
224 self.assertEqual(self.
expected_host, request_event.call_details.host)
225 self.assertLess(abs(DEADLINE - request_event.call_details.deadline),
228 server_call_tag = object()
229 server_call = request_event.call
230 server_initial_metadata = ((
231 SERVER_INITIAL_METADATA_KEY,
232 SERVER_INITIAL_METADATA_VALUE,
234 server_trailing_metadata = ((
235 SERVER_TRAILING_METADATA_KEY,
236 SERVER_TRAILING_METADATA_VALUE,
238 server_start_batch_result = server_call.start_server_batch([
239 cygrpc.SendInitialMetadataOperation(server_initial_metadata,
241 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
242 cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS),
243 cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
244 cygrpc.SendStatusFromServerOperation(
245 server_trailing_metadata, SERVER_STATUS_CODE,
246 SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
248 self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
251 client_event = client_event_future.result()
253 self.assertEqual(6,
len(client_event.batch_operations))
254 found_client_op_types =
set()
255 for client_result
in client_event.batch_operations:
257 self.assertNotIn(client_result.type(), found_client_op_types)
258 found_client_op_types.add(client_result.type())
259 if client_result.type(
260 ) == cygrpc.OperationType.receive_initial_metadata:
262 test_common.metadata_transmitted(
263 server_initial_metadata,
264 client_result.initial_metadata()))
265 elif client_result.type() == cygrpc.OperationType.receive_message:
266 self.assertEqual(RESPONSE, client_result.message())
267 elif client_result.type(
268 ) == cygrpc.OperationType.receive_status_on_client:
270 test_common.metadata_transmitted(
271 server_trailing_metadata,
272 client_result.trailing_metadata()))
273 self.assertEqual(SERVER_STATUS_DETAILS, client_result.details())
274 self.assertEqual(SERVER_STATUS_CODE, client_result.code())
277 cygrpc.OperationType.send_initial_metadata,
278 cygrpc.OperationType.send_message,
279 cygrpc.OperationType.send_close_from_client,
280 cygrpc.OperationType.receive_initial_metadata,
281 cygrpc.OperationType.receive_message,
282 cygrpc.OperationType.receive_status_on_client
283 ]), found_client_op_types)
285 self.assertEqual(5,
len(server_event.batch_operations))
286 found_server_op_types =
set()
287 for server_result
in server_event.batch_operations:
288 self.assertNotIn(server_result.type(), found_server_op_types)
289 found_server_op_types.add(server_result.type())
290 if server_result.type() == cygrpc.OperationType.receive_message:
291 self.assertEqual(REQUEST, server_result.message())
292 elif server_result.type(
293 ) == cygrpc.OperationType.receive_close_on_server:
294 self.assertFalse(server_result.cancelled())
297 cygrpc.OperationType.send_initial_metadata,
298 cygrpc.OperationType.receive_message,
299 cygrpc.OperationType.send_message,
300 cygrpc.OperationType.receive_close_on_server,
301 cygrpc.OperationType.send_status_from_server
302 ]), found_server_op_types)
308 DEADLINE = time.time() + 5
309 DEADLINE_TOLERANCE = 0.25
315 server_request_tag = object()
323 cygrpc.SendInitialMetadataOperation(empty_metadata,
325 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
331 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
337 client_call.next_event)
340 server_call = request_event.call
342 def perform_server_operations(operations, description):
345 DEADLINE, description)
347 server_event_future = perform_server_operations([
348 cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
349 ],
"Server prologue")
351 client_initial_metadata_event_future.result()
352 server_event_future.result()
356 client_call.operate([
357 cygrpc.SendMessageOperation(b
'', _EMPTY_FLAGS),
358 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
361 client_call.next_event)
362 server_event_future = perform_server_operations([
363 cygrpc.SendMessageOperation(b
'', _EMPTY_FLAGS),
364 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
367 client_message_event_future.result()
368 server_event_future.result()
371 client_call.operate([
372 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
373 ],
"Client epilogue")
376 client_call.next_event(),
377 client_call.next_event(),
380 server_event_future = perform_server_operations([
381 cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
382 cygrpc.SendStatusFromServerOperation(
383 empty_metadata, cygrpc.StatusCode.ok, b
'', _EMPTY_FLAGS)
384 ],
"Server epilogue")
386 client_events_future.result()
387 server_event_future.result()
402 server_credentials = cygrpc.server_credentials_ssl(
404 cygrpc.SslPemKeyCertPair(resources.private_key(),
405 resources.certificate_chain())
407 client_credentials = cygrpc.SSLChannelCredentials(
408 resources.test_root_certificates(),
None,
None)
409 self.
setUpMixin(server_credentials, client_credentials,
416 if __name__ ==
'__main__':
417 unittest.main(verbosity=2)