Go to the documentation of this file.
19 #ifndef TEST_QPS_CLIENT_H
20 #define TEST_QPS_CLIENT_H
24 #include <condition_variable>
27 #include <unordered_map>
30 #include "absl/memory/memory.h"
31 #include "absl/strings/match.h"
42 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
43 #include "src/proto/grpc/testing/payloads.pb.h"
52 #define INPROC_NAME_PREFIX "qpsinproc:"
57 template <
class RequestType>
72 const PayloadConfig& payload_config) {
73 if (payload_config.has_bytebuf_params()) {
75 }
else if (payload_config.has_simple_params()) {
77 req->set_response_size(payload_config.simple_params().resp_size());
78 req->mutable_payload()->set_type(
80 int size = payload_config.simple_params().req_size();
81 std::unique_ptr<char[]> body(
new char[
size]);
82 req->mutable_payload()->set_body(body.get(),
size);
83 }
else if (payload_config.has_complex_params()) {
88 req->set_response_size(0);
89 req->mutable_payload()->set_type(
99 if (payload_config.has_bytebuf_params()) {
101 static_cast<size_t>(payload_config.bytebuf_params().req_size());
102 std::unique_ptr<char[]>
buf(
new char[req_sz]);
139 for (StatusHistogram::const_iterator
it =
from.begin();
it !=
from.end();
141 (*to)[
it->first] +=
it->second;
166 std::vector<Histogram> to_merge(
threads_.size());
167 std::vector<StatusHistogram> to_merge_status(
threads_.size());
170 threads_[
i]->BeginSwap(&to_merge[
i], &to_merge_status[
i]);
175 latencies.
Merge(to_merge[
i]);
178 timer_result =
timer->Mark();
183 threads_[
i]->MergeStatsInto(&latencies, &statuses);
185 timer_result =
timer_->Mark();
192 std::vector<double> medians_per_interval =
193 threads_[0]->GetMedianPerIntervalList();
195 gpr_log(
GPR_INFO,
"Number of medians: %zu", medians_per_interval.size());
196 for (
size_t j = 0; j < medians_per_interval.size(); j++) {
206 for (StatusHistogram::const_iterator
it = statuses.begin();
207 it != statuses.end(); ++
it) {
208 RequestResultCount* rrc =
stats.add_request_results();
209 rrc->set_status_code(
it->first);
210 rrc->set_count(
it->second);
212 stats.set_time_elapsed(timer_result.
wall);
215 stats.set_cq_poll_count(poll_count);
265 std::lock_guard<std::mutex>
g(
mu_);
271 std::unique_lock<std::mutex>
g(
mu_);
281 std::lock_guard<std::mutex>
g(
mu_);
359 std::unique_ptr<RandomDistInterface> random_dist;
360 switch (
load.load_case()) {
361 case LoadParams::kClosedLoop:
364 case LoadParams::kPoisson:
365 random_dist = absl::make_unique<ExpDist>(
load.poisson().offered_load() /
426 template <
class StubType,
class RequestType>
430 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
433 for (
int i = 0;
i <
config.client_channels();
i++) {
440 config.median_latency_collection_interval_millis() / 1e3;
448 int connect_deadline_seconds = 10;
452 char* channel_connect_timeout_str =
453 gpr_getenv(
"QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
454 if (channel_connect_timeout_str !=
nullptr &&
455 strcmp(channel_connect_timeout_str,
"") != 0) {
456 connect_deadline_seconds = atoi(channel_connect_timeout_str);
459 "Waiting for up to %d seconds for all channels to connect",
460 connect_deadline_seconds);
461 gpr_free(channel_connect_timeout_str);
466 size_t num_remaining = 0;
468 if (!c.is_inproc()) {
475 channel->NotifyOnStateChange(last_observed, connect_deadline, &
cq,
480 while (num_remaining > 0) {
495 channel->NotifyOnStateChange(last_observed, connect_deadline, &
cq,
510 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
514 args.SetInt(
"shard_to_ensure_no_subchannel_merges", shard);
518 if (
config.has_security_params() &&
519 config.security_params().cred_type().empty()) {
529 !
config.security_params().use_test_ca(),
530 std::shared_ptr<CallCredentials>(),
args);
535 tgt.erase(0, inproc_pfx.length());
536 int srv_num = std::stoi(tgt);
537 channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(
args);
548 for (
const auto& channel_arg :
config.channel_args()) {
549 if (channel_arg.value_case() == ChannelArg::kStrValue) {
550 args->SetString(channel_arg.name(), channel_arg.str_value());
551 }
else if (channel_arg.value_case() == ChannelArg::kIntValue) {
552 args->SetInt(channel_arg.name(), channel_arg.int_value());
564 std::function<std::unique_ptr<StubType>(
const std::shared_ptr<Channel>&)>
void Merge(const Histogram &h)
const char kTlsCredentialsType[]
std::function< std::unique_ptr< StubType >const std::shared_ptr< Channel > &)> create_stub_
GPRAPI unsigned gpr_cpu_num_cores(void)
void init(const RandomDistInterface &r, int threads, int entries=1000000)
std::unordered_map< int, int64_t > StatusHistogram
void set_status(int status)
std::vector< double > GetMedianPerIntervalList()
ClientStats Mark(bool reset)
gpr_free(creds_file_name)
std::unique_ptr< Client > CreateGenericAsyncStreamingClient(const ClientConfig &config)
return memset(p, 0, total)
gpr_timespec NextIssueTime(int thread_idx)
Histogram histogram_per_interval_
gpr_event start_requests_
ClientRequestCreator(ByteBuffer *req, const PayloadConfig &payload_config)
double Percentile(double pctile) const
std::unique_ptr< UsageTimer > timer_
bool StartsWith(absl::string_view text, absl::string_view prefix) noexcept
void AwaitThreadsCompletion()
GPRAPI void gpr_event_set(gpr_event *ev, void *value)
int last_reset_poll_count_
const RequestType * request()
void SetupLoadTest(const ClientConfig &config, size_t num_threads)
void FillProto(HistogramData *p)
void set_channel_args(const ClientConfig &config, ChannelArguments *args)
std::condition_variable threads_complete_
void MergeStatusHistogram(const StatusHistogram &from, StatusHistogram *to)
Channels represent a connection to an endpoint. Created by CreateChannel.
std::function< gpr_timespec()> NextIssuer(int thread_idx)
GPRAPI gpr_timespec gpr_time_from_nanos(int64_t ns, gpr_clock_type clock_type)
StatusHistogram statuses_
std::unique_ptr< StubType > stub_
wrapped_grpc_channel * channel
char * gpr_getenv(const char *name)
int64_t next(int thread_num)
void StartThreads(size_t num_threads)
#define INPROC_NAME_PREFIX
std::vector< gpr_timespec > next_time_
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
double GetLatencyCollectionIntervalInSeconds()
#define gpr_atm_acq_load(p)
#define gpr_atm_rel_store(p, value)
Thread(Client *client, size_t idx)
ClientChannelInfo(const std::string &target, const ClientConfig &config, std::function< std::unique_ptr< StubType >(std::shared_ptr< Channel >)> create_stub, int shard)
GPRAPI void gpr_event_init(gpr_event *ev)
void grpc_stats_collect(grpc_stats_data *output)
std::shared_ptr< Channel > CreateTestChannel(const std::string &server, const std::string &cred_type, const std::string &override_hostname, bool use_prod_roots, const std::shared_ptr< CallCredentials > &creds, const ChannelArguments &args)
virtual void ThreadFunc(size_t thread_idx, Client::Thread *t)=0
ClientImpl(const ClientConfig &config, std::function< std::unique_ptr< StubType >(std::shared_ptr< Channel >)> create_stub)
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
std::shared_ptr< Channel > channel_
GPRAPI void * gpr_event_wait(gpr_event *ev, gpr_timespec abs_deadline)
ClientRequestCreator(SimpleRequest *req, const PayloadConfig &payload_config)
static void * tag(intptr_t t)
void MergeStatsInto(Histogram *hist, StatusHistogram *s)
void WaitForChannelsToConnect()
void CoreStatsToProto(const grpc_stats_data &core, Stats *proto)
gpr_atm thread_pool_done_
std::unique_ptr< Client > CreateSynchronousClient(const ClientConfig &config)
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
struct gpr_timespec gpr_timespec
size_t threads_remaining_
void MaybeStartRequests()
std::vector< ClientChannelInfo > channels_
static volatile int num_threads
Thread & operator=(const Thread &)
void UpdateHistogram(HistogramEntry *entry)
std::vector< std::unique_ptr< Thread > > threads_
std::vector< double > medians_each_interval_list_
double interval_start_time_
void BeginSwap(Histogram *n, StatusHistogram *s)
std::unique_ptr< Client > CreateCallbackClient(const ClientConfig &config)
InterarrivalTimer interarrival_timer_
ClientRequestCreator(RequestType *, const PayloadConfig &)
std::mutex thread_completion_mu_
def create_stub(channel, args)
double median_latency_collection_interval_seconds_
std::unique_ptr< Client > CreateAsyncClient(const ClientConfig &config)
static uv_thread_t thread
static grpc_completion_queue * cq
GPRAPI gpr_timespec gpr_time_from_seconds(int64_t s, gpr_clock_type clock_type)
virtual void DestroyMultithreading()=0
virtual int GetPollCount()
grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:46