14 """Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC)."""
17 from concurrent
import futures
22 from six.moves
import queue
24 from src.proto.grpc.testing
import benchmark_service_pb2_grpc
25 from src.proto.grpc.testing
import messages_pb2
29 _TIMEOUT = 60 * 60 * 24
36 '/grpc.testing.BenchmarkService/UnaryCall')
38 '/grpc.testing.BenchmarkService/StreamingFromServer')
40 '/grpc.testing.BenchmarkService/StreamingCall')
44 """Benchmark client interface that exposes a non-blocking send_request()."""
46 __metaclass__ = abc.ABCMeta
50 if config.HasField(
'security_params'):
52 resources.test_root_certificates())
53 channel = test_common.test_secure_channel(
54 server, creds, config.security_params.server_host_override)
61 if config.payload_config.WhichOneof(
'payload') ==
'simple_params':
63 self.
_stub = benchmark_service_pb2_grpc.BenchmarkServiceStub(
67 config.payload_config.simple_params.req_size))
70 response_size=config.payload_config.simple_params.resp_size)
75 config.payload_config.bytebuf_params.req_size)
81 """callback will be invoked as callback(client, query_time)"""
86 """Non-blocking wrapper for a client's request operation."""
87 raise NotImplementedError()
96 self._hist.
add(query_time * 1e9)
97 for callback
in self._response_callbacks:
104 super(UnarySyncBenchmarkClient, self).
__init__(server, config, hist)
105 self.
_pool = futures.ThreadPoolExecutor(
106 max_workers=config.outstanding_rpcs_per_channel)
114 self.
_pool.shutdown(wait=
True)
118 start_time = time.time()
120 end_time = time.time()
128 start_time = time.time()
129 response_future = self.
_stub.UnaryCall.future(self.
_request, _TIMEOUT)
130 response_future.add_done_callback(
135 end_time = time.time()
144 def __init__(self, stub, generic, request, handle_response):
161 for _
in response_stream:
181 super(StreamingSyncBenchmarkClient, self).
__init__(server, config, hist)
182 self.
_pool = futures.ThreadPoolExecutor(
183 max_workers=config.outstanding_rpcs_per_channel)
187 for _
in range(config.outstanding_rpcs_per_channel)
198 self.
_pool.submit(stream.start)
203 self.
_pool.shutdown(wait=
True)
210 super(ServerStreamingSyncBenchmarkClient,
211 self).
__init__(server, config, hist)
212 if config.outstanding_rpcs_per_channel == 1:
215 self.
_pool = futures.ThreadPoolExecutor(
216 max_workers=config.outstanding_rpcs_per_channel)
221 if self.
_pool is None:
222 self.
_sender = threading.Thread(
229 response_stream = self.
_stub.StreamingFromServer(
231 self.
_rpcs.append(response_stream)
232 start_time = time.time()
233 for _
in response_stream:
235 start_time = time.time()
238 for call
in self.
_rpcs:
242 if self.
_pool is not None:
243 self.
_pool.shutdown(wait=
False)