xds_cluster_end2end_test.cc
Go to the documentation of this file.
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <numeric>
17 #include <string>
18 #include <vector>
19 
20 #include <gmock/gmock.h>
21 #include <gtest/gtest.h>
22 
23 #include "absl/strings/match.h"
24 #include "absl/strings/str_cat.h"
25 
28 
29 namespace grpc {
30 namespace testing {
31 namespace {
32 
33 using ::envoy::config::cluster::v3::CircuitBreakers;
34 using ::envoy::config::cluster::v3::RoutingPriority;
35 using ::envoy::config::endpoint::v3::HealthStatus;
36 using ::envoy::type::v3::FractionalPercent;
37 
38 using ClientStats = LrsServiceImpl::ClientStats;
39 
40 constexpr char kLbDropType[] = "lb";
41 constexpr char kThrottleDropType[] = "throttle";
42 constexpr char kStatusMessageDropPrefix[] = "EDS-configured drop: ";
43 
44 //
45 // CDS tests
46 //
47 
48 using CdsTest = XdsEnd2endTest;
49 
50 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest, ::testing::Values(XdsTestType()),
52 
53 // Tests that CDS client should send an ACK upon correct CDS response.
54 TEST_P(CdsTest, Vanilla) {
55  (void)SendRpc();
56  auto response_state = balancer_->ads_service()->cds_response_state();
57  ASSERT_TRUE(response_state.has_value());
58  EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
59 }
60 
61 // Tests that CDS client should send a NACK if the cluster type in CDS
62 // response is unsupported.
63 TEST_P(CdsTest, UnsupportedClusterType) {
64  auto cluster = default_cluster_;
65  cluster.set_type(Cluster::STATIC);
66  balancer_->ads_service()->SetCdsResource(cluster);
67  const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
68  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
69  EXPECT_THAT(response_state->error_message,
70  ::testing::HasSubstr("DiscoveryType is not valid."));
71 }
72 
73 // Tests that we don't trigger does-not-exist callbacks for a resource
74 // that was previously valid but is updated to be invalid.
75 TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) {
76  CreateAndStartBackends(1);
77  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
78  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
79  // Check that everything works.
80  CheckRpcSendOk(DEBUG_LOCATION);
81  // Now send an update changing the Cluster to be invalid.
82  auto cluster = default_cluster_;
83  cluster.set_type(Cluster::STATIC);
84  balancer_->ads_service()->SetCdsResource(cluster);
85  const auto response_state = WaitForCdsNack(DEBUG_LOCATION, StatusCode::OK);
86  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
87  EXPECT_THAT(response_state->error_message,
89  kDefaultClusterName,
90  ": validation error.*DiscoveryType is not valid")));
91  CheckRpcSendOk(DEBUG_LOCATION);
92 }
93 
94 // Tests that CDS client should send a NACK if the eds_config in CDS response
95 // is other than ADS or SELF.
96 TEST_P(CdsTest, EdsConfigSourceDoesNotSpecifyAdsOrSelf) {
97  auto cluster = default_cluster_;
98  cluster.mutable_eds_cluster_config()->mutable_eds_config()->set_path(
99  "/foo/bar");
100  balancer_->ads_service()->SetCdsResource(cluster);
101  const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
102  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
103  EXPECT_THAT(response_state->error_message,
104  ::testing::HasSubstr("EDS ConfigSource is not ADS or SELF."));
105 }
106 
107 // Tests that CDS client accepts an eds_config of type ADS.
108 TEST_P(CdsTest, AcceptsEdsConfigSourceOfTypeAds) {
109  CreateAndStartBackends(1);
110  auto cluster = default_cluster_;
111  cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_ads();
112  balancer_->ads_service()->SetCdsResource(cluster);
113  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
114  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
115  WaitForAllBackends(DEBUG_LOCATION);
116  auto response_state = balancer_->ads_service()->cds_response_state();
117  ASSERT_TRUE(response_state.has_value());
118  EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
119 }
120 
121 // Tests that CDS client should send a NACK if the lb_policy in CDS response
122 // is other than ROUND_ROBIN.
123 TEST_P(CdsTest, WrongLbPolicy) {
124  auto cluster = default_cluster_;
125  cluster.set_lb_policy(Cluster::LEAST_REQUEST);
126  balancer_->ads_service()->SetCdsResource(cluster);
127  const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
128  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
129  EXPECT_THAT(response_state->error_message,
130  ::testing::HasSubstr("LB policy is not supported."));
131 }
132 
133 // Tests that CDS client should send a NACK if the lrs_server in CDS response
134 // is other than SELF.
135 TEST_P(CdsTest, WrongLrsServer) {
136  auto cluster = default_cluster_;
137  cluster.mutable_lrs_server()->mutable_ads();
138  balancer_->ads_service()->SetCdsResource(cluster);
139  const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
140  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
141  EXPECT_THAT(response_state->error_message,
142  ::testing::HasSubstr("LRS ConfigSource is not self."));
143 }
144 
145 // Tests round robin is not implacted by the endpoint weight, and that the
146 // localities in a locality map are picked according to their weights.
147 TEST_P(CdsTest, EndpointWeightDoesNotImpactWeightedRoundRobin) {
148  CreateAndStartBackends(2);
149  const int kLocalityWeight0 = 2;
150  const int kLocalityWeight1 = 8;
151  const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
152  const double kLocalityWeightRate0 =
153  static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
154  const double kLocalityWeightRate1 =
155  static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
156  const double kErrorTolerance = 0.05;
157  const size_t kNumRpcs =
158  ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
159  // ADS response contains 2 localities, each of which contains 1 backend.
160  EdsResourceArgs args({
161  {"locality0",
162  {CreateEndpoint(0, HealthStatus::UNKNOWN, 8)},
163  kLocalityWeight0},
164  {"locality1",
165  {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)},
166  kLocalityWeight1},
167  });
168  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
169  // Wait for both backends to be ready.
170  WaitForAllBackends(DEBUG_LOCATION, 0, 2);
171  // Send kNumRpcs RPCs.
172  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
173  // The locality picking rates should be roughly equal to the expectation.
174  const double locality_picked_rate_0 =
175  static_cast<double>(backends_[0]->backend_service()->request_count()) /
176  kNumRpcs;
177  const double locality_picked_rate_1 =
178  static_cast<double>(backends_[1]->backend_service()->request_count()) /
179  kNumRpcs;
180  EXPECT_THAT(locality_picked_rate_0,
181  ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
182  EXPECT_THAT(locality_picked_rate_1,
183  ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
184 }
185 
186 // In most of our tests, we use different names for different resource
187 // types, to make sure that there are no cut-and-paste errors in the code
188 // that cause us to look at data for the wrong resource type. So we add
189 // this test to make sure that the EDS resource name defaults to the
190 // cluster name if not specified in the CDS resource.
191 TEST_P(CdsTest, EdsServiceNameDefaultsToClusterName) {
192  CreateAndStartBackends(1);
193  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
194  balancer_->ads_service()->SetEdsResource(
195  BuildEdsResource(args, kDefaultClusterName));
196  Cluster cluster = default_cluster_;
197  cluster.mutable_eds_cluster_config()->clear_service_name();
198  balancer_->ads_service()->SetCdsResource(cluster);
199  CheckRpcSendOk(DEBUG_LOCATION);
200 }
201 
202 // Tests switching over from one cluster to another.
203 TEST_P(CdsTest, ChangeClusters) {
204  CreateAndStartBackends(2);
205  const char* kNewClusterName = "new_cluster_name";
206  const char* kNewEdsServiceName = "new_eds_service_name";
207  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
208  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
209  // We need to wait for all backends to come online.
210  WaitForAllBackends(DEBUG_LOCATION, 0, 1);
211  // Populate new EDS resource.
212  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
213  balancer_->ads_service()->SetEdsResource(
214  BuildEdsResource(args, kNewEdsServiceName));
215  // Populate new CDS resource.
216  Cluster new_cluster = default_cluster_;
217  new_cluster.set_name(kNewClusterName);
218  new_cluster.mutable_eds_cluster_config()->set_service_name(
219  kNewEdsServiceName);
220  balancer_->ads_service()->SetCdsResource(new_cluster);
221  // Change RDS resource to point to new cluster.
222  RouteConfiguration new_route_config = default_route_config_;
223  new_route_config.mutable_virtual_hosts(0)
224  ->mutable_routes(0)
225  ->mutable_route()
226  ->set_cluster(kNewClusterName);
227  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
228  new_route_config);
229  // Wait for all new backends to be used.
230  WaitForAllBackends(DEBUG_LOCATION, 1, 2);
231 }
232 
233 TEST_P(CdsTest, CircuitBreaking) {
234  CreateAndStartBackends(1);
235  constexpr size_t kMaxConcurrentRequests = 10;
236  // Populate new EDS resources.
237  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
238  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
239  // Update CDS resource to set max concurrent request.
240  CircuitBreakers circuit_breaks;
241  Cluster cluster = default_cluster_;
242  auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
243  threshold->set_priority(RoutingPriority::DEFAULT);
244  threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
245  balancer_->ads_service()->SetCdsResource(cluster);
246  // Send exactly max_concurrent_requests long RPCs.
247  LongRunningRpc rpcs[kMaxConcurrentRequests];
248  for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
249  rpcs[i].StartRpc(stub_.get());
250  }
251  // Wait for all RPCs to be in flight.
252  while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
253  kMaxConcurrentRequests) {
255  gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
256  }
257  // Sending a RPC now should fail, the error message should tell us
258  // we hit the max concurrent requests limit and got dropped.
259  Status status = SendRpc();
260  EXPECT_FALSE(status.ok());
261  EXPECT_EQ(status.error_message(), "circuit breaker drop");
262  // Cancel one RPC to allow another one through
263  rpcs[0].CancelRpc();
264  status = SendRpc();
265  EXPECT_TRUE(status.ok());
266  for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
267  rpcs[i].CancelRpc();
268  }
269 }
270 
271 TEST_P(CdsTest, CircuitBreakingMultipleChannelsShareCallCounter) {
272  CreateAndStartBackends(1);
273  constexpr size_t kMaxConcurrentRequests = 10;
274  // Populate new EDS resources.
275  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
276  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
277  // Update CDS resource to set max concurrent request.
278  CircuitBreakers circuit_breaks;
279  Cluster cluster = default_cluster_;
280  auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
281  threshold->set_priority(RoutingPriority::DEFAULT);
282  threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
283  balancer_->ads_service()->SetCdsResource(cluster);
284  auto channel2 = CreateChannel();
285  auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
286  // Send exactly max_concurrent_requests long RPCs, alternating between
287  // the two channels.
288  LongRunningRpc rpcs[kMaxConcurrentRequests];
289  for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
290  rpcs[i].StartRpc(i % 2 == 0 ? stub_.get() : stub2.get());
291  }
292  // Wait for all RPCs to be in flight.
293  while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
294  kMaxConcurrentRequests) {
296  gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
297  }
298  // Sending a RPC now should fail, the error message should tell us
299  // we hit the max concurrent requests limit and got dropped.
300  Status status = SendRpc();
301  EXPECT_FALSE(status.ok());
302  EXPECT_EQ(status.error_message(), "circuit breaker drop");
303  // Cancel one RPC to allow another one through
304  rpcs[0].CancelRpc();
305  status = SendRpc();
306  EXPECT_TRUE(status.ok());
307  for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
308  rpcs[i].CancelRpc();
309  }
310 }
311 
312 TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) {
313  CreateAndStartBackends(2);
314  const char* kNewEdsResourceName = "new_eds_resource_name";
315  // Populate EDS resources.
316  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
317  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
318  // Check that the channel is working.
319  CheckRpcSendOk(DEBUG_LOCATION);
320  // Stop and restart the balancer.
321  balancer_->Shutdown();
322  balancer_->Start();
323  // Create new EDS resource.
324  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
325  balancer_->ads_service()->SetEdsResource(
326  BuildEdsResource(args, kNewEdsResourceName));
327  // Change CDS resource to point to new EDS resource.
328  auto cluster = default_cluster_;
329  cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
330  balancer_->ads_service()->SetCdsResource(cluster);
331  // Make sure client sees the change.
332  WaitForBackend(DEBUG_LOCATION, 1);
333 }
334 
335 //
336 // CDS deletion tests
337 //
338 
339 class CdsDeletionTest : public XdsEnd2endTest {
340  protected:
341  void SetUp() override {} // Individual tests call InitClient().
342 };
343 
344 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsDeletionTest,
345  ::testing::Values(XdsTestType()), &XdsTestType::Name);
346 
347 // Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted.
348 TEST_P(CdsDeletionTest, ClusterDeleted) {
349  InitClient();
350  CreateAndStartBackends(1);
351  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
352  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
353  // We need to wait for all backends to come online.
354  WaitForAllBackends(DEBUG_LOCATION);
355  // Unset CDS resource.
356  balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
357  // Wait for RPCs to start failing.
358  SendRpcsUntil(DEBUG_LOCATION, [](const RpcResult& result) {
359  if (result.status.ok()) return true; // Keep going.
360  EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code());
361  EXPECT_EQ(absl::StrCat("CDS resource \"", kDefaultClusterName,
362  "\" does not exist"),
363  result.status.error_message());
364  return false;
365  });
366  // Make sure we ACK'ed the update.
367  auto response_state = balancer_->ads_service()->cds_response_state();
368  ASSERT_TRUE(response_state.has_value());
369  EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
370 }
371 
372 // Tests that we ignore Cluster deletions if configured to do so.
373 TEST_P(CdsDeletionTest, ClusterDeletionIgnored) {
374  InitClient(BootstrapBuilder().SetIgnoreResourceDeletion());
375  CreateAndStartBackends(2);
376  // Bring up client pointing to backend 0 and wait for it to connect.
377  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
378  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
379  WaitForAllBackends(DEBUG_LOCATION, 0, 1);
380  // Make sure we ACKed the CDS update.
381  auto response_state = balancer_->ads_service()->cds_response_state();
382  ASSERT_TRUE(response_state.has_value());
383  EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
384  // Unset CDS resource and wait for client to ACK the update.
385  balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
386  const auto deadline = absl::Now() + absl::Seconds(30);
387  while (true) {
388  ASSERT_LT(absl::Now(), deadline) << "timed out waiting for CDS ACK";
389  response_state = balancer_->ads_service()->cds_response_state();
390  if (response_state.has_value()) break;
391  }
392  EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
393  // Make sure we can still send RPCs.
394  CheckRpcSendOk(DEBUG_LOCATION);
395  // Now recreate the CDS resource pointing to a new EDS resource that
396  // specified backend 1, and make sure the client uses it.
397  const char* kNewEdsResourceName = "new_eds_resource_name";
398  auto cluster = default_cluster_;
399  cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
400  balancer_->ads_service()->SetCdsResource(cluster);
401  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
402  balancer_->ads_service()->SetEdsResource(
403  BuildEdsResource(args, kNewEdsResourceName));
404  // Wait for client to start using backend 1.
405  WaitForAllBackends(DEBUG_LOCATION, 1, 2);
406 }
407 
408 //
409 // EDS tests
410 //
411 
412 using EdsTest = XdsEnd2endTest;
413 
415  XdsTest, EdsTest,
416  ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
418 
419 // Tests that the balancer sends the correct response to the client, and the
420 // client sends RPCs to the backends using the default child policy.
421 TEST_P(EdsTest, Vanilla) {
422  CreateAndStartBackends(3);
423  const size_t kNumRpcsPerAddress = 100;
424  EdsResourceArgs args({
425  {"locality0", CreateEndpointsForBackends()},
426  });
427  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
428  // Make sure that trying to connect works without a call.
429  channel_->GetState(true /* try_to_connect */);
430  // We need to wait for all backends to come online.
431  WaitForAllBackends(DEBUG_LOCATION);
432  // Send kNumRpcsPerAddress RPCs per server.
433  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
434  // Each backend should have gotten 100 requests.
435  for (size_t i = 0; i < backends_.size(); ++i) {
436  EXPECT_EQ(kNumRpcsPerAddress,
437  backends_[i]->backend_service()->request_count());
438  }
439  // Check LB policy name for the channel.
440  EXPECT_EQ("xds_cluster_manager_experimental",
441  channel_->GetLoadBalancingPolicyName());
442 }
443 
444 TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
445  CreateAndStartBackends(2);
446  const size_t kNumRpcsPerAddress = 100;
447  auto endpoints = CreateEndpointsForBackends();
448  endpoints.push_back(MakeNonExistantEndpoint());
449  endpoints.back().health_status = HealthStatus::DRAINING;
450  EdsResourceArgs args({
451  {"locality0", std::move(endpoints), kDefaultLocalityWeight,
452  kDefaultLocalityPriority},
453  });
454  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
455  // Make sure that trying to connect works without a call.
456  channel_->GetState(true /* try_to_connect */);
457  // We need to wait for all backends to come online.
458  WaitForAllBackends(DEBUG_LOCATION);
459  // Send kNumRpcsPerAddress RPCs per server.
460  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
461  // Each backend should have gotten 100 requests.
462  for (size_t i = 0; i < backends_.size(); ++i) {
463  EXPECT_EQ(kNumRpcsPerAddress,
464  backends_[i]->backend_service()->request_count());
465  }
466 }
467 
468 // Tests that subchannel sharing works when the same backend is listed
469 // multiple times.
470 TEST_P(EdsTest, SameBackendListedMultipleTimes) {
471  CreateAndStartBackends(1);
472  // Same backend listed twice.
473  auto endpoints = CreateEndpointsForBackends();
474  endpoints.push_back(endpoints.front());
475  EdsResourceArgs args({{"locality0", endpoints}});
476  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
477  // We need to wait for the backend to come online.
478  WaitForAllBackends(DEBUG_LOCATION);
479  // Send kNumRpcsPerAddress RPCs per server.
480  const size_t kNumRpcsPerAddress = 10;
481  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * endpoints.size());
482  // Backend should have gotten 20 requests.
483  EXPECT_EQ(kNumRpcsPerAddress * endpoints.size(),
484  backends_[0]->backend_service()->request_count());
485 }
486 
487 // Tests that RPCs will be blocked until a non-empty serverlist is received.
488 TEST_P(EdsTest, InitiallyEmptyServerlist) {
489  CreateAndStartBackends(1);
490  // First response is an empty serverlist.
491  EdsResourceArgs::Locality empty_locality("locality0", {});
492  EdsResourceArgs args({std::move(empty_locality)});
493  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
494  // RPCs should fail.
495  constexpr char kErrorMessage[] =
496  // TODO(roth): Improve this error message as part of
497  // https://github.com/grpc/grpc/issues/22883.
498  "empty address list: ";
500  // Send non-empty serverlist.
501  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
502  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
503  // RPCs should eventually succeed.
504  WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
505  if (!result.status.ok()) {
506  EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
507  EXPECT_EQ(result.status.error_message(), kErrorMessage);
508  }
509  });
510 }
511 
512 // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
513 // all the servers are unreachable.
514 TEST_P(EdsTest, AllServersUnreachableFailFast) {
515  // Set Rpc timeout to 5 seconds to ensure there is enough time
516  // for communication with the xDS server to take place upon test start up.
517  const uint32_t kRpcTimeoutMs = 5000;
518  const size_t kNumUnreachableServers = 5;
519  std::vector<EdsResourceArgs::Endpoint> endpoints;
520  for (size_t i = 0; i < kNumUnreachableServers; ++i) {
521  endpoints.emplace_back(MakeNonExistantEndpoint());
522  }
523  EdsResourceArgs args({{"locality0", std::move(endpoints)}});
524  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
525  const Status status = SendRpc(RpcOptions().set_timeout_ms(kRpcTimeoutMs));
526  // The error shouldn't be DEADLINE_EXCEEDED because timeout is set to 5
527  // seconds, and we should disocver in that time that the target backend is
528  // down.
530 }
531 
532 // Tests that RPCs fail when the backends are down, and will succeed again
533 // after the backends are restarted.
534 TEST_P(EdsTest, BackendsRestart) {
535  CreateAndStartBackends(3);
536  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
537  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
538  WaitForAllBackends(DEBUG_LOCATION);
539  // Stop backends. RPCs should fail.
540  ShutdownAllBackends();
541  // Wait for channel to report TRANSIENT_FAILURE.
542  EXPECT_TRUE(channel_->WaitForStateChange(
545  // RPCs should fail.
546  CheckRpcSendFailure(
548  "connections to all backends failing; last error: "
549  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
550  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
551  // Restart all backends. RPCs should start succeeding again.
552  StartAllBackends();
553  CheckRpcSendOk(DEBUG_LOCATION, 1,
554  RpcOptions().set_timeout_ms(2000).set_wait_for_ready(true));
555 }
556 
557 TEST_P(EdsTest, IgnoresDuplicateUpdates) {
558  CreateAndStartBackends(1);
559  const size_t kNumRpcsPerAddress = 100;
560  EdsResourceArgs args({
561  {"locality0", CreateEndpointsForBackends()},
562  });
563  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
564  // Wait for all backends to come online.
565  WaitForAllBackends(DEBUG_LOCATION);
566  // Send kNumRpcsPerAddress RPCs per server, but send an EDS update in
567  // between. If the update is not ignored, this will cause the
568  // round_robin policy to see an update, which will randomly reset its
569  // position in the address list.
570  for (size_t i = 0; i < kNumRpcsPerAddress; ++i) {
571  CheckRpcSendOk(DEBUG_LOCATION, 2);
572  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
573  CheckRpcSendOk(DEBUG_LOCATION, 2);
574  }
575  // Each backend should have gotten the right number of requests.
576  for (size_t i = 1; i < backends_.size(); ++i) {
577  EXPECT_EQ(kNumRpcsPerAddress,
578  backends_[i]->backend_service()->request_count());
579  }
580 }
581 
582 // Tests that EDS client should send a NACK if the EDS update contains
583 // sparse priorities.
584 TEST_P(EdsTest, NacksSparsePriorityList) {
585  EdsResourceArgs args({
586  {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
587  });
588  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
589  const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
590  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
591  EXPECT_THAT(response_state->error_message,
592  ::testing::HasSubstr("sparse priority list"));
593 }
594 
595 // Tests that EDS client should send a NACK if the EDS update contains
596 // multiple instances of the same locality in the same priority.
597 TEST_P(EdsTest, NacksDuplicateLocalityInSamePriority) {
598  EdsResourceArgs args({
599  {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
600  {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
601  });
602  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
603  const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
604  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
605  EXPECT_THAT(response_state->error_message,
607  "duplicate locality {region=\"xds_default_locality_region\", "
608  "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"} "
609  "found in priority 0"));
610 }
611 
612 TEST_P(EdsTest, NacksEndpointWeightZero) {
613  EdsResourceArgs args({{"locality0", {MakeNonExistantEndpoint()}}});
614  auto eds_resource = BuildEdsResource(args);
615  eds_resource.mutable_endpoints(0)
616  ->mutable_lb_endpoints(0)
617  ->mutable_load_balancing_weight()
618  ->set_value(0);
619  balancer_->ads_service()->SetEdsResource(eds_resource);
620  const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
621  ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
622  EXPECT_THAT(response_state->error_message,
623  ::testing::HasSubstr("Invalid endpoint weight of 0."));
624 }
625 
626 // Tests that if the balancer is down, the RPCs will still be sent to the
627 // backends according to the last balancer response, until a new balancer is
628 // reachable.
629 TEST_P(EdsTest, KeepUsingLastDataIfBalancerGoesDown) {
630  CreateAndStartBackends(2);
631  // Set up EDS resource pointing to backend 0.
632  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
633  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
634  // Start the client and make sure it sees the backend.
635  WaitForBackend(DEBUG_LOCATION, 0);
636  // Stop the balancer, and verify that RPCs continue to flow to backend 0.
637  balancer_->Shutdown();
638  auto deadline = grpc_timeout_seconds_to_deadline(5);
639  do {
640  CheckRpcSendOk(DEBUG_LOCATION);
641  } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
642  // Check the EDS resource to point to backend 1 and bring the balancer
643  // back up.
644  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
645  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
646  balancer_->Start();
647  // Wait for client to see backend 1.
648  WaitForBackend(DEBUG_LOCATION, 1);
649 }
650 
651 // Tests that the localities in a locality map are picked according to their
652 // weights.
653 TEST_P(EdsTest, WeightedRoundRobin) {
654  CreateAndStartBackends(2);
655  const int kLocalityWeight0 = 2;
656  const int kLocalityWeight1 = 8;
657  const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
658  const double kLocalityWeightRate0 =
659  static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
660  const double kLocalityWeightRate1 =
661  static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
662  const double kErrorTolerance = 0.05;
663  const size_t kNumRpcs =
664  ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
665  // ADS response contains 2 localities, each of which contains 1 backend.
666  EdsResourceArgs args({
667  {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
668  {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
669  });
670  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
671  // Wait for both backends to be ready.
672  WaitForAllBackends(DEBUG_LOCATION, 0, 2);
673  // Send kNumRpcs RPCs.
674  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
675  // The locality picking rates should be roughly equal to the expectation.
676  const double locality_picked_rate_0 =
677  static_cast<double>(backends_[0]->backend_service()->request_count()) /
678  kNumRpcs;
679  const double locality_picked_rate_1 =
680  static_cast<double>(backends_[1]->backend_service()->request_count()) /
681  kNumRpcs;
682  EXPECT_THAT(locality_picked_rate_0,
683  ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
684  EXPECT_THAT(locality_picked_rate_1,
685  ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
686 }
687 
688 // Tests that we correctly handle a locality containing no endpoints.
689 TEST_P(EdsTest, LocalityContainingNoEndpoints) {
690  CreateAndStartBackends(2);
691  const size_t kNumRpcs = 5000;
692  // EDS response contains 2 localities, one with no endpoints.
693  EdsResourceArgs args({
694  {"locality0", CreateEndpointsForBackends()},
695  {"locality1", {}},
696  });
697  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
698  // Wait for both backends to be ready.
699  WaitForAllBackends(DEBUG_LOCATION);
700  // Send kNumRpcs RPCs.
701  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
702  // All traffic should go to the reachable locality.
703  EXPECT_EQ(backends_[0]->backend_service()->request_count(),
704  kNumRpcs / backends_.size());
705  EXPECT_EQ(backends_[1]->backend_service()->request_count(),
706  kNumRpcs / backends_.size());
707 }
708 
709 // EDS update with no localities.
710 TEST_P(EdsTest, NoLocalities) {
711  balancer_->ads_service()->SetEdsResource(BuildEdsResource({}));
712  Status status = SendRpc();
713  EXPECT_FALSE(status.ok());
715 }
716 
717 // Tests that the locality map can work properly even when it contains a large
718 // number of localities.
719 TEST_P(EdsTest, ManyLocalitiesStressTest) {
720  CreateAndStartBackends(2);
721  const size_t kNumLocalities = 100;
722  const uint32_t kRpcTimeoutMs = 5000;
723  // The first ADS response contains kNumLocalities localities, each of which
724  // contains backend 0.
725  EdsResourceArgs args;
726  for (size_t i = 0; i < kNumLocalities; ++i) {
727  std::string name = absl::StrCat("locality", i);
728  EdsResourceArgs::Locality locality(name, CreateEndpointsForBackends(0, 1));
729  args.locality_list.emplace_back(std::move(locality));
730  }
731  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
732  // Wait until backend 0 is ready.
733  WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
734  WaitForBackendOptions().set_reset_counters(false),
735  RpcOptions().set_timeout_ms(kRpcTimeoutMs));
736  EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
737  // The second ADS response contains 1 locality, which contains backend 1.
738  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
739  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
740  // Wait until backend 1 is ready.
741  WaitForBackend(DEBUG_LOCATION, 1);
742 }
743 
744 // Tests that the localities in a locality map are picked correctly after
745 // update (addition, modification, deletion).
746 TEST_P(EdsTest, LocalityMapUpdateChurn) {
747  CreateAndStartBackends(4);
748  const size_t kNumRpcs = 3000;
749  // The locality weight for the first 3 localities.
750  const std::vector<int> kLocalityWeights0 = {2, 3, 4};
751  const double kTotalLocalityWeight0 =
752  std::accumulate(kLocalityWeights0.begin(), kLocalityWeights0.end(), 0);
753  std::vector<double> locality_weight_rate_0;
754  locality_weight_rate_0.reserve(kLocalityWeights0.size());
755  for (int weight : kLocalityWeights0) {
756  locality_weight_rate_0.push_back(weight / kTotalLocalityWeight0);
757  }
758  // Delete the first locality, keep the second locality, change the third
759  // locality's weight from 4 to 2, and add a new locality with weight 6.
760  const std::vector<int> kLocalityWeights1 = {3, 2, 6};
761  const double kTotalLocalityWeight1 =
762  std::accumulate(kLocalityWeights1.begin(), kLocalityWeights1.end(), 0);
763  std::vector<double> locality_weight_rate_1 = {
764  0 /* placeholder for locality 0 */};
765  for (int weight : kLocalityWeights1) {
766  locality_weight_rate_1.push_back(weight / kTotalLocalityWeight1);
767  }
768  EdsResourceArgs args({
769  {"locality0", CreateEndpointsForBackends(0, 1), 2},
770  {"locality1", CreateEndpointsForBackends(1, 2), 3},
771  {"locality2", CreateEndpointsForBackends(2, 3), 4},
772  });
773  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
774  // Wait for the first 3 backends to be ready.
775  WaitForAllBackends(DEBUG_LOCATION, 0, 3);
776  gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
777  // Send kNumRpcs RPCs.
778  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
779  gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
780  // The picking rates of the first 3 backends should be roughly equal to the
781  // expectation.
782  std::vector<double> locality_picked_rates;
783  for (size_t i = 0; i < 3; ++i) {
784  locality_picked_rates.push_back(
785  static_cast<double>(backends_[i]->backend_service()->request_count()) /
786  kNumRpcs);
787  }
788  const double kErrorTolerance = 0.2;
789  for (size_t i = 0; i < 3; ++i) {
790  gpr_log(GPR_INFO, "Locality %" PRIuPTR " rate %f", i,
791  locality_picked_rates[i]);
792  EXPECT_THAT(
793  locality_picked_rates[i],
794  ::testing::AllOf(
795  ::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
796  ::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
797  }
798  args = EdsResourceArgs({
799  {"locality1", CreateEndpointsForBackends(1, 2), 3},
800  {"locality2", CreateEndpointsForBackends(2, 3), 2},
801  {"locality3", CreateEndpointsForBackends(3, 4), 6},
802  });
803  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
804  // Backend 3 hasn't received any request.
805  EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
806  // Wait until the locality update has been processed, as signaled by backend
807  // 3 receiving a request.
808  WaitForAllBackends(DEBUG_LOCATION, 3, 4);
809  gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
810  // Send kNumRpcs RPCs.
811  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
812  gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
813  // Backend 0 no longer receives any request.
814  EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
815  // The picking rates of the last 3 backends should be roughly equal to the
816  // expectation.
817  locality_picked_rates = {0 /* placeholder for backend 0 */};
818  for (size_t i = 1; i < 4; ++i) {
819  locality_picked_rates.push_back(
820  static_cast<double>(backends_[i]->backend_service()->request_count()) /
821  kNumRpcs);
822  }
823  for (size_t i = 1; i < 4; ++i) {
824  gpr_log(GPR_INFO, "Locality %" PRIuPTR " rate %f", i,
825  locality_picked_rates[i]);
826  EXPECT_THAT(
827  locality_picked_rates[i],
828  ::testing::AllOf(
829  ::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
830  ::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
831  }
832 }
833 
834 // Tests that we don't fail RPCs when replacing all of the localities in
835 // a given priority.
836 TEST_P(EdsTest, ReplaceAllLocalitiesInPriority) {
837  CreateAndStartBackends(2);
838  // Initial EDS update has backend 0.
839  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
840  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
841  // Wait for the first backend to be ready.
842  WaitForBackend(DEBUG_LOCATION, 0);
843  // Send EDS update that replaces the locality and switches to backend 1.
844  args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}});
845  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
846  // When the client sees the update, RPCs should start going to backend 1.
847  // No RPCs should fail during this change.
848  WaitForBackend(DEBUG_LOCATION, 1);
849 }
850 
851 TEST_P(EdsTest, ConsistentWeightedTargetUpdates) {
852  CreateAndStartBackends(4);
853  // Initial update has two localities.
854  EdsResourceArgs args({
855  {"locality0", CreateEndpointsForBackends(1, 2)},
856  {"locality1", CreateEndpointsForBackends(2, 3)},
857  });
858  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
859  WaitForAllBackends(DEBUG_LOCATION, 1, 3);
860  // Next update removes locality1.
861  // Also add backend 0 to locality0, so that we can tell when the
862  // update has been seen.
863  args = EdsResourceArgs({
864  {"locality0", CreateEndpointsForBackends(0, 2)},
865  });
866  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
867  WaitForBackend(DEBUG_LOCATION, 0);
868  // Next update re-adds locality1.
869  // Also add backend 3 to locality1, so that we can tell when the
870  // update has been seen.
871  args = EdsResourceArgs({
872  {"locality0", CreateEndpointsForBackends(0, 2)},
873  {"locality1", CreateEndpointsForBackends(2, 4)},
874  });
875  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
876  WaitForBackend(DEBUG_LOCATION, 3);
877 }
878 
879 // Tests that RPCs are dropped according to the drop config.
880 TEST_P(EdsTest, Drops) {
881  CreateAndStartBackends(1);
882  const uint32_t kDropPerMillionForLb = 100000;
883  const uint32_t kDropPerMillionForThrottle = 200000;
884  const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
885  const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
886  const double kDropRateForLbAndThrottle =
887  kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
888  const double kErrorTolerance = 0.05;
889  const size_t kNumRpcs =
890  ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
891  // The ADS response contains two drop categories.
892  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
893  args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
894  {kThrottleDropType, kDropPerMillionForThrottle}};
895  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
896  // Send kNumRpcs RPCs and count the drops.
897  size_t num_drops = SendRpcsAndCountFailuresWithMessage(
899  kStatusMessageDropPrefix);
900  // The drop rate should be roughly equal to the expectation.
901  const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
902  EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
903  kErrorTolerance));
904 }
905 
906 // Tests that drop config is converted correctly from per hundred.
907 TEST_P(EdsTest, DropPerHundred) {
908  CreateAndStartBackends(1);
909  const uint32_t kDropPerHundredForLb = 10;
910  const double kDropRateForLb = kDropPerHundredForLb / 100.0;
911  const double kErrorTolerance = 0.05;
912  const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
913  // The ADS response contains one drop category.
914  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
915  args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
916  args.drop_denominator = FractionalPercent::HUNDRED;
917  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
918  // Send kNumRpcs RPCs and count the drops.
919  size_t num_drops = SendRpcsAndCountFailuresWithMessage(
921  kStatusMessageDropPrefix);
922  // The drop rate should be roughly equal to the expectation.
923  const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
924  EXPECT_THAT(seen_drop_rate,
925  ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
926 }
927 
928 // Tests that drop config is converted correctly from per ten thousand.
929 TEST_P(EdsTest, DropPerTenThousand) {
930  CreateAndStartBackends(1);
931  const uint32_t kDropPerTenThousandForLb = 1000;
932  const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
933  const double kErrorTolerance = 0.05;
934  const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
935  // The ADS response contains one drop category.
936  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
937  args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
938  args.drop_denominator = FractionalPercent::TEN_THOUSAND;
939  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
940  // Send kNumRpcs RPCs and count the drops.
941  size_t num_drops = SendRpcsAndCountFailuresWithMessage(
943  kStatusMessageDropPrefix);
944  // The drop rate should be roughly equal to the expectation.
945  const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
946  EXPECT_THAT(seen_drop_rate,
947  ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
948 }
949 
950 // Tests that drop is working correctly after update.
951 TEST_P(EdsTest, DropConfigUpdate) {
952  CreateAndStartBackends(1);
953  const uint32_t kDropPerMillionForLb = 100000;
954  const uint32_t kDropPerMillionForThrottle = 200000;
955  const double kErrorTolerance = 0.05;
956  const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
957  const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
958  const double kDropRateForLbAndThrottle =
959  kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
960  const size_t kNumRpcsLbOnly =
961  ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
962  const size_t kNumRpcsBoth =
963  ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
964  // The first ADS response contains one drop category.
965  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
966  args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
967  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
968  // Send kNumRpcsLbOnly RPCs and count the drops.
969  gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
970  size_t num_drops = SendRpcsAndCountFailuresWithMessage(
971  DEBUG_LOCATION, kNumRpcsLbOnly, StatusCode::UNAVAILABLE,
972  kStatusMessageDropPrefix);
973  gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
974  // The drop rate should be roughly equal to the expectation.
975  double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsLbOnly;
976  gpr_log(GPR_INFO, "First batch drop rate %f", seen_drop_rate);
977  EXPECT_THAT(seen_drop_rate,
978  ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
979  // The second ADS response contains two drop categories, send an update EDS
980  // response.
981  args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
982  {kThrottleDropType, kDropPerMillionForThrottle}};
983  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
984  // Wait until the drop rate increases to the middle of the two configs,
985  // which implies that the update has been in effect.
986  const double kDropRateThreshold =
987  (kDropRateForLb + kDropRateForLbAndThrottle) / 2;
988  size_t num_rpcs = kNumRpcsBoth;
989  SendRpcsUntil(
991  [&](const RpcResult& result) {
992  ++num_rpcs;
993  if (result.status.ok()) {
994  EXPECT_EQ(result.response.message(), kRequestMessage);
995  } else {
996  EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
997  EXPECT_THAT(result.status.error_message(),
998  ::testing::StartsWith(kStatusMessageDropPrefix));
999  ++num_drops;
1000  }
1001  seen_drop_rate = static_cast<double>(num_drops) / num_rpcs;
1002  return seen_drop_rate < kDropRateThreshold;
1003  },
1004  /*timeout_ms=*/20000);
1005  // Send kNumRpcsBoth RPCs and count the drops.
1006  gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1007  num_drops = SendRpcsAndCountFailuresWithMessage(DEBUG_LOCATION, kNumRpcsBoth,
1009  kStatusMessageDropPrefix);
1010  gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1011  // The new drop rate should be roughly equal to the expectation.
1012  seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsBoth;
1013  gpr_log(GPR_INFO, "Second batch drop rate %f", seen_drop_rate);
1014  EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1015  kErrorTolerance));
1016 }
1017 
1018 // Tests that all the RPCs are dropped if any drop category drops 100%.
1019 TEST_P(EdsTest, DropAll) {
1020  const size_t kNumRpcs = 1000;
1021  const uint32_t kDropPerMillionForLb = 100000;
1022  const uint32_t kDropPerMillionForThrottle = 1000000;
1023  // The ADS response contains two drop categories.
1024  EdsResourceArgs args;
1025  args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1026  {kThrottleDropType, kDropPerMillionForThrottle}};
1027  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1028  // Send kNumRpcs RPCs and all of them are dropped.
1029  size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1031  kStatusMessageDropPrefix);
1032  EXPECT_EQ(num_drops, kNumRpcs);
1033 }
1034 
1035 //
1036 // EDS failover tests
1037 //
1038 
1039 class FailoverTest : public XdsEnd2endTest {
1040  public:
1041  void SetUp() override {
1043  ResetStub(/*failover_timeout_ms=*/500);
1044  }
1045 };
1046 
1048  XdsTest, FailoverTest,
1049  ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
1051 
1052 // Localities with the highest priority are used when multiple priority exist.
1053 TEST_P(FailoverTest, ChooseHighestPriority) {
1054  CreateAndStartBackends(4);
1055  EdsResourceArgs args({
1056  {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1057  1},
1058  {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1059  2},
1060  {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1061  3},
1062  {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1063  0},
1064  });
1065  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1066  WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1067  WaitForBackendOptions().set_reset_counters(false));
1068  for (size_t i = 0; i < 3; ++i) {
1069  EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1070  }
1071 }
1072 
1073 // Does not choose priority with no endpoints.
1074 TEST_P(FailoverTest, DoesNotUsePriorityWithNoEndpoints) {
1075  CreateAndStartBackends(3);
1076  EdsResourceArgs args({
1077  {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1078  1},
1079  {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1080  2},
1081  {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1082  3},
1083  {"locality3", {}, kDefaultLocalityWeight, 0},
1084  });
1085  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1086  WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1087  WaitForBackendOptions().set_reset_counters(false));
1088  for (size_t i = 1; i < 3; ++i) {
1089  EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1090  }
1091 }
1092 
1093 // Does not choose locality with no endpoints.
1094 TEST_P(FailoverTest, DoesNotUseLocalityWithNoEndpoints) {
1095  CreateAndStartBackends(1);
1096  EdsResourceArgs args({
1097  {"locality0", {}, kDefaultLocalityWeight, 0},
1098  {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 0},
1099  });
1100  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1101  // Wait for all backends to be used.
1102  WaitForAllBackends(DEBUG_LOCATION);
1103 }
1104 
1105 // If the higher priority localities are not reachable, failover to the
1106 // highest priority among the rest.
1107 TEST_P(FailoverTest, Failover) {
1108  CreateAndStartBackends(2);
1109  EdsResourceArgs args({
1110  {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
1111  {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1112  2},
1113  {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1114  3},
1115  {"locality3", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
1116  });
1117  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1118  WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1119  WaitForBackendOptions().set_reset_counters(false));
1120  EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1121 }
1122 
1123 // If a locality with higher priority than the current one becomes ready,
1124 // switch to it.
1125 TEST_P(FailoverTest, SwitchBackToHigherPriority) {
1126  CreateAndStartBackends(4);
1127  const size_t kNumRpcs = 100;
1128  EdsResourceArgs args({
1129  {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1130  1},
1131  {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1132  2},
1133  {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1134  3},
1135  {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1136  0},
1137  });
1138  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1139  WaitForBackend(DEBUG_LOCATION, 3);
1140  backends_[3]->StopListeningAndSendGoaways();
1141  backends_[0]->StopListeningAndSendGoaways();
1142  WaitForBackend(DEBUG_LOCATION, 1);
1143  ShutdownBackend(0);
1144  StartBackend(0);
1145  WaitForBackend(DEBUG_LOCATION, 0);
1146  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1147  EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count());
1148 }
1149 
1150 // The first update only contains unavailable priorities. The second update
1151 // contains available priorities.
1152 TEST_P(FailoverTest, UpdateInitialUnavailable) {
1153  CreateAndStartBackends(2);
1154  EdsResourceArgs args({
1155  {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
1156  {"locality1", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
1157  });
1158  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1159  constexpr char kErrorMessageRegex[] =
1160  "connections to all backends failing; last error: "
1161  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1162  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)";
1163  CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1164  kErrorMessageRegex);
1165  args = EdsResourceArgs({
1166  {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1167  0},
1168  {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1169  1},
1170  });
1171  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1172  WaitForBackend(DEBUG_LOCATION, 0, [&](const RpcResult& result) {
1173  if (!result.status.ok()) {
1174  EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1175  EXPECT_THAT(result.status.error_message(),
1176  ::testing::MatchesRegex(kErrorMessageRegex));
1177  }
1178  });
1179 }
1180 
1181 // Tests that after the localities' priorities are updated, we still choose
1182 // the highest READY priority with the updated localities.
1183 TEST_P(FailoverTest, UpdatePriority) {
1184  CreateAndStartBackends(4);
1185  const size_t kNumRpcs = 100;
1186  EdsResourceArgs args({
1187  {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1188  1},
1189  {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1190  2},
1191  {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1192  3},
1193  {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1194  0},
1195  });
1196  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1197  WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1198  WaitForBackendOptions().set_reset_counters(false));
1199  EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
1200  EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1201  EXPECT_EQ(0U, backends_[2]->backend_service()->request_count());
1202  args = EdsResourceArgs({
1203  {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1204  2},
1205  {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1206  0},
1207  {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1208  1},
1209  {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1210  3},
1211  });
1212  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1213  WaitForBackend(DEBUG_LOCATION, 1);
1214  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1215  EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count());
1216 }
1217 
1218 // Moves all localities in the current priority to a higher priority.
1219 TEST_P(FailoverTest, MoveAllLocalitiesInCurrentPriorityToHigherPriority) {
1220  CreateAndStartBackends(3);
1221  auto non_existant_endpoint = MakeNonExistantEndpoint();
1222  // First update:
1223  // - Priority 0 is locality 0, containing an unreachable backend.
1224  // - Priority 1 is locality 1, containing backends 0 and 1.
1225  EdsResourceArgs args({
1226  {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1227  {"locality1", CreateEndpointsForBackends(0, 2), kDefaultLocalityWeight,
1228  1},
1229  });
1230  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1231  // When we get the first update, all backends in priority 0 are down,
1232  // so we will create priority 1. Backends 0 and 1 should have traffic,
1233  // but backend 2 should not.
1234  WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
1235  WaitForBackendOptions().set_reset_counters(false));
1236  EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1237  // Second update:
1238  // - Priority 0 contains both localities 0 and 1.
1239  // - Priority 1 is not present.
1240  // - We add backend 2 to locality 1, just so we have a way to know
1241  // when the update has been seen by the client.
1242  args = EdsResourceArgs({
1243  {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1244  {"locality1", CreateEndpointsForBackends(0, 3), kDefaultLocalityWeight,
1245  0},
1246  });
1247  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1248  // When backend 2 gets traffic, we know the second update has been seen.
1249  WaitForBackend(DEBUG_LOCATION, 2);
1250  // The xDS server got at least 1 response.
1251  EXPECT_TRUE(balancer_->ads_service()->eds_response_state().has_value());
1252 }
1253 
1254 // This tests a bug triggered by the xds_cluster_resolver policy reusing
1255 // a child name for the priority policy when that child name was still
1256 // present but deactivated.
1257 TEST_P(FailoverTest, PriorityChildNameChurn) {
1258  CreateAndStartBackends(4);
1259  auto non_existant_endpoint = MakeNonExistantEndpoint();
1260  // Initial update:
1261  // - P0:locality0, child number 0 (unreachable)
1262  // - P1:locality1, child number 1
1263  // - P2:locality2, child number 2
1264  EdsResourceArgs args({
1265  {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1266  {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1267  1},
1268  {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1269  2},
1270  });
1271  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1272  WaitForBackend(DEBUG_LOCATION, 0);
1273  // Next update:
1274  // - P0:locality0, child number 0 (still unreachable)
1275  // - P1:locality2, child number 2 (moved from P2 to P1)
1276  // - P2:locality3, child number 3 (new child)
1277  // Child number 1 will be deactivated.
1278  args = EdsResourceArgs({
1279  {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1280  {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1281  1},
1282  {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1283  2},
1284  });
1285  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1286  WaitForBackend(DEBUG_LOCATION, 1);
1287  // Next update:
1288  // - P0:locality0, child number 0 (still unreachable)
1289  // - P1:locality4, child number 4 (new child number -- should not reuse #1)
1290  // - P2:locality3, child number 3
1291  // Child number 1 will be deactivated.
1292  args = EdsResourceArgs({
1293  {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1294  {"locality4", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1295  1},
1296  {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1297  2},
1298  });
1299  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1300  WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1301  WaitForBackendOptions().set_reset_counters(false));
1302  // P2 should not have gotten any traffic in this change.
1303  EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1304 }
1305 
1306 //
1307 // EDS client load reporting tests
1308 //
1309 
1310 using ClientLoadReportingTest = XdsEnd2endTest;
1311 
1313  XdsTest, ClientLoadReportingTest,
1314  ::testing::Values(XdsTestType().set_enable_load_reporting()),
1316 
1317 // Tests that the load report received at the balancer is correct.
1318 TEST_P(ClientLoadReportingTest, Vanilla) {
1319  CreateAndStartBackends(4);
1320  const size_t kNumRpcsPerAddress = 10;
1321  const size_t kNumFailuresPerAddress = 3;
1322  EdsResourceArgs args({
1323  {"locality0", CreateEndpointsForBackends(0, 2)},
1324  {"locality1", CreateEndpointsForBackends(2, 4)},
1325  });
1326  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1327  // Wait until all backends are ready.
1328  size_t num_warmup_rpcs =
1329  WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1330  WaitForBackendOptions().set_reset_counters(false));
1331  // Send kNumRpcsPerAddress RPCs per server.
1332  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
1333  for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1334  CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1335  RpcOptions().set_server_fail(true));
1336  }
1337  const size_t total_successful_rpcs_sent =
1338  (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1339  const size_t total_failed_rpcs_sent =
1340  kNumFailuresPerAddress * backends_.size();
1341  // Check that the backends got the right number of requests.
1342  size_t total_rpcs_sent = 0;
1343  for (const auto& backend : backends_) {
1344  total_rpcs_sent += backend->backend_service()->request_count();
1345  }
1346  EXPECT_EQ(total_rpcs_sent,
1347  total_successful_rpcs_sent + total_failed_rpcs_sent);
1348  // The load report received at the balancer should be correct.
1349  std::vector<ClientStats> load_report =
1350  balancer_->lrs_service()->WaitForLoadReport();
1351  ASSERT_EQ(load_report.size(), 1UL);
1352  ClientStats& client_stats = load_report.front();
1353  EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1354  EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1355  EXPECT_EQ(total_successful_rpcs_sent,
1356  client_stats.total_successful_requests());
1357  EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1358  EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1359  EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1360  EXPECT_EQ(0U, client_stats.total_dropped_requests());
1361  ASSERT_THAT(
1362  client_stats.locality_stats(),
1363  ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1364  ::testing::Pair("locality1", ::testing::_)));
1365  size_t num_successful_rpcs = 0;
1366  size_t num_failed_rpcs = 0;
1367  for (const auto& p : client_stats.locality_stats()) {
1368  EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1369  EXPECT_EQ(
1370  p.second.total_issued_requests,
1371  p.second.total_successful_requests + p.second.total_error_requests);
1372  num_successful_rpcs += p.second.total_successful_requests;
1373  num_failed_rpcs += p.second.total_error_requests;
1374  }
1375  EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1376  EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1377  EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1378  // The LRS service got a single request, and sent a single response.
1379  EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1380  EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1381 }
1382 
1383 // Tests send_all_clusters.
1384 TEST_P(ClientLoadReportingTest, SendAllClusters) {
1385  CreateAndStartBackends(2);
1386  balancer_->lrs_service()->set_send_all_clusters(true);
1387  const size_t kNumRpcsPerAddress = 10;
1388  const size_t kNumFailuresPerAddress = 3;
1389  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1390  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1391  // Wait until all backends are ready.
1392  size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION);
1393  // Send kNumRpcsPerAddress RPCs per server.
1394  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
1395  for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1396  CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1397  RpcOptions().set_server_fail(true));
1398  }
1399  // Check that each backend got the right number of requests.
1400  for (size_t i = 0; i < backends_.size(); ++i) {
1401  EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress,
1402  backends_[i]->backend_service()->request_count());
1403  }
1404  // The load report received at the balancer should be correct.
1405  std::vector<ClientStats> load_report =
1406  balancer_->lrs_service()->WaitForLoadReport();
1407  ASSERT_EQ(load_report.size(), 1UL);
1408  ClientStats& client_stats = load_report.front();
1409  EXPECT_EQ(kNumRpcsPerAddress * backends_.size() + num_warmup_rpcs,
1410  client_stats.total_successful_requests());
1411  EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1412  EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size() +
1413  num_warmup_rpcs,
1414  client_stats.total_issued_requests());
1415  EXPECT_EQ(kNumFailuresPerAddress * backends_.size(),
1416  client_stats.total_error_requests());
1417  EXPECT_EQ(0U, client_stats.total_dropped_requests());
1418  // The LRS service got a single request, and sent a single response.
1419  EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1420  EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1421 }
1422 
1423 // Tests that we don't include stats for clusters that are not requested
1424 // by the LRS server.
1425 TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) {
1426  CreateAndStartBackends(1);
1427  balancer_->lrs_service()->set_cluster_names({"bogus"});
1428  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1429  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1430  // Wait until all backends are ready.
1431  WaitForAllBackends(DEBUG_LOCATION);
1432  // The load report received at the balancer should be correct.
1433  std::vector<ClientStats> load_report =
1434  balancer_->lrs_service()->WaitForLoadReport();
1435  ASSERT_EQ(load_report.size(), 0UL);
1436  // The LRS service got a single request, and sent a single response.
1437  EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1438  EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1439 }
1440 
1441 // Tests that if the balancer restarts, the client load report contains the
1442 // stats before and after the restart correctly.
1443 TEST_P(ClientLoadReportingTest, BalancerRestart) {
1444  CreateAndStartBackends(4);
1445  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}});
1446  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1447  // Wait until all backends returned by the balancer are ready.
1448  size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1449  std::vector<ClientStats> load_report =
1450  balancer_->lrs_service()->WaitForLoadReport();
1451  ASSERT_EQ(load_report.size(), 1UL);
1452  ClientStats client_stats = std::move(load_report.front());
1453  EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
1454  EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1455  EXPECT_EQ(0U, client_stats.total_error_requests());
1456  EXPECT_EQ(0U, client_stats.total_dropped_requests());
1457  // Shut down the balancer.
1458  balancer_->Shutdown();
1459  // We should continue using the last EDS response we received from the
1460  // balancer before it was shut down.
1461  // Note: We need to use WaitForAllBackends() here instead of just
1462  // CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer
1463  // shuts down, the XdsClient will generate an error to the
1464  // ListenerWatcher, which will cause the xds resolver to send a
1465  // no-op update to the LB policy. When this update gets down to the
1466  // round_robin child policy for the locality, it will generate a new
1467  // subchannel list, which resets the start index randomly. So we need
1468  // to be a little more permissive here to avoid spurious failures.
1469  ResetBackendCounters();
1470  num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1471  // Now restart the balancer, this time pointing to the new backends.
1472  balancer_->Start();
1473  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}});
1474  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1475  // Wait for queries to start going to one of the new backends.
1476  // This tells us that we're now using the new serverlist.
1477  num_rpcs += WaitForAllBackends(DEBUG_LOCATION, 2, 4);
1478  // Send one RPC per backend.
1479  CheckRpcSendOk(DEBUG_LOCATION, 2);
1480  num_rpcs += 2;
1481  // Check client stats.
1482  load_report = balancer_->lrs_service()->WaitForLoadReport();
1483  ASSERT_EQ(load_report.size(), 1UL);
1484  client_stats = std::move(load_report.front());
1485  EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
1486  EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1487  EXPECT_EQ(0U, client_stats.total_error_requests());
1488  EXPECT_EQ(0U, client_stats.total_dropped_requests());
1489 }
1490 
1491 // Tests load reporting when switching over from one cluster to another.
1492 TEST_P(ClientLoadReportingTest, ChangeClusters) {
1493  CreateAndStartBackends(4);
1494  const char* kNewClusterName = "new_cluster_name";
1495  const char* kNewEdsServiceName = "new_eds_service_name";
1496  balancer_->lrs_service()->set_cluster_names(
1497  {kDefaultClusterName, kNewClusterName});
1498  // cluster kDefaultClusterName -> locality0 -> backends 0 and 1
1499  EdsResourceArgs args({
1500  {"locality0", CreateEndpointsForBackends(0, 2)},
1501  });
1502  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1503  // cluster kNewClusterName -> locality1 -> backends 2 and 3
1504  EdsResourceArgs args2({
1505  {"locality1", CreateEndpointsForBackends(2, 4)},
1506  });
1507  balancer_->ads_service()->SetEdsResource(
1508  BuildEdsResource(args2, kNewEdsServiceName));
1509  // CDS resource for kNewClusterName.
1510  Cluster new_cluster = default_cluster_;
1511  new_cluster.set_name(kNewClusterName);
1512  new_cluster.mutable_eds_cluster_config()->set_service_name(
1513  kNewEdsServiceName);
1514  balancer_->ads_service()->SetCdsResource(new_cluster);
1515  // Wait for all backends to come online.
1516  size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1517  // The load report received at the balancer should be correct.
1518  std::vector<ClientStats> load_report =
1519  balancer_->lrs_service()->WaitForLoadReport();
1520  EXPECT_THAT(
1521  load_report,
1523  ::testing::Property(&ClientStats::cluster_name, kDefaultClusterName),
1525  kDefaultEdsServiceName),
1527  &ClientStats::locality_stats,
1529  "locality0",
1530  ::testing::AllOf(
1531  ::testing::Field(&ClientStats::LocalityStats::
1532  total_successful_requests,
1533  num_rpcs),
1534  ::testing::Field(&ClientStats::LocalityStats::
1535  total_requests_in_progress,
1536  0UL),
1537  ::testing::Field(
1538  &ClientStats::LocalityStats::total_error_requests,
1539  0UL),
1540  ::testing::Field(
1541  &ClientStats::LocalityStats::total_issued_requests,
1542  num_rpcs))))),
1543  ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
1544  // Change RDS resource to point to new cluster.
1545  RouteConfiguration new_route_config = default_route_config_;
1546  new_route_config.mutable_virtual_hosts(0)
1547  ->mutable_routes(0)
1548  ->mutable_route()
1549  ->set_cluster(kNewClusterName);
1550  SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1551  new_route_config);
1552  // Wait for all new backends to be used.
1553  num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 2, 4);
1554  // The load report received at the balancer should be correct.
1555  load_report = balancer_->lrs_service()->WaitForLoadReport();
1556  EXPECT_THAT(
1557  load_report,
1559  ::testing::AllOf(
1561  kDefaultClusterName),
1563  kDefaultEdsServiceName),
1565  &ClientStats::locality_stats,
1567  "locality0",
1568  ::testing::AllOf(
1569  ::testing::Field(&ClientStats::LocalityStats::
1570  total_successful_requests,
1571  ::testing::Lt(num_rpcs)),
1572  ::testing::Field(&ClientStats::LocalityStats::
1573  total_requests_in_progress,
1574  0UL),
1575  ::testing::Field(
1576  &ClientStats::LocalityStats::total_error_requests,
1577  0UL),
1578  ::testing::Field(&ClientStats::LocalityStats::
1579  total_issued_requests,
1580  ::testing::Le(num_rpcs)))))),
1581  ::testing::Property(&ClientStats::total_dropped_requests, 0UL)),
1582  ::testing::AllOf(
1583  ::testing::Property(&ClientStats::cluster_name, kNewClusterName),
1585  kNewEdsServiceName),
1587  &ClientStats::locality_stats,
1589  "locality1",
1590  ::testing::AllOf(
1591  ::testing::Field(&ClientStats::LocalityStats::
1592  total_successful_requests,
1593  ::testing::Le(num_rpcs)),
1594  ::testing::Field(&ClientStats::LocalityStats::
1595  total_requests_in_progress,
1596  0UL),
1597  ::testing::Field(
1598  &ClientStats::LocalityStats::total_error_requests,
1599  0UL),
1600  ::testing::Field(&ClientStats::LocalityStats::
1601  total_issued_requests,
1602  ::testing::Le(num_rpcs)))))),
1603  ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
1604  size_t total_ok = 0;
1605  for (const ClientStats& client_stats : load_report) {
1606  total_ok += client_stats.total_successful_requests();
1607  }
1608  EXPECT_EQ(total_ok, num_rpcs);
1609  // The LRS service got a single request, and sent a single response.
1610  EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1611  EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1612 }
1613 
1614 // Tests that the drop stats are correctly reported by client load reporting.
1615 TEST_P(ClientLoadReportingTest, DropStats) {
1616  CreateAndStartBackends(1);
1617  const uint32_t kDropPerMillionForLb = 100000;
1618  const uint32_t kDropPerMillionForThrottle = 200000;
1619  const double kErrorTolerance = 0.05;
1620  const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1621  const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1622  const double kDropRateForLbAndThrottle =
1623  kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1624  const size_t kNumRpcs =
1625  ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1626  // The ADS response contains two drop categories.
1627  EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1628  args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1629  {kThrottleDropType, kDropPerMillionForThrottle}};
1630  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1631  // Send kNumRpcs RPCs and count the drops.
1632  size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1634  kStatusMessageDropPrefix);
1635  // The drop rate should be roughly equal to the expectation.
1636  const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1637  EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1638  kErrorTolerance));
1639  // Check client stats.
1640  ClientStats client_stats;
1641  do {
1642  std::vector<ClientStats> load_reports =
1643  balancer_->lrs_service()->WaitForLoadReport();
1644  for (const auto& load_report : load_reports) {
1645  client_stats += load_report;
1646  }
1647  } while (client_stats.total_issued_requests() +
1648  client_stats.total_dropped_requests() <
1649  kNumRpcs);
1650  EXPECT_EQ(num_drops, client_stats.total_dropped_requests());
1651  EXPECT_THAT(static_cast<double>(client_stats.dropped_requests(kLbDropType)) /
1652  kNumRpcs,
1653  ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1654  EXPECT_THAT(
1655  static_cast<double>(client_stats.dropped_requests(kThrottleDropType)) /
1656  (kNumRpcs * (1 - kDropRateForLb)),
1657  ::testing::DoubleNear(kDropRateForThrottle, kErrorTolerance));
1658 }
1659 
1660 } // namespace
1661 } // namespace testing
1662 } // namespace grpc
1663 
1664 int main(int argc, char** argv) {
1665  grpc::testing::TestEnvironment env(&argc, argv);
1666  ::testing::InitGoogleTest(&argc, argv);
1667  // Make the backup poller poll very frequently in order to pick up
1668  // updates from all the subchannels's FDs.
1669  GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
1670 #if TARGET_OS_IPHONE
1671  // Workaround Apple CFStream bug
1672  gpr_setenv("grpc_cfstream", "0");
1673 #endif
1674  grpc_init();
1675  const auto result = RUN_ALL_TESTS();
1676  grpc_shutdown();
1677  return result;
1678 }
grpc::EXPECT_THAT
EXPECT_THAT(status.error_message(), ::testing::HasSubstr("subject_token_type"))
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
grpc::status
auto status
Definition: cpp/client/credentials_test.cc:200
grpc::testing::XdsEnd2endTest::SetUp
void SetUp() override
Definition: xds_end2end_test_lib.h:494
testing::ContainsRegex
PolymorphicMatcher< internal::MatchesRegexMatcher > ContainsRegex(const internal::RE *regex)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8835
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
grpc_timeout_seconds_to_deadline
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
Definition: test/core/util/test_config.cc:81
stub_
std::unique_ptr< grpc::testing::EchoTestService::Stub > stub_
Definition: client_channel_stress_test.cc:331
testing::Lt
internal::LtMatcher< Rhs > Lt(Rhs x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8603
generate.env
env
Definition: generate.py:37
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc::testing::kCdsTypeUrl
constexpr char kCdsTypeUrl[]
Definition: xds_server.h:55
grpc
Definition: grpcpp/alarm.h:33
testing::DoubleNear
internal::FloatingEqMatcher< double > DoubleNear(double rhs, double max_abs_error)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8647
backends_
std::vector< std::unique_ptr< BackendServiceImpl > > backends_
Definition: client_channel_stress_test.cc:333
cluster_name
std::string cluster_name
Definition: xds_cluster_resolver.cc:91
grpc::ASSERT_EQ
ASSERT_EQ(sizeof(valid_json), fwrite(valid_json, 1, sizeof(valid_json), creds_file))
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc::testing::kErrorMessage
const char * kErrorMessage
Definition: exception_test.cc:38
setup.name
name
Definition: setup.py:542
testing::Ge
internal::GeMatcher< Rhs > Ge(Rhs x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8585
xds_manager.p
p
Definition: xds_manager.py:60
grpc.StatusCode.FAILED_PRECONDITION
tuple FAILED_PRECONDITION
Definition: src/python/grpcio/grpc/__init__.py:272
testing::ElementsAre
internal::ElementsAreMatcher< ::testing::tuple<> > ElementsAre()
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:13040
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
testing::Property
PolymorphicMatcher< internal::PropertyMatcher< Class, PropertyType > > Property(PropertyType(Class::*property)() const, const PropertyMatcher &matcher)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8732
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
ASSERT_LT
#define ASSERT_LT(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2068
grpc.StatusCode.OK
tuple OK
Definition: src/python/grpcio/grpc/__init__.py:260
cluster
absl::string_view cluster
Definition: xds_resolver.cc:331
gpr_time_cmp
GPRAPI int gpr_time_cmp(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:30
ASSERT_THAT
#define ASSERT_THAT(value, matcher)
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
channel_
RefCountedPtr< Channel > channel_
Definition: channel_connectivity.cc:209
xds_end2end_test_lib.h
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
grpc::testing::INSTANTIATE_TEST_SUITE_P
INSTANTIATE_TEST_SUITE_P(HistogramTestCases, HistogramTest, ::testing::Range< int >(0, GRPC_STATS_HISTOGRAM_COUNT))
backup_poller.h
kNumRpcs
const int kNumRpcs
Definition: thread_stress_test.cc:50
grpc::CreateChannel
std::shared_ptr< Channel > CreateChannel(const grpc::string &target, const std::shared_ptr< ChannelCredentials > &creds)
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
weight
uint32_t weight
Definition: weighted_target.cc:84
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
GPR_GLOBAL_CONFIG_SET
#define GPR_GLOBAL_CONFIG_SET(name, value)
Definition: global_config_generic.h:26
testing::Pair
internal::PairMatcher< FirstMatcher, SecondMatcher > Pair(FirstMatcher first_matcher, SecondMatcher second_matcher)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:9152
absl::Seconds
constexpr Duration Seconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:419
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
grpc::testing::AdsServiceImpl::ResponseState::ACKED
@ ACKED
Definition: xds_server.h:73
testing::_
const internal::AnythingMatcher _
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8548
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
testing::Values
internal::ValueArray< T... > Values(T... v)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest-param-test.h:335
eds_service_name
std::string eds_service_name
Definition: xds_cluster_resolver.cc:99
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
testing::Le
internal::LeMatcher< Rhs > Le(Rhs x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8597
gpr_time_from_micros
GPRAPI gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:115
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
accumulate
static void accumulate(upb_pb_encoder *e)
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/upb.c:7694
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
ASSERT_TRUE
#define ASSERT_TRUE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1973
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
testing::Field
grpc.StatusCode.UNAVAILABLE
tuple UNAVAILABLE
Definition: src/python/grpcio/grpc/__init__.py:278
testing::AllOf
internal::AllOfResult2< M1, M2 >::type AllOf(M1 m1, M2 m2)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:13472
grpc::testing::TEST_P
TEST_P(HistogramTest, IncHistogram)
Definition: stats_test.cc:87
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
run_xds_tests.backend_service
def backend_service
Definition: run_xds_tests.py:3255
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
grpc::testing::SendRpc
static void SendRpc(grpc::testing::EchoTestService::Stub *stub, int num_rpcs, bool allow_exhaustion, gpr_atm *errors)
Definition: thread_stress_test.cc:277
testing::HasSubstr
PolymorphicMatcher< internal::HasSubstrMatcher< internal::string > > HasSubstr(const internal::string &substring)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8803
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
main
int main(int argc, char **argv)
Definition: xds_cluster_end2end_test.cc:1664
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc::testing::XdsTestType::Name
static std::string Name(const ::testing::TestParamInfo< XdsTestType > &info)
Definition: xds_end2end_test_lib.h:143
gpr_setenv
void gpr_setenv(const char *name, const char *value)


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:57