tests_gevent/unit/_test_server.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from concurrent import futures
16 from typing import Any, Tuple
17 
18 import gevent
19 import grpc
20 
21 from src.proto.grpc.testing import messages_pb2
22 from src.proto.grpc.testing import test_pb2_grpc
23 
24 LONG_UNARY_CALL_WITH_SLEEP_VALUE = 1
25 
26 
27 class TestServiceServicer(test_pb2_grpc.TestServiceServicer):
28 
29  def UnaryCall(self, request, context):
31 
32  def UnaryCallWithSleep(self, unused_request, unused_context):
33  gevent.sleep(LONG_UNARY_CALL_WITH_SLEEP_VALUE)
35 
36 
37 def start_test_server(port: int = 0) -> Tuple[str, Any]:
38  server = grpc.server(futures.ThreadPoolExecutor())
39  servicer = TestServiceServicer()
40  test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(),
41  server)
42 
43  server.add_generic_rpc_handlers((_create_extra_generic_handler(servicer),))
44  port = server.add_insecure_port('[::]:%d' % port)
45  server.start()
46  return 'localhost:%d' % port, server
47 
48 
49 def _create_extra_generic_handler(servicer: TestServiceServicer) -> Any:
50  # Add programatically extra methods not provided by the proto file
51  # that are used during the tests
52  rpc_method_handlers = {
53  'UnaryCallWithSleep':
55  servicer.UnaryCallWithSleep,
56  request_deserializer=messages_pb2.SimpleRequest.FromString,
57  response_serializer=messages_pb2.SimpleResponse.
58  SerializeToString)
59  }
60  return grpc.method_handlers_generic_handler('grpc.testing.TestService',
61  rpc_method_handlers)
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
tests_gevent.unit._test_server.TestServiceServicer
Definition: tests_gevent/unit/_test_server.py:27
tests_gevent.unit._test_server.TestServiceServicer.UnaryCall
def UnaryCall(self, request, context)
Definition: tests_gevent/unit/_test_server.py:29
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
grpc.method_handlers_generic_handler
def method_handlers_generic_handler(service, method_handlers)
Definition: src/python/grpcio/grpc/__init__.py:1590
tests_gevent.unit._test_server.start_test_server
Tuple[str, Any] start_test_server(int port=0)
Definition: tests_gevent/unit/_test_server.py:37
tests_gevent.unit._test_server.TestServiceServicer.UnaryCallWithSleep
def UnaryCallWithSleep(self, unused_request, unused_context)
Definition: tests_gevent/unit/_test_server.py:32
tests_gevent.unit._test_server._create_extra_generic_handler
Any _create_extra_generic_handler(TestServiceServicer servicer)
Definition: tests_gevent/unit/_test_server.py:49
messages_pb2.SimpleResponse
SimpleResponse
Definition: messages_pb2.py:604


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:39