xds_ring_hash_end2end_test.cc
Go to the documentation of this file.
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <string>
17 #include <vector>
18 
19 #include <gmock/gmock.h>
20 #include <gtest/gtest.h>
21 
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24 
29 #include "src/core/lib/gpr/env.h"
30 #include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
31 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
34 
35 namespace grpc {
36 namespace testing {
37 namespace {
38 
39 using ::envoy::config::cluster::v3::CustomClusterType;
40 using ::envoy::config::endpoint::v3::HealthStatus;
41 using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig;
42 
43 class RingHashTest : public XdsEnd2endTest {
44  protected:
45  void SetUp() override {
47  grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
48  InitClient();
49  ChannelArguments args;
50  args.SetPointerWithVtable(
54  ResetStub(/*failover_timeout_ms=*/0, &args);
55  }
56 
57  grpc_core::ServerAddressList CreateAddressListFromPortList(
58  const std::vector<int>& ports) {
60  for (int port : ports) {
62  absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port));
63  GPR_ASSERT(lb_uri.ok());
64  grpc_resolved_address address;
65  GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
66  addresses.emplace_back(address.addr, address.len, nullptr);
67  }
68  return addresses;
69  }
70 
71  std::string CreateMetadataValueThatHashesToBackendPort(int port) {
72  return absl::StrCat(ipv6_only_ ? "[::1]" : "127.0.0.1", ":", port, "_0");
73  }
74 
75  std::string CreateMetadataValueThatHashesToBackend(int index) {
76  return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
77  }
78 
81 };
82 
83 // Run both with and without load reporting, just for test coverage.
85  XdsTest, RingHashTest,
86  ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
88 
89 TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) {
90  CreateAndStartBackends(2);
91  const char* kNewCluster1Name = "new_cluster_1";
92  const char* kNewEdsService1Name = "new_eds_service_name_1";
93  const char* kNewCluster2Name = "new_cluster_2";
94  const char* kNewEdsService2Name = "new_eds_service_name_2";
95  // Populate new EDS resources.
96  EdsResourceArgs args1({
97  {"locality0", {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}},
98  });
99  EdsResourceArgs args2({
100  {"locality0", CreateEndpointsForBackends()},
101  });
102  balancer_->ads_service()->SetEdsResource(
103  BuildEdsResource(args1, kNewEdsService1Name));
104  balancer_->ads_service()->SetEdsResource(
105  BuildEdsResource(args2, kNewEdsService2Name));
106  // Populate new CDS resources.
107  Cluster new_cluster1 = default_cluster_;
108  new_cluster1.set_name(kNewCluster1Name);
109  new_cluster1.mutable_eds_cluster_config()->set_service_name(
110  kNewEdsService1Name);
111  balancer_->ads_service()->SetCdsResource(new_cluster1);
112  Cluster new_cluster2 = default_cluster_;
113  new_cluster2.set_name(kNewCluster2Name);
114  new_cluster2.mutable_eds_cluster_config()->set_service_name(
115  kNewEdsService2Name);
116  balancer_->ads_service()->SetCdsResource(new_cluster2);
117  // Create Aggregate Cluster
118  auto cluster = default_cluster_;
119  cluster.set_lb_policy(Cluster::RING_HASH);
120  CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
121  custom_cluster->set_name("envoy.clusters.aggregate");
122  ClusterConfig cluster_config;
123  cluster_config.add_clusters(kNewCluster1Name);
124  cluster_config.add_clusters(kNewCluster2Name);
125  custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
126  balancer_->ads_service()->SetCdsResource(cluster);
127  // Set up route with channel id hashing
128  auto new_route_config = default_route_config_;
129  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
130  auto* hash_policy = route->mutable_route()->add_hash_policy();
131  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
132  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
133  new_route_config);
134  // Verifying that we are using ring hash as only 1 endpoint is receiving all
135  // the traffic.
136  CheckRpcSendOk(DEBUG_LOCATION, 100);
137  bool found = false;
138  for (size_t i = 0; i < backends_.size(); ++i) {
139  if (backends_[i]->backend_service()->request_count() > 0) {
140  EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
141  << "backend " << i;
142  EXPECT_FALSE(found) << "backend " << i;
143  found = true;
144  }
145  }
147 }
148 
149 TEST_P(RingHashTest,
150  AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
151  CreateAndStartBackends(1);
152  const char* kEdsClusterName = "eds_cluster";
153  const char* kLogicalDNSClusterName = "logical_dns_cluster";
154  // Populate EDS resource.
155  EdsResourceArgs args({
156  {"locality0",
157  {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
158  kDefaultLocalityWeight,
159  0},
160  {"locality1",
161  {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
162  kDefaultLocalityWeight,
163  1},
164  });
165  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
166  // Populate new CDS resources.
167  Cluster eds_cluster = default_cluster_;
168  eds_cluster.set_name(kEdsClusterName);
169  balancer_->ads_service()->SetCdsResource(eds_cluster);
170  // Populate LOGICAL_DNS cluster.
171  auto logical_dns_cluster = default_cluster_;
172  logical_dns_cluster.set_name(kLogicalDNSClusterName);
173  logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
174  auto* address = logical_dns_cluster.mutable_load_assignment()
175  ->add_endpoints()
176  ->add_lb_endpoints()
177  ->mutable_endpoint()
178  ->mutable_address()
179  ->mutable_socket_address();
180  address->set_address(kServerName);
181  address->set_port_value(443);
182  balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
183  // Create Aggregate Cluster
184  auto cluster = default_cluster_;
185  cluster.set_lb_policy(Cluster::RING_HASH);
186  CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
187  custom_cluster->set_name("envoy.clusters.aggregate");
188  ClusterConfig cluster_config;
189  cluster_config.add_clusters(kEdsClusterName);
190  cluster_config.add_clusters(kLogicalDNSClusterName);
191  custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
192  balancer_->ads_service()->SetCdsResource(cluster);
193  // Set up route with channel id hashing
194  auto new_route_config = default_route_config_;
195  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
196  auto* hash_policy = route->mutable_route()->add_hash_policy();
197  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
198  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
199  new_route_config);
200  // Set Logical DNS result
201  {
204  result.addresses = CreateAddressListFromPortList(GetBackendPorts());
206  std::move(result));
207  }
208  // Inject connection delay to make this act more realistically.
209  ConnectionDelayInjector delay_injector(
211  delay_injector.Start();
212  // Send RPC. Need the timeout to be long enough to account for the
213  // subchannel connection delays.
214  CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
215 }
216 
217 // Tests that ring hash policy that hashes using channel id ensures all RPCs
218 // to go 1 particular backend.
219 TEST_P(RingHashTest, ChannelIdHashing) {
220  CreateAndStartBackends(4);
221  auto cluster = default_cluster_;
222  cluster.set_lb_policy(Cluster::RING_HASH);
223  balancer_->ads_service()->SetCdsResource(cluster);
224  auto new_route_config = default_route_config_;
225  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
226  auto* hash_policy = route->mutable_route()->add_hash_policy();
227  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
228  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
229  new_route_config);
230  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
231  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
232  CheckRpcSendOk(DEBUG_LOCATION, 100);
233  bool found = false;
234  for (size_t i = 0; i < backends_.size(); ++i) {
235  if (backends_[i]->backend_service()->request_count() > 0) {
236  EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
237  << "backend " << i;
238  EXPECT_FALSE(found) << "backend " << i;
239  found = true;
240  }
241  }
243 }
244 
245 // Tests that ring hash policy that hashes using a header value can spread
246 // RPCs across all the backends.
247 TEST_P(RingHashTest, HeaderHashing) {
248  CreateAndStartBackends(4);
249  auto cluster = default_cluster_;
250  cluster.set_lb_policy(Cluster::RING_HASH);
251  balancer_->ads_service()->SetCdsResource(cluster);
252  auto new_route_config = default_route_config_;
253  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
254  auto* hash_policy = route->mutable_route()->add_hash_policy();
255  hash_policy->mutable_header()->set_header_name("address_hash");
256  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
257  new_route_config);
258  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
259  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
260  // Note each type of RPC will contains a header value that will always be
261  // hashed to a specific backend as the header value matches the value used
262  // to create the entry in the ring.
263  std::vector<std::pair<std::string, std::string>> metadata = {
264  {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
265  std::vector<std::pair<std::string, std::string>> metadata1 = {
266  {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
267  std::vector<std::pair<std::string, std::string>> metadata2 = {
268  {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
269  std::vector<std::pair<std::string, std::string>> metadata3 = {
270  {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
271  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
272  const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
273  const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
274  const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
275  WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
276  WaitForBackendOptions(), rpc_options);
277  WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
278  WaitForBackendOptions(), rpc_options1);
279  WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
280  WaitForBackendOptions(), rpc_options2);
281  WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
282  WaitForBackendOptions(), rpc_options3);
283  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
284  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
285  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
286  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
287  for (size_t i = 0; i < backends_.size(); ++i) {
288  EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
289  }
290 }
291 
292 // Tests that ring hash policy that hashes using a header value and regex
293 // rewrite to aggregate RPCs to 1 backend.
294 TEST_P(RingHashTest, HeaderHashingWithRegexRewrite) {
295  CreateAndStartBackends(4);
296  auto cluster = default_cluster_;
297  cluster.set_lb_policy(Cluster::RING_HASH);
298  balancer_->ads_service()->SetCdsResource(cluster);
299  auto new_route_config = default_route_config_;
300  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
301  auto* hash_policy = route->mutable_route()->add_hash_policy();
302  hash_policy->mutable_header()->set_header_name("address_hash");
303  hash_policy->mutable_header()
304  ->mutable_regex_rewrite()
305  ->mutable_pattern()
306  ->set_regex("[0-9]+");
307  hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
308  "foo");
309  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
310  new_route_config);
311  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
312  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
313  std::vector<std::pair<std::string, std::string>> metadata = {
314  {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
315  std::vector<std::pair<std::string, std::string>> metadata1 = {
316  {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
317  std::vector<std::pair<std::string, std::string>> metadata2 = {
318  {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
319  std::vector<std::pair<std::string, std::string>> metadata3 = {
320  {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
321  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
322  const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
323  const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
324  const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
325  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
326  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
327  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
328  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
329  bool found = false;
330  for (size_t i = 0; i < backends_.size(); ++i) {
331  if (backends_[i]->backend_service()->request_count() > 0) {
332  EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
333  << "backend " << i;
334  EXPECT_FALSE(found) << "backend " << i;
335  found = true;
336  }
337  }
339 }
340 
341 // Tests that ring hash policy that hashes using a random value.
342 TEST_P(RingHashTest, NoHashPolicy) {
343  CreateAndStartBackends(2);
344  const double kDistribution50Percent = 0.5;
345  const double kErrorTolerance = 0.05;
346  const uint32_t kRpcTimeoutMs = 10000;
347  const size_t kNumRpcs =
348  ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
349  auto cluster = default_cluster_;
350  // Increasing min ring size for random distribution.
351  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
352  100000);
353  cluster.set_lb_policy(Cluster::RING_HASH);
354  balancer_->ads_service()->SetCdsResource(cluster);
355  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
356  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
357  // TODO(donnadionne): remove extended timeout after ring creation
358  // optimization.
359  WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
360  WaitForBackendOptions(),
361  RpcOptions().set_timeout_ms(kRpcTimeoutMs));
362  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
363  const int request_count_1 = backends_[0]->backend_service()->request_count();
364  const int request_count_2 = backends_[1]->backend_service()->request_count();
365  EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
366  ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
367  EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
368  ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
369 }
370 
371 // Tests that we observe endpoint weights.
372 TEST_P(RingHashTest, EndpointWeights) {
373  CreateAndStartBackends(3);
374  const double kDistribution50Percent = 0.5;
375  const double kDistribution25Percent = 0.25;
376  const double kErrorTolerance = 0.05;
377  const uint32_t kRpcTimeoutMs = 10000;
378  const size_t kNumRpcs =
379  ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
380  auto cluster = default_cluster_;
381  // Increasing min ring size for random distribution.
382  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
383  100000);
384  cluster.set_lb_policy(Cluster::RING_HASH);
385  balancer_->ads_service()->SetCdsResource(cluster);
386  // Endpoint 0 has weight 0, will be treated as weight 1.
387  // Endpoint 1 has weight 1.
388  // Endpoint 2 has weight 2.
389  EdsResourceArgs args(
390  {{"locality0",
391  {CreateEndpoint(0, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
392  0),
393  CreateEndpoint(1, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
394  1),
395  CreateEndpoint(2, ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN,
396  2)}}});
397  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
398  // TODO(donnadionne): remove extended timeout after ring creation
399  // optimization.
400  WaitForAllBackends(DEBUG_LOCATION, 0, 3, /*check_status=*/nullptr,
401  WaitForBackendOptions(),
402  RpcOptions().set_timeout_ms(kRpcTimeoutMs));
403  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
404  // Endpoint 2 should see 50% of traffic, and endpoints 0 and 1 should
405  // each see 25% of traffic.
406  const int request_count_0 = backends_[0]->backend_service()->request_count();
407  const int request_count_1 = backends_[1]->backend_service()->request_count();
408  const int request_count_2 = backends_[2]->backend_service()->request_count();
409  EXPECT_THAT(static_cast<double>(request_count_0) / kNumRpcs,
410  ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
411  EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
412  ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
413  EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
414  ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
415 }
416 
417 // Test that ring hash policy evaluation will continue past the terminal
418 // policy if no results are produced yet.
419 TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {
420  CreateAndStartBackends(2);
421  auto cluster = default_cluster_;
422  cluster.set_lb_policy(Cluster::RING_HASH);
423  balancer_->ads_service()->SetCdsResource(cluster);
424  auto new_route_config = default_route_config_;
425  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
426  auto* hash_policy = route->mutable_route()->add_hash_policy();
427  hash_policy->mutable_header()->set_header_name("header_not_present");
428  hash_policy->set_terminal(true);
429  auto* hash_policy2 = route->mutable_route()->add_hash_policy();
430  hash_policy2->mutable_header()->set_header_name("address_hash");
431  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
432  new_route_config);
433  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
434  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
435  std::vector<std::pair<std::string, std::string>> metadata = {
436  {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
437  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
438  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
439  EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
440  EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
441 }
442 
443 // Test random hash is used when header hashing specified a header field that
444 // the RPC did not have.
445 TEST_P(RingHashTest, HashOnHeaderThatIsNotPresent) {
446  CreateAndStartBackends(2);
447  const double kDistribution50Percent = 0.5;
448  const double kErrorTolerance = 0.05;
449  const uint32_t kRpcTimeoutMs = 10000;
450  const size_t kNumRpcs =
451  ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
452  auto cluster = default_cluster_;
453  // Increasing min ring size for random distribution.
454  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
455  100000);
456  cluster.set_lb_policy(Cluster::RING_HASH);
457  balancer_->ads_service()->SetCdsResource(cluster);
458  auto new_route_config = default_route_config_;
459  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
460  auto* hash_policy = route->mutable_route()->add_hash_policy();
461  hash_policy->mutable_header()->set_header_name("header_not_present");
462  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
463  new_route_config);
464  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
465  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
466  std::vector<std::pair<std::string, std::string>> metadata = {
467  {"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
468  };
469  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
470  // TODO(donnadionne): remove extended timeout after ring creation
471  // optimization.
472  WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
473  WaitForBackendOptions(),
474  RpcOptions().set_timeout_ms(kRpcTimeoutMs));
475  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs, rpc_options);
476  const int request_count_1 = backends_[0]->backend_service()->request_count();
477  const int request_count_2 = backends_[1]->backend_service()->request_count();
478  EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
479  ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
480  EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
481  ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
482 }
483 
484 // Test random hash is used when only unsupported hash policies are
485 // configured.
486 TEST_P(RingHashTest, UnsupportedHashPolicyDefaultToRandomHashing) {
487  CreateAndStartBackends(2);
488  const double kDistribution50Percent = 0.5;
489  const double kErrorTolerance = 0.05;
490  const uint32_t kRpcTimeoutMs = 10000;
491  const size_t kNumRpcs =
492  ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
493  auto cluster = default_cluster_;
494  // Increasing min ring size for random distribution.
495  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
496  100000);
497  cluster.set_lb_policy(Cluster::RING_HASH);
498  balancer_->ads_service()->SetCdsResource(cluster);
499  auto new_route_config = default_route_config_;
500  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
501  auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
502  hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
503  auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
504  hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
505  true);
506  auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
507  hash_policy_unsupported_3->mutable_query_parameter()->set_name(
508  "query_parameter");
509  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
510  new_route_config);
511  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
512  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
513  // TODO(donnadionne): remove extended timeout after ring creation
514  // optimization.
515  WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
516  WaitForBackendOptions(),
517  RpcOptions().set_timeout_ms(kRpcTimeoutMs));
518  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
519  const int request_count_1 = backends_[0]->backend_service()->request_count();
520  const int request_count_2 = backends_[1]->backend_service()->request_count();
521  EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
522  ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
523  EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
524  ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
525 }
526 
527 // Tests that ring hash policy that hashes using a random value can spread
528 // RPCs across all the backends according to locality weight.
529 TEST_P(RingHashTest, RandomHashingDistributionAccordingToEndpointWeight) {
530  CreateAndStartBackends(2);
531  const size_t kWeight1 = 1;
532  const size_t kWeight2 = 2;
533  const size_t kWeightTotal = kWeight1 + kWeight2;
534  const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
535  const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
536  const double kErrorTolerance = 0.05;
537  const uint32_t kRpcTimeoutMs = 10000;
538  const size_t kNumRpcs =
539  ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
540  auto cluster = default_cluster_;
541  // Increasing min ring size for random distribution.
542  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
543  100000);
544  cluster.set_lb_policy(Cluster::RING_HASH);
545  balancer_->ads_service()->SetCdsResource(cluster);
546  EdsResourceArgs args({{"locality0",
547  {CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
548  CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
549  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
550  // TODO(donnadionne): remove extended timeout after ring creation
551  // optimization.
552  WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
553  WaitForBackendOptions(),
554  RpcOptions().set_timeout_ms(kRpcTimeoutMs));
555  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
556  const int weight_33_request_count =
557  backends_[0]->backend_service()->request_count();
558  const int weight_66_request_count =
559  backends_[1]->backend_service()->request_count();
560  EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
561  ::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
562  EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
563  ::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
564 }
565 
566 // Tests that ring hash policy that hashes using a random value can spread
567 // RPCs across all the backends according to locality weight.
568 TEST_P(RingHashTest,
569  RandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
570  CreateAndStartBackends(2);
571  const size_t kWeight1 = 1 * 1;
572  const size_t kWeight2 = 2 * 2;
573  const size_t kWeightTotal = kWeight1 + kWeight2;
574  const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
575  const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
576  const double kErrorTolerance = 0.05;
577  const uint32_t kRpcTimeoutMs = 10000;
578  const size_t kNumRpcs =
579  ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
580  auto cluster = default_cluster_;
581  // Increasing min ring size for random distribution.
582  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
583  100000);
584  cluster.set_lb_policy(Cluster::RING_HASH);
585  balancer_->ads_service()->SetCdsResource(cluster);
586  EdsResourceArgs args(
587  {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
588  {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
589  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
590  // TODO(donnadionne): remove extended timeout after ring creation
591  // optimization.
592  WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
593  WaitForBackendOptions(),
594  RpcOptions().set_timeout_ms(kRpcTimeoutMs));
595  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
596  const int weight_20_request_count =
597  backends_[0]->backend_service()->request_count();
598  const int weight_80_request_count =
599  backends_[1]->backend_service()->request_count();
600  EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
601  ::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
602  EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
603  ::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
604 }
605 
606 // Tests that ring hash policy that hashes using a fixed string ensures all
607 // RPCs to go 1 particular backend; and that subsequent hashing policies are
608 // ignored due to the setting of terminal.
609 TEST_P(RingHashTest, FixedHashingTerminalPolicy) {
610  CreateAndStartBackends(2);
611  auto cluster = default_cluster_;
612  cluster.set_lb_policy(Cluster::RING_HASH);
613  balancer_->ads_service()->SetCdsResource(cluster);
614  auto new_route_config = default_route_config_;
615  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
616  auto* hash_policy = route->mutable_route()->add_hash_policy();
617  hash_policy->mutable_header()->set_header_name("fixed_string");
618  hash_policy->set_terminal(true);
619  auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
620  hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
621  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
622  new_route_config);
623  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
624  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
625  std::vector<std::pair<std::string, std::string>> metadata = {
626  {"fixed_string", "fixed_value"},
627  {"random_string", absl::StrFormat("%" PRIu32, rand())},
628  };
629  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
630  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
631  bool found = false;
632  for (size_t i = 0; i < backends_.size(); ++i) {
633  if (backends_[i]->backend_service()->request_count() > 0) {
634  EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
635  << "backend " << i;
636  EXPECT_FALSE(found) << "backend " << i;
637  found = true;
638  }
639  }
641 }
642 
643 // Test that the channel will go from idle to ready via connecting;
644 // (tho it is not possible to catch the connecting state before moving to
645 // ready)
646 TEST_P(RingHashTest, IdleToReady) {
647  CreateAndStartBackends(1);
648  auto cluster = default_cluster_;
649  cluster.set_lb_policy(Cluster::RING_HASH);
650  balancer_->ads_service()->SetCdsResource(cluster);
651  auto new_route_config = default_route_config_;
652  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
653  auto* hash_policy = route->mutable_route()->add_hash_policy();
654  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
655  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
656  new_route_config);
657  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
658  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
659  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
660  CheckRpcSendOk(DEBUG_LOCATION);
661  EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
662 }
663 
664 // Test that the channel will transition to READY once it starts
665 // connecting even if there are no RPCs being sent to the picker.
666 TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
667  // Create EDS resource.
668  CreateAndStartBackends(1);
669  auto non_existant_endpoint = MakeNonExistantEndpoint();
670  EdsResourceArgs args(
671  {{"locality0", {non_existant_endpoint, CreateEndpoint(0)}}});
672  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
673  // Change CDS resource to use RING_HASH.
674  auto cluster = default_cluster_;
675  cluster.set_lb_policy(Cluster::RING_HASH);
676  balancer_->ads_service()->SetCdsResource(cluster);
677  // Add hash policy to RDS resource.
678  auto new_route_config = default_route_config_;
679  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
680  auto* hash_policy = route->mutable_route()->add_hash_policy();
681  hash_policy->mutable_header()->set_header_name("address_hash");
682  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
683  new_route_config);
684  // Start connection attempt injector and add a hold for the P0
685  // connection attempt.
686  ConnectionHoldInjector injector;
687  injector.Start();
688  auto hold = injector.AddHold(non_existant_endpoint.port);
689  // A long-running RPC, just used to send the RPC in another thread.
690  LongRunningRpc rpc;
691  std::vector<std::pair<std::string, std::string>> metadata = {
692  {"address_hash",
693  CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}};
694  rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
695  std::move(metadata)));
696  // Wait for the RPC to trigger the P0 connection attempt, then cancel it,
697  // and then allow the connection attempt to complete.
698  hold->Wait();
699  rpc.CancelRpc();
700  EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
701  hold->Resume();
702  // Wait for channel to become connected without any pending RPC.
704  // Make sure the backend did not get any requests.
705  EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
706 }
707 
708 // Tests that when we trigger internal connection attempts without
709 // picks, we do so for only one subchannel at a time.
710 TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
711  // Create EDS resource.
712  CreateAndStartBackends(1);
713  auto non_existant_endpoint0 = MakeNonExistantEndpoint();
714  auto non_existant_endpoint1 = MakeNonExistantEndpoint();
715  auto non_existant_endpoint2 = MakeNonExistantEndpoint();
716  EdsResourceArgs args({{"locality0",
717  {non_existant_endpoint0, non_existant_endpoint1,
718  non_existant_endpoint2, CreateEndpoint(0)}}});
719  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
720  // Change CDS resource to use RING_HASH.
721  auto cluster = default_cluster_;
722  cluster.set_lb_policy(Cluster::RING_HASH);
723  balancer_->ads_service()->SetCdsResource(cluster);
724  // Add hash policy to RDS resource.
725  auto new_route_config = default_route_config_;
726  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
727  auto* hash_policy = route->mutable_route()->add_hash_policy();
728  hash_policy->mutable_header()->set_header_name("address_hash");
729  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
730  new_route_config);
731  // Start connection attempt injector.
732  ConnectionHoldInjector injector;
733  injector.Start();
734  auto hold_non_existant0 = injector.AddHold(non_existant_endpoint0.port);
735  auto hold_non_existant1 = injector.AddHold(non_existant_endpoint1.port);
736  auto hold_non_existant2 = injector.AddHold(non_existant_endpoint2.port);
737  auto hold_good = injector.AddHold(backends_[0]->port());
738  // A long-running RPC, just used to send the RPC in another thread.
739  LongRunningRpc rpc;
740  std::vector<std::pair<std::string, std::string>> metadata = {
741  {"address_hash", CreateMetadataValueThatHashesToBackendPort(
742  non_existant_endpoint0.port)}};
743  rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
744  std::move(metadata)));
745  // Wait for the RPC to trigger a connection attempt to the first address,
746  // then cancel the RPC. No other connection attempts should be started yet.
747  hold_non_existant0->Wait();
748  rpc.CancelRpc();
749  EXPECT_FALSE(hold_non_existant1->IsStarted());
750  EXPECT_FALSE(hold_non_existant2->IsStarted());
751  EXPECT_FALSE(hold_good->IsStarted());
752  // Allow the connection attempt to the first address to resume and wait
753  // for the attempt for the second address. No other connection
754  // attempts should be started yet.
755  auto hold_non_existant0_again = injector.AddHold(non_existant_endpoint0.port);
756  hold_non_existant0->Resume();
757  hold_non_existant1->Wait();
758  EXPECT_FALSE(hold_non_existant0_again->IsStarted());
759  EXPECT_FALSE(hold_non_existant2->IsStarted());
760  EXPECT_FALSE(hold_good->IsStarted());
761  // Allow the connection attempt to the second address to resume and wait
762  // for the attempt for the third address. No other connection
763  // attempts should be started yet.
764  auto hold_non_existant1_again = injector.AddHold(non_existant_endpoint1.port);
765  hold_non_existant1->Resume();
766  hold_non_existant2->Wait();
767  EXPECT_FALSE(hold_non_existant0_again->IsStarted());
768  EXPECT_FALSE(hold_non_existant1_again->IsStarted());
769  EXPECT_FALSE(hold_good->IsStarted());
770  // Allow the connection attempt to the third address to resume and wait
771  // for the attempt for the final address. No other connection
772  // attempts should be started yet.
773  auto hold_non_existant2_again = injector.AddHold(non_existant_endpoint2.port);
774  hold_non_existant2->Resume();
775  hold_good->Wait();
776  EXPECT_FALSE(hold_non_existant0_again->IsStarted());
777  EXPECT_FALSE(hold_non_existant1_again->IsStarted());
778  EXPECT_FALSE(hold_non_existant2_again->IsStarted());
779  // Allow the final attempt to resume.
780  hold_good->Resume();
781  // Wait for channel to become connected without any pending RPC.
782  EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(10)));
783  // No other connection attempts should have been started.
784  EXPECT_FALSE(hold_non_existant0_again->IsStarted());
785  EXPECT_FALSE(hold_non_existant1_again->IsStarted());
786  EXPECT_FALSE(hold_non_existant2_again->IsStarted());
787  // RPC should have been cancelled.
788  EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
789  // Make sure the backend did not get any requests.
790  EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
791 }
792 
793 // Test that when the first pick is down leading to a transient failure, we
794 // will move on to the next ring hash entry.
795 TEST_P(RingHashTest, TransientFailureCheckNextOne) {
796  CreateAndStartBackends(1);
797  auto cluster = default_cluster_;
798  cluster.set_lb_policy(Cluster::RING_HASH);
799  balancer_->ads_service()->SetCdsResource(cluster);
800  auto new_route_config = default_route_config_;
801  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
802  auto* hash_policy = route->mutable_route()->add_hash_policy();
803  hash_policy->mutable_header()->set_header_name("address_hash");
804  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
805  new_route_config);
806  std::vector<EdsResourceArgs::Endpoint> endpoints;
807  const int unused_port = grpc_pick_unused_port_or_die();
808  endpoints.emplace_back(unused_port);
809  endpoints.emplace_back(backends_[0]->port());
810  EdsResourceArgs args({{"locality0", std::move(endpoints)}});
811  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
812  std::vector<std::pair<std::string, std::string>> metadata = {
813  {"address_hash",
814  CreateMetadataValueThatHashesToBackendPort(unused_port)}};
815  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
816  WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
817  WaitForBackendOptions(), rpc_options);
818  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
819 }
820 
821 // Test that when a backend goes down, we will move on to the next subchannel
822 // (with a lower priority). When the backend comes back up, traffic will move
823 // back.
824 TEST_P(RingHashTest, SwitchToLowerPrioirtyAndThenBack) {
825  CreateAndStartBackends(2);
826  auto cluster = default_cluster_;
827  cluster.set_lb_policy(Cluster::RING_HASH);
828  balancer_->ads_service()->SetCdsResource(cluster);
829  auto new_route_config = default_route_config_;
830  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
831  auto* hash_policy = route->mutable_route()->add_hash_policy();
832  hash_policy->mutable_header()->set_header_name("address_hash");
833  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
834  new_route_config);
835  EdsResourceArgs args({
836  {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
837  0},
838  {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
839  1},
840  });
841  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
842  std::vector<std::pair<std::string, std::string>> metadata = {
843  {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
844  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
845  WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
846  WaitForBackendOptions(), rpc_options);
847  backends_[0]->StopListeningAndSendGoaways();
848  WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
849  WaitForBackendOptions(), rpc_options);
850  ShutdownBackend(0);
851  StartBackend(0);
852  WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
853  WaitForBackendOptions(), rpc_options);
854  CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
855  EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
856  EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
857 }
858 
859 // Test that when all backends are down, we will keep reattempting.
860 TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
861  CreateAndStartBackends(1);
862  const uint32_t kConnectionTimeoutMilliseconds = 5000;
863  auto cluster = default_cluster_;
864  cluster.set_lb_policy(Cluster::RING_HASH);
865  balancer_->ads_service()->SetCdsResource(cluster);
866  auto new_route_config = default_route_config_;
867  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
868  auto* hash_policy = route->mutable_route()->add_hash_policy();
869  hash_policy->mutable_header()->set_header_name("address_hash");
870  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
871  new_route_config);
872  EdsResourceArgs args(
873  {{"locality0", {MakeNonExistantEndpoint(), CreateEndpoint(0)}}});
874  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
875  std::vector<std::pair<std::string, std::string>> metadata = {
876  {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
877  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
878  ShutdownBackend(0);
879  CheckRpcSendFailure(
881  "ring hash cannot find a connected subchannel; first failure: "
882  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
883  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
884  RpcOptions().set_metadata(std::move(metadata)));
885  StartBackend(0);
886  // Ensure we are actively connecting without any traffic.
887  EXPECT_TRUE(channel_->WaitForConnected(
888  grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
889 }
890 
891 // Test that when all backends are down and then up, we may pick a TF backend
892 // and we will then jump to ready backend.
893 TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
894  CreateBackends(2);
895  const uint32_t kConnectionTimeoutMilliseconds = 5000;
896  auto cluster = default_cluster_;
897  cluster.set_lb_policy(Cluster::RING_HASH);
898  balancer_->ads_service()->SetCdsResource(cluster);
899  auto new_route_config = default_route_config_;
900  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
901  auto* hash_policy = route->mutable_route()->add_hash_policy();
902  hash_policy->mutable_header()->set_header_name("address_hash");
903  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
904  new_route_config);
905  // Make sure we include some unused ports to fill the ring.
906  EdsResourceArgs args({
907  {"locality0",
908  {CreateEndpoint(0), CreateEndpoint(1), MakeNonExistantEndpoint(),
909  MakeNonExistantEndpoint()}},
910  });
911  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
912  std::vector<std::pair<std::string, std::string>> metadata = {
913  {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
914  const auto rpc_options = RpcOptions()
915  .set_metadata(std::move(metadata))
916  .set_timeout_ms(kConnectionTimeoutMilliseconds);
917  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
918  gpr_log(GPR_INFO, "=== SENDING FIRST RPC ===");
919  CheckRpcSendFailure(
921  "ring hash cannot find a connected subchannel; first failure: "
922  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
923  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
924  rpc_options);
925  gpr_log(GPR_INFO, "=== DONE WITH FIRST RPC ===");
927  // Bring up backend 0. The channel should become connected without
928  // any picks, because in TF, we are always trying to connect to at
929  // least one backend at all times.
930  gpr_log(GPR_INFO, "=== STARTING BACKEND 0 ===");
931  StartBackend(0);
932  gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO BECOME READY ===");
933  EXPECT_TRUE(channel_->WaitForConnected(
934  grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
935  // RPCs should go to backend 0.
936  gpr_log(GPR_INFO, "=== WAITING FOR BACKEND 0 ===");
937  WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
938  WaitForBackendOptions(), rpc_options);
939  EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
940  // Bring down backend 0 and bring up backend 1.
941  // Note the RPC contains a header value that will always be hashed to
942  // backend 0. So by purposely bringing down backend 0 and bringing up another
943  // backend, this will ensure Picker's first choice of backend 0 will fail
944  // and it will go through the remaining subchannels to find one in READY.
945  // Since the the entries in the ring are pretty distributed and we have
946  // unused ports to fill the ring, it is almost guaranteed that the Picker
947  // will go through some non-READY entries and skip them as per design.
948  gpr_log(GPR_INFO, "=== SHUTTING DOWN BACKEND 0 ===");
949  ShutdownBackend(0);
950  gpr_log(GPR_INFO, "=== WAITING FOR STATE CHANGE ===");
951  EXPECT_TRUE(channel_->WaitForStateChange(
953  grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
955  gpr_log(GPR_INFO, "=== SENDING SECOND RPC ===");
956  CheckRpcSendFailure(
958  "ring hash cannot find a connected subchannel; first failure: "
959  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
960  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
961  rpc_options);
962  gpr_log(GPR_INFO, "=== STARTING BACKEND 1 ===");
963  StartBackend(1);
964  gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO BECOME READY ===");
965  EXPECT_TRUE(channel_->WaitForConnected(
966  grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
967  gpr_log(GPR_INFO, "=== WAITING FOR BACKEND 1 ===");
968  WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
969  WaitForBackendOptions(), rpc_options);
970  gpr_log(GPR_INFO, "=== DONE ===");
971 }
972 
973 // This tests a bug seen in the wild where ring_hash started with no
974 // endpoints and reported TRANSIENT_FAILURE, then got an update with
975 // endpoints and reported IDLE, but the picker update was squelched, so
976 // it failed to ever get reconnected.
977 TEST_P(RingHashTest, ReattemptWhenGoingFromTransientFailureToIdle) {
978  CreateAndStartBackends(1);
979  const uint32_t kConnectionTimeoutMilliseconds = 5000;
980  auto cluster = default_cluster_;
981  cluster.set_lb_policy(Cluster::RING_HASH);
982  balancer_->ads_service()->SetCdsResource(cluster);
983  auto new_route_config = default_route_config_;
984  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
985  new_route_config);
986  // Send empty EDS update.
987  EdsResourceArgs args(
988  {{"locality0", std::vector<EdsResourceArgs::Endpoint>()}});
989  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
990  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
991  // Channel should fail RPCs and go into TRANSIENT_FAILURE.
992  CheckRpcSendFailure(
994  // TODO(roth): As part of https://github.com/grpc/grpc/issues/22883,
995  // figure out how to get a useful resolution note plumbed down to
996  // improve this message.
997  "empty address list: ",
998  RpcOptions().set_timeout_ms(kConnectionTimeoutMilliseconds));
1000  // Send EDS update with 1 backend.
1001  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1002  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1003  // A wait_for_ready RPC should succeed, and the channel should report READY.
1004  CheckRpcSendOk(DEBUG_LOCATION, 1,
1005  RpcOptions()
1006  .set_timeout_ms(kConnectionTimeoutMilliseconds)
1007  .set_wait_for_ready(true));
1008  EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1009 }
1010 
1011 // Test unspported hash policy types are all ignored before a supported
1012 // policy.
1013 TEST_P(RingHashTest, UnsupportedHashPolicyUntilChannelIdHashing) {
1014  CreateAndStartBackends(2);
1015  auto cluster = default_cluster_;
1016  cluster.set_lb_policy(Cluster::RING_HASH);
1017  balancer_->ads_service()->SetCdsResource(cluster);
1018  auto new_route_config = default_route_config_;
1019  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1020  auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
1021  hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
1022  auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
1023  hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
1024  true);
1025  auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
1026  hash_policy_unsupported_3->mutable_query_parameter()->set_name(
1027  "query_parameter");
1028  auto* hash_policy = route->mutable_route()->add_hash_policy();
1029  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1030  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1031  new_route_config);
1032  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1033  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1034  CheckRpcSendOk(DEBUG_LOCATION, 100);
1035  bool found = false;
1036  for (size_t i = 0; i < backends_.size(); ++i) {
1037  if (backends_[i]->backend_service()->request_count() > 0) {
1038  EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
1039  << "backend " << i;
1040  EXPECT_FALSE(found) << "backend " << i;
1041  found = true;
1042  }
1043  }
1044  EXPECT_TRUE(found);
1045 }
1046 
1047 // Test we nack when ring hash policy has invalid hash function (something
1048 // other than XX_HASH.
1049 TEST_P(RingHashTest, InvalidHashFunction) {
1050  CreateAndStartBackends(1);
1051  auto cluster = default_cluster_;
1052  cluster.set_lb_policy(Cluster::RING_HASH);
1053  cluster.mutable_ring_hash_lb_config()->set_hash_function(
1054  Cluster::RingHashLbConfig::MURMUR_HASH_2);
1055  balancer_->ads_service()->SetCdsResource(cluster);
1056  auto new_route_config = default_route_config_;
1057  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1058  auto* hash_policy = route->mutable_route()->add_hash_policy();
1059  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1060  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1061  new_route_config);
1062  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1063  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1064  const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
1065  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
1066  EXPECT_THAT(
1067  response_state->error_message,
1068  ::testing::HasSubstr("ring hash lb config has invalid hash function."));
1069 }
1070 
1071 // Test we nack when ring hash policy has invalid ring size.
1072 TEST_P(RingHashTest, InvalidMinimumRingSize) {
1073  CreateAndStartBackends(1);
1074  auto cluster = default_cluster_;
1075  cluster.set_lb_policy(Cluster::RING_HASH);
1076  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
1077  0);
1078  balancer_->ads_service()->SetCdsResource(cluster);
1079  auto new_route_config = default_route_config_;
1080  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1081  auto* hash_policy = route->mutable_route()->add_hash_policy();
1082  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1083  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1084  new_route_config);
1085  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1086  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1087  const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
1088  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
1089  EXPECT_THAT(response_state->error_message,
1091  "min_ring_size is not in the range of 1 to 8388608."));
1092 }
1093 
1094 // Test we nack when ring hash policy has invalid ring size.
1095 TEST_P(RingHashTest, InvalidMaxmumRingSize) {
1096  CreateAndStartBackends(1);
1097  auto cluster = default_cluster_;
1098  cluster.set_lb_policy(Cluster::RING_HASH);
1099  cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
1100  8388609);
1101  balancer_->ads_service()->SetCdsResource(cluster);
1102  auto new_route_config = default_route_config_;
1103  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1104  auto* hash_policy = route->mutable_route()->add_hash_policy();
1105  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1106  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1107  new_route_config);
1108  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1109  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1110  const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
1111  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
1112  EXPECT_THAT(response_state->error_message,
1114  "max_ring_size is not in the range of 1 to 8388608."));
1115 }
1116 
1117 // Test we nack when ring hash policy has invalid ring size.
1118 TEST_P(RingHashTest, InvalidRingSizeMinGreaterThanMax) {
1119  CreateAndStartBackends(1);
1120  auto cluster = default_cluster_;
1121  cluster.set_lb_policy(Cluster::RING_HASH);
1122  cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
1123  5000);
1124  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
1125  5001);
1126  balancer_->ads_service()->SetCdsResource(cluster);
1127  auto new_route_config = default_route_config_;
1128  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1129  auto* hash_policy = route->mutable_route()->add_hash_policy();
1130  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1131  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1132  new_route_config);
1133  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1134  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1135  const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
1136  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
1137  EXPECT_THAT(response_state->error_message,
1139  "min_ring_size cannot be greater than max_ring_size."));
1140 }
1141 
1142 } // namespace
1143 } // namespace testing
1144 } // namespace grpc
1145 
1146 int main(int argc, char** argv) {
1147  grpc::testing::TestEnvironment env(&argc, argv);
1148  ::testing::InitGoogleTest(&argc, argv);
1149  // Make the backup poller poll very frequently in order to pick up
1150  // updates from all the subchannels's FDs.
1151  GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
1152 #if TARGET_OS_IPHONE
1153  // Workaround Apple CFStream bug
1154  gpr_setenv("grpc_cfstream", "0");
1155 #endif
1156  grpc_init();
1158  const auto result = RUN_ALL_TESTS();
1159  grpc_shutdown();
1160  return result;
1161 }
grpc::EXPECT_THAT
EXPECT_THAT(status.error_message(), ::testing::HasSubstr("subject_token_type"))
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
connection_delay_injector.h
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
grpc_timeout_seconds_to_deadline
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
Definition: test/core/util/test_config.cc:81
ipv6_only_
bool ipv6_only_
Definition: client_lb_end2end_test.cc:217
stub_
std::unique_ptr< grpc::testing::EchoTestService::Stub > stub_
Definition: client_channel_stress_test.cc:331
sockaddr_utils.h
generate.env
env
Definition: generate.py:37
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
metadata
Definition: cq_verifier.cc:48
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
grpc
Definition: grpcpp/alarm.h:33
route
XdsRouteConfigResource::Route route
Definition: xds_resolver.cc:337
grpc_core::RefCountedPtr::get
T * get() const
Definition: ref_counted_ptr.h:146
testing::DoubleNear
internal::FloatingEqMatcher< double > DoubleNear(double rhs, double max_abs_error)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8647
grpc_core::FakeResolverResponseGenerator::SetResponse
void SetResponse(Resolver::Result result)
Definition: fake_resolver.cc:229
backends_
std::vector< std::unique_ptr< BackendServiceImpl > > backends_
Definition: client_channel_stress_test.cc:333
GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR
#define GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR
Definition: filters/client_channel/lb_policy/xds/xds_channel_args.h:26
grpc::testing::ConnectionAttemptInjector::Init
static void Init()
Definition: connection_delay_injector.cc:70
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
fake_resolver.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc_resolved_address
Definition: resolved_address.h:34
env.h
grpc_core::URI::Parse
static absl::StatusOr< URI > Parse(absl::string_view uri_text)
Definition: uri_parser.cc:209
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc_parse_uri
bool grpc_parse_uri(const grpc_core::URI &uri, grpc_resolved_address *resolved_addr)
Definition: parse_address.cc:293
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable
static const grpc_arg_pointer_vtable kChannelArgPointerVtable
Definition: fake_resolver.h:49
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc_core::RefCountedPtr< grpc_core::FakeResolverResponseGenerator >
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
gen_stats_data.found
bool found
Definition: gen_stats_data.py:61
cluster
absl::string_view cluster
Definition: xds_resolver.cc:331
grpc_timeout_milliseconds_to_deadline
gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms)
Definition: test/core/util/test_config.cc:89
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::Resolver::Result
Results returned by the resolver.
Definition: resolver/resolver.h:56
grpc_test_slowdown_factor
int64_t grpc_test_slowdown_factor()
Definition: test/core/util/test_config.cc:76
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
channel_
RefCountedPtr< Channel > channel_
Definition: channel_connectivity.cc:209
xds_end2end_test_lib.h
xds_channel_args.h
grpc::testing::INSTANTIATE_TEST_SUITE_P
INSTANTIATE_TEST_SUITE_P(HistogramTestCases, HistogramTest, ::testing::Range< int >(0, GRPC_STATS_HISTOGRAM_COUNT))
backup_poller.h
grpc_resolved_address::len
socklen_t len
Definition: resolved_address.h:36
logical_dns_cluster_resolver_response_generator_
grpc_core::RefCountedPtr< grpc_core::FakeResolverResponseGenerator > logical_dns_cluster_resolver_response_generator_
Definition: xds_ring_hash_end2end_test.cc:80
kNumRpcs
const int kNumRpcs
Definition: thread_stress_test.cc:50
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
grpc_core::ServerAddressList
std::vector< ServerAddress > ServerAddressList
Definition: server_address.h:120
grpc_core::ExecCtx
Definition: exec_ctx.h:97
GPR_GLOBAL_CONFIG_SET
#define GPR_GLOBAL_CONFIG_SET(name, value)
Definition: global_config_generic.h:26
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
main
int main(int argc, char **argv)
Definition: xds_ring_hash_end2end_test.cc:1146
grpc_core::Duration::Milliseconds
static constexpr Duration Milliseconds(int64_t millis)
Definition: src/core/lib/gprpp/time.h:155
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
testing::Values
internal::ValueArray< T... > Values(T... v)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest-param-test.h:335
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
index
int index
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:1184
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
ASSERT_TRUE
#define ASSERT_TRUE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1973
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
grpc.StatusCode.UNAVAILABLE
tuple UNAVAILABLE
Definition: src/python/grpcio/grpc/__init__.py:278
grpc.StatusCode.CANCELLED
tuple CANCELLED
Definition: src/python/grpcio/grpc/__init__.py:261
grpc::testing::TEST_P
TEST_P(HistogramTest, IncHistogram)
Definition: stats_test.cc:87
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
run_xds_tests.backend_service
def backend_service
Definition: run_xds_tests.py:3255
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
testing::HasSubstr
PolymorphicMatcher< internal::HasSubstrMatcher< internal::string > > HasSubstr(const internal::string &substring)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8803
grpc_resolved_address::addr
char addr[GRPC_MAX_SOCKADDR_SIZE]
Definition: resolved_address.h:35
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc::testing::XdsTestType::Name
static std::string Name(const ::testing::TestParamInfo< XdsTestType > &info)
Definition: xds_end2end_test_lib.h:143
gpr_setenv
void gpr_setenv(const char *name, const char *value)


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:59