aio/interop/methods.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Implementations of interoperability test methods."""
15 
16 import argparse
17 import asyncio
18 import collections
19 import datetime
20 import enum
21 import inspect
22 import json
23 import os
24 import threading
25 import time
26 from typing import Any, Optional, Union
27 
28 from google import auth as google_auth
29 from google.auth import environment_vars as google_auth_environment_vars
30 from google.auth.transport import grpc as google_auth_transport_grpc
31 from google.auth.transport import requests as google_auth_transport_requests
32 import grpc
33 from grpc.experimental import aio
34 
35 from src.proto.grpc.testing import empty_pb2
36 from src.proto.grpc.testing import messages_pb2
37 from src.proto.grpc.testing import test_pb2_grpc
38 
39 _INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
40 _TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
41 
42 
43 async def _expect_status_code(call: aio.Call,
44  expected_code: grpc.StatusCode) -> None:
45  code = await call.code()
46  if code != expected_code:
47  raise ValueError('expected code %s, got %s' %
48  (expected_code, await call.code()))
49 
50 
51 async def _expect_status_details(call: aio.Call, expected_details: str) -> None:
52  details = await call.details()
53  if details != expected_details:
54  raise ValueError('expected message %s, got %s' %
55  (expected_details, await call.details()))
56 
57 
58 async def _validate_status_code_and_details(call: aio.Call,
59  expected_code: grpc.StatusCode,
60  expected_details: str) -> None:
61  await _expect_status_code(call, expected_code)
62  await _expect_status_details(call, expected_details)
63 
64 
66  messages_pb2.SimpleResponse, messages_pb2.StreamingOutputCallResponse],
67  expected_type: Any,
68  expected_length: int) -> None:
69  if response.payload.type is not expected_type:
70  raise ValueError('expected payload type %s, got %s' %
71  (expected_type, type(response.payload.type)))
72  elif len(response.payload.body) != expected_length:
73  raise ValueError('expected payload body size %d, got %d' %
74  (expected_length, len(response.payload.body)))
75 
76 
78  stub: test_pb2_grpc.TestServiceStub, fill_username: bool,
79  fill_oauth_scope: bool, call_credentials: Optional[grpc.CallCredentials]
80 ) -> messages_pb2.SimpleResponse:
81  size = 314159
83  response_type=messages_pb2.COMPRESSABLE,
84  response_size=size,
85  payload=messages_pb2.Payload(body=b'\x00' * 271828),
86  fill_username=fill_username,
87  fill_oauth_scope=fill_oauth_scope)
88  response = await stub.UnaryCall(request, credentials=call_credentials)
89  _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
90  return response
91 
92 
93 async def _empty_unary(stub: test_pb2_grpc.TestServiceStub) -> None:
94  response = await stub.EmptyCall(empty_pb2.Empty())
95  if not isinstance(response, empty_pb2.Empty):
96  raise TypeError('response is of type "%s", not empty_pb2.Empty!' %
97  type(response))
98 
99 
100 async def _large_unary(stub: test_pb2_grpc.TestServiceStub) -> None:
101  await _large_unary_common_behavior(stub, False, False, None)
102 
103 
104 async def _client_streaming(stub: test_pb2_grpc.TestServiceStub) -> None:
105  payload_body_sizes = (
106  27182,
107  8,
108  1828,
109  45904,
110  )
111 
112  async def request_gen():
113  for size in payload_body_sizes:
115  payload=messages_pb2.Payload(body=b'\x00' * size))
116 
117  response = await stub.StreamingInputCall(request_gen())
118  if response.aggregated_payload_size != sum(payload_body_sizes):
119  raise ValueError('incorrect size %d!' %
120  response.aggregated_payload_size)
121 
122 
123 async def _server_streaming(stub: test_pb2_grpc.TestServiceStub) -> None:
124  sizes = (
125  31415,
126  9,
127  2653,
128  58979,
129  )
130 
132  response_type=messages_pb2.COMPRESSABLE,
133  response_parameters=(
134  messages_pb2.ResponseParameters(size=sizes[0]),
135  messages_pb2.ResponseParameters(size=sizes[1]),
136  messages_pb2.ResponseParameters(size=sizes[2]),
137  messages_pb2.ResponseParameters(size=sizes[3]),
138  ))
139  call = stub.StreamingOutputCall(request)
140  for size in sizes:
141  response = await call.read()
142  _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
143  size)
144 
145 
146 async def _ping_pong(stub: test_pb2_grpc.TestServiceStub) -> None:
147  request_response_sizes = (
148  31415,
149  9,
150  2653,
151  58979,
152  )
153  request_payload_sizes = (
154  27182,
155  8,
156  1828,
157  45904,
158  )
159 
160  call = stub.FullDuplexCall()
161  for response_size, payload_size in zip(request_response_sizes,
162  request_payload_sizes):
164  response_type=messages_pb2.COMPRESSABLE,
165  response_parameters=(messages_pb2.ResponseParameters(
166  size=response_size),),
167  payload=messages_pb2.Payload(body=b'\x00' * payload_size))
168 
169  await call.write(request)
170  response = await call.read()
171  _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
172  response_size)
173  await call.done_writing()
174  await _validate_status_code_and_details(call, grpc.StatusCode.OK, '')
175 
176 
177 async def _cancel_after_begin(stub: test_pb2_grpc.TestServiceStub):
178  call = stub.StreamingInputCall()
179  call.cancel()
180  if not call.cancelled():
181  raise ValueError('expected cancelled method to return True')
182  code = await call.code()
183  if code is not grpc.StatusCode.CANCELLED:
184  raise ValueError('expected status code CANCELLED')
185 
186 
187 async def _cancel_after_first_response(stub: test_pb2_grpc.TestServiceStub):
188  request_response_sizes = (
189  31415,
190  9,
191  2653,
192  58979,
193  )
194  request_payload_sizes = (
195  27182,
196  8,
197  1828,
198  45904,
199  )
200 
201  call = stub.FullDuplexCall()
202 
203  response_size = request_response_sizes[0]
204  payload_size = request_payload_sizes[0]
206  response_type=messages_pb2.COMPRESSABLE,
207  response_parameters=(messages_pb2.ResponseParameters(
208  size=response_size),),
209  payload=messages_pb2.Payload(body=b'\x00' * payload_size))
210 
211  await call.write(request)
212  await call.read()
213 
214  call.cancel()
215 
216  try:
217  await call.read()
218  except asyncio.CancelledError:
219  assert await call.code() is grpc.StatusCode.CANCELLED
220  else:
221  raise ValueError('expected call to be cancelled')
222 
223 
224 async def _timeout_on_sleeping_server(stub: test_pb2_grpc.TestServiceStub):
225  request_payload_size = 27182
226  time_limit = datetime.timedelta(seconds=1)
227 
228  call = stub.FullDuplexCall(timeout=time_limit.total_seconds())
229 
231  response_type=messages_pb2.COMPRESSABLE,
232  payload=messages_pb2.Payload(body=b'\x00' * request_payload_size),
233  response_parameters=(messages_pb2.ResponseParameters(
234  interval_us=int(time_limit.total_seconds() * 2 * 10**6)),))
235  await call.write(request)
236  await call.done_writing()
237  try:
238  await call.read()
239  except aio.AioRpcError as rpc_error:
240  if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED:
241  raise
242  else:
243  raise ValueError('expected call to exceed deadline')
244 
245 
246 async def _empty_stream(stub: test_pb2_grpc.TestServiceStub):
247  call = stub.FullDuplexCall()
248  await call.done_writing()
249  assert await call.read() == aio.EOF
250 
251 
252 async def _status_code_and_message(stub: test_pb2_grpc.TestServiceStub):
253  details = 'test status message'
254  status = grpc.StatusCode.UNKNOWN # code = 2
255 
256  # Test with a UnaryCall
257  request = messages_pb2.SimpleRequest(
258  response_type=messages_pb2.COMPRESSABLE,
259  response_size=1,
260  payload=messages_pb2.Payload(body=b'\x00'),
261  response_status=messages_pb2.EchoStatus(code=status.value[0],
262  message=details))
263  call = stub.UnaryCall(request)
264  await _validate_status_code_and_details(call, status, details)
265 
266  # Test with a FullDuplexCall
267  call = stub.FullDuplexCall()
269  response_type=messages_pb2.COMPRESSABLE,
270  response_parameters=(messages_pb2.ResponseParameters(size=1),),
271  payload=messages_pb2.Payload(body=b'\x00'),
272  response_status=messages_pb2.EchoStatus(code=status.value[0],
273  message=details))
274  await call.write(request) # sends the initial request.
275  await call.done_writing()
276  try:
277  await call.read()
278  except aio.AioRpcError as rpc_error:
279  assert rpc_error.code() == status
280  await _validate_status_code_and_details(call, status, details)
281 
282 
283 async def _unimplemented_method(stub: test_pb2_grpc.TestServiceStub):
284  call = stub.UnimplementedCall(empty_pb2.Empty())
285  await _expect_status_code(call, grpc.StatusCode.UNIMPLEMENTED)
286 
287 
288 async def _unimplemented_service(stub: test_pb2_grpc.UnimplementedServiceStub):
289  call = stub.UnimplementedCall(empty_pb2.Empty())
290  await _expect_status_code(call, grpc.StatusCode.UNIMPLEMENTED)
291 
292 
293 async def _custom_metadata(stub: test_pb2_grpc.TestServiceStub):
294  initial_metadata_value = "test_initial_metadata_value"
295  trailing_metadata_value = b"\x0a\x0b\x0a\x0b\x0a\x0b"
296  metadata = aio.Metadata(
297  (_INITIAL_METADATA_KEY, initial_metadata_value),
298  (_TRAILING_METADATA_KEY, trailing_metadata_value),
299  )
300 
301  async def _validate_metadata(call):
302  initial_metadata = await call.initial_metadata()
303  if initial_metadata[_INITIAL_METADATA_KEY] != initial_metadata_value:
304  raise ValueError('expected initial metadata %s, got %s' %
305  (initial_metadata_value,
306  initial_metadata[_INITIAL_METADATA_KEY]))
307 
308  trailing_metadata = await call.trailing_metadata()
309  if trailing_metadata[_TRAILING_METADATA_KEY] != trailing_metadata_value:
310  raise ValueError('expected trailing metadata %s, got %s' %
311  (trailing_metadata_value,
312  trailing_metadata[_TRAILING_METADATA_KEY]))
313 
314  # Testing with UnaryCall
315  request = messages_pb2.SimpleRequest(
316  response_type=messages_pb2.COMPRESSABLE,
317  response_size=1,
318  payload=messages_pb2.Payload(body=b'\x00'))
319  call = stub.UnaryCall(request, metadata=metadata)
320  await _validate_metadata(call)
321 
322  # Testing with FullDuplexCall
323  call = stub.FullDuplexCall(metadata=metadata)
325  response_type=messages_pb2.COMPRESSABLE,
326  response_parameters=(messages_pb2.ResponseParameters(size=1),))
327  await call.write(request)
328  await call.read()
329  await call.done_writing()
330  await _validate_metadata(call)
331 
332 
333 async def _compute_engine_creds(stub: test_pb2_grpc.TestServiceStub,
334  args: argparse.Namespace):
335  response = await _large_unary_common_behavior(stub, True, True, None)
336  if args.default_service_account != response.username:
337  raise ValueError('expected username %s, got %s' %
338  (args.default_service_account, response.username))
339 
340 
341 async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub,
342  args: argparse.Namespace):
343  json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
344  wanted_email = json.load(open(json_key_filename, 'r'))['client_email']
345  response = await _large_unary_common_behavior(stub, True, True, None)
346  if wanted_email != response.username:
347  raise ValueError('expected username %s, got %s' %
348  (wanted_email, response.username))
349  if args.oauth_scope.find(response.oauth_scope) == -1:
350  raise ValueError(
351  'expected to find oauth scope "{}" in received "{}"'.format(
352  response.oauth_scope, args.oauth_scope))
353 
354 
355 async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub):
356  json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
357  wanted_email = json.load(open(json_key_filename, 'r'))['client_email']
358  response = await _large_unary_common_behavior(stub, True, False, None)
359  if wanted_email != response.username:
360  raise ValueError('expected username %s, got %s' %
361  (wanted_email, response.username))
362 
363 
364 async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub,
365  args: argparse.Namespace):
366  json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS]
367  wanted_email = json.load(open(json_key_filename, 'r'))['client_email']
368  google_credentials, unused_project_id = google_auth.default(
369  scopes=[args.oauth_scope])
370  call_credentials = grpc.metadata_call_credentials(
371  google_auth_transport_grpc.AuthMetadataPlugin(
372  credentials=google_credentials,
373  request=google_auth_transport_requests.Request()))
374  response = await _large_unary_common_behavior(stub, True, False,
375  call_credentials)
376  if wanted_email != response.username:
377  raise ValueError('expected username %s, got %s' %
378  (wanted_email, response.username))
379 
380 
381 async def _special_status_message(stub: test_pb2_grpc.TestServiceStub):
382  details = b'\t\ntest with whitespace\r\nand Unicode BMP \xe2\x98\xba and non-BMP \xf0\x9f\x98\x88\t\n'.decode(
383  'utf-8')
384  status = grpc.StatusCode.UNKNOWN # code = 2
385 
386  # Test with a UnaryCall
387  request = messages_pb2.SimpleRequest(
388  response_type=messages_pb2.COMPRESSABLE,
389  response_size=1,
390  payload=messages_pb2.Payload(body=b'\x00'),
391  response_status=messages_pb2.EchoStatus(code=status.value[0],
392  message=details))
393  call = stub.UnaryCall(request)
394  await _validate_status_code_and_details(call, status, details)
395 
396 
397 @enum.unique
398 class TestCase(enum.Enum):
399  EMPTY_UNARY = 'empty_unary'
400  LARGE_UNARY = 'large_unary'
401  SERVER_STREAMING = 'server_streaming'
402  CLIENT_STREAMING = 'client_streaming'
403  PING_PONG = 'ping_pong'
404  CANCEL_AFTER_BEGIN = 'cancel_after_begin'
405  CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
406  TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
407  EMPTY_STREAM = 'empty_stream'
408  STATUS_CODE_AND_MESSAGE = 'status_code_and_message'
409  UNIMPLEMENTED_METHOD = 'unimplemented_method'
410  UNIMPLEMENTED_SERVICE = 'unimplemented_service'
411  CUSTOM_METADATA = "custom_metadata"
412  COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
413  OAUTH2_AUTH_TOKEN = 'oauth2_auth_token'
414  JWT_TOKEN_CREDS = 'jwt_token_creds'
415  PER_RPC_CREDS = 'per_rpc_creds'
416  SPECIAL_STATUS_MESSAGE = 'special_status_message'
417 
418 
419 _TEST_CASE_IMPLEMENTATION_MAPPING = {
420  TestCase.EMPTY_UNARY: _empty_unary,
421  TestCase.LARGE_UNARY: _large_unary,
422  TestCase.SERVER_STREAMING: _server_streaming,
423  TestCase.CLIENT_STREAMING: _client_streaming,
424  TestCase.PING_PONG: _ping_pong,
425  TestCase.CANCEL_AFTER_BEGIN: _cancel_after_begin,
426  TestCase.CANCEL_AFTER_FIRST_RESPONSE: _cancel_after_first_response,
427  TestCase.TIMEOUT_ON_SLEEPING_SERVER: _timeout_on_sleeping_server,
428  TestCase.EMPTY_STREAM: _empty_stream,
429  TestCase.STATUS_CODE_AND_MESSAGE: _status_code_and_message,
430  TestCase.UNIMPLEMENTED_METHOD: _unimplemented_method,
431  TestCase.UNIMPLEMENTED_SERVICE: _unimplemented_service,
432  TestCase.CUSTOM_METADATA: _custom_metadata,
433  TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds,
434  TestCase.OAUTH2_AUTH_TOKEN: _oauth2_auth_token,
435  TestCase.JWT_TOKEN_CREDS: _jwt_token_creds,
436  TestCase.PER_RPC_CREDS: _per_rpc_creds,
437  TestCase.SPECIAL_STATUS_MESSAGE: _special_status_message,
438 }
439 
440 
442  case: TestCase,
443  stub: test_pb2_grpc.TestServiceStub,
444  args: Optional[argparse.Namespace] = None) -> None:
445  method = _TEST_CASE_IMPLEMENTATION_MAPPING.get(case)
446  if method is None:
447  raise NotImplementedError(f'Test case "{case}" not implemented!')
448  else:
449  num_params = len(inspect.signature(method).parameters)
450  if num_params == 1:
451  await method(stub)
452  elif num_params == 2:
453  if args is not None:
454  await method(stub, args)
455  else:
456  raise ValueError(f'Failed to run case [{case}]: args is None')
457  else:
458  raise ValueError(f'Invalid number of parameters [{num_params}]')
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
http2_test_server.format
format
Definition: http2_test_server.py:118
tests_aio.interop.methods._expect_status_details
None _expect_status_details(aio.Call call, str expected_details)
Definition: aio/interop/methods.py:51
tests_aio.interop.methods._unimplemented_service
def _unimplemented_service(test_pb2_grpc.UnimplementedServiceStub stub)
Definition: aio/interop/methods.py:288
grpc::testing::sum
double sum(const T &container, F functor)
Definition: test/cpp/qps/stats.h:30
tests_aio.interop.methods._custom_metadata
def _custom_metadata(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:293
tests_aio.interop.methods._cancel_after_begin
def _cancel_after_begin(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:177
tests_aio.interop.methods._expect_status_code
None _expect_status_code(aio.Call call, grpc.StatusCode expected_code)
Definition: aio/interop/methods.py:43
tests_aio.interop.methods._empty_stream
def _empty_stream(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:246
tests_aio.interop.methods.TestCase
Definition: aio/interop/methods.py:398
grpc.metadata_call_credentials
def metadata_call_credentials(metadata_plugin, name=None)
Definition: src/python/grpcio/grpc/__init__.py:1644
xds_interop_client.int
int
Definition: xds_interop_client.py:113
grpc::experimental
Definition: include/grpcpp/channel.h:46
grpc.StatusCode
Definition: src/python/grpcio/grpc/__init__.py:232
tests_aio.interop.methods._status_code_and_message
def _status_code_and_message(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:252
tests_aio.interop.methods._large_unary
None _large_unary(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:100
tests_aio.interop.methods._empty_unary
None _empty_unary(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:93
tests_aio.interop.methods._validate_payload_type_and_length
None _validate_payload_type_and_length(Union[messages_pb2.SimpleResponse, messages_pb2.StreamingOutputCallResponse] response, Any expected_type, int expected_length)
Definition: aio/interop/methods.py:65
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
tests_aio.interop.methods._cancel_after_first_response
def _cancel_after_first_response(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:187
tests_aio.interop.methods._large_unary_common_behavior
messages_pb2.SimpleResponse _large_unary_common_behavior(test_pb2_grpc.TestServiceStub stub, bool fill_username, bool fill_oauth_scope, Optional[grpc.CallCredentials] call_credentials)
Definition: aio/interop/methods.py:77
tests_aio.interop.methods._client_streaming
None _client_streaming(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:104
grpc._common.decode
def decode(b)
Definition: grpc/_common.py:75
messages_pb2.StreamingInputCallRequest
StreamingInputCallRequest
Definition: messages_pb2.py:611
tests_aio.interop.methods._special_status_message
def _special_status_message(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:381
open
#define open
Definition: test-fs.c:46
tests_aio.interop.methods._oauth2_auth_token
def _oauth2_auth_token(test_pb2_grpc.TestServiceStub stub, argparse.Namespace args)
Definition: aio/interop/methods.py:341
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_aio.interop.methods._validate_status_code_and_details
None _validate_status_code_and_details(aio.Call call, grpc.StatusCode expected_code, str expected_details)
Definition: aio/interop/methods.py:58
method
NSString * method
Definition: ProtoMethod.h:28
tests_aio.interop.methods._timeout_on_sleeping_server
def _timeout_on_sleeping_server(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:224
tests_aio.interop.methods._compute_engine_creds
def _compute_engine_creds(test_pb2_grpc.TestServiceStub stub, argparse.Namespace args)
Definition: aio/interop/methods.py:333
grpc::CallCredentials
Definition: include/grpcpp/security/credentials.h:132
messages_pb2.EchoStatus
EchoStatus
Definition: messages_pb2.py:590
tests_aio.interop.methods._unimplemented_method
def _unimplemented_method(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:283
tests_aio.interop.methods._server_streaming
None _server_streaming(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:123
tests_aio.interop.methods._jwt_token_creds
def _jwt_token_creds(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:355
tests_aio.interop.methods.test_interoperability
None test_interoperability(TestCase case, test_pb2_grpc.TestServiceStub stub, Optional[argparse.Namespace] args=None)
Definition: aio/interop/methods.py:441
tests_aio.interop.methods._per_rpc_creds
def _per_rpc_creds(test_pb2_grpc.TestServiceStub stub, argparse.Namespace args)
Definition: aio/interop/methods.py:364
tests_aio.interop.methods._ping_pong
None _ping_pong(test_pb2_grpc.TestServiceStub stub)
Definition: aio/interop/methods.py:146


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:29