15 from concurrent
import futures
16 import multiprocessing
23 from src.proto.grpc.testing
import benchmark_service_pb2_grpc
24 from src.proto.grpc.testing
import control_pb2
25 from src.proto.grpc.testing
import stats_pb2
26 from src.proto.grpc.testing
import worker_service_pb2_grpc
36 """Python Worker Server implementation."""
43 config =
next(request_iterator).setup
45 cores = multiprocessing.cpu_count()
47 start_time = time.time()
50 for request
in request_iterator:
51 end_time = time.time()
53 if request.mark.reset:
59 end_time = time.time()
60 elapsed_time = end_time - start_time
61 stats = stats_pb2.ServerStats(time_elapsed=elapsed_time,
62 time_user=elapsed_time,
63 time_system=elapsed_time)
64 return control_pb2.ServerStatus(stats=stats, port=port, cores=cores)
67 if config.async_server_threads == 0:
70 server_threads = multiprocessing.cpu_count() * 5
72 server_threads = config.async_server_threads
73 server = test_common.test_server(max_workers=server_threads)
74 if config.server_type == control_pb2.ASYNC_SERVER:
76 benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server(
78 elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
79 resp_size = config.payload_config.bytebuf_params.resp_size
81 method_implementations = {
89 'grpc.testing.BenchmarkService', method_implementations)
90 server.add_generic_rpc_handlers((handler,))
92 raise Exception(
'Unsupported server type {}'.
format(
98 server_port = config.port
100 if config.HasField(
'security_params'):
102 ((resources.private_key(), resources.certificate_chain()),))
103 port = server.add_secure_port(
'[::]:{}'.
format(server_port),
106 port = server.add_insecure_port(
'[::]:{}'.
format(server_port))
108 return (server, port)
111 config =
next(request_iterator).setup
114 config.histogram_params.max_possible)
115 start_time = time.time()
118 for i
in range(config.client_channels):
119 server = config.server_targets[i %
len(config.server_targets)]
121 client_runners.append(runner)
124 end_time = time.time()
128 for request
in request_iterator:
129 end_time = time.time()
131 if request.mark.reset:
133 start_time = time.time()
137 for runner
in client_runners:
141 latencies = qps_data.get_data()
142 end_time = time.time()
143 elapsed_time = end_time - start_time
144 stats = stats_pb2.ClientStats(latencies=latencies,
145 time_elapsed=elapsed_time,
146 time_user=elapsed_time,
147 time_system=elapsed_time)
148 return control_pb2.ClientStatus(stats=stats)
152 if config.client_type == control_pb2.SYNC_CLIENT:
153 if config.rpc_type == control_pb2.UNARY:
155 server, config, qps_data)
156 elif config.rpc_type == control_pb2.STREAMING:
158 server, config, qps_data)
159 elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER:
162 server, config, qps_data)
163 elif config.client_type == control_pb2.ASYNC_CLIENT:
164 if config.rpc_type == control_pb2.UNARY:
166 server, config, qps_data)
168 raise Exception(
'Async streaming client not supported')
170 raise Exception(
'Unsupported client type {}'.
format(
174 load_factor = float(config.client_channels)
175 if config.load_params.WhichOneof(
'load') ==
'closed_loop':
177 client, config.outstanding_rpcs_per_channel, no_ping_pong)
179 alpha = config.load_params.poisson.offered_load / load_factor
183 yield random.expovariate(alpha)
190 return control_pb2.CoreResponse(cores=multiprocessing.cpu_count())
194 return control_pb2.Void()