worker_servicer.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import asyncio
16 import collections
17 import logging
18 import multiprocessing
19 import os
20 import sys
21 import time
22 from typing import Tuple
23 
24 import grpc
25 from grpc.experimental import aio
26 
27 from src.proto.grpc.testing import benchmark_service_pb2_grpc
28 from src.proto.grpc.testing import control_pb2
29 from src.proto.grpc.testing import stats_pb2
30 from src.proto.grpc.testing import worker_service_pb2_grpc
31 from tests.qps import histogram
32 from tests.unit import resources
33 from tests.unit.framework.common import get_socket
34 from tests_aio.benchmark import benchmark_client
35 from tests_aio.benchmark import benchmark_servicer
36 
37 _NUM_CORES = multiprocessing.cpu_count()
38 _WORKER_ENTRY_FILE = os.path.join(
39  os.path.split(os.path.abspath(__file__))[0], 'worker.py')
40 
41 _LOGGER = logging.getLogger(__name__)
42 
43 
44 class _SubWorker(
45  collections.namedtuple('_SubWorker',
46  ['process', 'port', 'channel', 'stub'])):
47  """A data class that holds information about a child qps worker."""
48 
49  def _repr(self):
50  return f'<_SubWorker pid={self.process.pid} port={self.port}>'
51 
52  def __repr__(self):
53  return self._repr()
54 
55  def __str__(self):
56  return self._repr()
57 
58 
59 def _get_server_status(start_time: float, end_time: float,
60  port: int) -> control_pb2.ServerStatus:
61  """Creates ServerStatus proto message."""
62  end_time = time.monotonic()
63  elapsed_time = end_time - start_time
64  # TODO(lidiz) Collect accurate time system to compute QPS/core-second.
65  stats = stats_pb2.ServerStats(time_elapsed=elapsed_time,
66  time_user=elapsed_time,
67  time_system=elapsed_time)
68  return control_pb2.ServerStatus(stats=stats, port=port, cores=_NUM_CORES)
69 
70 
71 def _create_server(config: control_pb2.ServerConfig) -> Tuple[aio.Server, int]:
72  """Creates a server object according to the ServerConfig."""
73  channel_args = tuple(
74  (arg.name,
75  arg.str_value) if arg.HasField('str_value') else (arg.name,
76  int(arg.int_value))
77  for arg in config.channel_args)
78 
79  server = aio.server(options=channel_args + (('grpc.so_reuseport', 1),))
80  if config.server_type == control_pb2.ASYNC_SERVER:
82  benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server(
83  servicer, server)
84  elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
85  resp_size = config.payload_config.bytebuf_params.resp_size
87  method_implementations = {
88  'StreamingCall':
89  grpc.stream_stream_rpc_method_handler(servicer.StreamingCall),
90  'UnaryCall':
91  grpc.unary_unary_rpc_method_handler(servicer.UnaryCall),
92  }
94  'grpc.testing.BenchmarkService', method_implementations)
95  server.add_generic_rpc_handlers((handler,))
96  else:
97  raise NotImplementedError('Unsupported server type {}'.format(
98  config.server_type))
99 
100  if config.HasField('security_params'): # Use SSL
101  server_creds = grpc.ssl_server_credentials(
102  ((resources.private_key(), resources.certificate_chain()),))
103  port = server.add_secure_port('[::]:{}'.format(config.port),
104  server_creds)
105  else:
106  port = server.add_insecure_port('[::]:{}'.format(config.port))
107 
108  return server, port
109 
110 
112  start_time: float, end_time: float,
113  qps_data: histogram.Histogram) -> control_pb2.ClientStatus:
114  """Creates ClientStatus proto message."""
115  latencies = qps_data.get_data()
116  end_time = time.monotonic()
117  elapsed_time = end_time - start_time
118  # TODO(lidiz) Collect accurate time system to compute QPS/core-second.
119  stats = stats_pb2.ClientStats(latencies=latencies,
120  time_elapsed=elapsed_time,
121  time_user=elapsed_time,
122  time_system=elapsed_time)
123  return control_pb2.ClientStatus(stats=stats)
124 
125 
127  server: str, config: control_pb2.ClientConfig,
128  qps_data: histogram.Histogram) -> benchmark_client.BenchmarkClient:
129  """Creates a client object according to the ClientConfig."""
130  if config.load_params.WhichOneof('load') != 'closed_loop':
131  raise NotImplementedError(
132  f'Unsupported load parameter {config.load_params}')
133 
134  if config.client_type == control_pb2.ASYNC_CLIENT:
135  if config.rpc_type == control_pb2.UNARY:
137  elif config.rpc_type == control_pb2.STREAMING:
139  elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER:
141  else:
142  raise NotImplementedError(
143  f'Unsupported rpc_type [{config.rpc_type}]')
144  else:
145  raise NotImplementedError(
146  f'Unsupported client type {config.client_type}')
147 
148  return client_type(server, config, qps_data)
149 
150 
151 def _pick_an_unused_port() -> int:
152  """Picks an unused TCP port."""
153  _, port, sock = get_socket()
154  sock.close()
155  return port
156 
157 
158 async def _create_sub_worker() -> _SubWorker:
159  """Creates a child qps worker as a subprocess."""
160  port = _pick_an_unused_port()
161 
162  _LOGGER.info('Creating sub worker at port [%d]...', port)
163  process = await asyncio.create_subprocess_exec(sys.executable,
164  _WORKER_ENTRY_FILE,
165  '--driver_port', str(port))
166  _LOGGER.info('Created sub worker process for port [%d] at pid [%d]', port,
167  process.pid)
168  channel = aio.insecure_channel(f'localhost:{port}')
169  _LOGGER.info('Waiting for sub worker at port [%d]', port)
170  await channel.channel_ready()
171  stub = worker_service_pb2_grpc.WorkerServiceStub(channel)
172  return _SubWorker(
173  process=process,
174  port=port,
175  channel=channel,
176  stub=stub,
177  )
178 
179 
180 class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
181  """Python Worker Server implementation."""
182 
183  def __init__(self):
184  self._loop = asyncio.get_event_loop()
185  self._quit_event = asyncio.Event()
186 
187  async def _run_single_server(self, config, request_iterator, context):
188  server, port = _create_server(config)
189  await server.start()
190  _LOGGER.info('Server started at port [%d]', port)
191 
192  start_time = time.monotonic()
193  await context.write(_get_server_status(start_time, start_time, port))
194 
195  async for request in request_iterator:
196  end_time = time.monotonic()
197  status = _get_server_status(start_time, end_time, port)
198  if request.mark.reset:
199  start_time = end_time
200  await context.write(status)
201  await server.stop(None)
202 
203  async def RunServer(self, request_iterator, context):
204  config_request = await context.read()
205  config = config_request.setup
206  _LOGGER.info('Received ServerConfig: %s', config)
207 
208  if config.server_processes <= 0:
209  _LOGGER.info('Using server_processes == [%d]', _NUM_CORES)
210  config.server_processes = _NUM_CORES
211 
212  if config.port == 0:
213  config.port = _pick_an_unused_port()
214  _LOGGER.info('Port picked [%d]', config.port)
215 
216  if config.server_processes == 1:
217  # If server_processes == 1, start the server in this process.
218  await self._run_single_server(config, request_iterator, context)
219  else:
220  # If server_processes > 1, offload to other processes.
221  sub_workers = await asyncio.gather(
222  *[_create_sub_worker() for _ in range(config.server_processes)])
223 
224  calls = [worker.stub.RunServer() for worker in sub_workers]
225 
226  config_request.setup.server_processes = 1
227 
228  for call in calls:
229  await call.write(config_request)
230  # An empty status indicates the peer is ready
231  await call.read()
232 
233  start_time = time.monotonic()
234  await context.write(
236  start_time,
237  start_time,
238  config.port,
239  ))
240 
241  _LOGGER.info('Servers are ready to serve.')
242 
243  async for request in request_iterator:
244  end_time = time.monotonic()
245 
246  for call in calls:
247  await call.write(request)
248  # Reports from sub workers doesn't matter
249  await call.read()
250 
251  status = _get_server_status(
252  start_time,
253  end_time,
254  config.port,
255  )
256  if request.mark.reset:
257  start_time = end_time
258  await context.write(status)
259 
260  for call in calls:
261  await call.done_writing()
262 
263  for worker in sub_workers:
264  await worker.stub.QuitWorker(control_pb2.Void())
265  await worker.channel.close()
266  _LOGGER.info('Waiting for [%s] to quit...', worker)
267  await worker.process.wait()
268 
269  async def _run_single_client(self, config, request_iterator, context):
270  running_tasks = []
271  qps_data = histogram.Histogram(config.histogram_params.resolution,
272  config.histogram_params.max_possible)
273  start_time = time.monotonic()
274 
275  # Create a client for each channel as asyncio.Task
276  for i in range(config.client_channels):
277  server = config.server_targets[i % len(config.server_targets)]
278  client = _create_client(server, config, qps_data)
279  _LOGGER.info('Client created against server [%s]', server)
280  running_tasks.append(self._loop.create_task(client.run()))
281 
282  end_time = time.monotonic()
283  await context.write(_get_client_status(start_time, end_time, qps_data))
284 
285  # Respond to stat requests
286  async for request in request_iterator:
287  end_time = time.monotonic()
288  status = _get_client_status(start_time, end_time, qps_data)
289  if request.mark.reset:
290  qps_data.reset()
291  start_time = time.monotonic()
292  await context.write(status)
293 
294  # Cleanup the clients
295  for task in running_tasks:
296  task.cancel()
297 
298  async def RunClient(self, request_iterator, context):
299  config_request = await context.read()
300  config = config_request.setup
301  _LOGGER.info('Received ClientConfig: %s', config)
302 
303  if config.client_processes <= 0:
304  _LOGGER.info('client_processes can\'t be [%d]',
305  config.client_processes)
306  _LOGGER.info('Using client_processes == [%d]', _NUM_CORES)
307  config.client_processes = _NUM_CORES
308 
309  if config.client_processes == 1:
310  # If client_processes == 1, run the benchmark in this process.
311  await self._run_single_client(config, request_iterator, context)
312  else:
313  # If client_processes > 1, offload the work to other processes.
314  sub_workers = await asyncio.gather(
315  *[_create_sub_worker() for _ in range(config.client_processes)])
316 
317  calls = [worker.stub.RunClient() for worker in sub_workers]
318 
319  config_request.setup.client_processes = 1
320 
321  for call in calls:
322  await call.write(config_request)
323  # An empty status indicates the peer is ready
324  await call.read()
325 
326  start_time = time.monotonic()
327  result = histogram.Histogram(config.histogram_params.resolution,
328  config.histogram_params.max_possible)
329  end_time = time.monotonic()
330  await context.write(_get_client_status(start_time, end_time,
331  result))
332 
333  async for request in request_iterator:
334  end_time = time.monotonic()
335 
336  for call in calls:
337  _LOGGER.debug('Fetching status...')
338  await call.write(request)
339  sub_status = await call.read()
340  result.merge(sub_status.stats.latencies)
341  _LOGGER.debug('Update from sub worker count=[%d]',
342  sub_status.stats.latencies.count)
343 
344  status = _get_client_status(start_time, end_time, result)
345  if request.mark.reset:
346  result.reset()
347  start_time = time.monotonic()
348  _LOGGER.debug('Reporting count=[%d]',
349  status.stats.latencies.count)
350  await context.write(status)
351 
352  for call in calls:
353  await call.done_writing()
354 
355  for worker in sub_workers:
356  await worker.stub.QuitWorker(control_pb2.Void())
357  await worker.channel.close()
358  _LOGGER.info('Waiting for sub worker [%s] to quit...', worker)
359  await worker.process.wait()
360  _LOGGER.info('Sub worker [%s] quit', worker)
361 
362  @staticmethod
363  async def CoreCount(unused_request, unused_context):
364  return control_pb2.CoreResponse(cores=_NUM_CORES)
365 
366  async def QuitWorker(self, unused_request, unused_context):
367  _LOGGER.info('QuitWorker command received.')
368  self._quit_event.set()
369  return control_pb2.Void()
370 
371  async def wait_for_quit(self):
372  await self._quit_event.wait()
xds_interop_client.str
str
Definition: xds_interop_client.py:487
worker_servicer.WorkerServicer._run_single_client
def _run_single_client(self, config, request_iterator, context)
Definition: worker_servicer.py:269
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
benchmark_servicer.GenericBenchmarkServicer
Definition: benchmark_servicer.py:45
worker_servicer.WorkerServicer
Definition: worker_servicer.py:180
http2_test_server.format
format
Definition: http2_test_server.py:118
worker_servicer._get_server_status
control_pb2.ServerStatus _get_server_status(float start_time, float end_time, int port)
Definition: worker_servicer.py:59
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
worker_servicer._SubWorker._repr
def _repr(self)
Definition: worker_servicer.py:49
worker_servicer.WorkerServicer._loop
_loop
Definition: worker_servicer.py:184
worker_servicer._create_sub_worker
_SubWorker _create_sub_worker()
Definition: worker_servicer.py:158
xds_interop_client.int
int
Definition: xds_interop_client.py:113
benchmark_client.ServerStreamingAsyncBenchmarkClient
Definition: aio/benchmark/benchmark_client.py:161
worker_servicer._SubWorker.__str__
def __str__(self)
Definition: worker_servicer.py:55
grpc::experimental
Definition: include/grpcpp/channel.h:46
grpc.ssl_server_credentials
def ssl_server_credentials(private_key_certificate_chain_pairs, root_certificates=None, require_client_auth=False)
Definition: src/python/grpcio/grpc/__init__.py:1709
worker_servicer.WorkerServicer.CoreCount
def CoreCount(unused_request, unused_context)
Definition: worker_servicer.py:363
worker_servicer._create_server
Tuple[aio.Server, int] _create_server(control_pb2.ServerConfig config)
Definition: worker_servicer.py:71
tests.unit.framework.common.get_socket
def get_socket(bind_address='localhost', port=0, listen=True, sock_options=_DEFAULT_SOCK_OPTIONS)
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:26
worker_servicer._create_client
benchmark_client.BenchmarkClient _create_client(str server, control_pb2.ClientConfig config, histogram.Histogram qps_data)
Definition: worker_servicer.py:126
benchmark_client.UnaryAsyncBenchmarkClient
Definition: aio/benchmark/benchmark_client.py:101
tests.qps
Definition: src/python/grpcio_tests/tests/qps/__init__.py:1
worker_servicer.WorkerServicer.__init__
def __init__(self)
Definition: worker_servicer.py:183
benchmark_client.StreamingAsyncBenchmarkClient
Definition: aio/benchmark/benchmark_client.py:131
grpc.method_handlers_generic_handler
def method_handlers_generic_handler(service, method_handlers)
Definition: src/python/grpcio/grpc/__init__.py:1590
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
worker_servicer.WorkerServicer._run_single_server
def _run_single_server(self, config, request_iterator, context)
Definition: worker_servicer.py:187
worker_servicer._SubWorker.__repr__
def __repr__(self)
Definition: worker_servicer.py:52
client.run
def run(server_address, secure)
Definition: examples/python/xds/client.py:29
benchmark_client.BenchmarkClient
Definition: aio/benchmark/benchmark_client.py:43
worker_servicer.WorkerServicer.RunServer
def RunServer(self, request_iterator, context)
Definition: worker_servicer.py:203
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
grpc.stream_stream_rpc_method_handler
def stream_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1570
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
worker_servicer.WorkerServicer.wait_for_quit
def wait_for_quit(self)
Definition: worker_servicer.py:371
worker_servicer.WorkerServicer.QuitWorker
def QuitWorker(self, unused_request, unused_context)
Definition: worker_servicer.py:366
worker_servicer.WorkerServicer._quit_event
_quit_event
Definition: worker_servicer.py:185
worker_servicer._SubWorker
Definition: worker_servicer.py:46
worker_servicer.WorkerServicer.RunClient
def RunClient(self, request_iterator, context)
Definition: worker_servicer.py:298
worker_servicer._pick_an_unused_port
int _pick_an_unused_port()
Definition: worker_servicer.py:151
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
benchmark_servicer.BenchmarkServicer
Definition: benchmark_servicer.py:26
worker_servicer._get_client_status
control_pb2.ClientStatus _get_client_status(float start_time, float end_time, histogram.Histogram qps_data)
Definition: worker_servicer.py:111


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:53