aio/benchmark/benchmark_client.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """The Python AsyncIO Benchmark Clients."""
15 
16 import abc
17 import asyncio
18 import logging
19 import random
20 import time
21 
22 import grpc
23 from grpc.experimental import aio
24 
25 from src.proto.grpc.testing import benchmark_service_pb2_grpc
26 from src.proto.grpc.testing import control_pb2
27 from src.proto.grpc.testing import messages_pb2
28 from tests.qps import histogram
29 from tests.unit import resources
30 
31 
32 class GenericStub(object):
33 
34  def __init__(self, channel: aio.Channel):
35  self.UnaryCall = channel.unary_unary(
36  '/grpc.testing.BenchmarkService/UnaryCall')
37  self.StreamingFromServer = channel.unary_stream(
38  '/grpc.testing.BenchmarkService/StreamingFromServer')
39  self.StreamingCall = channel.stream_stream(
40  '/grpc.testing.BenchmarkService/StreamingCall')
41 
42 
43 class BenchmarkClient(abc.ABC):
44  """Benchmark client interface that exposes a non-blocking send_request()."""
45 
46  def __init__(self, address: str, config: control_pb2.ClientConfig,
47  hist: histogram.Histogram):
48  # Disables underlying reuse of subchannels
49  unique_option = (('iv', random.random()),)
50 
51  # Parses the channel argument from config
52  channel_args = tuple(
53  (arg.name, arg.str_value) if arg.HasField('str_value') else (
54  arg.name, int(arg.int_value)) for arg in config.channel_args)
55 
56  # Creates the channel
57  if config.HasField('security_params'):
58  channel_credentials = grpc.ssl_channel_credentials(
59  resources.test_root_certificates(),)
60  server_host_override_option = ((
61  'grpc.ssl_target_name_override',
62  config.security_params.server_host_override,
63  ),)
64  self._channel = aio.secure_channel(
65  address, channel_credentials,
66  unique_option + channel_args + server_host_override_option)
67  else:
68  self._channel = aio.insecure_channel(address,
69  options=unique_option +
70  channel_args)
71 
72  # Creates the stub
73  if config.payload_config.WhichOneof('payload') == 'simple_params':
74  self._generic = False
75  self._stub = benchmark_service_pb2_grpc.BenchmarkServiceStub(
76  self._channel)
77  payload = messages_pb2.Payload(
78  body=b'\0' * config.payload_config.simple_params.req_size)
80  payload=payload,
81  response_size=config.payload_config.simple_params.resp_size)
82  else:
83  self._generic = True
84  self._stub = GenericStub(self._channel)
85  self._request = b'\0' * config.payload_config.bytebuf_params.req_size
86 
87  self._hist = hist
89  self._concurrency = config.outstanding_rpcs_per_channel
90 
91  async def run(self) -> None:
92  await self._channel.channel_ready()
93 
94  async def stop(self) -> None:
95  await self._channel.close()
96 
97  def _record_query_time(self, query_time: float) -> None:
98  self._hist.add(query_time * 1e9)
99 
100 
102 
103  def __init__(self, address: str, config: control_pb2.ClientConfig,
104  hist: histogram.Histogram):
105  super().__init__(address, config, hist)
106  self._running = None
107  self._stopped = asyncio.Event()
108 
109  async def _send_request(self):
110  start_time = time.monotonic()
111  await self._stub.UnaryCall(self._request)
112  self._record_query_time(time.monotonic() - start_time)
113 
114  async def _send_indefinitely(self) -> None:
115  while self._running:
116  await self._send_request()
117 
118  async def run(self) -> None:
119  await super().run()
120  self._running = True
121  senders = (self._send_indefinitely() for _ in range(self._concurrency))
122  await asyncio.gather(*senders)
123  self._stopped.set()
124 
125  async def stop(self) -> None:
126  self._running = False
127  await self._stopped.wait()
128  await super().stop()
129 
130 
132 
133  def __init__(self, address: str, config: control_pb2.ClientConfig,
134  hist: histogram.Histogram):
135  super().__init__(address, config, hist)
136  self._running = None
137  self._stopped = asyncio.Event()
138 
139  async def _one_streaming_call(self):
140  call = self._stub.StreamingCall()
141  while self._running:
142  start_time = time.time()
143  await call.write(self._request)
144  await call.read()
145  self._record_query_time(time.time() - start_time)
146  await call.done_writing()
147 
148  async def run(self):
149  await super().run()
150  self._running = True
151  senders = (self._one_streaming_call() for _ in range(self._concurrency))
152  await asyncio.gather(*senders)
153  self._stopped.set()
154 
155  async def stop(self):
156  self._running = False
157  await self._stopped.wait()
158  await super().stop()
159 
160 
162 
163  def __init__(self, address: str, config: control_pb2.ClientConfig,
164  hist: histogram.Histogram):
165  super().__init__(address, config, hist)
166  self._running = None
167  self._stopped = asyncio.Event()
168 
169  async def _one_server_streaming_call(self):
170  call = self._stub.StreamingFromServer(self._request)
171  while self._running:
172  start_time = time.time()
173  await call.read()
174  self._record_query_time(time.time() - start_time)
175 
176  async def run(self):
177  await super().run()
178  self._running = True
179  senders = (
180  self._one_server_streaming_call() for _ in range(self._concurrency))
181  await asyncio.gather(*senders)
182  self._stopped.set()
183 
184  async def stop(self):
185  self._running = False
186  await self._stopped.wait()
187  await super().stop()
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
benchmark_client.BenchmarkClient._response_callbacks
_response_callbacks
Definition: aio/benchmark/benchmark_client.py:87
benchmark_client.ServerStreamingAsyncBenchmarkClient._running
_running
Definition: aio/benchmark/benchmark_client.py:165
benchmark_client.BenchmarkClient._hist
_hist
Definition: aio/benchmark/benchmark_client.py:86
benchmark_client.StreamingAsyncBenchmarkClient._one_streaming_call
def _one_streaming_call(self)
Definition: aio/benchmark/benchmark_client.py:139
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
benchmark_client.StreamingAsyncBenchmarkClient.run
def run(self)
Definition: aio/benchmark/benchmark_client.py:148
benchmark_client.UnaryAsyncBenchmarkClient.run
None run(self)
Definition: aio/benchmark/benchmark_client.py:118
benchmark_client.BenchmarkClient._stub
_stub
Definition: aio/benchmark/benchmark_client.py:74
benchmark_client.GenericStub
Definition: aio/benchmark/benchmark_client.py:32
xds_interop_client.int
int
Definition: xds_interop_client.py:113
benchmark_client.ServerStreamingAsyncBenchmarkClient
Definition: aio/benchmark/benchmark_client.py:161
grpc::experimental
Definition: include/grpcpp/channel.h:46
benchmark_client.UnaryAsyncBenchmarkClient._stopped
_stopped
Definition: aio/benchmark/benchmark_client.py:106
benchmark_client.UnaryAsyncBenchmarkClient.stop
None stop(self)
Definition: aio/benchmark/benchmark_client.py:125
benchmark_client.GenericStub.UnaryCall
UnaryCall
Definition: aio/benchmark/benchmark_client.py:35
benchmark_client.GenericStub.StreamingCall
StreamingCall
Definition: aio/benchmark/benchmark_client.py:39
benchmark_client.BenchmarkClient._record_query_time
None _record_query_time(self, float query_time)
Definition: aio/benchmark/benchmark_client.py:97
benchmark_client.BenchmarkClient.__init__
def __init__(self, str address, control_pb2.ClientConfig config, histogram.Histogram hist)
Definition: aio/benchmark/benchmark_client.py:46
benchmark_client.ServerStreamingAsyncBenchmarkClient._one_server_streaming_call
def _one_server_streaming_call(self)
Definition: aio/benchmark/benchmark_client.py:169
benchmark_client.BenchmarkClient.stop
None stop(self)
Definition: aio/benchmark/benchmark_client.py:94
benchmark_client.UnaryAsyncBenchmarkClient
Definition: aio/benchmark/benchmark_client.py:101
benchmark_client.BenchmarkClient._concurrency
_concurrency
Definition: aio/benchmark/benchmark_client.py:88
benchmark_client.BenchmarkClient._generic
_generic
Definition: aio/benchmark/benchmark_client.py:73
benchmark_client.UnaryAsyncBenchmarkClient.__init__
def __init__(self, str address, control_pb2.ClientConfig config, histogram.Histogram hist)
Definition: aio/benchmark/benchmark_client.py:103
benchmark_client.BenchmarkClient._channel
_channel
Definition: aio/benchmark/benchmark_client.py:63
close
#define close
Definition: test-fs.c:48
tests.qps
Definition: src/python/grpcio_tests/tests/qps/__init__.py:1
benchmark_client.GenericStub.__init__
def __init__(self, aio.Channel channel)
Definition: aio/benchmark/benchmark_client.py:34
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
benchmark_client.StreamingAsyncBenchmarkClient
Definition: aio/benchmark/benchmark_client.py:131
add
static void add(const char *beg, const char *end, char ***ss, size_t *ns)
Definition: debug/trace.cc:96
benchmark_client.UnaryAsyncBenchmarkClient._running
_running
Definition: aio/benchmark/benchmark_client.py:105
benchmark_client.ServerStreamingAsyncBenchmarkClient.__init__
def __init__(self, str address, control_pb2.ClientConfig config, histogram.Histogram hist)
Definition: aio/benchmark/benchmark_client.py:163
benchmark_client.ServerStreamingAsyncBenchmarkClient._stopped
_stopped
Definition: aio/benchmark/benchmark_client.py:166
benchmark_client.UnaryAsyncBenchmarkClient._send_request
def _send_request(self)
Definition: aio/benchmark/benchmark_client.py:109
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
benchmark_client.StreamingAsyncBenchmarkClient.__init__
def __init__(self, str address, control_pb2.ClientConfig config, histogram.Histogram hist)
Definition: aio/benchmark/benchmark_client.py:133
benchmark_client.BenchmarkClient
Definition: aio/benchmark/benchmark_client.py:43
grpc.ssl_channel_credentials
def ssl_channel_credentials(root_certificates=None, private_key=None, certificate_chain=None)
Definition: src/python/grpcio/grpc/__init__.py:1607
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
benchmark_client.ServerStreamingAsyncBenchmarkClient.stop
def stop(self)
Definition: aio/benchmark/benchmark_client.py:184
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
benchmark_client.GenericStub.StreamingFromServer
StreamingFromServer
Definition: aio/benchmark/benchmark_client.py:37
benchmark_client.StreamingAsyncBenchmarkClient.stop
def stop(self)
Definition: aio/benchmark/benchmark_client.py:155
benchmark_client.BenchmarkClient._request
_request
Definition: aio/benchmark/benchmark_client.py:78
benchmark_client.StreamingAsyncBenchmarkClient._stopped
_stopped
Definition: aio/benchmark/benchmark_client.py:136
benchmark_client.StreamingAsyncBenchmarkClient._running
_running
Definition: aio/benchmark/benchmark_client.py:135
benchmark_client.BenchmarkClient.run
None run(self)
Definition: aio/benchmark/benchmark_client.py:91
benchmark_client.ServerStreamingAsyncBenchmarkClient.run
def run(self)
Definition: aio/benchmark/benchmark_client.py:176
benchmark_client.UnaryAsyncBenchmarkClient._send_indefinitely
None _send_indefinitely(self)
Definition: aio/benchmark/benchmark_client.py:114


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:46