18 import multiprocessing
22 from typing
import Tuple
27 from src.proto.grpc.testing
import benchmark_service_pb2_grpc
28 from src.proto.grpc.testing
import control_pb2
29 from src.proto.grpc.testing
import stats_pb2
30 from src.proto.grpc.testing
import worker_service_pb2_grpc
34 from tests_aio.benchmark
import benchmark_client
35 from tests_aio.benchmark
import benchmark_servicer
37 _NUM_CORES = multiprocessing.cpu_count()
38 _WORKER_ENTRY_FILE = os.path.join(
39 os.path.split(os.path.abspath(__file__))[0],
'worker.py')
41 _LOGGER = logging.getLogger(__name__)
45 collections.namedtuple(
'_SubWorker',
46 [
'process',
'port',
'channel',
'stub'])):
47 """A data class that holds information about a child qps worker."""
50 return f
'<_SubWorker pid={self.process.pid} port={self.port}>'
60 port: int) -> control_pb2.ServerStatus:
61 """Creates ServerStatus proto message."""
62 end_time = time.monotonic()
63 elapsed_time = end_time - start_time
65 stats = stats_pb2.ServerStats(time_elapsed=elapsed_time,
66 time_user=elapsed_time,
67 time_system=elapsed_time)
68 return control_pb2.ServerStatus(stats=stats, port=port, cores=_NUM_CORES)
71 def _create_server(config: control_pb2.ServerConfig) -> Tuple[aio.Server, int]:
72 """Creates a server object according to the ServerConfig."""
75 arg.str_value)
if arg.HasField(
'str_value')
else (arg.name,
77 for arg
in config.channel_args)
79 server = aio.server(options=channel_args + ((
'grpc.so_reuseport', 1),))
80 if config.server_type == control_pb2.ASYNC_SERVER:
82 benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server(
84 elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
85 resp_size = config.payload_config.bytebuf_params.resp_size
87 method_implementations = {
94 'grpc.testing.BenchmarkService', method_implementations)
95 server.add_generic_rpc_handlers((handler,))
97 raise NotImplementedError(
'Unsupported server type {}'.
format(
100 if config.HasField(
'security_params'):
102 ((resources.private_key(), resources.certificate_chain()),))
103 port = server.add_secure_port(
'[::]:{}'.
format(config.port),
106 port = server.add_insecure_port(
'[::]:{}'.
format(config.port))
112 start_time: float, end_time: float,
113 qps_data: histogram.Histogram) -> control_pb2.ClientStatus:
114 """Creates ClientStatus proto message."""
115 latencies = qps_data.get_data()
116 end_time = time.monotonic()
117 elapsed_time = end_time - start_time
119 stats = stats_pb2.ClientStats(latencies=latencies,
120 time_elapsed=elapsed_time,
121 time_user=elapsed_time,
122 time_system=elapsed_time)
123 return control_pb2.ClientStatus(stats=stats)
127 server: str, config: control_pb2.ClientConfig,
129 """Creates a client object according to the ClientConfig."""
130 if config.load_params.WhichOneof(
'load') !=
'closed_loop':
131 raise NotImplementedError(
132 f
'Unsupported load parameter {config.load_params}')
134 if config.client_type == control_pb2.ASYNC_CLIENT:
135 if config.rpc_type == control_pb2.UNARY:
137 elif config.rpc_type == control_pb2.STREAMING:
139 elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER:
142 raise NotImplementedError(
143 f
'Unsupported rpc_type [{config.rpc_type}]')
145 raise NotImplementedError(
146 f
'Unsupported client type {config.client_type}')
148 return client_type(server, config, qps_data)
152 """Picks an unused TCP port."""
159 """Creates a child qps worker as a subprocess."""
162 _LOGGER.info(
'Creating sub worker at port [%d]...', port)
163 process = await asyncio.create_subprocess_exec(sys.executable,
165 '--driver_port',
str(port))
166 _LOGGER.info(
'Created sub worker process for port [%d] at pid [%d]', port,
168 channel = aio.insecure_channel(f
'localhost:{port}')
169 _LOGGER.info(
'Waiting for sub worker at port [%d]', port)
170 await channel.channel_ready()
171 stub = worker_service_pb2_grpc.WorkerServiceStub(channel)
181 """Python Worker Server implementation."""
184 self.
_loop = asyncio.get_event_loop()
190 _LOGGER.info(
'Server started at port [%d]', port)
192 start_time = time.monotonic()
195 async
for request
in request_iterator:
196 end_time = time.monotonic()
198 if request.mark.reset:
199 start_time = end_time
200 await context.write(status)
201 await server.stop(
None)
204 config_request = await context.read()
205 config = config_request.setup
206 _LOGGER.info(
'Received ServerConfig: %s', config)
208 if config.server_processes <= 0:
209 _LOGGER.info(
'Using server_processes == [%d]', _NUM_CORES)
210 config.server_processes = _NUM_CORES
214 _LOGGER.info(
'Port picked [%d]', config.port)
216 if config.server_processes == 1:
221 sub_workers = await asyncio.gather(
224 calls = [worker.stub.RunServer()
for worker
in sub_workers]
226 config_request.setup.server_processes = 1
229 await call.write(config_request)
233 start_time = time.monotonic()
241 _LOGGER.info(
'Servers are ready to serve.')
243 async
for request
in request_iterator:
244 end_time = time.monotonic()
247 await call.write(request)
256 if request.mark.reset:
257 start_time = end_time
258 await context.write(status)
261 await call.done_writing()
263 for worker
in sub_workers:
264 await worker.stub.QuitWorker(control_pb2.Void())
265 await worker.channel.close()
266 _LOGGER.info(
'Waiting for [%s] to quit...', worker)
267 await worker.process.wait()
271 qps_data = histogram.Histogram(config.histogram_params.resolution,
272 config.histogram_params.max_possible)
273 start_time = time.monotonic()
276 for i
in range(config.client_channels):
277 server = config.server_targets[i %
len(config.server_targets)]
279 _LOGGER.info(
'Client created against server [%s]', server)
282 end_time = time.monotonic()
286 async
for request
in request_iterator:
287 end_time = time.monotonic()
289 if request.mark.reset:
291 start_time = time.monotonic()
292 await context.write(status)
295 for task
in running_tasks:
299 config_request = await context.read()
300 config = config_request.setup
301 _LOGGER.info(
'Received ClientConfig: %s', config)
303 if config.client_processes <= 0:
304 _LOGGER.info(
'client_processes can\'t be [%d]',
305 config.client_processes)
306 _LOGGER.info(
'Using client_processes == [%d]', _NUM_CORES)
307 config.client_processes = _NUM_CORES
309 if config.client_processes == 1:
314 sub_workers = await asyncio.gather(
317 calls = [worker.stub.RunClient()
for worker
in sub_workers]
319 config_request.setup.client_processes = 1
322 await call.write(config_request)
326 start_time = time.monotonic()
327 result = histogram.Histogram(config.histogram_params.resolution,
328 config.histogram_params.max_possible)
329 end_time = time.monotonic()
333 async
for request
in request_iterator:
334 end_time = time.monotonic()
337 _LOGGER.debug(
'Fetching status...')
338 await call.write(request)
339 sub_status = await call.read()
340 result.merge(sub_status.stats.latencies)
341 _LOGGER.debug(
'Update from sub worker count=[%d]',
342 sub_status.stats.latencies.count)
345 if request.mark.reset:
347 start_time = time.monotonic()
348 _LOGGER.debug(
'Reporting count=[%d]',
349 status.stats.latencies.count)
350 await context.write(status)
353 await call.done_writing()
355 for worker
in sub_workers:
356 await worker.stub.QuitWorker(control_pb2.Void())
357 await worker.channel.close()
358 _LOGGER.info(
'Waiting for sub worker [%s] to quit...', worker)
359 await worker.process.wait()
360 _LOGGER.info(
'Sub worker [%s] quit', worker)
364 return control_pb2.CoreResponse(cores=_NUM_CORES)
367 _LOGGER.info(
'QuitWorker command received.')
369 return control_pb2.Void()