tests_aio/unit/_test_server.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import asyncio
16 import datetime
17 
18 import grpc
19 from grpc.experimental import aio
20 
21 from src.proto.grpc.testing import empty_pb2
22 from src.proto.grpc.testing import messages_pb2
23 from src.proto.grpc.testing import test_pb2_grpc
24 from tests.unit import resources
25 from tests_aio.unit import _constants
26 
27 _INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
28 _TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
29 
30 
31 async def _maybe_echo_metadata(servicer_context):
32  """Copies metadata from request to response if it is present."""
33  invocation_metadata = dict(servicer_context.invocation_metadata())
34  if _INITIAL_METADATA_KEY in invocation_metadata:
35  initial_metadatum = (_INITIAL_METADATA_KEY,
36  invocation_metadata[_INITIAL_METADATA_KEY])
37  await servicer_context.send_initial_metadata((initial_metadatum,))
38  if _TRAILING_METADATA_KEY in invocation_metadata:
39  trailing_metadatum = (_TRAILING_METADATA_KEY,
40  invocation_metadata[_TRAILING_METADATA_KEY])
41  servicer_context.set_trailing_metadata((trailing_metadatum,))
42 
43 
44 async def _maybe_echo_status(request: messages_pb2.SimpleRequest,
45  servicer_context):
46  """Echos the RPC status if demanded by the request."""
47  if request.HasField('response_status'):
48  await servicer_context.abort(request.response_status.code,
49  request.response_status.message)
50 
51 
52 class TestServiceServicer(test_pb2_grpc.TestServiceServicer):
53 
54  async def UnaryCall(self, request, context):
55  await _maybe_echo_metadata(context)
56  await _maybe_echo_status(request, context)
58  payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE,
59  body=b'\x00' * request.response_size))
60 
61  async def EmptyCall(self, request, context):
62  return empty_pb2.Empty()
63 
64  async def StreamingOutputCall(
65  self, request: messages_pb2.StreamingOutputCallRequest,
66  unused_context):
67  for response_parameters in request.response_parameters:
68  if response_parameters.interval_us != 0:
69  await asyncio.sleep(
70  datetime.timedelta(microseconds=response_parameters.
71  interval_us).total_seconds())
72  if response_parameters.size != 0:
74  payload=messages_pb2.Payload(type=request.response_type,
75  body=b'\x00' *
76  response_parameters.size))
77  else:
79 
80  # Next methods are extra ones that are registred programatically
81  # when the sever is instantiated. They are not being provided by
82  # the proto file.
83  async def UnaryCallWithSleep(self, unused_request, unused_context):
84  await asyncio.sleep(_constants.UNARY_CALL_WITH_SLEEP_VALUE)
86 
87  async def StreamingInputCall(self, request_async_iterator, unused_context):
88  aggregate_size = 0
89  async for request in request_async_iterator:
90  if request.payload is not None and request.payload.body:
91  aggregate_size += len(request.payload.body)
93  aggregated_payload_size=aggregate_size)
94 
95  async def FullDuplexCall(self, request_async_iterator, context):
96  await _maybe_echo_metadata(context)
97  async for request in request_async_iterator:
98  await _maybe_echo_status(request, context)
99  for response_parameters in request.response_parameters:
100  if response_parameters.interval_us != 0:
101  await asyncio.sleep(
102  datetime.timedelta(microseconds=response_parameters.
103  interval_us).total_seconds())
104  if response_parameters.size != 0:
106  payload=messages_pb2.Payload(type=request.payload.type,
107  body=b'\x00' *
108  response_parameters.size))
109  else:
111 
112 
113 def _create_extra_generic_handler(servicer: TestServiceServicer):
114  # Add programatically extra methods not provided by the proto file
115  # that are used during the tests
116  rpc_method_handlers = {
117  'UnaryCallWithSleep':
119  servicer.UnaryCallWithSleep,
120  request_deserializer=messages_pb2.SimpleRequest.FromString,
121  response_serializer=messages_pb2.SimpleResponse.
122  SerializeToString)
123  }
124  return grpc.method_handlers_generic_handler('grpc.testing.TestService',
125  rpc_method_handlers)
126 
127 
128 async def start_test_server(port=0,
129  secure=False,
130  server_credentials=None,
131  interceptors=None):
132  server = aio.server(options=(('grpc.so_reuseport', 0),),
133  interceptors=interceptors)
134  servicer = TestServiceServicer()
135  test_pb2_grpc.add_TestServiceServicer_to_server(servicer, server)
136 
137  server.add_generic_rpc_handlers((_create_extra_generic_handler(servicer),))
138 
139  if secure:
140  if server_credentials is None:
141  server_credentials = grpc.ssl_server_credentials([
142  (resources.private_key(), resources.certificate_chain())
143  ])
144  port = server.add_secure_port('[::]:%d' % port, server_credentials)
145  else:
146  port = server.add_insecure_port('[::]:%d' % port)
147 
148  await server.start()
149 
150  # NOTE(lidizheng) returning the server to prevent it from deallocation
151  return 'localhost:%d' % port, server
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
messages_pb2.StreamingInputCallResponse
StreamingInputCallResponse
Definition: messages_pb2.py:618
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_server._maybe_echo_metadata
def _maybe_echo_metadata(servicer_context)
Definition: tests_aio/unit/_test_server.py:31
grpc::experimental
Definition: include/grpcpp/channel.h:46
grpc.ssl_server_credentials
def ssl_server_credentials(private_key_certificate_chain_pairs, root_certificates=None, require_client_auth=False)
Definition: src/python/grpcio/grpc/__init__.py:1709
tests_aio.unit._test_server.TestServiceServicer
Definition: tests_aio/unit/_test_server.py:52
tests_aio.unit._test_server._maybe_echo_status
def _maybe_echo_status(messages_pb2.SimpleRequest request, servicer_context)
Definition: tests_aio/unit/_test_server.py:44
tests_aio.unit._test_server._create_extra_generic_handler
def _create_extra_generic_handler(TestServiceServicer servicer)
Definition: tests_aio/unit/_test_server.py:113
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
tests_aio.unit._test_server.TestServiceServicer.UnaryCall
def UnaryCall(self, request, context)
Definition: tests_aio/unit/_test_server.py:54
grpc.method_handlers_generic_handler
def method_handlers_generic_handler(service, method_handlers)
Definition: src/python/grpcio/grpc/__init__.py:1590
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
messages_pb2.StreamingOutputCallResponse
StreamingOutputCallResponse
Definition: messages_pb2.py:639
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
messages_pb2.SimpleResponse
SimpleResponse
Definition: messages_pb2.py:604


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:39