19 #include <gmock/gmock.h>
20 #include <gtest/gtest.h>
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
30 #include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
31 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
39 using ::envoy::config::cluster::v3::CustomClusterType;
40 using ::envoy::config::endpoint::v3::HealthStatus;
41 using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig;
43 class RingHashTest :
public XdsEnd2endTest {
45 void SetUp()
override {
47 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
49 ChannelArguments
args;
50 args.SetPointerWithVtable(
58 const std::vector<int>& ports) {
60 for (
int port : ports) {
66 addresses.emplace_back(address.
addr, address.
len,
nullptr);
85 XdsTest, RingHashTest,
86 ::
testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
89 TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) {
90 CreateAndStartBackends(2);
91 const char* kNewCluster1Name =
"new_cluster_1";
92 const char* kNewEdsService1Name =
"new_eds_service_name_1";
93 const char* kNewCluster2Name =
"new_cluster_2";
94 const char* kNewEdsService2Name =
"new_eds_service_name_2";
96 EdsResourceArgs args1({
97 {
"locality0", {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}},
99 EdsResourceArgs args2({
100 {
"locality0", CreateEndpointsForBackends()},
102 balancer_->ads_service()->SetEdsResource(
103 BuildEdsResource(args1, kNewEdsService1Name));
104 balancer_->ads_service()->SetEdsResource(
105 BuildEdsResource(args2, kNewEdsService2Name));
107 Cluster new_cluster1 = default_cluster_;
108 new_cluster1.set_name(kNewCluster1Name);
109 new_cluster1.mutable_eds_cluster_config()->set_service_name(
110 kNewEdsService1Name);
111 balancer_->ads_service()->SetCdsResource(new_cluster1);
112 Cluster new_cluster2 = default_cluster_;
113 new_cluster2.set_name(kNewCluster2Name);
114 new_cluster2.mutable_eds_cluster_config()->set_service_name(
115 kNewEdsService2Name);
116 balancer_->ads_service()->SetCdsResource(new_cluster2);
118 auto cluster = default_cluster_;
119 cluster.set_lb_policy(Cluster::RING_HASH);
120 CustomClusterType* custom_cluster =
cluster.mutable_cluster_type();
121 custom_cluster->set_name(
"envoy.clusters.aggregate");
122 ClusterConfig cluster_config;
123 cluster_config.add_clusters(kNewCluster1Name);
124 cluster_config.add_clusters(kNewCluster2Name);
125 custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
126 balancer_->ads_service()->SetCdsResource(
cluster);
128 auto new_route_config = default_route_config_;
129 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
130 auto* hash_policy =
route->mutable_route()->add_hash_policy();
131 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
132 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
150 AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
151 CreateAndStartBackends(1);
152 const char* kEdsClusterName =
"eds_cluster";
153 const char* kLogicalDNSClusterName =
"logical_dns_cluster";
155 EdsResourceArgs
args({
157 {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
158 kDefaultLocalityWeight,
161 {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
162 kDefaultLocalityWeight,
165 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
167 Cluster eds_cluster = default_cluster_;
168 eds_cluster.set_name(kEdsClusterName);
169 balancer_->ads_service()->SetCdsResource(eds_cluster);
171 auto logical_dns_cluster = default_cluster_;
172 logical_dns_cluster.set_name(kLogicalDNSClusterName);
173 logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
174 auto* address = logical_dns_cluster.mutable_load_assignment()
179 ->mutable_socket_address();
180 address->set_address(kServerName);
181 address->set_port_value(443);
182 balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
184 auto cluster = default_cluster_;
185 cluster.set_lb_policy(Cluster::RING_HASH);
186 CustomClusterType* custom_cluster =
cluster.mutable_cluster_type();
187 custom_cluster->set_name(
"envoy.clusters.aggregate");
188 ClusterConfig cluster_config;
189 cluster_config.add_clusters(kEdsClusterName);
190 cluster_config.add_clusters(kLogicalDNSClusterName);
191 custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
192 balancer_->ads_service()->SetCdsResource(
cluster);
194 auto new_route_config = default_route_config_;
195 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
196 auto* hash_policy =
route->mutable_route()->add_hash_policy();
197 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
198 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
204 result.addresses = CreateAddressListFromPortList(GetBackendPorts());
209 ConnectionDelayInjector delay_injector(
211 delay_injector.Start();
214 CheckRpcSendOk(
DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
219 TEST_P(RingHashTest, ChannelIdHashing) {
220 CreateAndStartBackends(4);
221 auto cluster = default_cluster_;
222 cluster.set_lb_policy(Cluster::RING_HASH);
223 balancer_->ads_service()->SetCdsResource(
cluster);
224 auto new_route_config = default_route_config_;
225 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
226 auto* hash_policy =
route->mutable_route()->add_hash_policy();
227 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
228 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
230 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
231 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
247 TEST_P(RingHashTest, HeaderHashing) {
248 CreateAndStartBackends(4);
249 auto cluster = default_cluster_;
250 cluster.set_lb_policy(Cluster::RING_HASH);
251 balancer_->ads_service()->SetCdsResource(
cluster);
252 auto new_route_config = default_route_config_;
253 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
254 auto* hash_policy =
route->mutable_route()->add_hash_policy();
255 hash_policy->mutable_header()->set_header_name(
"address_hash");
256 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
258 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
259 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
263 std::vector<std::pair<std::string, std::string>>
metadata = {
264 {
"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
265 std::vector<std::pair<std::string, std::string>> metadata1 = {
266 {
"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
267 std::vector<std::pair<std::string, std::string>> metadata2 = {
268 {
"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
269 std::vector<std::pair<std::string, std::string>> metadata3 = {
270 {
"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
272 const auto rpc_options1 = RpcOptions().set_metadata(
std::move(metadata1));
273 const auto rpc_options2 = RpcOptions().set_metadata(
std::move(metadata2));
274 const auto rpc_options3 = RpcOptions().set_metadata(
std::move(metadata3));
276 WaitForBackendOptions(), rpc_options);
278 WaitForBackendOptions(), rpc_options1);
280 WaitForBackendOptions(), rpc_options2);
282 WaitForBackendOptions(), rpc_options3);
294 TEST_P(RingHashTest, HeaderHashingWithRegexRewrite) {
295 CreateAndStartBackends(4);
296 auto cluster = default_cluster_;
297 cluster.set_lb_policy(Cluster::RING_HASH);
298 balancer_->ads_service()->SetCdsResource(
cluster);
299 auto new_route_config = default_route_config_;
300 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
301 auto* hash_policy =
route->mutable_route()->add_hash_policy();
302 hash_policy->mutable_header()->set_header_name(
"address_hash");
303 hash_policy->mutable_header()
304 ->mutable_regex_rewrite()
306 ->set_regex(
"[0-9]+");
307 hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
309 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
311 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
312 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
313 std::vector<std::pair<std::string, std::string>>
metadata = {
314 {
"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
315 std::vector<std::pair<std::string, std::string>> metadata1 = {
316 {
"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
317 std::vector<std::pair<std::string, std::string>> metadata2 = {
318 {
"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
319 std::vector<std::pair<std::string, std::string>> metadata3 = {
320 {
"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
322 const auto rpc_options1 = RpcOptions().set_metadata(
std::move(metadata1));
323 const auto rpc_options2 = RpcOptions().set_metadata(
std::move(metadata2));
324 const auto rpc_options3 = RpcOptions().set_metadata(
std::move(metadata3));
342 TEST_P(RingHashTest, NoHashPolicy) {
343 CreateAndStartBackends(2);
344 const double kDistribution50Percent = 0.5;
345 const double kErrorTolerance = 0.05;
346 const uint32_t kRpcTimeoutMs = 10000;
348 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
349 auto cluster = default_cluster_;
351 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
353 cluster.set_lb_policy(Cluster::RING_HASH);
354 balancer_->ads_service()->SetCdsResource(
cluster);
355 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
356 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
360 WaitForBackendOptions(),
361 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
363 const int request_count_1 =
backends_[0]->backend_service()->request_count();
364 const int request_count_2 =
backends_[1]->backend_service()->request_count();
372 TEST_P(RingHashTest, EndpointWeights) {
373 CreateAndStartBackends(3);
374 const double kDistribution50Percent = 0.5;
375 const double kDistribution25Percent = 0.25;
376 const double kErrorTolerance = 0.05;
377 const uint32_t kRpcTimeoutMs = 10000;
379 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
380 auto cluster = default_cluster_;
382 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
384 cluster.set_lb_policy(Cluster::RING_HASH);
385 balancer_->ads_service()->SetCdsResource(
cluster);
389 EdsResourceArgs
args(
391 {CreateEndpoint(0, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
393 CreateEndpoint(1, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
395 CreateEndpoint(2, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
397 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
401 WaitForBackendOptions(),
402 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
406 const int request_count_0 =
backends_[0]->backend_service()->request_count();
407 const int request_count_1 =
backends_[1]->backend_service()->request_count();
408 const int request_count_2 =
backends_[2]->backend_service()->request_count();
419 TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {
420 CreateAndStartBackends(2);
421 auto cluster = default_cluster_;
422 cluster.set_lb_policy(Cluster::RING_HASH);
423 balancer_->ads_service()->SetCdsResource(
cluster);
424 auto new_route_config = default_route_config_;
425 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
426 auto* hash_policy =
route->mutable_route()->add_hash_policy();
427 hash_policy->mutable_header()->set_header_name(
"header_not_present");
428 hash_policy->set_terminal(
true);
429 auto* hash_policy2 =
route->mutable_route()->add_hash_policy();
430 hash_policy2->mutable_header()->set_header_name(
"address_hash");
431 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
433 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
434 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
435 std::vector<std::pair<std::string, std::string>>
metadata = {
436 {
"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
445 TEST_P(RingHashTest, HashOnHeaderThatIsNotPresent) {
446 CreateAndStartBackends(2);
447 const double kDistribution50Percent = 0.5;
448 const double kErrorTolerance = 0.05;
449 const uint32_t kRpcTimeoutMs = 10000;
451 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
452 auto cluster = default_cluster_;
454 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
456 cluster.set_lb_policy(Cluster::RING_HASH);
457 balancer_->ads_service()->SetCdsResource(
cluster);
458 auto new_route_config = default_route_config_;
459 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
460 auto* hash_policy =
route->mutable_route()->add_hash_policy();
461 hash_policy->mutable_header()->set_header_name(
"header_not_present");
462 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
464 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
465 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
466 std::vector<std::pair<std::string, std::string>>
metadata = {
473 WaitForBackendOptions(),
474 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
476 const int request_count_1 =
backends_[0]->backend_service()->request_count();
477 const int request_count_2 =
backends_[1]->backend_service()->request_count();
486 TEST_P(RingHashTest, UnsupportedHashPolicyDefaultToRandomHashing) {
487 CreateAndStartBackends(2);
488 const double kDistribution50Percent = 0.5;
489 const double kErrorTolerance = 0.05;
490 const uint32_t kRpcTimeoutMs = 10000;
492 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
493 auto cluster = default_cluster_;
495 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
497 cluster.set_lb_policy(Cluster::RING_HASH);
498 balancer_->ads_service()->SetCdsResource(
cluster);
499 auto new_route_config = default_route_config_;
500 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
501 auto* hash_policy_unsupported_1 =
route->mutable_route()->add_hash_policy();
502 hash_policy_unsupported_1->mutable_cookie()->set_name(
"cookie");
503 auto* hash_policy_unsupported_2 =
route->mutable_route()->add_hash_policy();
504 hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
506 auto* hash_policy_unsupported_3 =
route->mutable_route()->add_hash_policy();
507 hash_policy_unsupported_3->mutable_query_parameter()->set_name(
509 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
511 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
512 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
516 WaitForBackendOptions(),
517 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
519 const int request_count_1 =
backends_[0]->backend_service()->request_count();
520 const int request_count_2 =
backends_[1]->backend_service()->request_count();
529 TEST_P(RingHashTest, RandomHashingDistributionAccordingToEndpointWeight) {
530 CreateAndStartBackends(2);
531 const size_t kWeight1 = 1;
532 const size_t kWeight2 = 2;
533 const size_t kWeightTotal = kWeight1 + kWeight2;
534 const double kWeight33Percent =
static_cast<double>(kWeight1) / kWeightTotal;
535 const double kWeight66Percent =
static_cast<double>(kWeight2) / kWeightTotal;
536 const double kErrorTolerance = 0.05;
537 const uint32_t kRpcTimeoutMs = 10000;
539 ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
540 auto cluster = default_cluster_;
542 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
544 cluster.set_lb_policy(Cluster::RING_HASH);
545 balancer_->ads_service()->SetCdsResource(
cluster);
546 EdsResourceArgs
args({{
"locality0",
547 {CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
548 CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
549 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
553 WaitForBackendOptions(),
554 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
556 const int weight_33_request_count =
557 backends_[0]->backend_service()->request_count();
558 const int weight_66_request_count =
559 backends_[1]->backend_service()->request_count();
569 RandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
570 CreateAndStartBackends(2);
571 const size_t kWeight1 = 1 * 1;
572 const size_t kWeight2 = 2 * 2;
573 const size_t kWeightTotal = kWeight1 + kWeight2;
574 const double kWeight20Percent =
static_cast<double>(kWeight1) / kWeightTotal;
575 const double kWeight80Percent =
static_cast<double>(kWeight2) / kWeightTotal;
576 const double kErrorTolerance = 0.05;
577 const uint32_t kRpcTimeoutMs = 10000;
579 ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
580 auto cluster = default_cluster_;
582 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
584 cluster.set_lb_policy(Cluster::RING_HASH);
585 balancer_->ads_service()->SetCdsResource(
cluster);
586 EdsResourceArgs
args(
587 {{
"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
588 {
"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
589 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
593 WaitForBackendOptions(),
594 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
596 const int weight_20_request_count =
597 backends_[0]->backend_service()->request_count();
598 const int weight_80_request_count =
599 backends_[1]->backend_service()->request_count();
609 TEST_P(RingHashTest, FixedHashingTerminalPolicy) {
610 CreateAndStartBackends(2);
611 auto cluster = default_cluster_;
612 cluster.set_lb_policy(Cluster::RING_HASH);
613 balancer_->ads_service()->SetCdsResource(
cluster);
614 auto new_route_config = default_route_config_;
615 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
616 auto* hash_policy =
route->mutable_route()->add_hash_policy();
617 hash_policy->mutable_header()->set_header_name(
"fixed_string");
618 hash_policy->set_terminal(
true);
619 auto* hash_policy_to_be_ignored =
route->mutable_route()->add_hash_policy();
620 hash_policy_to_be_ignored->mutable_header()->set_header_name(
"random_string");
621 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
623 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
624 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
625 std::vector<std::pair<std::string, std::string>>
metadata = {
626 {
"fixed_string",
"fixed_value"},
646 TEST_P(RingHashTest, IdleToReady) {
647 CreateAndStartBackends(1);
648 auto cluster = default_cluster_;
649 cluster.set_lb_policy(Cluster::RING_HASH);
650 balancer_->ads_service()->SetCdsResource(
cluster);
651 auto new_route_config = default_route_config_;
652 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
653 auto* hash_policy =
route->mutable_route()->add_hash_policy();
654 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
655 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
657 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
658 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
666 TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
668 CreateAndStartBackends(1);
669 auto non_existant_endpoint = MakeNonExistantEndpoint();
670 EdsResourceArgs
args(
671 {{
"locality0", {non_existant_endpoint, CreateEndpoint(0)}}});
672 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
674 auto cluster = default_cluster_;
675 cluster.set_lb_policy(Cluster::RING_HASH);
676 balancer_->ads_service()->SetCdsResource(
cluster);
678 auto new_route_config = default_route_config_;
679 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
680 auto* hash_policy =
route->mutable_route()->add_hash_policy();
681 hash_policy->mutable_header()->set_header_name(
"address_hash");
682 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
686 ConnectionHoldInjector injector;
688 auto hold = injector.AddHold(non_existant_endpoint.port);
691 std::vector<std::pair<std::string, std::string>>
metadata = {
693 CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}};
694 rpc.StartRpc(
stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
710 TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
712 CreateAndStartBackends(1);
713 auto non_existant_endpoint0 = MakeNonExistantEndpoint();
714 auto non_existant_endpoint1 = MakeNonExistantEndpoint();
715 auto non_existant_endpoint2 = MakeNonExistantEndpoint();
716 EdsResourceArgs
args({{
"locality0",
717 {non_existant_endpoint0, non_existant_endpoint1,
718 non_existant_endpoint2, CreateEndpoint(0)}}});
719 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
721 auto cluster = default_cluster_;
722 cluster.set_lb_policy(Cluster::RING_HASH);
723 balancer_->ads_service()->SetCdsResource(
cluster);
725 auto new_route_config = default_route_config_;
726 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
727 auto* hash_policy =
route->mutable_route()->add_hash_policy();
728 hash_policy->mutable_header()->set_header_name(
"address_hash");
729 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
732 ConnectionHoldInjector injector;
734 auto hold_non_existant0 = injector.AddHold(non_existant_endpoint0.port);
735 auto hold_non_existant1 = injector.AddHold(non_existant_endpoint1.port);
736 auto hold_non_existant2 = injector.AddHold(non_existant_endpoint2.port);
740 std::vector<std::pair<std::string, std::string>>
metadata = {
741 {
"address_hash", CreateMetadataValueThatHashesToBackendPort(
742 non_existant_endpoint0.port)}};
743 rpc.StartRpc(
stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
747 hold_non_existant0->Wait();
755 auto hold_non_existant0_again = injector.AddHold(non_existant_endpoint0.port);
756 hold_non_existant0->Resume();
757 hold_non_existant1->Wait();
764 auto hold_non_existant1_again = injector.AddHold(non_existant_endpoint1.port);
765 hold_non_existant1->Resume();
766 hold_non_existant2->Wait();
773 auto hold_non_existant2_again = injector.AddHold(non_existant_endpoint2.port);
774 hold_non_existant2->Resume();
795 TEST_P(RingHashTest, TransientFailureCheckNextOne) {
796 CreateAndStartBackends(1);
797 auto cluster = default_cluster_;
798 cluster.set_lb_policy(Cluster::RING_HASH);
799 balancer_->ads_service()->SetCdsResource(
cluster);
800 auto new_route_config = default_route_config_;
801 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
802 auto* hash_policy =
route->mutable_route()->add_hash_policy();
803 hash_policy->mutable_header()->set_header_name(
"address_hash");
804 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
806 std::vector<EdsResourceArgs::Endpoint> endpoints;
808 endpoints.emplace_back(unused_port);
811 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
812 std::vector<std::pair<std::string, std::string>>
metadata = {
814 CreateMetadataValueThatHashesToBackendPort(unused_port)}};
817 WaitForBackendOptions(), rpc_options);
824 TEST_P(RingHashTest, SwitchToLowerPrioirtyAndThenBack) {
825 CreateAndStartBackends(2);
826 auto cluster = default_cluster_;
827 cluster.set_lb_policy(Cluster::RING_HASH);
828 balancer_->ads_service()->SetCdsResource(
cluster);
829 auto new_route_config = default_route_config_;
830 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
831 auto* hash_policy =
route->mutable_route()->add_hash_policy();
832 hash_policy->mutable_header()->set_header_name(
"address_hash");
833 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
835 EdsResourceArgs
args({
836 {
"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
838 {
"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
841 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
842 std::vector<std::pair<std::string, std::string>>
metadata = {
843 {
"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
846 WaitForBackendOptions(), rpc_options);
847 backends_[0]->StopListeningAndSendGoaways();
849 WaitForBackendOptions(), rpc_options);
853 WaitForBackendOptions(), rpc_options);
860 TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
861 CreateAndStartBackends(1);
862 const uint32_t kConnectionTimeoutMilliseconds = 5000;
863 auto cluster = default_cluster_;
864 cluster.set_lb_policy(Cluster::RING_HASH);
865 balancer_->ads_service()->SetCdsResource(
cluster);
866 auto new_route_config = default_route_config_;
867 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
868 auto* hash_policy =
route->mutable_route()->add_hash_policy();
869 hash_policy->mutable_header()->set_header_name(
"address_hash");
870 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
872 EdsResourceArgs
args(
873 {{
"locality0", {MakeNonExistantEndpoint(), CreateEndpoint(0)}}});
874 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
875 std::vector<std::pair<std::string, std::string>>
metadata = {
876 {
"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
881 "ring hash cannot find a connected subchannel; first failure: "
882 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
883 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
893 TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
895 const uint32_t kConnectionTimeoutMilliseconds = 5000;
896 auto cluster = default_cluster_;
897 cluster.set_lb_policy(Cluster::RING_HASH);
898 balancer_->ads_service()->SetCdsResource(
cluster);
899 auto new_route_config = default_route_config_;
900 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
901 auto* hash_policy =
route->mutable_route()->add_hash_policy();
902 hash_policy->mutable_header()->set_header_name(
"address_hash");
903 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
906 EdsResourceArgs
args({
908 {CreateEndpoint(0), CreateEndpoint(1), MakeNonExistantEndpoint(),
909 MakeNonExistantEndpoint()}},
911 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
912 std::vector<std::pair<std::string, std::string>>
metadata = {
913 {
"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
914 const auto rpc_options = RpcOptions()
916 .set_timeout_ms(kConnectionTimeoutMilliseconds);
921 "ring hash cannot find a connected subchannel; first failure: "
922 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
923 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
938 WaitForBackendOptions(), rpc_options);
958 "ring hash cannot find a connected subchannel; first failure: "
959 "(UNKNOWN: Failed to connect to remote host: Connection refused|"
960 "UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
969 WaitForBackendOptions(), rpc_options);
977 TEST_P(RingHashTest, ReattemptWhenGoingFromTransientFailureToIdle) {
978 CreateAndStartBackends(1);
979 const uint32_t kConnectionTimeoutMilliseconds = 5000;
980 auto cluster = default_cluster_;
981 cluster.set_lb_policy(Cluster::RING_HASH);
982 balancer_->ads_service()->SetCdsResource(
cluster);
983 auto new_route_config = default_route_config_;
984 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
987 EdsResourceArgs
args(
988 {{
"locality0", std::vector<EdsResourceArgs::Endpoint>()}});
989 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
997 "empty address list: ",
998 RpcOptions().set_timeout_ms(kConnectionTimeoutMilliseconds));
1001 args = EdsResourceArgs({{
"locality0", CreateEndpointsForBackends()}});
1002 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
1006 .set_timeout_ms(kConnectionTimeoutMilliseconds)
1007 .set_wait_for_ready(
true));
1013 TEST_P(RingHashTest, UnsupportedHashPolicyUntilChannelIdHashing) {
1014 CreateAndStartBackends(2);
1015 auto cluster = default_cluster_;
1016 cluster.set_lb_policy(Cluster::RING_HASH);
1017 balancer_->ads_service()->SetCdsResource(
cluster);
1018 auto new_route_config = default_route_config_;
1019 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1020 auto* hash_policy_unsupported_1 =
route->mutable_route()->add_hash_policy();
1021 hash_policy_unsupported_1->mutable_cookie()->set_name(
"cookie");
1022 auto* hash_policy_unsupported_2 =
route->mutable_route()->add_hash_policy();
1023 hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
1025 auto* hash_policy_unsupported_3 =
route->mutable_route()->add_hash_policy();
1026 hash_policy_unsupported_3->mutable_query_parameter()->set_name(
1028 auto* hash_policy =
route->mutable_route()->add_hash_policy();
1029 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
1030 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1032 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
1033 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
1049 TEST_P(RingHashTest, InvalidHashFunction) {
1050 CreateAndStartBackends(1);
1051 auto cluster = default_cluster_;
1052 cluster.set_lb_policy(Cluster::RING_HASH);
1053 cluster.mutable_ring_hash_lb_config()->set_hash_function(
1054 Cluster::RingHashLbConfig::MURMUR_HASH_2);
1055 balancer_->ads_service()->SetCdsResource(
cluster);
1056 auto new_route_config = default_route_config_;
1057 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1058 auto* hash_policy =
route->mutable_route()->add_hash_policy();
1059 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
1060 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1062 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
1063 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
1065 ASSERT_TRUE(response_state.has_value()) <<
"timed out waiting for NACK";
1067 response_state->error_message,
1072 TEST_P(RingHashTest, InvalidMinimumRingSize) {
1073 CreateAndStartBackends(1);
1074 auto cluster = default_cluster_;
1075 cluster.set_lb_policy(Cluster::RING_HASH);
1076 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
1078 balancer_->ads_service()->SetCdsResource(
cluster);
1079 auto new_route_config = default_route_config_;
1080 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1081 auto* hash_policy =
route->mutable_route()->add_hash_policy();
1082 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
1083 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1085 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
1086 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
1088 ASSERT_TRUE(response_state.has_value()) <<
"timed out waiting for NACK";
1091 "min_ring_size is not in the range of 1 to 8388608."));
1095 TEST_P(RingHashTest, InvalidMaxmumRingSize) {
1096 CreateAndStartBackends(1);
1097 auto cluster = default_cluster_;
1098 cluster.set_lb_policy(Cluster::RING_HASH);
1099 cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
1101 balancer_->ads_service()->SetCdsResource(
cluster);
1102 auto new_route_config = default_route_config_;
1103 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1104 auto* hash_policy =
route->mutable_route()->add_hash_policy();
1105 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
1106 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1108 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
1109 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
1111 ASSERT_TRUE(response_state.has_value()) <<
"timed out waiting for NACK";
1114 "max_ring_size is not in the range of 1 to 8388608."));
1118 TEST_P(RingHashTest, InvalidRingSizeMinGreaterThanMax) {
1119 CreateAndStartBackends(1);
1120 auto cluster = default_cluster_;
1121 cluster.set_lb_policy(Cluster::RING_HASH);
1122 cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
1124 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
1126 balancer_->ads_service()->SetCdsResource(
cluster);
1127 auto new_route_config = default_route_config_;
1128 auto*
route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1129 auto* hash_policy =
route->mutable_route()->add_hash_policy();
1130 hash_policy->mutable_filter_state()->set_key(
"io.grpc.channel_id");
1131 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1133 EdsResourceArgs
args({{
"locality0", CreateEndpointsForBackends()}});
1134 balancer_->ads_service()->SetEdsResource(BuildEdsResource(
args));
1136 ASSERT_TRUE(response_state.has_value()) <<
"timed out waiting for NACK";
1139 "min_ring_size cannot be greater than max_ring_size."));
1152 #if TARGET_OS_IPHONE