driver.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/cpp/qps/driver.h"
20 
21 #include <cinttypes>
22 #include <deque>
23 #include <list>
24 #include <thread>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "google/protobuf/timestamp.pb.h"
29 
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/create_channel.h>
36 
37 #include "src/core/lib/gpr/env.h"
40 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
41 #include "test/core/util/port.h"
43 #include "test/cpp/qps/client.h"
44 #include "test/cpp/qps/histogram.h"
46 #include "test/cpp/qps/stats.h"
48 
49 using std::deque;
50 using std::list;
51 using std::unique_ptr;
52 using std::vector;
53 
54 namespace grpc {
55 namespace testing {
57  absl::string_view host;
59  grpc_core::SplitHostPort(worker.c_str(), &host, &port);
60  return std::string(host.data(), host.size());
61 }
62 
63 static deque<string> get_workers(const string& env_name) {
64  deque<string> out;
65  char* env = gpr_getenv(env_name.c_str());
66  if (!env) {
67  env = gpr_strdup("");
68  }
69  char* p = env;
70  if (strlen(env) != 0) {
71  for (;;) {
72  char* comma = strchr(p, ',');
73  if (comma) {
74  out.emplace_back(p, comma);
75  p = comma + 1;
76  } else {
77  out.emplace_back(p);
78  break;
79  }
80  }
81  }
82  if (out.empty()) {
84  "Environment variable \"%s\" does not contain a list of QPS "
85  "workers to use. Set it to a comma-separated list of "
86  "hostname:port pairs, starting with hosts that should act as "
87  "servers. E.g. export "
88  "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
89  env_name.c_str(), env_name.c_str());
90  }
91  gpr_free(env);
92  return out;
93 }
94 
96  const std::string& worker_addr,
97  const std::map<std::string, std::string>& per_worker_credential_types,
98  const std::string& credential_type) {
99  auto it = per_worker_credential_types.find(worker_addr);
100  if (it != per_worker_credential_types.end()) {
101  return it->second;
102  }
103  return credential_type;
104 }
105 
106 // helpers for postprocess_scenario_result
107 static double WallTime(const ClientStats& s) { return s.time_elapsed(); }
108 static double SystemTime(const ClientStats& s) { return s.time_system(); }
109 static double UserTime(const ClientStats& s) { return s.time_user(); }
110 static double CliPollCount(const ClientStats& s) { return s.cq_poll_count(); }
111 static double SvrPollCount(const ServerStats& s) { return s.cq_poll_count(); }
112 static double ServerWallTime(const ServerStats& s) { return s.time_elapsed(); }
113 static double ServerSystemTime(const ServerStats& s) { return s.time_system(); }
114 static double ServerUserTime(const ServerStats& s) { return s.time_user(); }
115 static double ServerTotalCpuTime(const ServerStats& s) {
116  return s.total_cpu_time();
117 }
118 static double ServerIdleCpuTime(const ServerStats& s) {
119  return s.idle_cpu_time();
120 }
121 static int Cores(int n) { return n; }
122 
123 static bool IsSuccess(const Status& s) {
124  if (s.ok()) return true;
125  // Since we shutdown servers and clients at the same time, they both can
126  // observe cancellation. Thus, we consider CANCELLED as good status.
127  if (static_cast<StatusCode>(s.error_code()) == StatusCode::CANCELLED) {
128  return true;
129  }
130  // Since we shutdown servers and clients at the same time, server can close
131  // the socket before the client attempts to do that, and vice versa. Thus
132  // receiving a "Socket closed" error is fine.
133  if (s.error_message() == "Socket closed") return true;
134  return false;
135 }
136 
137 // Postprocess ScenarioResult and populate result summary.
138 static void postprocess_scenario_result(ScenarioResult* result) {
139  // Get latencies from ScenarioResult latencies histogram and populate to
140  // result summary.
142  histogram.MergeProto(result->latencies());
143  result->mutable_summary()->set_latency_50(histogram.Percentile(50));
144  result->mutable_summary()->set_latency_90(histogram.Percentile(90));
145  result->mutable_summary()->set_latency_95(histogram.Percentile(95));
146  result->mutable_summary()->set_latency_99(histogram.Percentile(99));
147  result->mutable_summary()->set_latency_999(histogram.Percentile(99.9));
148 
149  // Calculate qps and cpu load for each client and then aggregate results for
150  // all clients
151  double qps = 0;
152  double client_system_cpu_load = 0, client_user_cpu_load = 0;
153  for (int i = 0; i < result->client_stats_size(); i++) {
154  auto client_stat = result->client_stats(i);
155  qps += client_stat.latencies().count() / client_stat.time_elapsed();
156  client_system_cpu_load +=
157  client_stat.time_system() / client_stat.time_elapsed();
158  client_user_cpu_load +=
159  client_stat.time_user() / client_stat.time_elapsed();
160  }
161  // Calculate cpu load for each server and then aggregate results for all
162  // servers
163  double server_system_cpu_load = 0, server_user_cpu_load = 0;
164  for (int i = 0; i < result->server_stats_size(); i++) {
165  auto server_stat = result->server_stats(i);
166  server_system_cpu_load +=
167  server_stat.time_system() / server_stat.time_elapsed();
168  server_user_cpu_load +=
169  server_stat.time_user() / server_stat.time_elapsed();
170  }
171  result->mutable_summary()->set_qps(qps);
172  // Populate the percentage of cpu load to result summary.
173  result->mutable_summary()->set_server_system_time(100 *
174  server_system_cpu_load);
175  result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
176  result->mutable_summary()->set_client_system_time(100 *
177  client_system_cpu_load);
178  result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
179 
180  // For Non-linux platform, get_cpu_usage() is not implemented. Thus,
181  // ServerTotalCpuTime and ServerIdleCpuTime are both 0.
182  if (average(result->server_stats(), ServerTotalCpuTime) == 0) {
183  result->mutable_summary()->set_server_cpu_usage(0);
184  } else {
185  auto server_cpu_usage =
186  100 - 100 * average(result->server_stats(), ServerIdleCpuTime) /
187  average(result->server_stats(), ServerTotalCpuTime);
188  result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
189  }
190 
191  // Calculate and populate successful request per second and failed requests
192  // per seconds to result summary.
193  auto time_estimate = average(result->client_stats(), WallTime);
194  if (result->request_results_size() > 0) {
195  int64_t successes = 0;
196  int64_t failures = 0;
197  for (int i = 0; i < result->request_results_size(); i++) {
198  const RequestResultCount& rrc = result->request_results(i);
199  if (rrc.status_code() == 0) {
200  successes += rrc.count();
201  } else {
202  failures += rrc.count();
203  }
204  }
205  result->mutable_summary()->set_successful_requests_per_second(
206  successes / time_estimate);
207  result->mutable_summary()->set_failed_requests_per_second(failures /
208  time_estimate);
209  }
210 
211  // Fill in data for other metrics required in result summary
212  auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
213  result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
214  result->mutable_summary()->set_client_polls_per_request(
215  sum(result->client_stats(), CliPollCount) / histogram.Count());
216  result->mutable_summary()->set_server_polls_per_request(
217  sum(result->server_stats(), SvrPollCount) / histogram.Count());
218 
219  auto server_queries_per_cpu_sec =
220  histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
221  sum(result->server_stats(), ServerUserTime));
222  auto client_queries_per_cpu_sec =
223  histogram.Count() / (sum(result->client_stats(), SystemTime) +
224  sum(result->client_stats(), UserTime));
225 
226  result->mutable_summary()->set_server_queries_per_cpu_sec(
227  server_queries_per_cpu_sec);
228  result->mutable_summary()->set_client_queries_per_cpu_sec(
229  client_queries_per_cpu_sec);
230 }
231 
232 struct ClientData {
233  unique_ptr<WorkerService::Stub> stub;
234  unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
235 };
236 
237 struct ServerData {
238  unique_ptr<WorkerService::Stub> stub;
239  unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
240 };
241 
242 static void FinishClients(const std::vector<ClientData>& clients,
243  const ClientArgs& client_mark) {
244  gpr_log(GPR_INFO, "Finishing clients");
245  for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
246  auto client = &clients[i];
247  if (!client->stream->Write(client_mark)) {
248  gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
249  GPR_ASSERT(false);
250  }
251  if (!client->stream->WritesDone()) {
252  gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
253  GPR_ASSERT(false);
254  }
255  }
256 }
257 
259  const std::vector<ClientData>& clients, Histogram& merged_latencies,
260  std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
261  gpr_log(GPR_INFO, "Receiving final status from clients");
262  ClientStatus client_status;
263  for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
264  auto client = &clients[i];
265  // Read the client final status
266  if (client->stream->Read(&client_status)) {
267  gpr_log(GPR_INFO, "Received final status from client %zu", i);
268  const auto& stats = client_status.stats();
269  merged_latencies.MergeProto(stats.latencies());
270  for (int i = 0; i < stats.request_results_size(); i++) {
271  merged_statuses[stats.request_results(i).status_code()] +=
272  stats.request_results(i).count();
273  }
274  result.add_client_stats()->CopyFrom(stats);
275  // Check that final status was should be the last message on the client
276  // stream.
277  // TODO(jtattermusch): note that that waiting for Read to return can take
278  // long on some scenarios (e.g. unconstrained streaming_from_server). See
279  // https://github.com/grpc/grpc/blob/3bd0cd208ea549760a2daf595f79b91b247fe240/test/cpp/qps/server_async.cc#L176
280  // where the shutdown delay pretty much determines the wait here.
281  GPR_ASSERT(!client->stream->Read(&client_status));
282  } else {
283  gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
284  GPR_ASSERT(false);
285  }
286  }
287 }
288 
289 static void ShutdownClients(const std::vector<ClientData>& clients,
290  ScenarioResult& result) {
291  gpr_log(GPR_INFO, "Shutdown clients");
292  for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
293  auto client = &clients[i];
294  Status s = client->stream->Finish();
295  // Since we shutdown servers and clients at the same time, clients can
296  // observe cancellation. Thus, we consider both OK and CANCELLED as good
297  // status.
298  const bool success = IsSuccess(s);
299  result.add_client_success(success);
300  if (!success) {
301  gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
302  s.error_message().c_str());
303  GPR_ASSERT(false);
304  }
305  }
306 }
307 
308 static void FinishServers(const std::vector<ServerData>& servers,
309  const ServerArgs& server_mark) {
310  gpr_log(GPR_INFO, "Finishing servers");
311  for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
312  auto server = &servers[i];
313  if (!server->stream->Write(server_mark)) {
314  gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
315  GPR_ASSERT(false);
316  }
317  if (!server->stream->WritesDone()) {
318  gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
319  GPR_ASSERT(false);
320  }
321  }
322 }
323 
324 static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
325  ScenarioResult& result) {
326  gpr_log(GPR_INFO, "Receiving final status from servers");
327  ServerStatus server_status;
328  for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
329  auto server = &servers[i];
330  // Read the server final status
331  if (server->stream->Read(&server_status)) {
332  gpr_log(GPR_INFO, "Received final status from server %zu", i);
333  result.add_server_stats()->CopyFrom(server_status.stats());
334  result.add_server_cores(server_status.cores());
335  // That final status should be the last message on the server stream
336  GPR_ASSERT(!server->stream->Read(&server_status));
337  } else {
338  gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
339  GPR_ASSERT(false);
340  }
341  }
342 }
343 
344 static void ShutdownServers(const std::vector<ServerData>& servers,
345  ScenarioResult& result) {
346  gpr_log(GPR_INFO, "Shutdown servers");
347  for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
348  auto server = &servers[i];
349  Status s = server->stream->Finish();
350  // Since we shutdown servers and clients at the same time, servers can
351  // observe cancellation. Thus, we consider both OK and CANCELLED as good
352  // status.
353  const bool success = IsSuccess(s);
354  result.add_server_success(success);
355  if (!success) {
356  gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
357  s.error_message().c_str());
358  GPR_ASSERT(false);
359  }
360  }
361 }
362 
363 std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
364 
365 std::unique_ptr<ScenarioResult> RunScenario(
366  const ClientConfig& initial_client_config, size_t num_clients,
367  const ServerConfig& initial_server_config, size_t num_servers,
368  int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
369  const std::string& qps_server_target_override,
370  const std::string& credential_type,
371  const std::map<std::string, std::string>& per_worker_credential_types,
372  bool run_inproc, int32_t median_latency_collection_interval_millis) {
373  if (run_inproc) {
374  g_inproc_servers = new std::vector<grpc::testing::Server*>;
375  }
376  // Log everything from the driver
378 
379  // ClientContext allocations (all are destroyed at scope exit)
380  list<ClientContext> contexts;
381  auto alloc_context = [](list<ClientContext>* contexts) {
382  contexts->emplace_back();
383  auto context = &contexts->back();
385  return context;
386  };
387 
388  // To be added to the result, containing the final configuration used for
389  // client and config (including host, etc.)
390  ClientConfig result_client_config;
391 
392  // Get client, server lists; ignore if inproc test
393  auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
394  ClientConfig client_config = initial_client_config;
395 
396  // Spawn some local workers if desired
397  vector<unique_ptr<QpsWorker>> local_workers;
398  for (int i = 0; i < abs(spawn_local_worker_count); i++) {
399  // act as if we're a new test -- gets a good rng seed
400  static bool called_init = false;
401  if (!called_init) {
402  char args_buf[100];
403  strcpy(args_buf, "some-benchmark");
404  char* args[] = {args_buf};
405  int argc = 1;
406  grpc_test_init(&argc, args);
407  called_init = true;
408  }
409 
410  char addr[256];
411  // we use port # of -1 to indicate inproc
412  int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
413  local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
414  sprintf(addr, "localhost:%d", driver_port);
415  if (spawn_local_worker_count < 0) {
416  workers.push_front(addr);
417  } else {
418  workers.push_back(addr);
419  }
420  }
421  GPR_ASSERT(!workers.empty());
422 
423  // if num_clients is set to <=0, do dynamic sizing: all workers
424  // except for servers are clients
425  if (num_clients <= 0) {
426  num_clients = workers.size() - num_servers;
427  }
428 
429  // TODO(ctiller): support running multiple configurations, and binpack
430  // client/server pairs
431  // to available workers
432  GPR_ASSERT(workers.size() >= num_clients + num_servers);
433 
434  // Trim to just what we need
435  workers.resize(num_clients + num_servers);
436 
437  // Start servers
438  std::vector<ServerData> servers(num_servers);
439  std::unordered_map<string, std::deque<int>> hosts_cores;
440  ChannelArguments channel_args;
441 
442  for (size_t i = 0; i < num_servers; i++) {
443  gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
444  workers[i].c_str(), i);
445  if (!run_inproc) {
446  servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
447  workers[i],
448  GetCredType(workers[i], per_worker_credential_types, credential_type),
449  nullptr /* call creds */, {} /* interceptor creators */));
450  } else {
451  servers[i].stub = WorkerService::NewStub(
452  local_workers[i]->InProcessChannel(channel_args));
453  }
454 
455  const ServerConfig& server_config = initial_server_config;
456  if (server_config.core_limit() != 0) {
458  "server config core limit is set but ignored by driver");
459  GPR_ASSERT(false);
460  }
461 
462  ServerArgs args;
463  *args.mutable_setup() = server_config;
464  servers[i].stream = servers[i].stub->RunServer(alloc_context(&contexts));
465  if (!servers[i].stream->Write(args)) {
466  gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
467  GPR_ASSERT(false);
468  }
469  ServerStatus init_status;
470  if (!servers[i].stream->Read(&init_status)) {
471  gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
472  GPR_ASSERT(false);
473  }
474  if (run_inproc) {
475  std::string cli_target(INPROC_NAME_PREFIX);
476  cli_target += std::to_string(i);
477  client_config.add_server_targets(cli_target);
478  } else {
479  std::string host = get_host(workers[i]);
480  std::string cli_target =
481  grpc_core::JoinHostPort(host.c_str(), init_status.port());
482  client_config.add_server_targets(cli_target.c_str());
483  }
484  }
485  if (qps_server_target_override.length() > 0) {
486  // overriding the qps server target only makes since if there is <= 1
487  // servers
488  GPR_ASSERT(num_servers <= 1);
489  client_config.clear_server_targets();
490  client_config.add_server_targets(qps_server_target_override);
491  }
492  client_config.set_median_latency_collection_interval_millis(
493  median_latency_collection_interval_millis);
494 
495  // Targets are all set by now
496  result_client_config = client_config;
497  // Start clients
498  std::vector<ClientData> clients(num_clients);
499  size_t channels_allocated = 0;
500  for (size_t i = 0; i < num_clients; i++) {
501  const auto& worker = workers[i + num_servers];
502  gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
503  worker.c_str(), i + num_servers);
504  if (!run_inproc) {
505  clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
506  worker,
507  GetCredType(worker, per_worker_credential_types, credential_type),
508  nullptr /* call creds */, {} /* interceptor creators */));
509  } else {
510  clients[i].stub = WorkerService::NewStub(
511  local_workers[i + num_servers]->InProcessChannel(channel_args));
512  }
513  ClientConfig per_client_config = client_config;
514 
515  if (initial_client_config.core_limit() != 0) {
516  gpr_log(GPR_ERROR, "client config core limit set but ignored");
517  GPR_ASSERT(false);
518  }
519 
520  // Reduce channel count so that total channels specified is held regardless
521  // of the number of clients available
522  size_t num_channels =
523  (client_config.client_channels() - channels_allocated) /
524  (num_clients - i);
525  channels_allocated += num_channels;
526  gpr_log(GPR_DEBUG, "Client %" PRIdPTR " gets %" PRIdPTR " channels", i,
527  num_channels);
528  per_client_config.set_client_channels(num_channels);
529 
530  ClientArgs args;
531  *args.mutable_setup() = per_client_config;
532  clients[i].stream = clients[i].stub->RunClient(alloc_context(&contexts));
533  if (!clients[i].stream->Write(args)) {
534  gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
535  GPR_ASSERT(false);
536  }
537  }
538 
539  for (size_t i = 0; i < num_clients; i++) {
540  ClientStatus init_status;
541  if (!clients[i].stream->Read(&init_status)) {
542  gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
543  GPR_ASSERT(false);
544  }
545  }
546 
547  // Send an initial mark: clients can use this to know that everything is ready
548  // to start
549  gpr_log(GPR_INFO, "Initiating");
550  ServerArgs server_mark;
551  server_mark.mutable_mark()->set_reset(true);
552  ClientArgs client_mark;
553  client_mark.mutable_mark()->set_reset(true);
554  ServerStatus server_status;
555  ClientStatus client_status;
556  for (size_t i = 0; i < num_clients; i++) {
557  auto client = &clients[i];
558  if (!client->stream->Write(client_mark)) {
559  gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
560  GPR_ASSERT(false);
561  }
562  }
563  for (size_t i = 0; i < num_clients; i++) {
564  auto client = &clients[i];
565  if (!client->stream->Read(&client_status)) {
566  gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
567  GPR_ASSERT(false);
568  }
569  }
570 
571  // Let everything warmup
572  gpr_log(GPR_INFO, "Warming up");
576 
577  // Start a run
578  gpr_log(GPR_INFO, "Starting");
579 
580  auto start_time = time(nullptr);
581 
582  for (size_t i = 0; i < num_servers; i++) {
583  auto server = &servers[i];
584  if (!server->stream->Write(server_mark)) {
585  gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
586  GPR_ASSERT(false);
587  }
588  }
589  for (size_t i = 0; i < num_clients; i++) {
590  auto client = &clients[i];
591  if (!client->stream->Write(client_mark)) {
592  gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
593  GPR_ASSERT(false);
594  }
595  }
596  for (size_t i = 0; i < num_servers; i++) {
597  auto server = &servers[i];
598  if (!server->stream->Read(&server_status)) {
599  gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
600  GPR_ASSERT(false);
601  }
602  }
603  for (size_t i = 0; i < num_clients; i++) {
604  auto client = &clients[i];
605  if (!client->stream->Read(&client_status)) {
606  gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
607  GPR_ASSERT(false);
608  }
609  }
610 
611  // Wait some time
612  gpr_log(GPR_INFO, "Running");
613  // Use gpr_sleep_until rather than this_thread::sleep_until to support
614  // compilers that don't work with this_thread
616  start,
617  gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
618 
620 
621  // Finish a run
622  std::unique_ptr<ScenarioResult> result(new ScenarioResult);
623  Histogram merged_latencies;
624  std::unordered_map<int, int64_t> merged_statuses;
625 
626  // For the case where clients lead the test such as UNARY and
627  // STREAMING_FROM_CLIENT, clients need to finish completely while a server
628  // is running to prevent the clients from being stuck while waiting for
629  // the result.
630  bool client_finish_first =
631  (client_config.rpc_type() != STREAMING_FROM_SERVER);
632 
633  auto end_time = time(nullptr);
634 
635  FinishClients(clients, client_mark);
636 
637  if (!client_finish_first) {
638  FinishServers(servers, server_mark);
639  }
640 
641  ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
642  *result);
644 
645  if (client_finish_first) {
646  FinishServers(servers, server_mark);
647  }
648 
651 
652  delete g_inproc_servers;
653 
654  merged_latencies.FillProto(result->mutable_latencies());
655  for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
656  it != merged_statuses.end(); ++it) {
657  RequestResultCount* rrc = result->add_request_results();
658  rrc->set_status_code(it->first);
659  rrc->set_count(it->second);
660  }
661 
662  // Fill in start and end time for the test scenario
663  result->mutable_summary()->mutable_start_time()->set_seconds(start_time);
664  result->mutable_summary()->mutable_end_time()->set_seconds(end_time);
665 
667  return result;
668 }
669 
670 bool RunQuit(
671  const std::string& credential_type,
672  const std::map<std::string, std::string>& per_worker_credential_types) {
673  // Get client, server lists
674  bool result = true;
675  auto workers = get_workers("QPS_WORKERS");
676  if (workers.empty()) {
677  return false;
678  }
679 
680  for (size_t i = 0; i < workers.size(); i++) {
681  auto stub = WorkerService::NewStub(grpc::CreateTestChannel(
682  workers[i],
683  GetCredType(workers[i], per_worker_credential_types, credential_type),
684  nullptr /* call creds */, {} /* interceptor creators */));
685  Void phony;
687  ctx.set_wait_for_ready(true);
688  Status s = stub->QuitWorker(&ctx, phony, &phony);
689  if (!s.ok()) {
690  gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
691  s.error_message().c_str());
692  result = false;
693  }
694  }
695  return result;
696 }
697 
698 } // namespace testing
699 } // namespace grpc
grpc::testing::FinishClients
static void FinishClients(const std::vector< ClientData > &clients, const ClientArgs &client_mark)
Definition: driver.cc:242
test_credentials_provider.h
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
gen_build_yaml.out
dictionary out
Definition: src/benchmark/gen_build_yaml.py:24
grpc::testing::ServerUserTime
static double ServerUserTime(const ServerStats &s)
Definition: driver.cc:114
grpc::testing::ServerIdleCpuTime
static double ServerIdleCpuTime(const ServerStats &s)
Definition: driver.cc:118
regen-readme.it
it
Definition: regen-readme.py:15
log.h
port.h
ctx
Definition: benchmark-async.c:30
generate.env
env
Definition: generate.py:37
grpc::gpr_free
gpr_free(creds_file_name)
timers.h
grpc
Definition: grpcpp/alarm.h:33
driver.h
histogram.h
client
Definition: examples/python/async_streaming/client.py:1
grpc::testing::sum
double sum(const T &container, F functor)
Definition: test/cpp/qps/stats.h:30
grpc::ClientContext::set_wait_for_ready
void set_wait_for_ready(bool wait_for_ready)
Definition: grpcpp/impl/codegen/client_context.h:285
grpc::testing::Cores
static int Cores(int n)
Definition: driver.cc:121
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
end_time
static int64_t end_time
Definition: benchmark-getaddrinfo.c:38
grpc::testing::ShutdownClients
static void ShutdownClients(const std::vector< ClientData > &clients, ScenarioResult &result)
Definition: driver.cc:289
grpc::testing::Histogram::FillProto
void FillProto(HistogramData *p)
Definition: cpp/qps/histogram.h:53
qps_worker.h
grpc_core::SplitHostPort
bool SplitHostPort(absl::string_view name, absl::string_view *host, absl::string_view *port)
Definition: host_port.cc:88
grpc::testing::ShutdownServers
static void ShutdownServers(const std::vector< ServerData > &servers, ScenarioResult &result)
Definition: driver.cc:344
ctx
static struct test_ctx ctx
Definition: test-ipc-send-recv.c:65
env.h
grpc::testing::ServerData
Definition: driver.cc:237
start_time
static int64_t start_time
Definition: benchmark-getaddrinfo.c:37
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
grpc::testing::ClientData::stream
unique_ptr< ClientReaderWriter< ClientArgs, ClientStatus > > stream
Definition: driver.cc:234
GPR_LOG_SEVERITY_DEBUG
@ GPR_LOG_SEVERITY_DEBUG
Definition: include/grpc/impl/codegen/log.h:46
iterator
const typedef MCPhysReg * iterator
Definition: MCRegisterInfo.h:27
failures
std::atomic< uint64_t > failures
Definition: outlier_detection.cc:233
grpc::testing::CliPollCount
static double CliPollCount(const ClientStats &s)
Definition: driver.cc:110
grpc::testing::GetCredType
std::string GetCredType(const std::string &worker_addr, const std::map< std::string, std::string > &per_worker_credential_types, const std::string &credential_type)
Definition: driver.cc:95
grpc_test_init
void grpc_test_init(int *argc, char **argv)
Definition: test/core/util/test_config.cc:135
grpc::testing::WallTime
static double WallTime(const ClientStats &s)
Definition: driver.cc:107
string_util.h
start
static uint64_t start
Definition: benchmark-pound.c:74
grpc::testing::ReceiveFinalStatusFromClients
static void ReceiveFinalStatusFromClients(const std::vector< ClientData > &clients, Histogram &merged_latencies, std::unordered_map< int, int64_t > &merged_statuses, ScenarioResult &result)
Definition: driver.cc:258
clients
static client_t clients[NUM_CLIENTS]
Definition: test-pipe-connect-multiple.c:40
grpc::testing::SystemTime
static double SystemTime(const ClientStats &s)
Definition: driver.cc:108
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
gpr_getenv
char * gpr_getenv(const char *name)
grpc::testing::ServerTotalCpuTime
static double ServerTotalCpuTime(const ServerStats &s)
Definition: driver.cc:115
grpc::testing::get_workers
static deque< string > get_workers(const string &env_name)
Definition: driver.cc:63
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
absl::string_view::size
constexpr size_type size() const noexcept
Definition: abseil-cpp/absl/strings/string_view.h:277
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
grpc::testing::RunScenario
std::unique_ptr< ScenarioResult > RunScenario(const ClientConfig &initial_client_config, size_t num_clients, const ServerConfig &initial_server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, const std::string &qps_server_target_override, const std::string &credential_type, const std::map< std::string, std::string > &per_worker_credential_types, bool run_inproc, int32_t median_latency_collection_interval_millis)
Definition: driver.cc:365
grpc.StatusCode
Definition: src/python/grpcio/grpc/__init__.py:232
INPROC_NAME_PREFIX
#define INPROC_NAME_PREFIX
Definition: client.h:52
successes
std::atomic< uint64_t > successes
Definition: outlier_detection.cc:232
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
stats.h
worker
Definition: worker.py:1
grpc::testing::IsSuccess
static bool IsSuccess(const Status &s)
Definition: driver.cc:123
grpc::testing::QpsWorker
Definition: qps_worker.h:39
gpr_timer_set_enabled
void gpr_timer_set_enabled(int)
Definition: basic_timers.cc:294
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
gen_stats_data.stats
list stats
Definition: gen_stats_data.py:58
framework.rpc.grpc_csds.ClientConfig
ClientConfig
Definition: grpc_csds.py:40
grpc_core::JoinHostPort
std::string JoinHostPort(absl::string_view host, int port)
Definition: host_port.cc:32
grpc::testing::UserTime
static double UserTime(const ClientStats &s)
Definition: driver.cc:109
histogram
static grpc_histogram * histogram
Definition: test/core/fling/client.cc:34
channel.h
grpc::testing::g_inproc_servers
std::vector< grpc::testing::Server * > * g_inproc_servers
Definition: driver.cc:363
host_port.h
grpc::testing::ServerWallTime
static double ServerWallTime(const ServerStats &s)
Definition: driver.cc:112
grpc::testing::ServerData::stub
unique_ptr< WorkerService::Stub > stub
Definition: driver.cc:238
grpc::CreateTestChannel
std::shared_ptr< Channel > CreateTestChannel(const std::string &server, const std::string &cred_type, const std::string &override_hostname, bool use_prod_roots, const std::shared_ptr< CallCredentials > &creds, const ChannelArguments &args)
Definition: create_test_channel.cc:88
grpc::testing::ServerSystemTime
static double ServerSystemTime(const ServerStats &s)
Definition: driver.cc:113
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
grpc::testing::postprocess_scenario_result
static void postprocess_scenario_result(ScenarioResult *result)
Definition: driver.cc:138
grpc::ClientContext
Definition: grpcpp/impl/codegen/client_context.h:195
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
test_config.h
grpc::testing::ClientData
Definition: driver.cc:232
grpc::testing::FinishServers
static void FinishServers(const std::vector< ServerData > &servers, const ServerArgs &server_mark)
Definition: driver.cc:308
grpc::testing::ServerData::stream
unique_ptr< ClientReaderWriter< ServerArgs, ServerStatus > > stream
Definition: driver.cc:239
grpc::ChannelArguments
Definition: grpcpp/support/channel_arguments.h:39
client_context.h
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
server
Definition: examples/python/async_streaming/server.py:1
alloc.h
env
Definition: env.py:1
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
grpc::testing::get_host
static std::string get_host(const std::string &worker)
Definition: driver.cc:56
grpc::testing::SvrPollCount
static double SvrPollCount(const ServerStats &s)
Definition: driver.cc:111
grpc::testing::ReceiveFinalStatusFromServer
static void ReceiveFinalStatusFromServer(const std::vector< ServerData > &servers, ScenarioResult &result)
Definition: driver.cc:324
grpc::testing::ClientData::stub
unique_ptr< WorkerService::Stub > stub
Definition: driver.cc:233
grpc::testing::Histogram
Definition: cpp/qps/histogram.h:28
grpc.StatusCode.CANCELLED
tuple CANCELLED
Definition: src/python/grpcio/grpc/__init__.py:261
gpr_strdup
GPRAPI char * gpr_strdup(const char *src)
Definition: string.cc:39
workers
struct child_worker * workers
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
grpc::testing::RunQuit
bool RunQuit(const std::string &credential_type, const std::map< std::string, std::string > &per_worker_credential_types)
Definition: driver.cc:670
gpr_timespec
Definition: gpr_types.h:50
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
int32_t
signed int int32_t
Definition: stdint-msvc2008.h:77
absl::string_view::data
constexpr const_pointer data() const noexcept
Definition: abseil-cpp/absl/strings/string_view.h:336
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
grpc::testing::Histogram::MergeProto
void MergeProto(const HistogramData &p)
Definition: cpp/qps/histogram.h:65
gpr_set_log_verbosity
GPRAPI void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print)
Definition: log.cc:96
client.h
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
run_interop_tests.servers
servers
Definition: run_interop_tests.py:1288
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
create_channel.h
gpr_time_from_seconds
GPRAPI gpr_timespec gpr_time_from_seconds(int64_t s, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:123
grpc::testing::average
double average(const T &container, F functor)
Definition: test/cpp/qps/stats.h:39
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:12