14 """Implementations of interoperability test methods."""
19 from tests
import bazel_namespace_package_hack
20 bazel_namespace_package_hack.sys_path_to_site_dir_hack()
30 from google
import auth
as google_auth
31 from google.auth
import environment_vars
as google_auth_environment_vars
32 from google.auth.transport
import grpc
as google_auth_transport_grpc
33 from google.auth.transport
import requests
as google_auth_transport_requests
36 from src.proto.grpc.testing
import empty_pb2
37 from src.proto.grpc.testing
import messages_pb2
39 _INITIAL_METADATA_KEY =
"x-grpc-test-echo-initial"
40 _TRAILING_METADATA_KEY =
"x-grpc-test-echo-trailing-bin"
44 if call.code() != expected_code:
45 raise ValueError(
'expected code %s, got %s' %
46 (expected_code, call.code()))
50 if call.details() != expected_details:
51 raise ValueError(
'expected message %s, got %s' %
52 (expected_details, call.details()))
61 if response.payload.type
is not expected_type:
62 raise ValueError(
'expected payload type %s, got %s' %
63 (expected_type,
type(response.payload.type)))
64 elif len(response.payload.body) != expected_length:
65 raise ValueError(
'expected payload body size %d, got %d' %
66 (expected_length,
len(response.payload.body)))
73 response_type=messages_pb2.COMPRESSABLE,
76 fill_username=fill_username,
77 fill_oauth_scope=fill_oauth_scope)
78 response_future = stub.UnaryCall.future(request,
79 credentials=call_credentials)
80 response = response_future.result()
86 response = stub.EmptyCall(empty_pb2.Empty())
87 if not isinstance(response, empty_pb2.Empty):
88 raise TypeError(
'response is of type "%s", not empty_pb2.Empty!' %
97 payload_body_sizes = (
104 for size
in payload_body_sizes)
106 for payload
in payloads)
107 response = stub.StreamingInputCall(requests)
108 if response.aggregated_payload_size != 74922:
109 raise ValueError(
'incorrect size %d!' %
110 response.aggregated_payload_size)
122 response_type=messages_pb2.COMPRESSABLE,
123 response_parameters=(
129 response_iterator = stub.StreamingOutputCall(request)
130 for index, response
in enumerate(response_iterator):
155 raise StopIteration()
175 request_response_sizes = (
181 request_payload_sizes = (
188 with _Pipe()
as pipe:
189 response_iterator = stub.FullDuplexCall(pipe)
190 for response_size, payload_size
in zip(request_response_sizes,
191 request_payload_sizes):
193 response_type=messages_pb2.COMPRESSABLE,
195 size=response_size),),
198 response =
next(response_iterator)
200 messages_pb2.COMPRESSABLE,
205 with _Pipe()
as pipe:
206 response_future = stub.StreamingInputCall.future(pipe)
207 response_future.cancel()
208 if not response_future.cancelled():
209 raise ValueError(
'expected cancelled method to return True')
210 if response_future.code()
is not grpc.StatusCode.CANCELLED:
211 raise ValueError(
'expected status code CANCELLED')
215 request_response_sizes = (
221 request_payload_sizes = (
227 with _Pipe()
as pipe:
228 response_iterator = stub.FullDuplexCall(pipe)
230 response_size = request_response_sizes[0]
231 payload_size = request_payload_sizes[0]
233 response_type=messages_pb2.COMPRESSABLE,
235 size=response_size),),
238 response =
next(response_iterator)
241 response_iterator.cancel()
244 next(response_iterator)
246 if rpc_error.code()
is not grpc.StatusCode.CANCELLED:
249 raise ValueError(
'expected call to be cancelled')
253 request_payload_size = 27182
254 with _Pipe()
as pipe:
255 response_iterator = stub.FullDuplexCall(pipe, timeout=0.001)
258 response_type=messages_pb2.COMPRESSABLE,
262 next(response_iterator)
264 if rpc_error.code()
is not grpc.StatusCode.DEADLINE_EXCEEDED:
267 raise ValueError(
'expected call to exceed deadline')
271 with _Pipe()
as pipe:
272 response_iterator = stub.FullDuplexCall(pipe)
275 next(response_iterator)
276 raise ValueError(
'expected exactly 0 responses')
277 except StopIteration:
282 details =
'test status message'
284 status = grpc.StatusCode.UNKNOWN
288 response_type=messages_pb2.COMPRESSABLE,
292 response_future = stub.UnaryCall.future(request)
296 with _Pipe()
as pipe:
297 response_iterator = stub.FullDuplexCall(pipe)
299 response_type=messages_pb2.COMPRESSABLE,
305 next(response_iterator)
307 assert rpc_error.code() == status
313 response_future = (test_service_stub.UnimplementedCall.future(
319 response_future = (unimplemented_service_stub.UnimplementedCall.future(
325 initial_metadata_value =
"test_initial_metadata_value"
326 trailing_metadata_value = b
"\x0a\x0b\x0a\x0b\x0a\x0b"
327 metadata = ((_INITIAL_METADATA_KEY, initial_metadata_value),
328 (_TRAILING_METADATA_KEY, trailing_metadata_value))
330 def _validate_metadata(response):
331 initial_metadata = dict(response.initial_metadata())
332 if initial_metadata[_INITIAL_METADATA_KEY] != initial_metadata_value:
333 raise ValueError(
'expected initial metadata %s, got %s' %
334 (initial_metadata_value,
335 initial_metadata[_INITIAL_METADATA_KEY]))
336 trailing_metadata = dict(response.trailing_metadata())
337 if trailing_metadata[_TRAILING_METADATA_KEY] != trailing_metadata_value:
338 raise ValueError(
'expected trailing metadata %s, got %s' %
339 (trailing_metadata_value,
340 trailing_metadata[_TRAILING_METADATA_KEY]))
344 response_type=messages_pb2.COMPRESSABLE,
347 response_future = stub.UnaryCall.future(request, metadata=metadata)
348 _validate_metadata(response_future)
351 with _Pipe()
as pipe:
352 response_iterator = stub.FullDuplexCall(pipe, metadata=metadata)
354 response_type=messages_pb2.COMPRESSABLE,
357 next(response_iterator)
359 _validate_metadata(response_iterator)
364 if args.default_service_account != response.username:
365 raise ValueError(
'expected username %s, got %s' %
366 (args.default_service_account, response.username))
370 json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
371 wanted_email = json.load(
open(json_key_filename,
'r'))[
'client_email']
373 if wanted_email != response.username:
374 raise ValueError(
'expected username %s, got %s' %
375 (wanted_email, response.username))
376 if args.oauth_scope.find(response.oauth_scope) == -1:
378 'expected to find oauth scope "{}" in received "{}"'.
format(
379 response.oauth_scope, args.oauth_scope))
383 json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
384 wanted_email = json.load(
open(json_key_filename,
'r'))[
'client_email']
386 if wanted_email != response.username:
387 raise ValueError(
'expected username %s, got %s' %
388 (wanted_email, response.username))
392 json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
393 wanted_email = json.load(
open(json_key_filename,
'r'))[
'client_email']
394 google_credentials, unused_project_id = google_auth.default(
395 scopes=[args.oauth_scope])
397 google_auth_transport_grpc.AuthMetadataPlugin(
398 credentials=google_credentials,
399 request=google_auth_transport_requests.Request()))
401 if wanted_email != response.username:
402 raise ValueError(
'expected username %s, got %s' %
403 (wanted_email, response.username))
407 details = b
'\t\ntest with whitespace\r\nand Unicode BMP \xe2\x98\xba and non-BMP \xf0\x9f\x98\x88\t\n'.
decode(
410 status = grpc.StatusCode.UNKNOWN
414 response_type=messages_pb2.COMPRESSABLE,
418 response_future = stub.UnaryCall.future(request)
424 EMPTY_UNARY =
'empty_unary'
425 LARGE_UNARY =
'large_unary'
426 SERVER_STREAMING =
'server_streaming'
427 CLIENT_STREAMING =
'client_streaming'
428 PING_PONG =
'ping_pong'
429 CANCEL_AFTER_BEGIN =
'cancel_after_begin'
430 CANCEL_AFTER_FIRST_RESPONSE =
'cancel_after_first_response'
431 EMPTY_STREAM =
'empty_stream'
432 STATUS_CODE_AND_MESSAGE =
'status_code_and_message'
433 UNIMPLEMENTED_METHOD =
'unimplemented_method'
434 UNIMPLEMENTED_SERVICE =
'unimplemented_service'
435 CUSTOM_METADATA =
"custom_metadata"
436 COMPUTE_ENGINE_CREDS =
'compute_engine_creds'
437 OAUTH2_AUTH_TOKEN =
'oauth2_auth_token'
438 JWT_TOKEN_CREDS =
'jwt_token_creds'
439 PER_RPC_CREDS =
'per_rpc_creds'
440 TIMEOUT_ON_SLEEPING_SERVER =
'timeout_on_sleeping_server'
441 SPECIAL_STATUS_MESSAGE =
'special_status_message'
444 if self
is TestCase.EMPTY_UNARY:
446 elif self
is TestCase.LARGE_UNARY:
448 elif self
is TestCase.SERVER_STREAMING:
450 elif self
is TestCase.CLIENT_STREAMING:
452 elif self
is TestCase.PING_PONG:
454 elif self
is TestCase.CANCEL_AFTER_BEGIN:
456 elif self
is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
458 elif self
is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
460 elif self
is TestCase.EMPTY_STREAM:
462 elif self
is TestCase.STATUS_CODE_AND_MESSAGE:
464 elif self
is TestCase.UNIMPLEMENTED_METHOD:
466 elif self
is TestCase.UNIMPLEMENTED_SERVICE:
468 elif self
is TestCase.CUSTOM_METADATA:
470 elif self
is TestCase.COMPUTE_ENGINE_CREDS:
472 elif self
is TestCase.OAUTH2_AUTH_TOKEN:
474 elif self
is TestCase.JWT_TOKEN_CREDS:
476 elif self
is TestCase.PER_RPC_CREDS:
478 elif self
is TestCase.SPECIAL_STATUS_MESSAGE:
481 raise NotImplementedError(
'Test case "%s" not implemented!' %