stress_test.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *is % allowed in string
17  */
18 
19 #include <memory>
20 #include <string>
21 #include <thread>
22 #include <utility>
23 #include <vector>
24 
25 #include "absl/flags/flag.h"
26 
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/grpcpp.h>
31 
32 #include "src/proto/grpc/testing/metrics.grpc.pb.h"
33 #include "src/proto/grpc/testing/metrics.pb.h"
39 
41 
42 ABSL_FLAG(int32_t, metrics_port, 8081, "The metrics server port.");
43 
44 // TODO(Capstan): Consider using absl::Duration
45 ABSL_FLAG(int32_t, sleep_duration_ms, 0,
46  "The duration (in millisec) between two"
47  " consecutive test calls (per server) issued by the server.");
48 
49 // TODO(Capstan): Consider using absl::Duration
50 ABSL_FLAG(int32_t, test_duration_secs, -1,
51  "The length of time (in seconds) to run"
52  " the test. Enter -1 if the test should run continuously until"
53  " forcefully terminated.");
54 
55 ABSL_FLAG(std::string, server_addresses, "localhost:8080",
56  "The list of server addresses. The format is: \n"
57  " \"<name_1>:<port_1>,<name_2>:<port_1>...<name_N>:<port_N>\"\n"
58  " Note: <name> can be servername or IP address.");
59 
60 ABSL_FLAG(int32_t, num_channels_per_server, 1,
61  "Number of channels for each server");
62 
63 ABSL_FLAG(int32_t, num_stubs_per_channel, 1,
64  "Number of stubs per each channels to server. This number also "
65  "indicates the max number of parallel RPC calls on each channel "
66  "at any given time.");
67 
68 // TODO(sreek): Add more test cases here in future
70  "List of test cases to call along with the"
71  " relative weights in the following format:\n"
72  " \"<testcase_1:w_1>,<testcase_2:w_2>...<testcase_n:w_n>\"\n"
73  " The following testcases are currently supported:\n"
74  " empty_unary\n"
75  " large_unary\n"
76  " large_compressed_unary\n"
77  " client_streaming\n"
78  " server_streaming\n"
79  " server_compressed_streaming\n"
80  " slow_consumer\n"
81  " half_duplex\n"
82  " ping_pong\n"
83  " cancel_after_begin\n"
84  " cancel_after_first_response\n"
85  " timeout_on_sleeping_server\n"
86  " empty_stream\n"
87  " status_code_and_message\n"
88  " custom_metadata\n"
89  " Example: \"empty_unary:20,large_unary:10,empty_stream:70\"\n"
90  " The above will execute 'empty_unary', 20% of the time,"
91  " 'large_unary', 10% of the time and 'empty_stream' the remaining"
92  " 70% of the time");
93 
95  "Severity level of messages that should be logged. Any messages "
96  "greater than or equal to the level set here will be logged. "
97  "The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 "
98  "(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)");
99 
100 ABSL_FLAG(bool, do_not_abort_on_transient_failures, true,
101  "If set to 'true', abort() is not called in case of transient "
102  "failures like temporary connection failures.");
103 
104 // Options from client.cc (for compatibility with interop test).
105 // TODO(sreek): Consolidate overlapping options
106 ABSL_FLAG(bool, use_alts, false,
107  "Whether to use alts. Enable alts will disable tls.");
108 ABSL_FLAG(bool, use_tls, false, "Whether to use tls.");
109 ABSL_FLAG(bool, use_test_ca, false, "False to use SSL roots for google");
110 ABSL_FLAG(std::string, server_host_override, "",
111  "Override the server host which is sent in HTTP header");
112 
113 using grpc::testing::ALTS;
119 using grpc::testing::TLS;
123 
125 
126 // A simple wrapper to grp_default_log() function. This only logs messages at or
127 // above the current log level (set in 'log_level' variable)
129  if (args->severity >= log_level) {
131  }
132 }
133 
135  TestCaseType test_case = UNKNOWN_TEST;
136 
137  for (auto it = kTestCaseList.begin(); it != kTestCaseList.end(); it++) {
138  if (test_name == it->second) {
139  test_case = it->first;
140  break;
141  }
142  }
143 
144  return test_case;
145 }
146 
147 // Converts a string of comma delimited tokens to a vector of tokens
148 bool ParseCommaDelimitedString(const std::string& comma_delimited_str,
149  std::vector<std::string>& tokens) {
150  size_t bpos = 0;
151  size_t epos = std::string::npos;
152 
153  while ((epos = comma_delimited_str.find(',', bpos)) != std::string::npos) {
154  tokens.emplace_back(comma_delimited_str.substr(bpos, epos - bpos));
155  bpos = epos + 1;
156  }
157 
158  tokens.emplace_back(comma_delimited_str.substr(bpos)); // Last token
159  return true;
160 }
161 
162 // Input: Test case string "<testcase_name:weight>,<testcase_name:weight>...."
163 // Output:
164 // - Whether parsing was successful (return value)
165 // - Vector of (test_type_enum, weight) pairs returned via 'tests' parameter
167  std::vector<std::pair<TestCaseType, int>>& tests) {
168  bool is_success = true;
169 
170  std::vector<std::string> tokens;
172 
173  for (auto it = tokens.begin(); it != tokens.end(); it++) {
174  // Token is in the form <test_name>:<test_weight>
175  size_t colon_pos = it->find(':');
176  if (colon_pos == std::string::npos) {
177  gpr_log(GPR_ERROR, "Error in parsing test case string: %s", it->c_str());
178  is_success = false;
179  break;
180  }
181 
182  std::string test_name = it->substr(0, colon_pos);
183  int weight = std::stoi(it->substr(colon_pos + 1));
184  TestCaseType test_case = GetTestTypeFromName(test_name);
185  if (test_case == UNKNOWN_TEST) {
186  gpr_log(GPR_ERROR, "Unknown test case: %s", test_name.c_str());
187  is_success = false;
188  break;
189  }
190 
191  tests.emplace_back(std::make_pair(test_case, weight));
192  }
193 
194  return is_success;
195 }
196 
197 // For debugging purposes
198 void LogParameterInfo(const std::vector<std::string>& addresses,
199  const std::vector<std::pair<TestCaseType, int>>& tests) {
200  gpr_log(GPR_INFO, "server_addresses: %s",
201  absl::GetFlag(FLAGS_server_addresses).c_str());
202  gpr_log(GPR_INFO, "test_cases : %s", absl::GetFlag(FLAGS_test_cases).c_str());
203  gpr_log(GPR_INFO, "sleep_duration_ms: %d",
204  absl::GetFlag(FLAGS_sleep_duration_ms));
205  gpr_log(GPR_INFO, "test_duration_secs: %d",
206  absl::GetFlag(FLAGS_test_duration_secs));
207  gpr_log(GPR_INFO, "num_channels_per_server: %d",
208  absl::GetFlag(FLAGS_num_channels_per_server));
209  gpr_log(GPR_INFO, "num_stubs_per_channel: %d",
210  absl::GetFlag(FLAGS_num_stubs_per_channel));
211  gpr_log(GPR_INFO, "log_level: %d", absl::GetFlag(FLAGS_log_level));
212  gpr_log(GPR_INFO, "do_not_abort_on_transient_failures: %s",
213  absl::GetFlag(FLAGS_do_not_abort_on_transient_failures) ? "true"
214  : "false");
215 
216  int num = 0;
217  for (auto it = addresses.begin(); it != addresses.end(); it++) {
218  gpr_log(GPR_INFO, "%d:%s", ++num, it->c_str());
219  }
220 
221  num = 0;
222  for (auto it = tests.begin(); it != tests.end(); it++) {
223  TestCaseType test_case = it->first;
224  int weight = it->second;
225  gpr_log(GPR_INFO, "%d. TestCaseType: %d, Weight: %d", ++num, test_case,
226  weight);
227  }
228 }
229 
230 int main(int argc, char** argv) {
231  grpc::testing::InitTest(&argc, &argv, true);
232 
233  if (absl::GetFlag(FLAGS_log_level) > GPR_LOG_SEVERITY_ERROR ||
234  absl::GetFlag(FLAGS_log_level) < GPR_LOG_SEVERITY_DEBUG) {
235  gpr_log(GPR_ERROR, "log_level should be an integer between %d and %d",
237  return 1;
238  }
239 
240  // Change the default log function to TestLogFunction which respects the
241  // log_level setting.
242  log_level = absl::GetFlag(FLAGS_log_level);
244 
245  srand(time(nullptr));
246 
247  // Parse the server addresses
248  std::vector<std::string> server_addresses;
249  ParseCommaDelimitedString(absl::GetFlag(FLAGS_server_addresses),
251 
252  // Parse test cases and weights
253  if (absl::GetFlag(FLAGS_test_cases).length() == 0) {
254  gpr_log(GPR_ERROR, "No test cases supplied");
255  return 1;
256  }
257 
258  std::vector<std::pair<TestCaseType, int>> tests;
259  if (!ParseTestCasesString(absl::GetFlag(FLAGS_test_cases), tests)) {
260  gpr_log(GPR_ERROR, "Error in parsing test cases string %s ",
261  absl::GetFlag(FLAGS_test_cases).c_str());
262  return 1;
263  }
264 
266 
267  WeightedRandomTestSelector test_selector(tests);
268  MetricsServiceImpl metrics_service;
269 
270  gpr_log(GPR_INFO, "Starting test(s)..");
271 
272  std::vector<std::thread> test_threads;
273  std::vector<std::unique_ptr<StressTestInteropClient>> clients;
274 
275  // Create and start the test threads.
276  // Note that:
277  // - Each server can have multiple channels (as configured by
278  // FLAGS_num_channels_per_server).
279  //
280  // - Each channel can have multiple stubs (as configured by
281  // FLAGS_num_stubs_per_channel). This is to test calling multiple RPCs in
282  // parallel on the same channel.
283  int thread_idx = 0;
284  int server_idx = -1;
285  char buffer[256];
286  transport_security security_type =
287  absl::GetFlag(FLAGS_use_alts)
288  ? ALTS
289  : (absl::GetFlag(FLAGS_use_tls) ? TLS : INSECURE);
290  for (auto it = server_addresses.begin(); it != server_addresses.end(); it++) {
291  ++server_idx;
292  // Create channel(s) for each server
293  for (int channel_idx = 0;
294  channel_idx < absl::GetFlag(FLAGS_num_channels_per_server);
295  channel_idx++) {
296  gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(),
297  channel_idx);
298  grpc::testing::ChannelCreationFunc channel_creation_func =
299  std::bind(static_cast<std::shared_ptr<grpc::Channel> (*)(
300  const std::string&, const std::string&,
303  *it, absl::GetFlag(FLAGS_server_host_override),
304  security_type, !absl::GetFlag(FLAGS_use_test_ca));
305 
306  // Create stub(s) for each channel
307  for (int stub_idx = 0;
308  stub_idx < absl::GetFlag(FLAGS_num_stubs_per_channel); stub_idx++) {
309  clients.emplace_back(new StressTestInteropClient(
310  ++thread_idx, *it, channel_creation_func, test_selector,
311  absl::GetFlag(FLAGS_test_duration_secs),
312  absl::GetFlag(FLAGS_sleep_duration_ms),
313  absl::GetFlag(FLAGS_do_not_abort_on_transient_failures)));
314 
315  bool is_already_created = false;
316  // QpsGauge name
317  std::snprintf(buffer, sizeof(buffer),
318  "/stress_test/server_%d/channel_%d/stub_%d/qps",
319  server_idx, channel_idx, stub_idx);
320 
321  test_threads.emplace_back(std::thread(
322  &StressTestInteropClient::MainLoop, clients.back().get(),
323  metrics_service.CreateQpsGauge(buffer, &is_already_created)));
324 
325  // The QpsGauge should not have been already created
326  GPR_ASSERT(!is_already_created);
327  }
328  }
329  }
330 
331  // Start metrics server before waiting for the stress test threads
332  std::unique_ptr<grpc::Server> metrics_server;
333  if (absl::GetFlag(FLAGS_metrics_port) > 0) {
334  metrics_server =
335  metrics_service.StartServer(absl::GetFlag(FLAGS_metrics_port));
336  }
337 
338  // Wait for the stress test threads to complete
339  for (auto it = test_threads.begin(); it != test_threads.end(); it++) {
340  it->join();
341  }
342 
343  return 0;
344 }
grpc::testing::InitTest
void InitTest(int *argc, char ***argv, bool remove_flags)
Definition: test_config_cc.cc:28
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
regen-readme.it
it
Definition: regen-readme.py:15
log.h
log_level
static int log_level
Definition: stress_test.cc:124
gpr_default_log
void gpr_default_log(gpr_log_func_args *args)
grpc::testing::kTestCaseList
const vector< pair< TestCaseType, std::string > > kTestCaseList
Definition: stress_interop_client.h:58
stress_interop_client.h
grpc::testing::transport_security
transport_security
Definition: create_test_channel.h:34
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
time.h
GPR_LOG_SEVERITY_DEBUG
@ GPR_LOG_SEVERITY_DEBUG
Definition: include/grpc/impl/codegen/log.h:46
create_test_channel.h
gpr_log_func_args
Definition: include/grpc/impl/codegen/log.h:77
TestLogFunction
void TestLogFunction(gpr_log_func_args *args)
Definition: stress_test.cc:128
GPR_LOG_SEVERITY_INFO
@ GPR_LOG_SEVERITY_INFO
Definition: include/grpc/impl/codegen/log.h:47
grpc::testing::MetricsServiceImpl::StartServer
std::unique_ptr< grpc::Server > StartServer(int port)
Definition: metrics_server.cc:100
grpc::testing::MetricsServiceImpl::CreateQpsGauge
std::shared_ptr< QpsGauge > CreateQpsGauge(const std::string &name, bool *already_present)
Definition: metrics_server.cc:83
LogParameterInfo
void LogParameterInfo(const std::vector< std::string > &addresses, const std::vector< std::pair< TestCaseType, int >> &tests)
Definition: stress_test.cc:198
clients
static client_t clients[NUM_CLIENTS]
Definition: test-pipe-connect-multiple.c:40
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
metrics_server.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::testing::WeightedRandomTestSelector
Definition: stress_interop_client.h:77
grpcpp.h
run_interop_tests.server_addresses
dictionary server_addresses
Definition: run_interop_tests.py:1390
interop_client.h
absl::GetFlag
ABSL_MUST_USE_RESULT T GetFlag(const absl::Flag< T > &flag)
Definition: abseil-cpp/absl/flags/flag.h:98
GPR_LOG_SEVERITY_ERROR
@ GPR_LOG_SEVERITY_ERROR
Definition: include/grpc/impl/codegen/log.h:48
ABSL_FLAG
ABSL_FLAG(int32_t, metrics_port, 8081, "The metrics server port.")
buffer
char buffer[1024]
Definition: libuv/docs/code/idle-compute/main.c:8
grpc::CreateTestChannel
std::shared_ptr< Channel > CreateTestChannel(const std::string &server, const std::string &cred_type, const std::string &override_hostname, bool use_prod_roots, const std::shared_ptr< CallCredentials > &creds, const ChannelArguments &args)
Definition: create_test_channel.cc:88
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
weight
uint32_t weight
Definition: weighted_target.cc:84
tests
static BloatyTestEntry tests[]
Definition: bloaty_test_pe.cc:112
grpc::testing::MetricsServiceImpl
Definition: metrics_server.h:73
ParseCommaDelimitedString
bool ParseCommaDelimitedString(const std::string &comma_delimited_str, std::vector< std::string > &tokens)
Definition: stress_test.cc:148
grpc::testing::StressTestInteropClient
Definition: stress_interop_client.h:92
grpc::testing::ChannelCreationFunc
std::function< std::shared_ptr< Channel >void)> ChannelCreationFunc
Definition: interop_client.h:38
grpc::testing::UNKNOWN_TEST
@ UNKNOWN_TEST
Definition: stress_interop_client.h:38
ParseTestCasesString
bool ParseTestCasesString(const std::string &test_cases, std::vector< std::pair< TestCaseType, int >> &tests)
Definition: stress_test.cc:166
grpc::testing::ALTS
@ ALTS
Definition: create_test_channel.h:34
tests
Definition: src/python/grpcio_tests/tests/__init__.py:1
grpc::testing::INSECURE
@ INSECURE
Definition: create_test_channel.h:34
GetTestTypeFromName
TestCaseType GetTestTypeFromName(const std::string &test_name)
Definition: stress_test.cc:134
grpc::testing::TestCaseType
TestCaseType
Definition: stress_interop_client.h:37
xds_manager.num
num
Definition: xds_manager.py:56
test_config.h
python_utils.upload_rbe_results.test_cases
list test_cases
Definition: upload_rbe_results.py:197
gpr_set_log_function
GPRAPI void gpr_set_log_function(gpr_log_func func)
Definition: log.cc:143
length
std::size_t length
Definition: abseil-cpp/absl/time/internal/test_util.cc:57
int32_t
signed int int32_t
Definition: stdint-msvc2008.h:77
main
int main(int argc, char **argv)
Definition: stress_test.cc:230
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
create_channel.h
grpc::testing::TLS
@ TLS
Definition: create_test_channel.h:34


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:21