25 #include <unordered_map>
28 #include "google/protobuf/timestamp.pb.h"
40 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
51 using std::unique_ptr;
70 if (strlen(
env) != 0) {
72 char* comma = strchr(p,
',');
74 out.emplace_back(p, comma);
84 "Environment variable \"%s\" does not contain a list of QPS "
85 "workers to use. Set it to a comma-separated list of "
86 "hostname:port pairs, starting with hosts that should act as "
87 "servers. E.g. export "
88 "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
89 env_name.c_str(), env_name.c_str());
97 const std::map<std::string, std::string>& per_worker_credential_types,
99 auto it = per_worker_credential_types.find(worker_addr);
100 if (
it != per_worker_credential_types.end()) {
103 return credential_type;
107 static double WallTime(
const ClientStats& s) {
return s.time_elapsed(); }
108 static double SystemTime(
const ClientStats& s) {
return s.time_system(); }
109 static double UserTime(
const ClientStats& s) {
return s.time_user(); }
110 static double CliPollCount(
const ClientStats& s) {
return s.cq_poll_count(); }
111 static double SvrPollCount(
const ServerStats& s) {
return s.cq_poll_count(); }
116 return s.total_cpu_time();
119 return s.idle_cpu_time();
124 if (s.ok())
return true;
133 if (s.error_message() ==
"Socket closed")
return true;
147 result->mutable_summary()->set_latency_999(
histogram.Percentile(99.9));
152 double client_system_cpu_load = 0, client_user_cpu_load = 0;
153 for (
int i = 0;
i <
result->client_stats_size();
i++) {
154 auto client_stat =
result->client_stats(
i);
155 qps += client_stat.latencies().count() / client_stat.time_elapsed();
156 client_system_cpu_load +=
157 client_stat.time_system() / client_stat.time_elapsed();
158 client_user_cpu_load +=
159 client_stat.time_user() / client_stat.time_elapsed();
163 double server_system_cpu_load = 0, server_user_cpu_load = 0;
164 for (
int i = 0;
i <
result->server_stats_size();
i++) {
165 auto server_stat =
result->server_stats(
i);
166 server_system_cpu_load +=
167 server_stat.time_system() / server_stat.time_elapsed();
168 server_user_cpu_load +=
169 server_stat.time_user() / server_stat.time_elapsed();
171 result->mutable_summary()->set_qps(qps);
173 result->mutable_summary()->set_server_system_time(100 *
174 server_system_cpu_load);
175 result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
176 result->mutable_summary()->set_client_system_time(100 *
177 client_system_cpu_load);
178 result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
183 result->mutable_summary()->set_server_cpu_usage(0);
185 auto server_cpu_usage =
188 result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
194 if (
result->request_results_size() > 0) {
197 for (
int i = 0;
i <
result->request_results_size();
i++) {
198 const RequestResultCount& rrc =
result->request_results(
i);
199 if (rrc.status_code() == 0) {
205 result->mutable_summary()->set_successful_requests_per_second(
207 result->mutable_summary()->set_failed_requests_per_second(
failures /
212 auto qps_per_server_core = qps /
sum(
result->server_cores(),
Cores);
213 result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
214 result->mutable_summary()->set_client_polls_per_request(
216 result->mutable_summary()->set_server_polls_per_request(
219 auto server_queries_per_cpu_sec =
222 auto client_queries_per_cpu_sec =
226 result->mutable_summary()->set_server_queries_per_cpu_sec(
227 server_queries_per_cpu_sec);
228 result->mutable_summary()->set_client_queries_per_cpu_sec(
229 client_queries_per_cpu_sec);
233 unique_ptr<WorkerService::Stub>
stub;
234 unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>>
stream;
238 unique_ptr<WorkerService::Stub>
stub;
239 unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>>
stream;
243 const ClientArgs& client_mark) {
245 for (
size_t i = 0, i_end =
clients.size();
i < i_end;
i++) {
247 if (!
client->stream->Write(client_mark)) {
251 if (!
client->stream->WritesDone()) {
260 std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult&
result) {
262 ClientStatus client_status;
263 for (
size_t i = 0, i_end =
clients.size();
i < i_end;
i++) {
266 if (
client->stream->Read(&client_status)) {
268 const auto&
stats = client_status.stats();
270 for (
int i = 0;
i <
stats.request_results_size();
i++) {
271 merged_statuses[
stats.request_results(
i).status_code()] +=
272 stats.request_results(
i).count();
292 for (
size_t i = 0, i_end =
clients.size();
i < i_end;
i++) {
299 result.add_client_success(success);
302 s.error_message().c_str());
309 const ServerArgs& server_mark) {
311 for (
size_t i = 0, i_end =
servers.size();
i < i_end;
i++) {
313 if (!
server->stream->Write(server_mark)) {
317 if (!
server->stream->WritesDone()) {
327 ServerStatus server_status;
328 for (
size_t i = 0, i_end =
servers.size();
i < i_end;
i++) {
331 if (
server->stream->Read(&server_status)) {
333 result.add_server_stats()->CopyFrom(server_status.stats());
334 result.add_server_cores(server_status.cores());
347 for (
size_t i = 0, i_end =
servers.size();
i < i_end;
i++) {
354 result.add_server_success(success);
357 s.error_message().c_str());
366 const ClientConfig& initial_client_config,
size_t num_clients,
367 const ServerConfig& initial_server_config,
size_t num_servers,
368 int warmup_seconds,
int benchmark_seconds,
int spawn_local_worker_count,
371 const std::map<std::string, std::string>& per_worker_credential_types,
372 bool run_inproc,
int32_t median_latency_collection_interval_millis) {
380 list<ClientContext> contexts;
381 auto alloc_context = [](list<ClientContext>* contexts) {
382 contexts->emplace_back();
383 auto context = &contexts->back();
397 vector<unique_ptr<QpsWorker>> local_workers;
398 for (
int i = 0;
i < abs(spawn_local_worker_count);
i++) {
400 static bool called_init =
false;
403 strcpy(args_buf,
"some-benchmark");
404 char*
args[] = {args_buf};
413 local_workers.emplace_back(
new QpsWorker(driver_port, 0, credential_type));
414 sprintf(
addr,
"localhost:%d", driver_port);
415 if (spawn_local_worker_count < 0) {
425 if (num_clients <= 0) {
426 num_clients =
workers.size() - num_servers;
435 workers.resize(num_clients + num_servers);
438 std::vector<ServerData>
servers(num_servers);
439 std::unordered_map<string, std::deque<int>> hosts_cores;
442 for (
size_t i = 0;
i < num_servers;
i++) {
451 servers[
i].stub = WorkerService::NewStub(
452 local_workers[
i]->InProcessChannel(channel_args));
455 const ServerConfig& server_config = initial_server_config;
456 if (server_config.core_limit() != 0) {
458 "server config core limit is set but ignored by driver");
463 *
args.mutable_setup() = server_config;
469 ServerStatus init_status;
477 client_config.add_server_targets(cli_target);
482 client_config.add_server_targets(cli_target.c_str());
485 if (qps_server_target_override.length() > 0) {
489 client_config.clear_server_targets();
490 client_config.add_server_targets(qps_server_target_override);
492 client_config.set_median_latency_collection_interval_millis(
493 median_latency_collection_interval_millis);
496 result_client_config = client_config;
498 std::vector<ClientData>
clients(num_clients);
499 size_t channels_allocated = 0;
500 for (
size_t i = 0;
i < num_clients;
i++) {
503 worker.c_str(),
i + num_servers);
510 clients[
i].stub = WorkerService::NewStub(
511 local_workers[
i + num_servers]->InProcessChannel(channel_args));
515 if (initial_client_config.core_limit() != 0) {
522 size_t num_channels =
523 (client_config.client_channels() - channels_allocated) /
525 channels_allocated += num_channels;
528 per_client_config.set_client_channels(num_channels);
531 *
args.mutable_setup() = per_client_config;
539 for (
size_t i = 0;
i < num_clients;
i++) {
540 ClientStatus init_status;
550 ServerArgs server_mark;
551 server_mark.mutable_mark()->set_reset(
true);
552 ClientArgs client_mark;
553 client_mark.mutable_mark()->set_reset(
true);
554 ServerStatus server_status;
555 ClientStatus client_status;
556 for (
size_t i = 0;
i < num_clients;
i++) {
558 if (!
client->stream->Write(client_mark)) {
563 for (
size_t i = 0;
i < num_clients;
i++) {
565 if (!
client->stream->Read(&client_status)) {
582 for (
size_t i = 0;
i < num_servers;
i++) {
584 if (!
server->stream->Write(server_mark)) {
589 for (
size_t i = 0;
i < num_clients;
i++) {
591 if (!
client->stream->Write(client_mark)) {
596 for (
size_t i = 0;
i < num_servers;
i++) {
598 if (!
server->stream->Read(&server_status)) {
603 for (
size_t i = 0;
i < num_clients;
i++) {
605 if (!
client->stream->Read(&client_status)) {
622 std::unique_ptr<ScenarioResult>
result(
new ScenarioResult);
624 std::unordered_map<int, int64_t> merged_statuses;
630 bool client_finish_first =
631 (client_config.rpc_type() != STREAMING_FROM_SERVER);
637 if (!client_finish_first) {
645 if (client_finish_first) {
656 it != merged_statuses.end(); ++
it) {
657 RequestResultCount* rrc =
result->add_request_results();
658 rrc->set_status_code(
it->first);
659 rrc->set_count(
it->second);
664 result->mutable_summary()->mutable_end_time()->set_seconds(
end_time);
672 const std::map<std::string, std::string>& per_worker_credential_types) {
687 ctx.set_wait_for_ready(
true);
691 s.error_message().c_str());