27 _RESPONSE_COUNT = 32 * 1024
33 from concurrent import futures
34 from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2
35 from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc
37 class Handler(unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceServicer):
39 def Benchmark(self, request, context):
40 payload = b'\\x00\\x01' * int(request.message_size / 2)
41 for _ in range(request.response_count):
42 yield unary_stream_benchmark_pb2.BenchmarkResponse(response=payload)
45 server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
46 server.add_insecure_port('[::]:%d')
47 unary_stream_benchmark_pb2_grpc.add_UnaryStreamBenchmarkServiceServicer_to_server(Handler(), server)
49 server.wait_for_termination()
53 from src.python.grpcio_tests.tests.stress
import \
54 unary_stream_benchmark_pb2_grpc
55 from src.python.grpcio_tests.tests.stress
import unary_stream_benchmark_pb2
57 _GRPC_CHANNEL_OPTIONS = [
58 (
'grpc.max_metadata_size', 16 * 1024 * 1024),
59 (
'grpc.max_receive_message_length', 64 * 1024 * 1024),
60 (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1),
63 @contextlib.contextmanager
65 server_process = subprocess.Popen([sys.executable,
'-c', _SERVER_CODE],
66 stdout=subprocess.PIPE,
67 stderr=subprocess.PIPE)
71 server_process.terminate()
73 sys.stdout.write(
"stdout: {}".
format(server_process.stdout.read()))
75 sys.stdout.write(
"stderr: {}".
format(server_process.stderr.read()))
78 def profile(message_size, response_count):
79 request = unary_stream_benchmark_pb2.BenchmarkRequest(
80 message_size=message_size, response_count=response_count)
82 options=_GRPC_CHANNEL_OPTIONS)
as channel:
83 stub = unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub(
85 start = datetime.datetime.now()
86 call = stub.Benchmark(request, wait_for_ready=
True)
89 end = datetime.datetime.now()
95 latency =
profile(_MESSAGE_SIZE, 1024)
96 sys.stdout.write(
"{}\n".
format(latency.total_seconds()))
99 if __name__ ==
'__main__':