25 #include <type_traits>
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_format.h"
40 #include "src/proto/grpc/testing/empty.pb.h"
41 #include "src/proto/grpc/testing/messages.pb.h"
42 #include "src/proto/grpc/testing/test.grpc.pb.h"
51 const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
52 const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
53 const int kNumResponseMessages = 2000;
54 const int kResponseMessageSize = 1030;
55 const int kReceiveDelayMilliSeconds = 20;
56 const int kLargeRequestSize = 271828;
57 const int kLargeResponseSize = 314159;
59 void NoopChecks(
const InteropClientContextInspector& ,
63 void UnaryCompressionChecks(
const InteropClientContextInspector& inspector,
67 inspector.GetCallCompressionAlgorithm();
68 if (
request->response_compressed().value()) {
72 "Failure: Requested compression but got uncompressed response "
86 : channel_creation_func_(
std::
move(channel_creation_func)),
88 new_stub_every_call_(new_stub_every_call) {
91 if (!new_stub_every_call) {
97 if (new_stub_every_call_) {
104 UnimplementedService::Stub*
106 if (unimplemented_service_stub_ ==
nullptr) {
107 unimplemented_service_stub_ = UnimplementedService::NewStub(
channel_);
109 return unimplemented_service_stub_.get();
113 channel_ = channel_creation_func_();
114 if (!new_stub_every_call_) {
120 bool new_stub_every_test_case,
121 bool do_not_abort_on_transient_failures)
140 if (s.error_code() == expected_code) {
145 "Error status code: %d (expected: %d), message: %s,"
147 s.error_code(), expected_code, s.error_message().c_str(),
148 optional_debug_string.c_str());
186 request->set_response_size(kLargeResponseSize);
188 request->mutable_payload()->set_body(
payload.c_str(), kLargeRequestSize);
189 if (
request->has_expect_compressed()) {
190 if (
request->expect_compressed().value()) {
214 "Sending a large unary rpc with compute engine credentials ...");
217 request.set_fill_username(
true);
218 request.set_fill_oauth_scope(
true);
229 const char* oauth_scope_str =
response.oauth_scope().c_str();
238 "Sending a unary rpc with raw oauth2 access token credentials ...");
241 request.set_fill_username(
true);
242 request.set_fill_oauth_scope(
true);
255 const char* oauth_scope_str =
response.oauth_scope().c_str();
262 gpr_log(
GPR_DEBUG,
"Sending a unary rpc with per-rpc JWT access token ...");
265 request.set_fill_username(
true);
269 std::shared_ptr<CallCredentials> creds =
288 "Sending a large unary rpc with JWT token credentials ...");
291 request.set_fill_username(
true);
306 "Sending a large unary rpc with GoogleDefaultCredentials...");
309 request.set_fill_username(
true);
340 probe_req.mutable_expect_compressed()->set_value(
true);
342 probe_req.set_response_size(kLargeResponseSize);
343 probe_req.mutable_payload()->set_body(
std::string(kLargeRequestSize,
'\0'));
354 gpr_log(
GPR_DEBUG,
"Compressed unary request probe succeeded. Proceeding.");
356 const std::vector<bool> compressions = {
true,
false};
357 for (
size_t i = 0;
i < compressions.size();
i++) {
365 request.mutable_expect_compressed()->set_value(compressions[
i]);
380 const std::vector<bool> compressions = {
true,
false};
381 for (
size_t i = 0;
i < compressions.size();
i++) {
389 request.mutable_response_compressed()->set_value(compressions[
i]);
421 std::unique_ptr<ClientWriter<StreamingInputCallRequest>>
stream(
424 int aggregated_payload_size = 0;
425 for (
size_t i = 0;
i < request_stream_sizes.size(); ++
i) {
432 aggregated_payload_size += request_stream_sizes[
i];
450 for (
unsigned int i = 0;
i < response_stream_sizes.size(); ++
i) {
452 response_parameter->set_size(response_stream_sizes[
i]);
455 std::unique_ptr<ClientReader<StreamingOutputCallResponse>>
stream(
465 if (
i < response_stream_sizes.size()) {
469 "DoResponseStreaming(): Read fewer streams (%d) than "
470 "response_stream_sizes.size() (%" PRIuPTR
")",
471 i, response_stream_sizes.size());
491 probe_req.mutable_expect_compressed()->set_value(
true);
492 probe_req.mutable_payload()->set_body(
std::string(27182,
'\0'));
496 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
499 if (!probe_stream->Write(probe_req)) {
503 Status s = probe_stream->Finish();
511 "Compressed streaming request probe succeeded. Proceeding.");
518 std::unique_ptr<ClientWriter<StreamingInputCallRequest>>
stream(
522 request.mutable_expect_compressed()->set_value(
true);
532 request.mutable_expect_compressed()->set_value(
false);
545 const std::vector<bool> compressions = {
true,
false};
546 const std::vector<int> sizes = {31415, 92653};
552 GPR_ASSERT(compressions.size() == sizes.size());
553 for (
size_t i = 0;
i < sizes.size();
i++) {
556 compressions[
i] ?
"true" :
"false", sizes[
i]);
558 gpr_log(
GPR_DEBUG,
"Sending request streaming rpc %s.", log_suffix.c_str());
561 request.add_response_parameters();
562 response_parameter->mutable_compressed()->set_value(compressions[
i]);
563 response_parameter->set_size(sizes[
i]);
565 std::unique_ptr<ClientReader<StreamingOutputCallResponse>>
stream(
577 if (
request.response_parameters(
k).compressed().value()) {
587 if (
k < sizes.size()) {
591 "%s(): Responses read (k=%" PRIuPTR
592 ") is less than the expected number of messages (%" PRIuPTR
").",
593 __func__,
k, sizes.size());
602 gpr_log(
GPR_DEBUG,
"Receiving response streaming rpc with slow consumer ...");
607 for (
int i = 0;
i < kNumResponseMessages; ++
i) {
609 response_parameter->set_size(kResponseMessageSize);
612 std::unique_ptr<ClientReader<StreamingOutputCallResponse>>
stream(
626 if (
i < kNumResponseMessages) {
628 "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
629 "less than the expected messages (i.e kNumResponseMessages = %d)",
630 i, kNumResponseMessages);
654 for (
unsigned int i = 0;
i < response_stream_sizes.size(); ++
i) {
655 response_parameter->set_size(response_stream_sizes[
i]);
672 if (
i < response_stream_sizes.size()) {
676 "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
677 "number of messages response_stream_sizes.size() (%" PRIuPTR
")",
678 i, response_stream_sizes.size());
704 for (
unsigned int i = 0;
i < request_stream_sizes.size(); ++
i) {
705 response_parameter->set_size(response_stream_sizes[
i]);
742 std::unique_ptr<ClientWriter<StreamingInputCallRequest>>
stream(
768 response_parameter->set_size(31415);
793 "Sending Ping Pong streaming rpc with a short deadline...");
839 "Sending RPC with a request for status code 2 and message");
842 const std::string test_msg =
"This is a test message";
849 requested_status->set_code(test_code);
850 requested_status->set_message(test_msg);
864 requested_status = streaming_request.mutable_response_status();
865 requested_status->set_code(test_code);
866 requested_status->set_message(test_msg);
867 stream->Write(streaming_request);
870 while (
stream->Read(&streaming_response)) {
886 "Sending RPC with a request for status code 2 and message - \\t\\ntest "
887 "with whitespace\\r\\nand Unicode BMP ☺ and non-BMP 😈\\t\\n");
890 "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n";
895 requested_status->set_code(test_code);
896 requested_status->set_message(test_msg);
908 const int rpcCount = 100;
912 request.set_fill_server_id(
true);
913 for (
int i = 0;
i < rpcCount;
i++) {
923 if (
response.server_id() != server_id) {
925 response.server_id().c_str(), server_id.c_str());
935 const std::string kInitialMetadataValue(
"test_initial_metadata_value");
937 "x-grpc-test-echo-trailing-bin");
938 const std::string kTrailingBinValue(
"\x0a\x0b\x0a\x0b\x0a\x0b");
947 request.set_response_size(kLargeResponseSize);
949 request.mutable_payload()->set_body(
payload.c_str(), kLargeRequestSize);
980 response_parameter->set_size(kLargeResponseSize);
982 request.mutable_payload()->set_body(
payload.c_str(), kLargeRequestSize);
1023 std::tuple<bool, int32_t, std::string, std::string>
1025 const bool reset_channel,
1026 const int32_t max_acceptable_per_iteration_latency_ms) {
1035 request.set_response_size(kLargeResponseSize);
1037 request.mutable_payload()->set_body(
payload.c_str(), kLargeRequestSize);
1038 if (reset_channel) {
1047 }
else if (elapsed_ms > max_acceptable_per_iteration_latency_ms) {
1049 "%d ms exceeds max acceptable latency: %d ms, peer: %s", elapsed_ms,
1050 max_acceptable_per_iteration_latency_ms,
context.
peer());
1059 const bool reset_channel_per_iteration,
const int32_t soak_iterations,
1061 const int32_t max_acceptable_per_iteration_latency_ms,
1062 const int32_t min_time_ms_between_rpcs,
1063 const int32_t overall_timeout_seconds) {
1064 std::vector<std::tuple<bool, int32_t, std::string, std::string>>
results;
1072 int total_failures = 0;
1074 i < soak_iterations &&
1081 reset_channel_per_iteration, max_acceptable_per_iteration_latency_ms);
1082 bool success = std::get<0>(
result);
1089 "soak iteration: %d elapsed_ms: %d peer: %s failed: %s",
i,
1090 elapsed_ms, peer.c_str(), debug_string.c_str());
1094 i, elapsed_ms, peer.c_str());
1100 double latency_ms_median =
1102 double latency_ms_90th =
1106 if (iterations_ran < soak_iterations) {
1109 "soak test consumed all %d seconds of time and quit early, only "
1110 "having ran %d out of desired %d iterations. "
1111 "total_failures: %d. "
1112 "max_failures_threshold: %d. "
1113 "median_soak_iteration_latency: %lf ms. "
1114 "90th_soak_iteration_latency: %lf ms. "
1115 "worst_soak_iteration_latency: %lf ms. "
1116 "Some or all of the iterations that did run were unexpectedly slow. "
1117 "See breakdown above for which iterations succeeded, failed, and "
1118 "why for more info.",
1119 overall_timeout_seconds, iterations_ran, soak_iterations,
1120 total_failures, max_failures, latency_ms_median, latency_ms_90th,
1123 }
else if (total_failures > max_failures) {
1125 "soak test ran: %d iterations. total_failures: %d exceeds "
1126 "max_failures_threshold: %d. "
1127 "median_soak_iteration_latency: %lf ms. "
1128 "90th_soak_iteration_latency: %lf ms. "
1129 "worst_soak_iteration_latency: %lf ms. "
1130 "See breakdown above for which iterations succeeded, failed, and "
1131 "why for more info.",
1132 soak_iterations, total_failures, max_failures, latency_ms_median,
1133 latency_ms_90th, latency_ms_worst);
1137 "soak test ran: %d iterations. total_failures: %d is within "
1138 "max_failures_threshold: %d. "
1139 "median_soak_iteration_latency: %lf ms. "
1140 "90th_soak_iteration_latency: %lf ms. "
1141 "worst_soak_iteration_latency: %lf ms. "
1142 "See breakdown above for which iterations succeeded, failed, and "
1143 "why for more info.",
1144 soak_iterations, total_failures, max_failures, latency_ms_median,
1145 latency_ms_90th, latency_ms_worst);
1151 int64_t max_acceptable_per_iteration_latency_ms,
1152 int32_t soak_min_time_ms_between_rpcs,
int32_t overall_timeout_seconds) {
1156 max_failures, max_acceptable_per_iteration_latency_ms,
1157 soak_min_time_ms_between_rpcs, overall_timeout_seconds);
1164 int64_t max_acceptable_per_iteration_latency_ms,
1165 int32_t soak_min_time_ms_between_rpcs,
int32_t overall_timeout_seconds) {
1166 gpr_log(
GPR_DEBUG,
"Sending %d RPCs, tearing down the channel each time...",
1170 max_failures, max_acceptable_per_iteration_latency_ms,
1171 soak_min_time_ms_between_rpcs, overall_timeout_seconds);
1184 for (
int i = 0;
i < soak_iterations; ++
i) {