15 from concurrent
import futures
16 from typing
import Any, Tuple
21 from src.proto.grpc.testing
import messages_pb2
22 from src.proto.grpc.testing
import test_pb2_grpc
24 LONG_UNARY_CALL_WITH_SLEEP_VALUE = 1
33 gevent.sleep(LONG_UNARY_CALL_WITH_SLEEP_VALUE)
44 port = server.add_insecure_port(
'[::]:%d' % port)
46 return 'localhost:%d' % port, server
52 rpc_method_handlers = {
55 servicer.UnaryCallWithSleep,
56 request_deserializer=messages_pb2.SimpleRequest.FromString,
57 response_serializer=messages_pb2.SimpleResponse.