qps/benchmark_client.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC)."""
15 
16 import abc
17 from concurrent import futures
18 import threading
19 import time
20 
21 import grpc
22 from six.moves import queue
23 
24 from src.proto.grpc.testing import benchmark_service_pb2_grpc
25 from src.proto.grpc.testing import messages_pb2
26 from tests.unit import resources
27 from tests.unit import test_common
28 
29 _TIMEOUT = 60 * 60 * 24
30 
31 
32 class GenericStub(object):
33 
34  def __init__(self, channel):
35  self.UnaryCall = channel.unary_unary(
36  '/grpc.testing.BenchmarkService/UnaryCall')
37  self.StreamingFromServer = channel.unary_stream(
38  '/grpc.testing.BenchmarkService/StreamingFromServer')
39  self.StreamingCall = channel.stream_stream(
40  '/grpc.testing.BenchmarkService/StreamingCall')
41 
42 
44  """Benchmark client interface that exposes a non-blocking send_request()."""
45 
46  __metaclass__ = abc.ABCMeta
47 
48  def __init__(self, server, config, hist):
49  # Create the stub
50  if config.HasField('security_params'):
52  resources.test_root_certificates())
53  channel = test_common.test_secure_channel(
54  server, creds, config.security_params.server_host_override)
55  else:
56  channel = grpc.insecure_channel(server)
57 
58  # waits for the channel to be ready before we start sending messages
60 
61  if config.payload_config.WhichOneof('payload') == 'simple_params':
62  self._generic = False
63  self._stub = benchmark_service_pb2_grpc.BenchmarkServiceStub(
64  channel)
65  payload = messages_pb2.Payload(
66  body=bytes(b'\0' *
67  config.payload_config.simple_params.req_size))
69  payload=payload,
70  response_size=config.payload_config.simple_params.resp_size)
71  else:
72  self._generic = True
73  self._stub = GenericStub(channel)
74  self._request = bytes(b'\0' *
75  config.payload_config.bytebuf_params.req_size)
76 
77  self._hist = hist
79 
80  def add_response_callback(self, callback):
81  """callback will be invoked as callback(client, query_time)"""
82  self._response_callbacks.append(callback)
83 
84  @abc.abstractmethod
85  def send_request(self):
86  """Non-blocking wrapper for a client's request operation."""
87  raise NotImplementedError()
88 
89  def start(self):
90  pass
91 
92  def stop(self):
93  pass
94 
95  def _handle_response(self, client, query_time):
96  self._hist.add(query_time * 1e9) # Report times in nanoseconds
97  for callback in self._response_callbacks:
98  callback(client, query_time)
99 
100 
102 
103  def __init__(self, server, config, hist):
104  super(UnarySyncBenchmarkClient, self).__init__(server, config, hist)
105  self._pool = futures.ThreadPoolExecutor(
106  max_workers=config.outstanding_rpcs_per_channel)
107 
108  def send_request(self):
109  # Send requests in separate threads to support multiple outstanding rpcs
110  # (See src/proto/grpc/testing/control.proto)
111  self._pool.submit(self._dispatch_request)
112 
113  def stop(self):
114  self._pool.shutdown(wait=True)
115  self._stub = None
116 
117  def _dispatch_request(self):
118  start_time = time.time()
119  self._stub.UnaryCall(self._request, _TIMEOUT)
120  end_time = time.time()
121  self._handle_response(self, end_time - start_time)
122 
123 
125 
126  def send_request(self):
127  # Use the Future callback api to support multiple outstanding rpcs
128  start_time = time.time()
129  response_future = self._stub.UnaryCall.future(self._request, _TIMEOUT)
130  response_future.add_done_callback(
131  lambda resp: self._response_received(start_time, resp))
132 
133  def _response_received(self, start_time, resp):
134  resp.result()
135  end_time = time.time()
136  self._handle_response(self, end_time - start_time)
137 
138  def stop(self):
139  self._stub = None
140 
141 
142 class _SyncStream(object):
143 
144  def __init__(self, stub, generic, request, handle_response):
145  self._stub = stub
146  self._generic = generic
147  self._request = request
148  self._handle_response = handle_response
149  self._is_streaming = False
150  self._request_queue = queue.Queue()
151  self._send_time_queue = queue.Queue()
152 
153  def send_request(self):
154  self._send_time_queue.put(time.time())
155  self._request_queue.put(self._request)
156 
157  def start(self):
158  self._is_streaming = True
159  response_stream = self._stub.StreamingCall(self._request_generator(),
160  _TIMEOUT)
161  for _ in response_stream:
162  self._handle_response(
163  self,
164  time.time() - self._send_time_queue.get_nowait())
165 
166  def stop(self):
167  self._is_streaming = False
168 
170  while self._is_streaming:
171  try:
172  request = self._request_queue.get(block=True, timeout=1.0)
173  yield request
174  except queue.Empty:
175  pass
176 
177 
179 
180  def __init__(self, server, config, hist):
181  super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
182  self._pool = futures.ThreadPoolExecutor(
183  max_workers=config.outstanding_rpcs_per_channel)
184  self._streams = [
185  _SyncStream(self._stub, self._generic, self._request,
186  self._handle_response)
187  for _ in range(config.outstanding_rpcs_per_channel)
188  ]
189  self._curr_stream = 0
190 
191  def send_request(self):
192  # Use a round_robin scheduler to determine what stream to send on
193  self._streams[self._curr_stream].send_request()
194  self._curr_stream = (self._curr_stream + 1) % len(self._streams)
195 
196  def start(self):
197  for stream in self._streams:
198  self._pool.submit(stream.start)
199 
200  def stop(self):
201  for stream in self._streams:
202  stream.stop()
203  self._pool.shutdown(wait=True)
204  self._stub = None
205 
206 
208 
209  def __init__(self, server, config, hist):
210  super(ServerStreamingSyncBenchmarkClient,
211  self).__init__(server, config, hist)
212  if config.outstanding_rpcs_per_channel == 1:
213  self._pool = None
214  else:
215  self._pool = futures.ThreadPoolExecutor(
216  max_workers=config.outstanding_rpcs_per_channel)
217  self._rpcs = []
218  self._sender = None
219 
220  def send_request(self):
221  if self._pool is None:
222  self._sender = threading.Thread(
223  target=self._one_stream_streaming_rpc, daemon=True)
224  self._sender.start()
225  else:
226  self._pool.submit(self._one_stream_streaming_rpc)
227 
229  response_stream = self._stub.StreamingFromServer(
230  self._request, _TIMEOUT)
231  self._rpcs.append(response_stream)
232  start_time = time.time()
233  for _ in response_stream:
234  self._handle_response(self, time.time() - start_time)
235  start_time = time.time()
236 
237  def stop(self):
238  for call in self._rpcs:
239  call.cancel()
240  if self._sender is not None:
241  self._sender.join()
242  if self._pool is not None:
243  self._pool.shutdown(wait=False)
244  self._stub = None
tests.qps.benchmark_client._SyncStream.stop
def stop(self)
Definition: qps/benchmark_client.py:166
tests.qps.benchmark_client._SyncStream._is_streaming
_is_streaming
Definition: qps/benchmark_client.py:149
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
tests.qps.benchmark_client.UnaryAsyncBenchmarkClient
Definition: qps/benchmark_client.py:124
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.qps.benchmark_client._SyncStream.start
def start(self)
Definition: qps/benchmark_client.py:157
tests.qps.benchmark_client.GenericStub.__init__
def __init__(self, channel)
Definition: qps/benchmark_client.py:34
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
tests.qps.benchmark_client.BenchmarkClient.stop
def stop(self)
Definition: qps/benchmark_client.py:92
tests.qps.benchmark_client.BenchmarkClient.send_request
def send_request(self)
Definition: qps/benchmark_client.py:85
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.qps.benchmark_client.BenchmarkClient._request
_request
Definition: qps/benchmark_client.py:68
tests.qps.benchmark_client._SyncStream.__init__
def __init__(self, stub, generic, request, handle_response)
Definition: qps/benchmark_client.py:144
tests.qps.benchmark_client.UnarySyncBenchmarkClient._pool
_pool
Definition: qps/benchmark_client.py:105
send_request
Definition: ares_private.h:147
tests.qps.benchmark_client.GenericStub
Definition: qps/benchmark_client.py:32
tests.qps.benchmark_client.StreamingSyncBenchmarkClient._streams
_streams
Definition: qps/benchmark_client.py:184
tests.qps.benchmark_client.ServerStreamingSyncBenchmarkClient.send_request
def send_request(self)
Definition: qps/benchmark_client.py:220
tests.qps.benchmark_client._SyncStream._handle_response
_handle_response
Definition: qps/benchmark_client.py:148
tests.qps.benchmark_client._SyncStream._request_generator
def _request_generator(self)
Definition: qps/benchmark_client.py:169
tests.qps.benchmark_client.ServerStreamingSyncBenchmarkClient
Definition: qps/benchmark_client.py:207
tests.qps.benchmark_client.UnaryAsyncBenchmarkClient._response_received
def _response_received(self, start_time, resp)
Definition: qps/benchmark_client.py:133
tests.qps.benchmark_client.StreamingSyncBenchmarkClient
Definition: qps/benchmark_client.py:178
tests.qps.benchmark_client._SyncStream._request
_request
Definition: qps/benchmark_client.py:147
tests.qps.benchmark_client.ServerStreamingSyncBenchmarkClient._rpcs
_rpcs
Definition: qps/benchmark_client.py:217
tests.qps.benchmark_client.GenericStub.StreamingCall
StreamingCall
Definition: qps/benchmark_client.py:39
tests.qps.benchmark_client.BenchmarkClient._handle_response
def _handle_response(self, client, query_time)
Definition: qps/benchmark_client.py:95
tests.qps.benchmark_client.UnarySyncBenchmarkClient.__init__
def __init__(self, server, config, hist)
Definition: qps/benchmark_client.py:103
tests.qps.benchmark_client._SyncStream._send_time_queue
_send_time_queue
Definition: qps/benchmark_client.py:151
tests.qps.benchmark_client.BenchmarkClient._stub
_stub
Definition: qps/benchmark_client.py:63
tests.qps.benchmark_client.ServerStreamingSyncBenchmarkClient.stop
def stop(self)
Definition: qps/benchmark_client.py:237
tests.qps.benchmark_client.BenchmarkClient._response_callbacks
_response_callbacks
Definition: qps/benchmark_client.py:78
tests.qps.benchmark_client.ServerStreamingSyncBenchmarkClient._one_stream_streaming_rpc
def _one_stream_streaming_rpc(self)
Definition: qps/benchmark_client.py:228
tests.qps.benchmark_client._SyncStream
Definition: qps/benchmark_client.py:142
tests.qps.benchmark_client.UnarySyncBenchmarkClient
Definition: qps/benchmark_client.py:101
tests.qps.benchmark_client.StreamingSyncBenchmarkClient.stop
def stop(self)
Definition: qps/benchmark_client.py:200
tests.qps.benchmark_client.BenchmarkClient
Definition: qps/benchmark_client.py:43
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
tests.qps.benchmark_client.UnarySyncBenchmarkClient.stop
def stop(self)
Definition: qps/benchmark_client.py:113
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
tests.qps.benchmark_client.UnarySyncBenchmarkClient._dispatch_request
def _dispatch_request(self)
Definition: qps/benchmark_client.py:117
tests.qps.benchmark_client.ServerStreamingSyncBenchmarkClient._pool
_pool
Definition: qps/benchmark_client.py:213
tests.qps.benchmark_client.GenericStub.UnaryCall
UnaryCall
Definition: qps/benchmark_client.py:35
tests.qps.benchmark_client._SyncStream.send_request
def send_request(self)
Definition: qps/benchmark_client.py:153
add
static void add(const char *beg, const char *end, char ***ss, size_t *ns)
Definition: debug/trace.cc:96
tests.qps.benchmark_client.UnarySyncBenchmarkClient.send_request
def send_request(self)
Definition: qps/benchmark_client.py:108
tests.qps.benchmark_client.ServerStreamingSyncBenchmarkClient._sender
_sender
Definition: qps/benchmark_client.py:218
tests.qps.benchmark_client.StreamingSyncBenchmarkClient._curr_stream
_curr_stream
Definition: qps/benchmark_client.py:189
grpc.channel_ready_future
def channel_ready_future(channel)
Definition: src/python/grpcio/grpc/__init__.py:1945
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.qps.benchmark_client.StreamingSyncBenchmarkClient.start
def start(self)
Definition: qps/benchmark_client.py:196
tests.qps.benchmark_client.StreamingSyncBenchmarkClient.send_request
def send_request(self)
Definition: qps/benchmark_client.py:191
tests.qps.benchmark_client._SyncStream._request_queue
_request_queue
Definition: qps/benchmark_client.py:150
bytes
uint8 bytes[10]
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/coded_stream_unittest.cc:153
tests.qps.benchmark_client.GenericStub.StreamingFromServer
StreamingFromServer
Definition: qps/benchmark_client.py:37
tests.qps.benchmark_client.BenchmarkClient._hist
_hist
Definition: qps/benchmark_client.py:77
grpc.ssl_channel_credentials
def ssl_channel_credentials(root_certificates=None, private_key=None, certificate_chain=None)
Definition: src/python/grpcio/grpc/__init__.py:1607
tests.qps.benchmark_client.UnaryAsyncBenchmarkClient.send_request
def send_request(self)
Definition: qps/benchmark_client.py:126
tests.qps.benchmark_client.BenchmarkClient.add_response_callback
def add_response_callback(self, callback)
Definition: qps/benchmark_client.py:80
tests.qps.benchmark_client._SyncStream._generic
_generic
Definition: qps/benchmark_client.py:146
tests.qps.benchmark_client.StreamingSyncBenchmarkClient._pool
_pool
Definition: qps/benchmark_client.py:182
tests.qps.benchmark_client.ServerStreamingSyncBenchmarkClient.__init__
def __init__(self, server, config, hist)
Definition: qps/benchmark_client.py:209
tests.qps.benchmark_client.BenchmarkClient._generic
_generic
Definition: qps/benchmark_client.py:62
tests.qps.benchmark_client.StreamingSyncBenchmarkClient.__init__
def __init__(self, server, config, hist)
Definition: qps/benchmark_client.py:180
tests.qps.benchmark_client.UnaryAsyncBenchmarkClient.stop
def stop(self)
Definition: qps/benchmark_client.py:138
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests.qps.benchmark_client.BenchmarkClient.start
def start(self)
Definition: qps/benchmark_client.py:89
tests.qps.benchmark_client.BenchmarkClient.__init__
def __init__(self, server, config, hist)
Definition: qps/benchmark_client.py:48
tests.qps.benchmark_client._SyncStream._stub
_stub
Definition: qps/benchmark_client.py:145


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:46