25 #include <gmock/gmock.h>
26 #include <gtest/gtest.h>
28 #include "absl/memory/memory.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
56 #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
57 #include "src/proto/grpc/testing/echo.grpc.pb.h"
82 using std::chrono::system_clock;
84 using grpc::lb::v1::LoadBalancer;
85 using grpc::lb::v1::LoadBalanceRequest;
86 using grpc::lb::v1::LoadBalanceResponse;
92 constexpr
char kDefaultServiceConfig[] =
94 " \"loadBalancingConfig\":[\n"
95 " { \"grpclb\":{} }\n"
99 using BackendService = CountedService<TestServiceImpl>;
100 using BalancerService = CountedService<LoadBalancer::Service>;
102 const char g_kCallCredsMdKey[] =
"Balancer should not ...";
103 const char g_kCallCredsMdValue[] =
"... receive me";
105 class BackendServiceImpl :
public BackendService {
107 BackendServiceImpl() {}
112 auto call_credentials_entry =
113 context->client_metadata().find(g_kCallCredsMdKey);
115 if (call_credentials_entry !=
context->client_metadata().end()) {
116 EXPECT_EQ(call_credentials_entry->second, g_kCallCredsMdValue);
118 IncreaseRequestCount();
120 IncreaseResponseCount();
129 std::set<std::string>
clients() {
144 std::string Ip4ToPackedString(
const char* ip_str) {
146 GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
147 return std::string(
reinterpret_cast<const char*
>(&ip4),
sizeof(ip4));
150 std::string Ip6ToPackedString(
const char* ip_str) {
153 return std::string(
reinterpret_cast<const char*
>(&ip6),
sizeof(ip6));
163 ClientStats&
operator+=(
const ClientStats& other) {
167 other.num_calls_finished_with_client_failed_to_send;
169 other.num_calls_finished_known_received;
170 for (
const auto& p : other.drop_token_counts) {
185 class BalancerServiceImpl :
public BalancerService {
187 using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
188 using ResponseDelayPair = std::pair<LoadBalanceResponse, int>;
190 explicit BalancerServiceImpl(
int client_load_reporting_interval_seconds)
192 client_load_reporting_interval_seconds) {}
198 if (serverlist_done_)
goto done;
203 context->client_metadata().end());
205 std::vector<ResponseDelayPair> responses_and_delays;
210 if (
request.has_initial_request()) {
215 IncreaseRequestCount();
217 request.DebugString().c_str());
221 LoadBalanceResponse initial_response;
222 initial_response.mutable_initial_response()
223 ->mutable_client_stats_report_interval()
225 stream->Write(initial_response);
232 for (
const auto& response_and_delay : responses_and_delays) {
233 SendResponse(
stream, response_and_delay.first,
234 response_and_delay.second);
238 while (!serverlist_done_) {
247 this,
request.DebugString().c_str());
249 ClientStats load_report;
250 load_report.num_calls_started =
251 request.client_stats().num_calls_started();
252 load_report.num_calls_finished =
253 request.client_stats().num_calls_finished();
254 load_report.num_calls_finished_with_client_failed_to_send =
256 .num_calls_finished_with_client_failed_to_send();
257 load_report.num_calls_finished_known_received =
258 request.client_stats().num_calls_finished_known_received();
259 for (
const auto& drop_token_count :
260 request.client_stats().calls_finished_with_drop()) {
262 .drop_token_counts[drop_token_count.load_balance_token()] =
263 drop_token_count.num_calls();
268 load_report_queue_.emplace_back(
std::move(load_report));
278 void add_response(
const LoadBalanceResponse&
response,
int send_after_ms) {
285 serverlist_done_ =
false;
287 load_report_queue_.clear();
291 NotifyDoneWithServerlists();
295 ClientStats WaitForLoadReport() {
297 if (load_report_queue_.empty()) {
298 while (load_report_queue_.empty()) {
302 ClientStats load_report =
std::move(load_report_queue_.front());
303 load_report_queue_.pop_front();
307 void NotifyDoneWithServerlists() {
309 if (!serverlist_done_) {
310 serverlist_done_ =
true;
315 std::vector<std::string> service_names() {
329 IncreaseResponseCount();
346 GrpclbEnd2endTest(
size_t num_backends,
size_t num_balancers,
347 int client_load_reporting_interval_seconds)
352 client_load_reporting_interval_seconds) {}
354 static void SetUpTestCase() {
367 void SetUp()
override {
368 bool localhost_resolves_to_ipv4 =
false;
369 bool localhost_resolves_to_ipv6 =
false;
371 &localhost_resolves_to_ipv6);
372 ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
374 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
377 backends_.emplace_back(
new ServerThread<BackendServiceImpl>(
"backend"));
382 balancers_.emplace_back(
new ServerThread<BalancerServiceImpl>(
389 void TearDown()
override {
390 ShutdownAllBackends();
391 for (
auto& balancer :
balancers_) balancer->Shutdown();
394 void StartAllBackends() {
400 void ShutdownAllBackends() {
401 for (
auto& backend :
backends_) backend->Shutdown();
406 void ResetStub(
int fallback_timeout = 0,
408 int subchannel_cache_delay_ms = 0) {
409 ChannelArguments
args;
410 if (fallback_timeout > 0)
args.SetGrpclbFallbackTimeout(fallback_timeout);
413 if (!expected_targets.empty()) {
416 if (subchannel_cache_delay_ms > 0) {
420 std::ostringstream uri;
427 g_kCallCredsMdKey, g_kCallCredsMdValue);
428 std::shared_ptr<ChannelCredentials> creds(
432 channel_creds->
Unref();
437 void ResetBackendCounters() {
438 for (
auto& backend :
backends_) backend->service_.ResetCounters();
441 ClientStats WaitForLoadReports() {
442 ClientStats client_stats;
444 client_stats += balancer->service_.WaitForLoadReport();
449 bool SeenAllBackends(
size_t start_index = 0,
size_t stop_index = 0) {
450 if (stop_index == 0) stop_index =
backends_.size();
451 for (
size_t i = start_index;
i < stop_index; ++
i) {
457 void SendRpcAndCount(
int* num_total,
int* num_ok,
int* num_failure,
463 if (
status.error_message() ==
"drop directed by grpclb balancer") {
472 std::tuple<int, int, int> WaitForAllBackends(
int num_requests_multiple_of = 1,
473 size_t start_index = 0,
474 size_t stop_index = 0) {
479 while (!SeenAllBackends(start_index, stop_index)) {
480 SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
482 while (num_total % num_requests_multiple_of != 0) {
483 SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
485 ResetBackendCounters();
487 "Performed %d warm up requests (a multiple of %d) against the "
488 "backends. %d succeeded, %d failed, %d dropped.",
489 num_total, num_requests_multiple_of, num_ok, num_failure,
494 void WaitForBackend(
size_t backend_idx) {
498 ResetBackendCounters();
507 const std::vector<AddressData>& address_data) {
509 for (
const auto&
addr : address_data) {
518 const_cast<char*
>(
addr.balancer_name.c_str()));
521 addresses.emplace_back(address.
addr, address.
len,
args);
527 const std::vector<AddressData>& balancer_address_data,
528 const std::vector<AddressData>& backend_address_data = {},
529 const char* service_config_json = kDefaultServiceConfig) {
532 CreateLbAddressesFromAddressDataList(backend_address_data);
535 nullptr, service_config_json, &
error);
538 CreateLbAddressesFromAddressDataList(balancer_address_data);
544 void SetNextResolutionAllBalancers(
545 const char* service_config_json = kDefaultServiceConfig) {
546 std::vector<AddressData> addresses;
548 addresses.emplace_back(AddressData{
balancers_[
i]->port_,
""});
550 SetNextResolution(addresses, {}, service_config_json);
553 void SetNextResolution(
554 const std::vector<AddressData>& balancer_address_data,
555 const std::vector<AddressData>& backend_address_data = {},
556 const char* service_config_json = kDefaultServiceConfig) {
559 balancer_address_data, backend_address_data, service_config_json);
563 void SetNextReresolutionResponse(
564 const std::vector<AddressData>& balancer_address_data,
565 const std::vector<AddressData>& backend_address_data = {},
566 const char* service_config_json = kDefaultServiceConfig) {
569 balancer_address_data, backend_address_data, service_config_json);
573 std::vector<int> GetBackendPorts(
size_t start_index = 0,
574 size_t stop_index = 0)
const {
575 if (stop_index == 0) stop_index =
backends_.size();
576 std::vector<int> backend_ports;
577 for (
size_t i = start_index;
i < stop_index; ++
i) {
580 return backend_ports;
583 void ScheduleResponseForBalancer(
size_t i,
584 const LoadBalanceResponse&
response,
589 LoadBalanceResponse BuildResponseForBackends(
590 const std::vector<int>& backend_ports,
594 for (
size_t i = 0;
i < drop_token_count.second; ++
i) {
597 server->set_load_balance_token(drop_token_count.first);
600 for (
const int& backend_port : backend_ports) {
603 : Ip4ToPackedString(
"127.0.0.1"));
604 server->set_port(backend_port);
605 static int token_count = 0;
606 server->set_load_balance_token(
615 const bool local_response = (
response ==
nullptr);
616 if (local_response)
response =
new EchoResponse;
619 if (!expected_status.ok()) {
620 auto*
error =
request.mutable_param()->mutable_expected_error();
621 error->set_code(expected_status.error_code());
622 error->set_error_message(expected_status.error_message());
628 if (local_response)
delete response;
632 void CheckRpcSendOk(
const size_t times = 1,
const int timeout_ms = 1000,
634 for (
size_t i = 0;
i < times; ++
i) {
638 <<
" message=" <<
status.error_message();
643 void CheckRpcSendFailure() {
648 template <
typename T>
649 struct ServerThread {
650 template <
typename...
Args>
666 thread_ = absl::make_unique<std::thread>(
680 std::shared_ptr<ServerCredentials> creds(
new SecureServerCredentials(
712 std::unique_ptr<grpc::testing::EchoTestService::Stub>
stub_;
713 std::vector<std::unique_ptr<ServerThread<BackendServiceImpl>>>
backends_;
714 std::vector<std::unique_ptr<ServerThread<BalancerServiceImpl>>>
balancers_;
721 class SingleBalancerTest :
public GrpclbEnd2endTest {
723 SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {}
726 TEST_F(SingleBalancerTest, Vanilla) {
727 SetNextResolutionAllBalancers();
728 const size_t kNumRpcsPerAddress = 100;
729 ScheduleResponseForBalancer(
730 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
734 WaitForAllBackends();
742 balancers_[0]->service_.NotifyDoneWithServerlists();
752 TEST_F(SingleBalancerTest, SubchannelCaching) {
755 SetNextResolutionAllBalancers();
757 ScheduleResponseForBalancer(
758 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
760 ScheduleResponseForBalancer(
761 0, BuildResponseForBackends(GetBackendPorts(2), {}), 1000);
763 ScheduleResponseForBalancer(
764 0, BuildResponseForBackends(GetBackendPorts(1), {}), 1000);
766 WaitForAllBackends();
791 balancers_[0]->service_.NotifyDoneWithServerlists();
798 TEST_F(SingleBalancerTest, ReturnServerStatus) {
799 SetNextResolutionAllBalancers();
800 ScheduleResponseForBalancer(
801 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
803 WaitForAllBackends();
809 EXPECT_EQ(actual.error_code(), expected.error_code());
810 EXPECT_EQ(actual.error_message(), expected.error_message());
813 TEST_F(SingleBalancerTest, SelectGrpclbWithMigrationServiceConfig) {
814 SetNextResolutionAllBalancers(
816 " \"loadBalancingConfig\":[\n"
817 " { \"does_not_exist\":{} },\n"
818 " { \"grpclb\":{} }\n"
821 ScheduleResponseForBalancer(
822 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
823 CheckRpcSendOk(1, 1000 ,
true );
824 balancers_[0]->service_.NotifyDoneWithServerlists();
833 TEST_F(SingleBalancerTest,
834 SelectGrpclbWithMigrationServiceConfigAndNoAddresses) {
836 ResetStub(kFallbackTimeoutMs);
837 SetNextResolution({}, {},
839 " \"loadBalancingConfig\":[\n"
840 " { \"does_not_exist\":{} },\n"
841 " { \"grpclb\":{} }\n"
857 TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) {
858 SetNextResolutionAllBalancers(
860 " \"loadBalancingConfig\":[\n"
862 " \"childPolicy\":[\n"
863 " { \"pick_first\":{} }\n"
868 ScheduleResponseForBalancer(
869 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
871 CheckRpcSendOk(
kNumRpcs, 1000 ,
true );
872 balancers_[0]->service_.NotifyDoneWithServerlists();
887 TEST_F(SingleBalancerTest, SwapChildPolicy) {
888 SetNextResolutionAllBalancers(
890 " \"loadBalancingConfig\":[\n"
892 " \"childPolicy\":[\n"
893 " { \"pick_first\":{} }\n"
898 ScheduleResponseForBalancer(
899 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
901 CheckRpcSendOk(
kNumRpcs, 1000 ,
true );
909 SetNextResolutionAllBalancers();
910 WaitForAllBackends();
911 CheckRpcSendOk(
kNumRpcs, 1000 ,
true );
918 balancers_[0]->service_.NotifyDoneWithServerlists();
927 TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
928 SetNextResolutionAllBalancers();
930 std::vector<int> ports;
933 const size_t kNumRpcsPerAddress = 10;
934 ScheduleResponseForBalancer(0, BuildResponseForBackends(ports, {}), 0);
938 CheckRpcSendOk(kNumRpcsPerAddress * ports.size());
944 balancers_[0]->service_.NotifyDoneWithServerlists();
947 TEST_F(SingleBalancerTest, SecureNaming) {
949 SetNextResolution({AddressData{
balancers_[0]->port_,
"lb"}});
950 const size_t kNumRpcsPerAddress = 100;
951 ScheduleResponseForBalancer(
952 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
956 WaitForAllBackends();
964 balancers_[0]->service_.NotifyDoneWithServerlists();
973 TEST_F(SingleBalancerTest, SecureNamingDeathTest) {
980 SetNextResolution({AddressData{
balancers_[0]->port_,
"woops"}});
986 TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
987 SetNextResolutionAllBalancers();
989 const int kCallDeadlineMs = kServerlistDelayMs * 2;
991 ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
993 ScheduleResponseForBalancer(
994 0, BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs);
997 CheckRpcSendOk(1, kCallDeadlineMs,
true );
998 const auto ellapsed_ms =
999 std::chrono::duration_cast<std::chrono::milliseconds>(
1005 EXPECT_GT(ellapsed_ms.count(), kServerlistDelayMs);
1006 balancers_[0]->service_.NotifyDoneWithServerlists();
1013 TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
1014 SetNextResolutionAllBalancers();
1015 const size_t kNumUnreachableServers = 5;
1016 std::vector<int> ports;
1017 for (
size_t i = 0;
i < kNumUnreachableServers; ++
i) {
1020 ScheduleResponseForBalancer(0, BuildResponseForBackends(ports, {}), 0);
1024 balancers_[0]->service_.NotifyDoneWithServerlists();
1031 TEST_F(SingleBalancerTest, Fallback) {
1032 SetNextResolutionAllBalancers();
1035 const size_t kNumBackendsInResolution =
backends_.size() / 2;
1037 ResetStub(kFallbackTimeoutMs);
1038 std::vector<AddressData> balancer_addresses;
1039 balancer_addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1040 std::vector<AddressData> backend_addresses;
1041 for (
size_t i = 0;
i < kNumBackendsInResolution; ++
i) {
1042 backend_addresses.emplace_back(AddressData{
backends_[
i]->port_,
""});
1044 SetNextResolution(balancer_addresses, backend_addresses);
1047 ScheduleResponseForBalancer(
1049 BuildResponseForBackends(
1050 GetBackendPorts(kNumBackendsInResolution ), {}),
1051 kServerlistDelayMs);
1054 for (
size_t i = 0;
i < kNumBackendsInResolution; ++
i) {
1060 CheckRpcSendOk(kNumBackendsInResolution);
1065 for (
size_t i = 0;
i < kNumBackendsInResolution; ++
i) {
1068 for (
size_t i = kNumBackendsInResolution;
i <
backends_.size(); ++
i) {
1074 for (
size_t i = kNumBackendsInResolution;
i <
backends_.size(); ++
i) {
1080 CheckRpcSendOk(
backends_.size() - kNumBackendsInResolution);
1085 for (
size_t i = 0;
i < kNumBackendsInResolution; ++
i) {
1088 for (
size_t i = kNumBackendsInResolution;
i <
backends_.size(); ++
i) {
1092 balancers_[0]->service_.NotifyDoneWithServerlists();
1099 TEST_F(SingleBalancerTest, FallbackUpdate) {
1100 SetNextResolutionAllBalancers();
1103 const size_t kNumBackendsInResolution =
backends_.size() / 3;
1104 const size_t kNumBackendsInResolutionUpdate =
backends_.size() / 3;
1106 ResetStub(kFallbackTimeoutMs);
1107 std::vector<AddressData> balancer_addresses;
1108 balancer_addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1109 std::vector<AddressData> backend_addresses;
1110 for (
size_t i = 0;
i < kNumBackendsInResolution; ++
i) {
1111 backend_addresses.emplace_back(AddressData{
backends_[
i]->port_,
""});
1113 SetNextResolution(balancer_addresses, backend_addresses);
1116 ScheduleResponseForBalancer(
1118 BuildResponseForBackends(
1119 GetBackendPorts(kNumBackendsInResolution +
1120 kNumBackendsInResolutionUpdate ),
1122 kServerlistDelayMs);
1125 for (
size_t i = 0;
i < kNumBackendsInResolution; ++
i) {
1131 CheckRpcSendOk(kNumBackendsInResolution);
1136 for (
size_t i = 0;
i < kNumBackendsInResolution; ++
i) {
1139 for (
size_t i = kNumBackendsInResolution;
i <
backends_.size(); ++
i) {
1143 balancer_addresses.clear();
1144 balancer_addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1145 backend_addresses.clear();
1146 for (
size_t i = kNumBackendsInResolution;
1147 i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++
i) {
1148 backend_addresses.emplace_back(AddressData{
backends_[
i]->port_,
""});
1150 SetNextResolution(balancer_addresses, backend_addresses);
1154 for (
size_t i = kNumBackendsInResolution;
1155 i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++
i) {
1161 CheckRpcSendOk(kNumBackendsInResolutionUpdate);
1166 for (
size_t i = 0;
i < kNumBackendsInResolution; ++
i) {
1169 for (
size_t i = kNumBackendsInResolution;
1170 i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++
i) {
1173 for (
size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
1180 for (
size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
1187 CheckRpcSendOk(
backends_.size() - kNumBackendsInResolution -
1188 kNumBackendsInResolutionUpdate);
1194 i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++
i) {
1197 for (
size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
1202 balancers_[0]->service_.NotifyDoneWithServerlists();
1209 TEST_F(SingleBalancerTest,
1210 FallbackAfterStartup_LoseContactWithBalancerThenBackends) {
1212 const size_t kNumFallbackBackends = 2;
1213 const size_t kNumBalancerBackends =
backends_.size() - kNumFallbackBackends;
1214 std::vector<AddressData> backend_addresses;
1215 for (
size_t i = 0;
i < kNumFallbackBackends; ++
i) {
1216 backend_addresses.emplace_back(AddressData{
backends_[
i]->port_,
""});
1218 std::vector<AddressData> balancer_addresses;
1220 balancer_addresses.emplace_back(AddressData{
balancers_[
i]->port_,
""});
1222 SetNextResolution(balancer_addresses, backend_addresses);
1223 ScheduleResponseForBalancer(
1224 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}),
1228 WaitForAllBackends(1 ,
1229 kNumFallbackBackends );
1232 CheckRpcSendOk(100 * kNumBalancerBackends);
1233 for (
size_t i = kNumFallbackBackends;
i <
backends_.size(); ++
i) {
1237 for (
size_t i = kNumFallbackBackends;
i <
backends_.size(); ++
i) {
1240 WaitForAllBackends(1 , 0 ,
1241 kNumFallbackBackends );
1245 for (
size_t i = kNumFallbackBackends;
i <
backends_.size(); ++
i) {
1248 CheckRpcSendOk(100 * kNumBalancerBackends);
1249 for (
size_t i = 0;
i < kNumFallbackBackends; ++
i) {
1255 ScheduleResponseForBalancer(
1256 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}),
1258 WaitForAllBackends(1 ,
1259 kNumFallbackBackends );
1262 TEST_F(SingleBalancerTest,
1263 FallbackAfterStartup_LoseContactWithBackendsThenBalancer) {
1265 const size_t kNumFallbackBackends = 2;
1266 const size_t kNumBalancerBackends =
backends_.size() - kNumFallbackBackends;
1267 std::vector<AddressData> backend_addresses;
1268 for (
size_t i = 0;
i < kNumFallbackBackends; ++
i) {
1269 backend_addresses.emplace_back(AddressData{
backends_[
i]->port_,
""});
1271 std::vector<AddressData> balancer_addresses;
1273 balancer_addresses.emplace_back(AddressData{
balancers_[
i]->port_,
""});
1275 SetNextResolution(balancer_addresses, backend_addresses);
1276 ScheduleResponseForBalancer(
1277 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}),
1281 WaitForAllBackends(1 ,
1282 kNumFallbackBackends );
1285 for (
size_t i = kNumFallbackBackends;
i <
backends_.size(); ++
i) {
1288 CheckRpcSendFailure();
1291 WaitForAllBackends(1 , 0 ,
1292 kNumFallbackBackends );
1296 for (
size_t i = kNumFallbackBackends;
i <
backends_.size(); ++
i) {
1299 CheckRpcSendOk(100 * kNumBalancerBackends);
1300 for (
size_t i = 0;
i < kNumFallbackBackends; ++
i) {
1306 ScheduleResponseForBalancer(
1307 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}),
1309 WaitForAllBackends(1 ,
1310 kNumFallbackBackends );
1313 TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) {
1315 ResetStub(kFallbackTimeoutMs);
1317 std::vector<AddressData> balancer_addresses;
1318 balancer_addresses.emplace_back(
1320 std::vector<AddressData> backend_addresses;
1321 backend_addresses.emplace_back(AddressData{
backends_[0]->port_,
""});
1322 SetNextResolution(balancer_addresses, backend_addresses);
1325 CheckRpcSendOk( 1, 1000,
1329 TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) {
1331 ResetStub(kFallbackTimeoutMs);
1333 std::vector<AddressData> balancer_addresses;
1334 balancer_addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1335 std::vector<AddressData> backend_addresses;
1336 backend_addresses.emplace_back(AddressData{
backends_[0]->port_,
""});
1337 SetNextResolution(balancer_addresses, backend_addresses);
1339 balancers_[0]->service_.NotifyDoneWithServerlists();
1342 CheckRpcSendOk( 1, 1000,
1346 TEST_F(SingleBalancerTest, FallbackControlledByBalancer_BeforeFirstServerlist) {
1348 ResetStub(kFallbackTimeoutMs);
1350 std::vector<AddressData> balancer_addresses;
1351 balancer_addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1352 std::vector<AddressData> backend_addresses;
1353 backend_addresses.emplace_back(AddressData{
backends_[0]->port_,
""});
1354 SetNextResolution(balancer_addresses, backend_addresses);
1356 LoadBalanceResponse
resp;
1357 resp.mutable_fallback_response();
1358 ScheduleResponseForBalancer(0,
resp, 0);
1361 CheckRpcSendOk( 1, 1000,
1365 TEST_F(SingleBalancerTest, FallbackControlledByBalancer_AfterFirstServerlist) {
1367 std::vector<AddressData> balancer_addresses;
1368 balancer_addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1369 std::vector<AddressData> backend_addresses;
1370 backend_addresses.emplace_back(AddressData{
backends_[0]->port_,
""});
1371 SetNextResolution(balancer_addresses, backend_addresses);
1375 LoadBalanceResponse serverlist_resp =
1376 BuildResponseForBackends({
backends_[1]->port_}, {});
1377 LoadBalanceResponse fallback_resp;
1378 fallback_resp.mutable_fallback_response();
1379 ScheduleResponseForBalancer(0, serverlist_resp, 0);
1380 ScheduleResponseForBalancer(0, fallback_resp, 100);
1381 ScheduleResponseForBalancer(0, serverlist_resp, 100);
1389 TEST_F(SingleBalancerTest, BackendsRestart) {
1390 SetNextResolutionAllBalancers();
1391 const size_t kNumRpcsPerAddress = 100;
1392 ScheduleResponseForBalancer(
1393 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
1399 ShutdownAllBackends();
1400 CheckRpcSendFailure();
1403 CheckRpcSendOk(1 , 2000 ,
1411 TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) {
1412 constexpr
char kServiceConfigWithTarget[] =
1414 " \"loadBalancingConfig\":[\n"
1416 " \"serviceName\":\"test_service\"\n"
1421 SetNextResolutionAllBalancers(kServiceConfigWithTarget);
1422 ScheduleResponseForBalancer(
1423 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
1427 WaitForAllBackends();
1431 class UpdatesTest :
public GrpclbEnd2endTest {
1433 UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
1436 TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) {
1437 SetNextResolutionAllBalancers();
1438 const std::vector<int> first_backend{GetBackendPorts()[0]};
1439 const std::vector<int> second_backend{GetBackendPorts()[1]};
1440 ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}),
1442 ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}),
1465 std::vector<AddressData> addresses;
1466 addresses.emplace_back(AddressData{
balancers_[1]->port_,
""});
1468 SetNextResolution(addresses);
1493 TEST_F(UpdatesTest, UpdateBalancersRepeated) {
1494 SetNextResolutionAllBalancers();
1495 const std::vector<int> first_backend{GetBackendPorts()[0]};
1496 const std::vector<int> second_backend{GetBackendPorts()[0]};
1498 ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}),
1500 ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}),
1514 balancers_[0]->service_.NotifyDoneWithServerlists();
1524 std::vector<AddressData> addresses;
1525 addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1526 addresses.emplace_back(AddressData{
balancers_[1]->port_,
""});
1527 addresses.emplace_back(AddressData{
balancers_[2]->port_,
""});
1529 SetNextResolution(addresses);
1542 balancers_[0]->service_.NotifyDoneWithServerlists();
1545 addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1546 addresses.emplace_back(AddressData{
balancers_[1]->port_,
""});
1548 SetNextResolution(addresses);
1561 balancers_[0]->service_.NotifyDoneWithServerlists();
1564 TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
1565 std::vector<AddressData> addresses;
1566 addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1567 SetNextResolution(addresses);
1568 const std::vector<int> first_backend{GetBackendPorts()[0]};
1569 const std::vector<int> second_backend{GetBackendPorts()[1]};
1571 ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}),
1573 ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}),
1584 gpr_log(
GPR_INFO,
"********** ABOUT TO KILL BALANCER 0 *************");
1606 addresses.emplace_back(AddressData{
balancers_[1]->port_,
""});
1608 SetNextResolution(addresses);
1639 TEST_F(UpdatesTest, ReresolveDeadBackend) {
1643 std::vector<AddressData> balancer_addresses;
1644 balancer_addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1645 std::vector<AddressData> backend_addresses;
1646 backend_addresses.emplace_back(AddressData{
backends_[0]->port_,
""});
1647 SetNextResolution(balancer_addresses, backend_addresses);
1652 balancer_addresses.clear();
1653 balancer_addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1654 backend_addresses.clear();
1655 backend_addresses.emplace_back(AddressData{
backends_[1]->port_,
""});
1656 SetNextReresolutionResponse(balancer_addresses, backend_addresses);
1666 gpr_log(
GPR_INFO,
"********** ABOUT TO KILL BACKEND 0 *************");
1680 balancers_[0]->service_.NotifyDoneWithServerlists();
1681 balancers_[1]->service_.NotifyDoneWithServerlists();
1682 balancers_[2]->service_.NotifyDoneWithServerlists();
1696 class UpdatesWithClientLoadReportingTest :
public GrpclbEnd2endTest {
1698 UpdatesWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 3, 2) {}
1701 TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) {
1702 const std::vector<int> first_backend{GetBackendPorts()[0]};
1703 const std::vector<int> second_backend{GetBackendPorts()[1]};
1704 ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}),
1706 ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}),
1711 std::vector<AddressData> addresses;
1712 addresses.emplace_back(AddressData{
balancers_[0]->port_,
""});
1713 SetNextResolution(addresses);
1715 addresses.emplace_back(AddressData{
balancers_[1]->port_,
""});
1716 SetNextReresolutionResponse(addresses);
1726 gpr_log(
GPR_INFO,
"********** ABOUT TO KILL BACKEND 0 *************");
1730 CheckRpcSendFailure();
1742 gpr_log(
GPR_INFO,
"********** ABOUT TO KILL BALANCER 0 *************");
1773 TEST_F(SingleBalancerTest, Drop) {
1774 SetNextResolutionAllBalancers();
1775 const size_t kNumRpcsPerAddress = 100;
1776 const int num_of_drop_by_rate_limiting_addresses = 1;
1777 const int num_of_drop_by_load_balancing_addresses = 2;
1778 const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses +
1779 num_of_drop_by_load_balancing_addresses;
1780 const int num_total_addresses =
num_backends_ + num_of_drop_addresses;
1781 ScheduleResponseForBalancer(
1783 BuildResponseForBackends(
1785 {{
"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1786 {
"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1789 WaitForAllBackends();
1791 size_t num_drops = 0;
1792 for (
size_t i = 0;
i < kNumRpcsPerAddress * num_total_addresses; ++
i) {
1796 status.error_message() ==
"drop directed by grpclb balancer") {
1800 <<
" message=" <<
status.error_message();
1804 EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
1815 TEST_F(SingleBalancerTest, DropAllFirst) {
1816 SetNextResolutionAllBalancers();
1818 const int num_of_drop_by_rate_limiting_addresses = 1;
1819 const int num_of_drop_by_load_balancing_addresses = 1;
1820 ScheduleResponseForBalancer(
1822 BuildResponseForBackends(
1823 {}, {{
"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1824 {
"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1828 EXPECT_EQ(
status.error_message(),
"drop directed by grpclb balancer");
1831 TEST_F(SingleBalancerTest, DropAll) {
1832 SetNextResolutionAllBalancers();
1833 ScheduleResponseForBalancer(
1834 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
1835 const int num_of_drop_by_rate_limiting_addresses = 1;
1836 const int num_of_drop_by_load_balancing_addresses = 1;
1837 ScheduleResponseForBalancer(
1839 BuildResponseForBackends(
1840 {}, {{
"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1841 {
"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1853 EXPECT_EQ(
status.error_message(),
"drop directed by grpclb balancer");
1856 class SingleBalancerWithClientLoadReportingTest :
public GrpclbEnd2endTest {
1858 SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 3) {}
1861 TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
1862 SetNextResolutionAllBalancers();
1863 const size_t kNumRpcsPerAddress = 100;
1864 ScheduleResponseForBalancer(
1865 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
1868 int num_failure = 0;
1870 std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
1877 balancers_[0]->service_.NotifyDoneWithServerlists();
1883 ClientStats client_stats;
1885 client_stats += WaitForLoadReports();
1886 }
while (client_stats.num_calls_finished !=
1889 client_stats.num_calls_started);
1891 client_stats.num_calls_finished);
1892 EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
1894 client_stats.num_calls_finished_known_received);
1898 TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
1899 SetNextResolutionAllBalancers();
1900 const size_t kNumBackendsFirstPass = 2;
1901 const size_t kNumBackendsSecondPass =
1902 backends_.size() - kNumBackendsFirstPass;
1904 ScheduleResponseForBalancer(
1906 BuildResponseForBackends(GetBackendPorts(0, kNumBackendsFirstPass), {}),
1910 int num_failure = 0;
1912 std::tie(num_ok, num_failure, num_drops) =
1913 WaitForAllBackends( 1, 0,
1914 kNumBackendsFirstPass);
1915 balancers_[0]->service_.NotifyDoneWithServerlists();
1916 ClientStats client_stats = WaitForLoadReports();
1917 EXPECT_EQ(
static_cast<size_t>(num_ok), client_stats.num_calls_started);
1918 EXPECT_EQ(
static_cast<size_t>(num_ok), client_stats.num_calls_finished);
1919 EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
1921 client_stats.num_calls_finished_known_received);
1927 ResetBackendCounters();
1928 CheckRpcSendOk(kNumBackendsFirstPass);
1930 for (
size_t i = 0;
i < kNumBackendsFirstPass; ++
i) {
1935 ScheduleResponseForBalancer(
1936 0, BuildResponseForBackends(GetBackendPorts(kNumBackendsFirstPass), {}),
1943 backends_[3]->service_.request_count() == 0);
1945 CheckRpcSendOk(kNumBackendsSecondPass);
1946 balancers_[0]->service_.NotifyDoneWithServerlists();
1948 client_stats = WaitForLoadReports();
1949 EXPECT_EQ(kNumBackendsSecondPass + 1, client_stats.num_calls_started);
1950 EXPECT_EQ(kNumBackendsSecondPass + 1, client_stats.num_calls_finished);
1951 EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
1953 client_stats.num_calls_finished_known_received);
1957 TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
1958 SetNextResolutionAllBalancers();
1959 const size_t kNumRpcsPerAddress = 3;
1960 const int num_of_drop_by_rate_limiting_addresses = 2;
1961 const int num_of_drop_by_load_balancing_addresses = 1;
1962 const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses +
1963 num_of_drop_by_load_balancing_addresses;
1964 const int num_total_addresses =
num_backends_ + num_of_drop_addresses;
1965 ScheduleResponseForBalancer(
1967 BuildResponseForBackends(
1969 {{
"rate_limiting", num_of_drop_by_rate_limiting_addresses},
1970 {
"load_balancing", num_of_drop_by_load_balancing_addresses}}),
1973 int num_warmup_ok = 0;
1974 int num_warmup_failure = 0;
1975 int num_warmup_drops = 0;
1976 std::tie(num_warmup_ok, num_warmup_failure, num_warmup_drops) =
1977 WaitForAllBackends(num_total_addresses );
1978 const int num_total_warmup_requests =
1979 num_warmup_ok + num_warmup_failure + num_warmup_drops;
1980 size_t num_drops = 0;
1981 for (
size_t i = 0;
i < kNumRpcsPerAddress * num_total_addresses; ++
i) {
1985 status.error_message() ==
"drop directed by grpclb balancer") {
1989 <<
" message=" <<
status.error_message();
1993 EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
1998 balancers_[0]->service_.NotifyDoneWithServerlists();
2004 const ClientStats client_stats = WaitForLoadReports();
2006 kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests,
2007 client_stats.num_calls_started);
2009 kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests,
2010 client_stats.num_calls_finished);
2011 EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
2013 client_stats.num_calls_finished_known_received);
2017 const int num_times_drop_addresses_hit =
2018 num_warmup_drops / num_of_drop_addresses;
2020 client_stats.drop_token_counts,
2023 (kNumRpcsPerAddress + num_times_drop_addresses_hit)),
2026 (kNumRpcsPerAddress + num_times_drop_addresses_hit) * 2)));