24 #include <gmock/gmock.h>
25 #include <gtest/gtest.h>
27 #include "absl/memory/memory.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_format.h"
30 #include "absl/strings/str_join.h"
31 #include "absl/strings/string_view.h"
68 #include "src/proto/grpc/testing/echo.grpc.pb.h"
69 #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
77 using grpc::testing::EchoRequest;
78 using grpc::testing::EchoResponse;
84 constexpr
char kRequestMessage[] =
"Live long and prosper.";
97 if (
request->has_param() &&
request->param().has_backend_metrics()) {
99 auto* recorder =
context->ExperimentalGetCallMetricRecorder();
101 recorder->RecordCpuUtilizationMetric(
load_report_.cpu_utilization())
102 .RecordMemoryUtilizationMetric(
load_report_.mem_utilization());
104 recorder->RecordRequestCostMetric(
p.first,
p.second);
107 recorder->RecordUtilizationMetric(
p.first,
p.second);
113 int request_count() {
118 void ResetCounters() {
123 std::set<std::string>
clients() {
142 class FakeResolverResponseGeneratorWrapper {
144 explicit FakeResolverResponseGeneratorWrapper(
bool ipv6_only)
149 FakeResolverResponseGeneratorWrapper(
150 FakeResolverResponseGeneratorWrapper&& other) noexcept {
155 void SetNextResolution(
156 const std::vector<int>& ports,
const char* service_config_json =
nullptr,
157 const char* attribute_key =
nullptr,
158 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
162 BuildFakeResults(
ipv6_only_, ports, service_config_json, attribute_key,
166 void SetNextResolutionUponError(
const std::vector<int>& ports) {
172 void SetFailureOnReresolution() {
183 bool ipv6_only,
const std::vector<int>& ports,
184 const char* service_config_json =
nullptr,
185 const char* attribute_key =
nullptr,
186 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
190 for (
const int&
port : ports) {
197 std::unique_ptr<grpc_core::ServerAddress::AttributeInterface>>
199 if (attribute !=
nullptr) {
200 attributes[attribute_key] = attribute->Copy();
205 if (
result.addresses->empty()) {
206 result.resolution_note =
"fake resolver empty address list";
208 if (service_config_json !=
nullptr) {
211 nullptr, service_config_json, &
error);
224 ClientLbEnd2endTest()
226 creds_(
new SecureChannelCredentials(
229 static void SetUpTestCase() {
239 void SetUp()
override {
241 bool localhost_resolves_to_ipv4 =
false;
242 bool localhost_resolves_to_ipv6 =
false;
244 &localhost_resolves_to_ipv6);
245 ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
248 void TearDown()
override {
249 for (
size_t i = 0;
i <
servers_.size(); ++
i) {
257 void CreateServers(
size_t num_servers,
258 std::vector<int> ports = std::vector<int>()) {
260 for (
size_t i = 0;
i < num_servers; ++
i) {
262 if (ports.size() == num_servers)
port = ports[
i];
269 void StartServers(
size_t num_servers,
270 std::vector<int> ports = std::vector<int>()) {
271 CreateServers(num_servers,
std::move(ports));
272 for (
size_t i = 0;
i < num_servers; ++
i) {
277 std::vector<int> GetServersPorts(
size_t start_index = 0,
278 size_t stop_index = 0) {
279 if (stop_index == 0) stop_index =
servers_.size();
280 std::vector<int> ports;
281 for (
size_t i = start_index;
i < stop_index; ++
i) {
287 FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
288 return FakeResolverResponseGeneratorWrapper(
ipv6_only_);
291 std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
292 const std::shared_ptr<Channel>&
channel) {
293 return grpc::testing::EchoTestService::NewStub(
channel);
296 std::shared_ptr<Channel> BuildChannel(
298 const FakeResolverResponseGeneratorWrapper& response_generator,
299 ChannelArguments
args = ChannelArguments()) {
300 if (!lb_policy_name.empty()) {
301 args.SetLoadBalancingPolicyName(lb_policy_name);
304 response_generator.Get());
309 const std::unique_ptr<grpc::testing::EchoTestService::Stub>&
stub,
312 EchoResponse local_response;
314 EchoRequest local_request;
316 request->set_message(kRequestMessage);
317 request->mutable_param()->set_echo_metadata(
true);
329 const std::unique_ptr<grpc::testing::EchoTestService::Stub>&
stub,
331 xds::data::orca::v3::OrcaLoadReport* load_report =
nullptr,
335 EchoRequest* request_ptr =
nullptr;
336 if (load_report !=
nullptr) {
338 auto params =
request.mutable_param();
339 auto backend_metrics = params->mutable_backend_metrics();
340 *backend_metrics = *load_report;
345 <<
"From " << location.
file() <<
":" << location.
line()
346 <<
"\nError: " <<
status.error_message() <<
" "
347 <<
status.error_details();
349 <<
"From " << location.
file() <<
":" << location.
line();
352 void CheckRpcSendFailure(
354 const std::unique_ptr<grpc::testing::EchoTestService::Stub>&
stub,
359 << location.
file() <<
":" << location.
line();
362 << location.
file() <<
":" << location.
line();
377 explicit ServerData(
int port = 0)
385 thread_ = absl::make_unique<std::thread>(
387 while (!server_ready_) {
390 server_ready_ =
false;
399 std::shared_ptr<ServerCredentials> creds(
new SecureServerCredentials(
406 server_ready_ =
true;
419 server_->GetHealthCheckService()->SetServingStatus(
service, serving);
423 void ResetCounters() {
427 bool SeenAllServers(
size_t start_index = 0,
size_t stop_index = 0) {
428 if (stop_index == 0) stop_index =
servers_.size();
429 for (
size_t i = start_index;
i < stop_index; ++
i) {
439 const std::unique_ptr<grpc::testing::EchoTestService::Stub>&
stub,
440 size_t start_index = 0,
size_t stop_index = 0,
443 if (stop_index == 0) stop_index =
servers_.size();
446 "========= WAITING FOR BACKENDS [%" PRIuPTR
", %" PRIuPTR
448 start_index, stop_index);
449 while (!SeenAllServers(start_index, stop_index)) {
451 if (status_check !=
nullptr) {
455 <<
" code=" <<
status.error_code() <<
" message=\""
456 <<
status.error_message() <<
"\" at " << location.
file() <<
":"
460 <<
" at " << location.
file() <<
":" << location.
line();
468 const std::unique_ptr<grpc::testing::EchoTestService::Stub>&
stub,
471 WaitForServers(location,
stub, server_index, server_index + 1,
475 bool WaitForChannelState(
483 if (predicate(
state))
break;
484 if (!
channel->WaitForStateChange(
state, deadline))
return false;
505 void UpdateConnectionOrder(
506 const std::vector<std::unique_ptr<ServerData>>&
servers,
507 std::vector<int>* connection_order) {
508 for (
size_t i = 0;
i <
servers.size(); ++
i) {
512 std::find(connection_order->begin(), connection_order->end(), i);
513 if (
it == connection_order->end()) {
514 connection_order->push_back(i);
523 std::shared_ptr<ChannelCredentials>
creds_;
527 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
528 const int kNumServers = 3;
529 StartServers(kNumServers);
530 auto response_generator = BuildResolverResponseGenerator();
531 auto channel = BuildChannel(
"", response_generator);
544 response_generator.SetNextResolution(GetServersPorts());
549 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
551 const int kNumServers = 1;
552 StartServers(kNumServers);
554 ChannelArguments
args;
556 auto response_generator = BuildResolverResponseGenerator();
557 auto channel = BuildChannel(
"", response_generator,
args);
563 response_generator.SetNextResolution(GetServersPorts());
572 gpr_log(
GPR_INFO,
"*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***");
573 response_generator.SetNextResolution(GetServersPorts());
582 using PickFirstTest = ClientLbEnd2endTest;
584 TEST_F(PickFirstTest, Basic) {
586 const int kNumServers = 3;
587 StartServers(kNumServers);
588 auto response_generator = BuildResolverResponseGenerator();
590 "", response_generator);
592 response_generator.SetNextResolution(GetServersPorts());
593 for (
size_t i = 0;
i <
servers_.size(); ++
i) {
598 for (
size_t i = 0;
i <
servers_.size(); ++
i) {
599 const int request_count =
servers_[
i]->service_.request_count();
600 if (request_count == kNumServers) {
611 TEST_F(PickFirstTest, ProcessPending) {
613 auto response_generator = BuildResolverResponseGenerator();
615 "", response_generator);
617 response_generator.SetNextResolution({
servers_[0]->port_});
624 auto second_response_generator = BuildResolverResponseGenerator();
625 auto second_channel = BuildChannel(
"", second_response_generator);
626 auto second_stub = BuildStub(second_channel);
627 second_response_generator.SetNextResolution({
servers_[0]->port_});
631 TEST_F(PickFirstTest, SelectsReadyAtStartup) {
632 ChannelArguments
args;
633 constexpr
int kInitialBackOffMs = 5000;
638 CreateServers(2, ports);
640 auto response_generator1 = BuildResolverResponseGenerator();
641 auto channel1 = BuildChannel(
"pick_first", response_generator1,
args);
642 auto stub1 = BuildStub(channel1);
643 response_generator1.SetNextResolution(ports);
649 auto response_generator2 = BuildResolverResponseGenerator();
650 auto channel2 = BuildChannel(
"pick_first", response_generator2,
args);
651 response_generator2.SetNextResolution(ports);
654 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 ));
657 TEST_F(PickFirstTest, BackOffInitialReconnect) {
658 ChannelArguments
args;
659 constexpr
int kInitialBackOffMs = 100;
663 auto response_generator = BuildResolverResponseGenerator();
664 auto channel = BuildChannel(
"pick_first", response_generator,
args);
666 response_generator.SetNextResolution(ports);
671 StartServers(1, ports);
689 TEST_F(PickFirstTest, BackOffMinReconnect) {
690 ChannelArguments
args;
691 constexpr
int kMinReconnectBackOffMs = 1000;
694 auto response_generator = BuildResolverResponseGenerator();
695 auto channel = BuildChannel(
"pick_first", response_generator,
args);
697 response_generator.SetNextResolution(ports);
700 ConnectionDelayInjector delay_injector(
702 delay_injector.Start();
715 TEST_F(PickFirstTest, ResetConnectionBackoff) {
716 ChannelArguments
args;
717 constexpr
int kInitialBackOffMs = 1000;
720 auto response_generator = BuildResolverResponseGenerator();
721 auto channel = BuildChannel(
"pick_first", response_generator,
args);
723 response_generator.SetNextResolution(ports);
728 StartServers(1, ports);
749 TEST_F(ClientLbEnd2endTest,
750 ResetConnectionBackoffNextAttemptStartsImmediately) {
752 ConnectionHoldInjector injector;
756 ChannelArguments
args;
759 auto response_generator = BuildResolverResponseGenerator();
760 auto channel = BuildChannel(
"pick_first", response_generator,
args);
762 response_generator.SetNextResolution({
port});
764 auto hold1 = injector.AddHold(
port);
775 auto hold2 = injector.AddHold(
port);
797 TriesAllSubchannelsBeforeReportingTransientFailureWithSubchannelSharing) {
799 ConnectionHoldInjector injector;
809 auto response_generator1 = BuildResolverResponseGenerator();
810 auto channel1 = BuildChannel(
"pick_first", response_generator1);
811 auto stub1 = BuildStub(channel1);
812 response_generator1.SetNextResolution(ports1);
815 auto hold_channel1_port2 = injector.AddHold(ports1[2]);
818 channel1->GetState(
true);
821 hold_channel1_port2->Wait();
824 auto response_generator2 = BuildResolverResponseGenerator();
825 auto channel2 = BuildChannel(
"pick_first", response_generator2);
826 response_generator2.SetNextResolution(ports2);
828 auto hold_channel2_port0 = injector.AddHold(ports2[0]);
831 channel2->GetState(
true);
834 hold_channel2_port0->Wait();
837 auto hold_channel1_port0 = injector.AddHold(ports1[0]);
841 hold_channel1_port2->Resume();
845 hold_channel1_port0->Wait();
852 auto hold_channel2_port2 = injector.AddHold(ports2[2]);
855 hold_channel2_port0->Resume();
858 hold_channel2_port2->Wait();
863 hold_channel2_port0 = injector.AddHold(ports2[0]);
865 hold_channel2_port2->Resume();
868 hold_channel2_port0->Wait();
872 gpr_log(
GPR_INFO,
"=== RESUMING CHANNEL 1 PORT 0 AND CHANNEL 2 PORT 0 ===");
873 hold_channel1_port0->Resume();
874 hold_channel2_port0->Resume();
877 TEST_F(PickFirstTest, Updates) {
879 const int kNumServers = 3;
880 StartServers(kNumServers);
881 auto response_generator = BuildResolverResponseGenerator();
882 auto channel = BuildChannel(
"pick_first", response_generator);
885 std::vector<int> ports;
889 response_generator.SetNextResolution(ports);
896 response_generator.SetNextResolution(ports);
900 channel_state =
channel->GetState(
true );
903 servers_[0]->service_.ResetCounters();
908 response_generator.SetNextResolution(ports);
916 response_generator.SetNextResolution(ports);
926 TEST_F(PickFirstTest, UpdateSuperset) {
928 const int kNumServers = 3;
929 StartServers(kNumServers);
930 auto response_generator = BuildResolverResponseGenerator();
931 auto channel = BuildChannel(
"pick_first", response_generator);
934 std::vector<int> ports;
938 response_generator.SetNextResolution(ports);
942 servers_[0]->service_.ResetCounters();
948 response_generator.SetNextResolution(ports);
959 TEST_F(PickFirstTest, UpdateToUnconnected) {
960 const int kNumServers = 2;
961 CreateServers(kNumServers);
963 auto response_generator = BuildResolverResponseGenerator();
964 auto channel = BuildChannel(
"pick_first", response_generator);
967 std::vector<int> ports;
971 response_generator.SetNextResolution(ports);
981 response_generator.SetNextResolution(ports);
992 TEST_F(PickFirstTest, GlobalSubchannelPool) {
994 const int kNumServers = 1;
995 StartServers(kNumServers);
996 std::vector<int> ports = GetServersPorts();
998 auto response_generator1 = BuildResolverResponseGenerator();
999 auto channel1 = BuildChannel(
"pick_first", response_generator1);
1000 auto stub1 = BuildStub(channel1);
1001 response_generator1.SetNextResolution(ports);
1002 auto response_generator2 = BuildResolverResponseGenerator();
1003 auto channel2 = BuildChannel(
"pick_first", response_generator2);
1004 auto stub2 = BuildStub(channel2);
1005 response_generator2.SetNextResolution(ports);
1017 TEST_F(PickFirstTest, LocalSubchannelPool) {
1019 const int kNumServers = 1;
1020 StartServers(kNumServers);
1021 std::vector<int> ports = GetServersPorts();
1023 ChannelArguments
args;
1025 auto response_generator1 = BuildResolverResponseGenerator();
1026 auto channel1 = BuildChannel(
"pick_first", response_generator1,
args);
1027 auto stub1 = BuildStub(channel1);
1028 response_generator1.SetNextResolution(ports);
1029 auto response_generator2 = BuildResolverResponseGenerator();
1030 auto channel2 = BuildChannel(
"pick_first", response_generator2,
args);
1031 auto stub2 = BuildStub(channel2);
1032 response_generator2.SetNextResolution(ports);
1044 TEST_F(PickFirstTest, ManyUpdates) {
1045 const int kNumUpdates = 1000;
1046 const int kNumServers = 3;
1047 StartServers(kNumServers);
1048 auto response_generator = BuildResolverResponseGenerator();
1049 auto channel = BuildChannel(
"pick_first", response_generator);
1051 std::vector<int> ports = GetServersPorts();
1052 for (
size_t i = 0;
i < kNumUpdates; ++
i) {
1053 std::shuffle(ports.begin(), ports.end(),
1054 std::mt19937(std::random_device()()));
1055 response_generator.SetNextResolution(ports);
1064 TEST_F(PickFirstTest, ReresolutionNoSelected) {
1066 const int kNumServers = 3;
1067 const int kNumAliveServers = 1;
1068 StartServers(kNumAliveServers);
1069 std::vector<int> alive_ports, dead_ports;
1070 for (
size_t i = 0;
i < kNumServers; ++
i) {
1071 if (i < kNumAliveServers) {
1077 auto response_generator = BuildResolverResponseGenerator();
1078 auto channel = BuildChannel(
"pick_first", response_generator);
1082 response_generator.SetNextResolution(dead_ports);
1084 for (
size_t i = 0;
i < 10; ++
i) {
1085 CheckRpcSendFailure(
1087 "failed to connect to all addresses; last error: "
1088 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1089 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1093 response_generator.SetNextResolutionUponError(alive_ports);
1100 "failed to connect to all addresses; last error: "
1101 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1102 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)"));
1110 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) {
1112 StartServers(1, ports);
1113 auto response_generator = BuildResolverResponseGenerator();
1114 auto channel = BuildChannel(
"pick_first", response_generator);
1116 response_generator.SetNextResolution(ports);
1123 StartServers(1, ports);
1127 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
1130 CreateServers(2, ports);
1132 auto response_generator = BuildResolverResponseGenerator();
1133 auto channel = BuildChannel(
"pick_first", response_generator);
1135 response_generator.SetNextResolution(ports);
1142 StartServers(2, ports);
1146 TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
1148 StartServers(1, ports);
1149 auto response_generator = BuildResolverResponseGenerator();
1150 auto channel_1 = BuildChannel(
"pick_first", response_generator);
1151 auto stub_1 = BuildStub(channel_1);
1152 response_generator.SetNextResolution(ports);
1159 StartServers(1, ports);
1161 auto response_generator_2 = BuildResolverResponseGenerator();
1162 auto channel_2 = BuildChannel(
"pick_first", response_generator_2);
1163 auto stub_2 = BuildStub(channel_2);
1164 response_generator_2.SetNextResolution(ports);
1168 EXPECT_EQ(
"failed to connect to all addresses",
status.error_message());
1174 EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
1177 StartServers(1, ports);
1184 EXPECT_EQ(
"pick_first", channel_1->GetLoadBalancingPolicyName());
1186 EXPECT_EQ(
"pick_first", channel_2->GetLoadBalancingPolicyName());
1189 TEST_F(PickFirstTest, IdleOnDisconnect) {
1191 const int kNumServers = 1;
1192 StartServers(kNumServers);
1193 auto response_generator = BuildResolverResponseGenerator();
1195 BuildChannel(
"", response_generator);
1197 response_generator.SetNextResolution(GetServersPorts());
1201 response_generator.SetFailureOnReresolution();
1208 TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) {
1209 auto response_generator = BuildResolverResponseGenerator();
1211 BuildChannel(
"", response_generator);
1218 response_generator.SetNextResolution({
servers_[0]->port_});
1228 "Phase 2: Resolver update pointing to remaining "
1229 "(not started) servers.");
1230 response_generator.SetNextResolution(GetServersPorts(1 ));
1240 WaitForChannelNotReady(
channel.get());
1253 WaitForChannelReady(
channel.get());
1256 EXPECT_EQ(
"failed to connect to all addresses",
status.error_message());
1260 TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
1262 const int kNumServers = 1;
1263 StartServers(kNumServers);
1264 auto response_generator = BuildResolverResponseGenerator();
1266 BuildChannel(
"", response_generator);
1268 response_generator.SetNextResolution(GetServersPorts());
1277 response_generator.SetNextResolution({});
1283 response_generator.SetNextResolution(GetServersPorts());
1289 StaysTransientFailureOnFailedConnectionAttemptUntilReady) {
1295 ChannelArguments
args;
1298 auto response_generator = BuildResolverResponseGenerator();
1299 auto channel = BuildChannel(
"", response_generator,
args);
1301 response_generator.SetNextResolution(ports);
1304 CheckRpcSendFailure(
1306 "failed to connect to all addresses; last error: "
1307 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1308 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1312 StartServers(1, {ports.back()});
1324 using RoundRobinTest = ClientLbEnd2endTest;
1326 TEST_F(RoundRobinTest, Basic) {
1328 const int kNumServers = 3;
1329 StartServers(kNumServers);
1330 auto response_generator = BuildResolverResponseGenerator();
1331 auto channel = BuildChannel(
"round_robin", response_generator);
1333 response_generator.SetNextResolution(GetServersPorts());
1337 }
while (!SeenAllServers());
1342 std::vector<int> connection_order;
1343 for (
size_t i = 0;
i <
servers_.size(); ++
i) {
1345 UpdateConnectionOrder(
servers_, &connection_order);
1349 const auto expected = std::vector<int>{0, 1, 2};
1355 TEST_F(RoundRobinTest, ProcessPending) {
1357 auto response_generator = BuildResolverResponseGenerator();
1358 auto channel = BuildChannel(
"round_robin", response_generator);
1360 response_generator.SetNextResolution({
servers_[0]->port_});
1367 auto second_response_generator = BuildResolverResponseGenerator();
1368 auto second_channel = BuildChannel(
"round_robin", second_response_generator);
1369 auto second_stub = BuildStub(second_channel);
1370 second_response_generator.SetNextResolution({
servers_[0]->port_});
1374 TEST_F(RoundRobinTest, Updates) {
1376 const int kNumServers = 3;
1377 StartServers(kNumServers);
1378 auto response_generator = BuildResolverResponseGenerator();
1379 auto channel = BuildChannel(
"round_robin", response_generator);
1383 std::vector<int> ports = {
servers_[0]->port_};
1384 response_generator.SetNextResolution(ports);
1396 response_generator.SetNextResolution(ports);
1410 response_generator.SetNextResolution(ports);
1423 response_generator.SetNextResolution(ports);
1434 response_generator.SetNextResolution(ports);
1435 WaitForChannelNotReady(
channel.get());
1437 "empty address list: fake resolver empty address list");
1438 servers_[0]->service_.ResetCounters();
1443 response_generator.SetNextResolution(ports);
1450 TEST_F(RoundRobinTest, UpdateInError) {
1452 auto response_generator = BuildResolverResponseGenerator();
1453 auto channel = BuildChannel(
"round_robin", response_generator);
1456 response_generator.SetNextResolution(GetServersPorts(0, 1));
1458 for (
size_t i = 0;
i < 10; ++
i) {
1464 servers_[0]->service_.ResetCounters();
1468 response_generator.SetNextResolution(ports);
1477 for (
size_t i = 0;
i < 10; ++
i) {
1486 servers_[1]->service_.request_count());
1489 TEST_F(RoundRobinTest, ManyUpdates) {
1491 const int kNumServers = 3;
1492 StartServers(kNumServers);
1493 auto response_generator = BuildResolverResponseGenerator();
1494 auto channel = BuildChannel(
"round_robin", response_generator);
1496 std::vector<int> ports = GetServersPorts();
1497 for (
size_t i = 0;
i < 1000; ++
i) {
1498 std::shuffle(ports.begin(), ports.end(),
1499 std::mt19937(std::random_device()()));
1500 response_generator.SetNextResolution(ports);
1507 TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
1511 auto response_generator = BuildResolverResponseGenerator();
1512 auto channel = BuildChannel(
"round_robin", response_generator);
1516 response_generator.SetNextResolution(ports);
1522 response_generator.SetNextResolutionUponError(ports);
1533 TEST_F(RoundRobinTest, TransientFailure) {
1535 const int kNumServers = 3;
1536 StartServers(kNumServers);
1537 auto response_generator = BuildResolverResponseGenerator();
1538 auto channel = BuildChannel(
"round_robin", response_generator);
1540 response_generator.SetNextResolution(GetServersPorts());
1543 for (
size_t i = 0;
i <
servers_.size(); ++
i) {
1550 CheckRpcSendFailure(
1552 "connections to all backends failing; last error: "
1553 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1554 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1557 TEST_F(RoundRobinTest, TransientFailureAtStartup) {
1560 auto response_generator = BuildResolverResponseGenerator();
1561 auto channel = BuildChannel(
"round_robin", response_generator);
1563 response_generator.SetNextResolution({
1568 for (
size_t i = 0;
i <
servers_.size(); ++
i) {
1575 CheckRpcSendFailure(
1577 "connections to all backends failing; last error: "
1578 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1579 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1582 TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
1584 ConnectionHoldInjector injector;
1589 auto response_generator = BuildResolverResponseGenerator();
1590 auto channel = BuildChannel(
"round_robin", response_generator);
1592 response_generator.SetNextResolution({
port});
1600 WaitForChannelState(
channel.get(), predicate,
true));
1602 auto hold = injector.AddHold(
port);
1611 for (
size_t i = 0;
i < 5; ++
i) {
1612 CheckRpcSendFailure(
1614 "connections to all backends failing; last error: "
1615 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1616 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1622 TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
1624 ConnectionHoldInjector injector;
1630 auto response_generator = BuildResolverResponseGenerator();
1631 auto channel = BuildChannel(
"round_robin", response_generator);
1633 response_generator.SetNextResolution(ports);
1636 CheckRpcSendFailure(
1638 "connections to all backends failing; last error: "
1639 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1640 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1642 auto hold1 = injector.AddHold(ports[0]);
1643 auto hold2 = injector.AddHold(ports[1]);
1655 if (
status.error_message() ==
1656 "connections to all backends failing; last error: "
1657 "UNKNOWN: Survey says... Bzzzzt!") {
1663 "connections to all backends failing; last error: "
1664 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1665 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)"));
1673 TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
1675 ConnectionHoldInjector injector;
1680 auto response_generator = BuildResolverResponseGenerator();
1681 auto channel = BuildChannel(
"round_robin", response_generator);
1683 response_generator.SetNextResolution(GetServersPorts());
1686 std::atomic<bool> shutdown{
false};
1693 while (!shutdown.load()) {
1725 gpr_log(
GPR_ERROR,
"=== WAITING FOR RECONNECTION ATTEMPT TO COMPLETE ===");
1726 hold1->WaitForCompletion();
1729 shutdown.store(
true);
1733 TEST_F(RoundRobinTest, SingleReconnect) {
1734 const int kNumServers = 3;
1735 StartServers(kNumServers);
1736 const auto ports = GetServersPorts();
1737 auto response_generator = BuildResolverResponseGenerator();
1738 auto channel = BuildChannel(
"round_robin", response_generator);
1740 response_generator.SetNextResolution(ports);
1744 for (
size_t i = 0;
i <
servers_.size(); ++
i) {
1749 for (
size_t i = 0;
i <
servers_.size(); ++
i) {
1752 const auto pre_death =
servers_[0]->service_.request_count();
1764 for (
int i = 0;
i < 10 * kNumServers; ++
i) {
1767 const auto post_death =
servers_[0]->service_.request_count();
1781 TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) {
1783 ChannelArguments
args;
1784 args.SetServiceConfigJSON(
1785 "{\"healthCheckConfig\": "
1786 "{\"serviceName\": \"health_check_service_name\"}}");
1787 auto response_generator = BuildResolverResponseGenerator();
1788 auto channel = BuildChannel(
"round_robin", response_generator,
args);
1790 response_generator.SetNextResolution({
servers_[0]->port_});
1795 TEST_F(RoundRobinTest, HealthChecking) {
1798 const int kNumServers = 3;
1799 StartServers(kNumServers);
1800 ChannelArguments
args;
1801 args.SetServiceConfigJSON(
1802 "{\"healthCheckConfig\": "
1803 "{\"serviceName\": \"health_check_service_name\"}}");
1804 auto response_generator = BuildResolverResponseGenerator();
1805 auto channel = BuildChannel(
"round_robin", response_generator,
args);
1807 response_generator.SetNextResolution(GetServersPorts());
1810 "*** initial state: unknown health check service name for "
1817 servers_[0]->SetServingStatus(
"health_check_service_name",
true);
1819 for (
int i = 0;
i < 10; ++
i) {
1827 servers_[2]->SetServingStatus(
"health_check_service_name",
true);
1829 for (
int i = 0;
i < 10; ++
i) {
1837 servers_[1]->SetServingStatus(
"health_check_service_name",
true);
1839 for (
int i = 0;
i < 9; ++
i) {
1850 servers_[0]->SetServingStatus(
"health_check_service_name",
false);
1853 for (
int i = 0;
i < kNumServers; ++
i) {
1857 servers_[2]->service_.request_count() != 2);
1861 servers_[1]->SetServingStatus(
"health_check_service_name",
false);
1862 servers_[2]->SetServingStatus(
"health_check_service_name",
false);
1865 "connections to all backends failing; last error: "
1866 "UNAVAILABLE: backend unhealthy");
1871 TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) {
1874 const int kNumServers = 3;
1875 StartServers(kNumServers);
1876 servers_[0]->SetServingStatus(
"health_check_service_name",
true);
1877 servers_[1]->SetServingStatus(
"health_check_service_name",
true);
1878 servers_[2]->SetServingStatus(
"health_check_service_name",
true);
1879 ChannelArguments
args;
1880 args.SetServiceConfigJSON(
1881 "{\"healthCheckConfig\": "
1882 "{\"serviceName\": \"health_check_service_name\"}}");
1883 auto response_generator = BuildResolverResponseGenerator();
1884 auto channel = BuildChannel(
"round_robin", response_generator,
args);
1886 response_generator.SetNextResolution(GetServersPorts());
1891 response_generator.SetNextResolution(GetServersPorts());
1893 for (
size_t i = 0;
i < 100;
i++) {
1898 TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
1901 const int kNumServers = 1;
1902 StartServers(kNumServers);
1904 ChannelArguments
args;
1905 args.SetServiceConfigJSON(
1906 "{\"healthCheckConfig\": "
1907 "{\"serviceName\": \"health_check_service_name\"}}");
1908 auto response_generator1 = BuildResolverResponseGenerator();
1909 auto channel1 = BuildChannel(
"round_robin", response_generator1,
args);
1910 auto stub1 = BuildStub(channel1);
1911 std::vector<int> ports = GetServersPorts();
1912 response_generator1.SetNextResolution(ports);
1915 auto response_generator2 = BuildResolverResponseGenerator();
1916 auto channel2 = BuildChannel(
"round_robin", response_generator2,
args);
1917 auto stub2 = BuildStub(channel2);
1918 response_generator2.SetNextResolution(ports);
1923 "connections to all backends failing; last error: "
1924 "UNAVAILABLE: backend unhealthy");
1926 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1929 servers_[0]->SetServingStatus(
"health_check_service_name",
true);
1937 TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
1940 const int kNumServers = 1;
1941 StartServers(kNumServers);
1943 ChannelArguments
args;
1944 args.SetServiceConfigJSON(
1945 "{\"healthCheckConfig\": "
1946 "{\"serviceName\": \"health_check_service_name\"}}");
1947 auto response_generator1 = BuildResolverResponseGenerator();
1948 auto channel1 = BuildChannel(
"round_robin", response_generator1,
args);
1949 auto stub1 = BuildStub(channel1);
1950 std::vector<int> ports = GetServersPorts();
1951 response_generator1.SetNextResolution(ports);
1954 ChannelArguments args2;
1955 args2.SetServiceConfigJSON(
1956 "{\"healthCheckConfig\": "
1957 "{\"serviceName\": \"health_check_service_name2\"}}");
1958 auto response_generator2 = BuildResolverResponseGenerator();
1959 auto channel2 = BuildChannel(
"round_robin", response_generator2, args2);
1960 auto stub2 = BuildStub(channel2);
1961 response_generator2.SetNextResolution(ports);
1963 servers_[0]->SetServingStatus(
"health_check_service_name2",
true);
1968 "connections to all backends failing; last error: "
1969 "UNAVAILABLE: backend unhealthy");
1971 EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1974 servers_[0]->SetServingStatus(
"health_check_service_name",
true);
1983 HealthCheckingServiceNameChangesAfterSubchannelsCreated) {
1986 const int kNumServers = 1;
1987 StartServers(kNumServers);
1989 const char* kServiceConfigJson =
1990 "{\"healthCheckConfig\": "
1991 "{\"serviceName\": \"health_check_service_name\"}}";
1992 auto response_generator = BuildResolverResponseGenerator();
1993 auto channel = BuildChannel(
"round_robin", response_generator);
1995 std::vector<int> ports = GetServersPorts();
1996 response_generator.SetNextResolution(ports, kServiceConfigJson);
1997 servers_[0]->SetServingStatus(
"health_check_service_name",
true);
2001 const char* kServiceConfigJson2 =
2002 "{\"healthCheckConfig\": "
2003 "{\"serviceName\": \"health_check_service_name2\"}}";
2004 response_generator.SetNextResolution(ports, kServiceConfigJson2);
2014 class ClientLbPickArgsTest :
public ClientLbEnd2endTest {
2016 void SetUp()
override {
2017 ClientLbEnd2endTest::SetUp();
2021 static void SetUpTestCase() {
2028 std::vector<grpc_core::PickArgsSeen> args_seen_list() {
2034 const std::vector<grpc_core::PickArgsSeen>& args_seen_list) {
2035 std::vector<std::string> entries;
2036 for (
const auto& args_seen : args_seen_list) {
2038 for (
const auto& p : args_seen.metadata) {
2052 self->args_seen_list_.emplace_back(args_seen);
2062 TEST_F(ClientLbPickArgsTest, Basic) {
2063 const int kNumServers = 1;
2064 StartServers(kNumServers);
2065 auto response_generator = BuildResolverResponseGenerator();
2066 auto channel = BuildChannel(
"test_pick_args_lb", response_generator);
2068 response_generator.SetNextResolution(GetServersPorts());
2078 auto pick_args_seen_list = args_seen_list();
2082 "/grpc.testing.EchoTestService/Echo"),
2088 << ArgsSeenListString(pick_args_seen_list);
2095 xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport(
2097 xds::data::orca::v3::OrcaLoadReport load_report;
2100 for (
const auto& p : backend_metric_data.
request_cost) {
2102 (*load_report.mutable_request_cost())[
name] =
p.second;
2104 for (
const auto& p : backend_metric_data.
utilization) {
2106 (*load_report.mutable_utilization())[
name] =
p.second;
2111 class ClientLbInterceptTrailingMetadataTest :
public ClientLbEnd2endTest {
2113 void SetUp()
override {
2114 ClientLbEnd2endTest::SetUp();
2118 static void SetUpTestCase() {
2121 ReportTrailerIntercepted);
2126 int trailers_intercepted() {
2147 static void ReportTrailerIntercepted(
2152 self->last_status_ = args_seen.
status;
2153 self->trailers_intercepted_++;
2154 self->trailing_metadata_ = args_seen.
metadata;
2155 if (backend_metric_data !=
nullptr) {
2156 self->load_report_ =
2157 BackendMetricDataToOrcaLoadReport(*backend_metric_data);
2169 ClientLbInterceptTrailingMetadataTest*
2172 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusOk) {
2174 auto response_generator = BuildResolverResponseGenerator();
2176 BuildChannel(
"intercept_trailing_metadata_lb", response_generator);
2178 response_generator.SetNextResolution(GetServersPorts());
2182 EXPECT_EQ(
"intercept_trailing_metadata_lb",
2183 channel->GetLoadBalancingPolicyName());
2188 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusFailed) {
2190 auto response_generator = BuildResolverResponseGenerator();
2192 BuildChannel(
"intercept_trailing_metadata_lb", response_generator);
2194 response_generator.SetNextResolution(GetServersPorts());
2196 auto* expected_error =
request.mutable_param()->mutable_expected_error();
2198 expected_error->set_error_message(
"bummer, man");
2208 TEST_F(ClientLbInterceptTrailingMetadataTest,
2209 StatusCancelledWithoutStartingRecvTrailingMetadata) {
2211 auto response_generator = BuildResolverResponseGenerator();
2213 BuildChannel(
"intercept_trailing_metadata_lb", response_generator);
2214 response_generator.SetNextResolution(GetServersPorts());
2230 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
2231 const int kNumServers = 1;
2233 StartServers(kNumServers);
2234 auto response_generator = BuildResolverResponseGenerator();
2235 ChannelArguments channel_args;
2237 auto channel = BuildChannel(
"intercept_trailing_metadata_lb",
2238 response_generator, channel_args);
2240 response_generator.SetNextResolution(GetServersPorts());
2245 EXPECT_EQ(
"intercept_trailing_metadata_lb",
2246 channel->GetLoadBalancingPolicyName());
2258 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
2259 const int kNumServers = 1;
2261 StartServers(kNumServers);
2262 ChannelArguments
args;
2263 args.SetServiceConfigJSON(
2265 " \"methodConfig\": [ {\n"
2267 " { \"service\": \"grpc.testing.EchoTestService\" }\n"
2269 " \"retryPolicy\": {\n"
2270 " \"maxAttempts\": 3,\n"
2271 " \"initialBackoff\": \"1s\",\n"
2272 " \"maxBackoff\": \"120s\",\n"
2273 " \"backoffMultiplier\": 1.6,\n"
2274 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
2278 auto response_generator = BuildResolverResponseGenerator();
2280 BuildChannel(
"intercept_trailing_metadata_lb", response_generator,
args);
2282 response_generator.SetNextResolution(GetServersPorts());
2287 EXPECT_EQ(
"intercept_trailing_metadata_lb",
2288 channel->GetLoadBalancingPolicyName());
2300 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
2301 const int kNumServers = 1;
2303 StartServers(kNumServers);
2304 xds::data::orca::v3::OrcaLoadReport load_report;
2305 load_report.set_cpu_utilization(0.5);
2306 load_report.set_mem_utilization(0.75);
2307 auto* request_cost = load_report.mutable_request_cost();
2308 (*request_cost)[
"foo"] = 0.8;
2309 (*request_cost)[
"bar"] = 1.4;
2310 auto* utilization = load_report.mutable_utilization();
2311 (*utilization)[
"baz"] = 1.1;
2312 (*utilization)[
"quux"] = 0.9;
2313 auto response_generator = BuildResolverResponseGenerator();
2315 BuildChannel(
"intercept_trailing_metadata_lb", response_generator);
2317 response_generator.SetNextResolution(GetServersPorts());
2320 auto actual = backend_load_report();
2324 EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization());
2325 EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization());
2326 EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size());
2327 for (
const auto& p : actual->request_cost()) {
2328 auto it = load_report.request_cost().find(
p.first);
2332 EXPECT_EQ(actual->utilization().size(), load_report.utilization().size());
2333 for (
const auto& p : actual->utilization()) {
2334 auto it = load_report.utilization().find(
p.first);
2340 EXPECT_EQ(
"intercept_trailing_metadata_lb",
2341 channel->GetLoadBalancingPolicyName());
2349 class ClientLbAddressTest :
public ClientLbEnd2endTest {
2357 std::unique_ptr<AttributeInterface>
Copy()
const override {
2358 return absl::make_unique<Attribute>(
str_);
2361 int Cmp(
const AttributeInterface* other)
const override {
2362 return str_.compare(
static_cast<const Attribute*
>(other)->
str_);
2371 void SetUp()
override {
2372 ClientLbEnd2endTest::SetUp();
2376 static void SetUpTestCase() {
2383 const std::vector<std::string>& addresses_seen() {
2392 self->addresses_seen_.emplace_back(address.
ToString());
2404 TEST_F(ClientLbAddressTest, Basic) {
2405 const int kNumServers = 1;
2406 StartServers(kNumServers);
2407 auto response_generator = BuildResolverResponseGenerator();
2408 auto channel = BuildChannel(
"address_test_lb", response_generator);
2411 response_generator.SetNextResolution(GetServersPorts(),
nullptr,
2413 absl::make_unique<Attribute>(
"foo"));
2418 std::vector<std::string> expected;
2419 for (
const int port : GetServersPorts()) {
2420 expected.emplace_back(
2431 class OobBackendMetricTest :
public ClientLbEnd2endTest {
2433 using BackendMetricReport =
2434 std::pair<
int , xds::data::orca::v3::OrcaLoadReport>;
2436 void SetUp()
override {
2437 ClientLbEnd2endTest::SetUp();
2441 static void SetUpTestCase() {
2444 BackendMetricCallback);
2451 if (backend_metric_reports_.empty())
return absl::nullopt;
2453 backend_metric_reports_.pop_front();
2458 static void BackendMetricCallback(
2461 auto load_report = BackendMetricDataToOrcaLoadReport(backend_metric_data);
2475 TEST_F(OobBackendMetricTest, Basic) {
2478 constexpr
char kMetricName[] =
"foo";
2479 servers_[0]->orca_service_.SetCpuUtilization(0.1);
2480 servers_[0]->orca_service_.SetMemoryUtilization(0.2);
2481 servers_[0]->orca_service_.SetNamedUtilization(kMetricName, 0.3);
2483 auto response_generator = BuildResolverResponseGenerator();
2484 auto channel = BuildChannel(
"oob_backend_metric_test_lb", response_generator);
2486 response_generator.SetNextResolution(GetServersPorts());
2491 channel->GetLoadBalancingPolicyName());
2493 for (
size_t i = 0;
i < 5; ++
i) {
2494 auto report = GetBackendMetricReport();
2495 if (report.has_value()) {
2497 EXPECT_EQ(report->second.cpu_utilization(), 0.1);
2498 EXPECT_EQ(report->second.mem_utilization(), 0.2);
2500 report->second.utilization(),
2510 servers_[0]->orca_service_.SetNamedUtilization(kMetricName, 0.6);
2511 servers_[0]->orca_service_.SetMemoryUtilization(0.5);
2512 servers_[0]->orca_service_.SetCpuUtilization(0.4);
2514 for (
size_t i = 0;
i < 5; ++
i) {
2515 auto report = GetBackendMetricReport();
2516 if (report.has_value()) {
2518 if (report->second.cpu_utilization() != 0.1) {
2519 EXPECT_EQ(report->second.cpu_utilization(), 0.4);
2520 EXPECT_EQ(report->second.mem_utilization(), 0.5);
2522 report->second.utilization(),