client_lb_end2end_test.cc
Go to the documentation of this file.
1 // Copyright 2016 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <deque>
17 #include <memory>
18 #include <mutex>
19 #include <random>
20 #include <set>
21 #include <string>
22 #include <thread>
23 
24 #include <gmock/gmock.h>
25 #include <gtest/gtest.h>
26 
27 #include "absl/memory/memory.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_format.h"
30 #include "absl/strings/str_join.h"
31 #include "absl/strings/string_view.h"
32 
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/atm.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/time.h>
38 #include <grpcpp/channel.h>
39 #include <grpcpp/client_context.h>
40 #include <grpcpp/create_channel.h>
45 #include <grpcpp/server.h>
46 #include <grpcpp/server_builder.h>
47 
55 #include "src/core/lib/gpr/env.h"
68 #include "src/proto/grpc/testing/echo.grpc.pb.h"
69 #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
70 #include "test/core/util/port.h"
76 
77 using grpc::testing::EchoRequest;
78 using grpc::testing::EchoResponse;
79 
80 namespace grpc {
81 namespace testing {
82 namespace {
83 
84 constexpr char kRequestMessage[] = "Live long and prosper.";
85 
86 // Subclass of TestServiceImpl that increments a request counter for
87 // every call to the Echo RPC.
88 class MyTestServiceImpl : public TestServiceImpl {
89  public:
90  Status Echo(ServerContext* context, const EchoRequest* request,
91  EchoResponse* response) override {
92  {
95  }
96  AddClient(context->peer());
97  if (request->has_param() && request->param().has_backend_metrics()) {
98  load_report_ = request->param().backend_metrics();
99  auto* recorder = context->ExperimentalGetCallMetricRecorder();
100  EXPECT_NE(recorder, nullptr);
101  recorder->RecordCpuUtilizationMetric(load_report_.cpu_utilization())
102  .RecordMemoryUtilizationMetric(load_report_.mem_utilization());
103  for (const auto& p : load_report_.request_cost()) {
104  recorder->RecordRequestCostMetric(p.first, p.second);
105  }
106  for (const auto& p : load_report_.utilization()) {
107  recorder->RecordUtilizationMetric(p.first, p.second);
108  }
109  }
111  }
112 
113  int request_count() {
115  return request_count_;
116  }
117 
118  void ResetCounters() {
120  request_count_ = 0;
121  }
122 
123  std::set<std::string> clients() {
125  return clients_;
126  }
127 
128  private:
129  void AddClient(const std::string& client) {
131  clients_.insert(client);
132  }
133 
135  int request_count_ = 0;
137  std::set<std::string> clients_;
138  // For strings storage.
139  xds::data::orca::v3::OrcaLoadReport load_report_;
140 };
141 
142 class FakeResolverResponseGeneratorWrapper {
143  public:
144  explicit FakeResolverResponseGeneratorWrapper(bool ipv6_only)
145  : ipv6_only_(ipv6_only),
148 
149  FakeResolverResponseGeneratorWrapper(
150  FakeResolverResponseGeneratorWrapper&& other) noexcept {
151  ipv6_only_ = other.ipv6_only_;
152  response_generator_ = std::move(other.response_generator_);
153  }
154 
155  void SetNextResolution(
156  const std::vector<int>& ports, const char* service_config_json = nullptr,
157  const char* attribute_key = nullptr,
158  std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
159  nullptr) {
162  BuildFakeResults(ipv6_only_, ports, service_config_json, attribute_key,
163  std::move(attribute)));
164  }
165 
166  void SetNextResolutionUponError(const std::vector<int>& ports) {
169  BuildFakeResults(ipv6_only_, ports));
170  }
171 
172  void SetFailureOnReresolution() {
175  }
176 
178  return response_generator_.get();
179  }
180 
181  private:
182  static grpc_core::Resolver::Result BuildFakeResults(
183  bool ipv6_only, const std::vector<int>& ports,
184  const char* service_config_json = nullptr,
185  const char* attribute_key = nullptr,
186  std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
187  nullptr) {
189  result.addresses = grpc_core::ServerAddressList();
190  for (const int& port : ports) {
192  absl::StrCat(ipv6_only ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port));
193  GPR_ASSERT(lb_uri.ok());
194  grpc_resolved_address address;
195  GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
196  std::map<const char*,
197  std::unique_ptr<grpc_core::ServerAddress::AttributeInterface>>
198  attributes;
199  if (attribute != nullptr) {
200  attributes[attribute_key] = attribute->Copy();
201  }
202  result.addresses->emplace_back(address.addr, address.len,
203  nullptr /* args */, std::move(attributes));
204  }
205  if (result.addresses->empty()) {
206  result.resolution_note = "fake resolver empty address list";
207  }
208  if (service_config_json != nullptr) {
211  nullptr, service_config_json, &error);
212  GPR_ASSERT(*result.service_config != nullptr);
213  }
214  return result;
215  }
216 
217  bool ipv6_only_ = false;
220 };
221 
222 class ClientLbEnd2endTest : public ::testing::Test {
223  protected:
224  ClientLbEnd2endTest()
225  : server_host_("localhost"),
226  creds_(new SecureChannelCredentials(
228 
229  static void SetUpTestCase() {
230  // Make the backup poller poll very frequently in order to pick up
231  // updates from all the subchannels's FDs.
232  GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
233 #if TARGET_OS_IPHONE
234  // Workaround Apple CFStream bug
235  gpr_setenv("grpc_cfstream", "0");
236 #endif
237  }
238 
239  void SetUp() override {
240  grpc_init();
241  bool localhost_resolves_to_ipv4 = false;
242  bool localhost_resolves_to_ipv6 = false;
243  grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
244  &localhost_resolves_to_ipv6);
245  ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
246  }
247 
248  void TearDown() override {
249  for (size_t i = 0; i < servers_.size(); ++i) {
250  servers_[i]->Shutdown();
251  }
252  servers_.clear();
253  creds_.reset();
254  grpc_shutdown();
255  }
256 
257  void CreateServers(size_t num_servers,
258  std::vector<int> ports = std::vector<int>()) {
259  servers_.clear();
260  for (size_t i = 0; i < num_servers; ++i) {
261  int port = 0;
262  if (ports.size() == num_servers) port = ports[i];
263  servers_.emplace_back(new ServerData(port));
264  }
265  }
266 
267  void StartServer(size_t index) { servers_[index]->Start(server_host_); }
268 
269  void StartServers(size_t num_servers,
270  std::vector<int> ports = std::vector<int>()) {
271  CreateServers(num_servers, std::move(ports));
272  for (size_t i = 0; i < num_servers; ++i) {
273  StartServer(i);
274  }
275  }
276 
277  std::vector<int> GetServersPorts(size_t start_index = 0,
278  size_t stop_index = 0) {
279  if (stop_index == 0) stop_index = servers_.size();
280  std::vector<int> ports;
281  for (size_t i = start_index; i < stop_index; ++i) {
282  ports.push_back(servers_[i]->port_);
283  }
284  return ports;
285  }
286 
287  FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
288  return FakeResolverResponseGeneratorWrapper(ipv6_only_);
289  }
290 
291  std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
292  const std::shared_ptr<Channel>& channel) {
293  return grpc::testing::EchoTestService::NewStub(channel);
294  }
295 
296  std::shared_ptr<Channel> BuildChannel(
297  const std::string& lb_policy_name,
298  const FakeResolverResponseGeneratorWrapper& response_generator,
299  ChannelArguments args = ChannelArguments()) {
300  if (!lb_policy_name.empty()) {
301  args.SetLoadBalancingPolicyName(lb_policy_name);
302  } // else, default to pick first
304  response_generator.Get());
305  return grpc::CreateCustomChannel("fake:///", creds_, args);
306  }
307 
308  Status SendRpc(
309  const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
310  EchoResponse* response = nullptr, int timeout_ms = 1000,
311  bool wait_for_ready = false, EchoRequest* request = nullptr) {
312  EchoResponse local_response;
313  if (response == nullptr) response = &local_response;
314  EchoRequest local_request;
315  if (request == nullptr) request = &local_request;
316  request->set_message(kRequestMessage);
317  request->mutable_param()->set_echo_metadata(true);
318  ClientContext context;
321  context.AddMetadata("foo", "1");
322  context.AddMetadata("bar", "2");
323  context.AddMetadata("baz", "3");
324  return stub->Echo(&context, *request, response);
325  }
326 
327  void CheckRpcSendOk(
328  const grpc_core::DebugLocation& location,
329  const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
330  bool wait_for_ready = false,
331  xds::data::orca::v3::OrcaLoadReport* load_report = nullptr,
332  int timeout_ms = 2000) {
333  EchoResponse response;
334  EchoRequest request;
335  EchoRequest* request_ptr = nullptr;
336  if (load_report != nullptr) {
337  request_ptr = &request;
338  auto params = request.mutable_param();
339  auto backend_metrics = params->mutable_backend_metrics();
340  *backend_metrics = *load_report;
341  }
342  Status status =
343  SendRpc(stub, &response, timeout_ms, wait_for_ready, request_ptr);
344  ASSERT_TRUE(status.ok())
345  << "From " << location.file() << ":" << location.line()
346  << "\nError: " << status.error_message() << " "
347  << status.error_details();
348  ASSERT_EQ(response.message(), kRequestMessage)
349  << "From " << location.file() << ":" << location.line();
350  }
351 
352  void CheckRpcSendFailure(
353  const grpc_core::DebugLocation& location,
354  const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
355  StatusCode expected_status, absl::string_view expected_message_regex) {
357  EXPECT_FALSE(status.ok());
358  EXPECT_EQ(expected_status, status.error_code())
359  << location.file() << ":" << location.line();
360  EXPECT_THAT(status.error_message(),
361  ::testing::MatchesRegex(expected_message_regex))
362  << location.file() << ":" << location.line();
363  }
364 
365  struct ServerData {
366  const int port_;
367  std::unique_ptr<Server> server_;
368  MyTestServiceImpl service_;
369  experimental::OrcaService orca_service_;
370  std::unique_ptr<std::thread> thread_;
371 
374  bool server_ready_ ABSL_GUARDED_BY(mu_) = false;
375  bool started_ ABSL_GUARDED_BY(mu_) = false;
376 
377  explicit ServerData(int port = 0)
379  orca_service_(experimental::OrcaService::Options()) {}
380 
381  void Start(const std::string& server_host) {
382  gpr_log(GPR_INFO, "starting server on port %d", port_);
384  started_ = true;
385  thread_ = absl::make_unique<std::thread>(
386  std::bind(&ServerData::Serve, this, server_host));
387  while (!server_ready_) {
388  cond_.Wait(&mu_);
389  }
390  server_ready_ = false;
391  gpr_log(GPR_INFO, "server startup complete");
392  }
393 
394  void Serve(const std::string& server_host) {
395  std::ostringstream server_address;
396  server_address << server_host << ":" << port_;
397  ServerBuilder builder;
399  std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
401  builder.AddListeningPort(server_address.str(), std::move(creds));
402  builder.RegisterService(&service_);
403  builder.RegisterService(&orca_service_);
404  server_ = builder.BuildAndStart();
406  server_ready_ = true;
407  cond_.Signal();
408  }
409 
410  void Shutdown() {
412  if (!started_) return;
414  thread_->join();
415  started_ = false;
416  }
417 
418  void SetServingStatus(const std::string& service, bool serving) {
419  server_->GetHealthCheckService()->SetServingStatus(service, serving);
420  }
421  };
422 
423  void ResetCounters() {
424  for (const auto& server : servers_) server->service_.ResetCounters();
425  }
426 
427  bool SeenAllServers(size_t start_index = 0, size_t stop_index = 0) {
428  if (stop_index == 0) stop_index = servers_.size();
429  for (size_t i = start_index; i < stop_index; ++i) {
430  if (servers_[i]->service_.request_count() == 0) return false;
431  }
432  return true;
433  }
434 
435  // If status_check is null, all RPCs must succeed.
436  // If status_check is non-null, it will be called for all non-OK RPCs.
437  void WaitForServers(
438  const grpc_core::DebugLocation& location,
439  const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
440  size_t start_index = 0, size_t stop_index = 0,
441  std::function<void(const Status&)> status_check = nullptr,
443  if (stop_index == 0) stop_index = servers_.size();
444  auto deadline = absl::Now() + (timeout * grpc_test_slowdown_factor());
446  "========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR
447  ") ==========",
448  start_index, stop_index);
449  while (!SeenAllServers(start_index, stop_index)) {
451  if (status_check != nullptr) {
452  if (!status.ok()) status_check(status);
453  } else {
454  EXPECT_TRUE(status.ok())
455  << " code=" << status.error_code() << " message=\""
456  << status.error_message() << "\" at " << location.file() << ":"
457  << location.line();
458  }
459  EXPECT_LE(absl::Now(), deadline)
460  << " at " << location.file() << ":" << location.line();
461  if (absl::Now() >= deadline) break;
462  }
463  ResetCounters();
464  }
465 
466  void WaitForServer(
467  const grpc_core::DebugLocation& location,
468  const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
469  size_t server_index,
470  std::function<void(const Status&)> status_check = nullptr) {
471  WaitForServers(location, stub, server_index, server_index + 1,
472  status_check);
473  }
474 
475  bool WaitForChannelState(
476  Channel* channel,
477  const std::function<bool(grpc_connectivity_state)>& predicate,
478  bool try_to_connect = false, int timeout_seconds = 5) {
479  const gpr_timespec deadline =
481  while (true) {
483  if (predicate(state)) break;
484  if (!channel->WaitForStateChange(state, deadline)) return false;
485  }
486  return true;
487  }
488 
489  bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
490  auto predicate = [](grpc_connectivity_state state) {
491  return state != GRPC_CHANNEL_READY;
492  };
493  return WaitForChannelState(channel, predicate, false, timeout_seconds);
494  }
495 
496  bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
497  auto predicate = [](grpc_connectivity_state state) {
498  return state == GRPC_CHANNEL_READY;
499  };
500  return WaitForChannelState(channel, predicate, true, timeout_seconds);
501  }
502 
503  // Updates \a connection_order by appending to it the index of the newly
504  // connected server. Must be called after every single RPC.
505  void UpdateConnectionOrder(
506  const std::vector<std::unique_ptr<ServerData>>& servers,
507  std::vector<int>* connection_order) {
508  for (size_t i = 0; i < servers.size(); ++i) {
509  if (servers[i]->service_.request_count() == 1) {
510  // Was the server index known? If not, update connection_order.
511  const auto it =
512  std::find(connection_order->begin(), connection_order->end(), i);
513  if (it == connection_order->end()) {
514  connection_order->push_back(i);
515  return;
516  }
517  }
518  }
519  }
520 
522  std::vector<std::unique_ptr<ServerData>> servers_;
523  std::shared_ptr<ChannelCredentials> creds_;
524  bool ipv6_only_ = false;
525 };
526 
527 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
528  const int kNumServers = 3;
529  StartServers(kNumServers);
530  auto response_generator = BuildResolverResponseGenerator();
531  auto channel = BuildChannel("", response_generator);
532  auto stub = BuildStub(channel);
533  // Initial state should be IDLE.
534  EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
535  // Tell the channel to try to connect.
536  // Note that this call also returns IDLE, since the state change has
537  // not yet occurred; it just gets triggered by this call.
538  EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
539  // Now that the channel is trying to connect, we should be in state
540  // CONNECTING.
541  EXPECT_EQ(channel->GetState(false /* try_to_connect */),
543  // Return a resolver result, which allows the connection attempt to proceed.
544  response_generator.SetNextResolution(GetServersPorts());
545  // We should eventually transition into state READY.
546  EXPECT_TRUE(WaitForChannelReady(channel.get()));
547 }
548 
549 TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
550  // Start server.
551  const int kNumServers = 1;
552  StartServers(kNumServers);
553  // Set max idle time and build the channel.
554  ChannelArguments args;
556  auto response_generator = BuildResolverResponseGenerator();
557  auto channel = BuildChannel("", response_generator, args);
558  auto stub = BuildStub(channel);
559  // The initial channel state should be IDLE.
560  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
561  // After sending RPC, channel state should be READY.
562  gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***");
563  response_generator.SetNextResolution(GetServersPorts());
564  CheckRpcSendOk(DEBUG_LOCATION, stub);
565  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
566  // After a period time not using the channel, the channel state should switch
567  // to IDLE.
568  gpr_log(GPR_INFO, "*** WAITING FOR CHANNEL TO GO IDLE ***");
570  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
571  // Sending a new RPC should awake the IDLE channel.
572  gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***");
573  response_generator.SetNextResolution(GetServersPorts());
574  CheckRpcSendOk(DEBUG_LOCATION, stub);
575  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
576 }
577 
578 //
579 // pick_first tests
580 //
581 
582 using PickFirstTest = ClientLbEnd2endTest;
583 
584 TEST_F(PickFirstTest, Basic) {
585  // Start servers and send one RPC per server.
586  const int kNumServers = 3;
587  StartServers(kNumServers);
588  auto response_generator = BuildResolverResponseGenerator();
589  auto channel = BuildChannel(
590  "", response_generator); // test that pick first is the default.
591  auto stub = BuildStub(channel);
592  response_generator.SetNextResolution(GetServersPorts());
593  for (size_t i = 0; i < servers_.size(); ++i) {
594  CheckRpcSendOk(DEBUG_LOCATION, stub);
595  }
596  // All requests should have gone to a single server.
597  bool found = false;
598  for (size_t i = 0; i < servers_.size(); ++i) {
599  const int request_count = servers_[i]->service_.request_count();
600  if (request_count == kNumServers) {
601  found = true;
602  } else {
603  EXPECT_EQ(0, request_count);
604  }
605  }
607  // Check LB policy name for the channel.
608  EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
609 }
610 
611 TEST_F(PickFirstTest, ProcessPending) {
612  StartServers(1); // Single server
613  auto response_generator = BuildResolverResponseGenerator();
614  auto channel = BuildChannel(
615  "", response_generator); // test that pick first is the default.
616  auto stub = BuildStub(channel);
617  response_generator.SetNextResolution({servers_[0]->port_});
618  WaitForServer(DEBUG_LOCATION, stub, 0);
619  // Create a new channel and its corresponding PF LB policy, which will pick
620  // the subchannels in READY state from the previous RPC against the same
621  // target (even if it happened over a different channel, because subchannels
622  // are globally reused). Progress should happen without any transition from
623  // this READY state.
624  auto second_response_generator = BuildResolverResponseGenerator();
625  auto second_channel = BuildChannel("", second_response_generator);
626  auto second_stub = BuildStub(second_channel);
627  second_response_generator.SetNextResolution({servers_[0]->port_});
628  CheckRpcSendOk(DEBUG_LOCATION, second_stub);
629 }
630 
631 TEST_F(PickFirstTest, SelectsReadyAtStartup) {
632  ChannelArguments args;
633  constexpr int kInitialBackOffMs = 5000;
634  args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
635  // Create 2 servers, but start only the second one.
636  std::vector<int> ports = {grpc_pick_unused_port_or_die(),
638  CreateServers(2, ports);
639  StartServer(1);
640  auto response_generator1 = BuildResolverResponseGenerator();
641  auto channel1 = BuildChannel("pick_first", response_generator1, args);
642  auto stub1 = BuildStub(channel1);
643  response_generator1.SetNextResolution(ports);
644  // Wait for second server to be ready.
645  WaitForServer(DEBUG_LOCATION, stub1, 1);
646  // Create a second channel with the same addresses. Its PF instance
647  // should immediately pick the second subchannel, since it's already
648  // in READY state.
649  auto response_generator2 = BuildResolverResponseGenerator();
650  auto channel2 = BuildChannel("pick_first", response_generator2, args);
651  response_generator2.SetNextResolution(ports);
652  // Check that the channel reports READY without waiting for the
653  // initial backoff.
654  EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
655 }
656 
657 TEST_F(PickFirstTest, BackOffInitialReconnect) {
658  ChannelArguments args;
659  constexpr int kInitialBackOffMs = 100;
660  args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
661  const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
663  auto response_generator = BuildResolverResponseGenerator();
664  auto channel = BuildChannel("pick_first", response_generator, args);
665  auto stub = BuildStub(channel);
666  response_generator.SetNextResolution(ports);
667  // The channel won't become connected (there's no server).
668  ASSERT_FALSE(channel->WaitForConnected(
669  grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
670  // Bring up a server on the chosen port.
671  StartServers(1, ports);
672  // Now it will.
673  ASSERT_TRUE(channel->WaitForConnected(
674  grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
676  const grpc_core::Duration waited =
678  gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
679  // We should have waited at least kInitialBackOffMs. We substract one to
680  // account for test and precision accuracy drift.
681  EXPECT_GE(waited.millis(), kInitialBackOffMs - 1);
682  // But not much more.
683  EXPECT_GT(
684  gpr_time_cmp(
685  grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1),
686  0);
687 }
688 
689 TEST_F(PickFirstTest, BackOffMinReconnect) {
690  ChannelArguments args;
691  constexpr int kMinReconnectBackOffMs = 1000;
692  args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
693  const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
694  auto response_generator = BuildResolverResponseGenerator();
695  auto channel = BuildChannel("pick_first", response_generator, args);
696  auto stub = BuildStub(channel);
697  response_generator.SetNextResolution(ports);
698  // Make connection delay a 10% longer than it's willing to in order to make
699  // sure we are hitting the codepath that waits for the min reconnect backoff.
700  ConnectionDelayInjector delay_injector(
701  grpc_core::Duration::Milliseconds(kMinReconnectBackOffMs * 1.10));
702  delay_injector.Start();
704  channel->WaitForConnected(
705  grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
707  const grpc_core::Duration waited =
709  gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
710  // We should have waited at least kMinReconnectBackOffMs. We substract one to
711  // account for test and precision accuracy drift.
712  EXPECT_GE(waited.millis(), kMinReconnectBackOffMs - 1);
713 }
714 
715 TEST_F(PickFirstTest, ResetConnectionBackoff) {
716  ChannelArguments args;
717  constexpr int kInitialBackOffMs = 1000;
718  args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
719  const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
720  auto response_generator = BuildResolverResponseGenerator();
721  auto channel = BuildChannel("pick_first", response_generator, args);
722  auto stub = BuildStub(channel);
723  response_generator.SetNextResolution(ports);
724  // The channel won't become connected (there's no server).
725  EXPECT_FALSE(
726  channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
727  // Bring up a server on the chosen port.
728  StartServers(1, ports);
730  // Wait for connect, but not long enough. This proves that we're
731  // being throttled by initial backoff.
732  EXPECT_FALSE(
733  channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
734  // Reset connection backoff.
736  // Wait for connect. Should happen as soon as the client connects to
737  // the newly started server, which should be before the initial
738  // backoff timeout elapses.
739  EXPECT_TRUE(
740  channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(20)));
742  const grpc_core::Duration waited =
744  gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
745  // We should have waited less than kInitialBackOffMs.
746  EXPECT_LT(waited.millis(), kInitialBackOffMs);
747 }
748 
749 TEST_F(ClientLbEnd2endTest,
750  ResetConnectionBackoffNextAttemptStartsImmediately) {
751  // Start connection injector.
752  ConnectionHoldInjector injector;
753  injector.Start();
754  // Create client.
755  const int port = grpc_pick_unused_port_or_die();
756  ChannelArguments args;
757  const int kInitialBackOffMs = 5000 * grpc_test_slowdown_factor();
758  args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
759  auto response_generator = BuildResolverResponseGenerator();
760  auto channel = BuildChannel("pick_first", response_generator, args);
761  auto stub = BuildStub(channel);
762  response_generator.SetNextResolution({port});
763  // Intercept initial connection attempt.
764  auto hold1 = injector.AddHold(port);
765  gpr_log(GPR_INFO, "=== TRIGGERING INITIAL CONNECTION ATTEMPT");
766  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(/*try_to_connect=*/true));
767  hold1->Wait();
769  channel->GetState(/*try_to_connect=*/false));
770  // Reset backoff.
771  gpr_log(GPR_INFO, "=== RESETTING BACKOFF");
773  // Intercept next attempt. Do this before resuming the first attempt,
774  // just in case the client makes progress faster than this thread.
775  auto hold2 = injector.AddHold(port);
776  // Fail current attempt and wait for next one to start.
777  gpr_log(GPR_INFO, "=== RESUMING INITIAL ATTEMPT");
779  hold1->Resume();
780  gpr_log(GPR_INFO, "=== WAITING FOR SECOND ATTEMPT");
781  // This WaitForStateChange() call just makes sure we're doing some polling.
782  EXPECT_TRUE(channel->WaitForStateChange(GRPC_CHANNEL_CONNECTING,
784  hold2->Wait();
786  gpr_log(GPR_INFO, "=== RESUMING SECOND ATTEMPT");
787  hold2->Resume();
788  // Elapsed time should be very short, much less than kInitialBackOffMs.
789  const grpc_core::Duration waited =
791  gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited.millis());
792  EXPECT_LT(waited.millis(), 1000 * grpc_test_slowdown_factor());
793 }
794 
795 TEST_F(
796  PickFirstTest,
797  TriesAllSubchannelsBeforeReportingTransientFailureWithSubchannelSharing) {
798  // Start connection injector.
799  ConnectionHoldInjector injector;
800  injector.Start();
801  // Get 5 unused ports. Each channel will have 2 unique ports followed
802  // by a common port.
803  std::vector<int> ports1 = {grpc_pick_unused_port_or_die(),
806  std::vector<int> ports2 = {grpc_pick_unused_port_or_die(),
807  grpc_pick_unused_port_or_die(), ports1[2]};
808  // Create channel 1.
809  auto response_generator1 = BuildResolverResponseGenerator();
810  auto channel1 = BuildChannel("pick_first", response_generator1);
811  auto stub1 = BuildStub(channel1);
812  response_generator1.SetNextResolution(ports1);
813  // Allow the connection attempts for ports 0 and 1 to fail normally.
814  // Inject a hold for the connection attempt to port 2.
815  auto hold_channel1_port2 = injector.AddHold(ports1[2]);
816  // Trigger connection attempt.
817  gpr_log(GPR_INFO, "=== START CONNECTING CHANNEL 1 ===");
818  channel1->GetState(/*try_to_connect=*/true);
819  // Wait for connection attempt to port 2.
820  gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 1 PORT 2 TO START ===");
821  hold_channel1_port2->Wait();
822  gpr_log(GPR_INFO, "=== CHANNEL 1 PORT 2 STARTED ===");
823  // Now create channel 2.
824  auto response_generator2 = BuildResolverResponseGenerator();
825  auto channel2 = BuildChannel("pick_first", response_generator2);
826  response_generator2.SetNextResolution(ports2);
827  // Inject a hold for port 0.
828  auto hold_channel2_port0 = injector.AddHold(ports2[0]);
829  // Trigger connection attempt.
830  gpr_log(GPR_INFO, "=== START CONNECTING CHANNEL 2 ===");
831  channel2->GetState(/*try_to_connect=*/true);
832  // Wait for connection attempt to port 0.
833  gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 0 TO START ===");
834  hold_channel2_port0->Wait();
835  gpr_log(GPR_INFO, "=== CHANNEL 2 PORT 0 STARTED ===");
836  // Inject a hold for port 0, which will be retried by channel 1.
837  auto hold_channel1_port0 = injector.AddHold(ports1[0]);
838  // Now allow the connection attempt to port 2 to complete. The subchannel
839  // will deliver a TRANSIENT_FAILURE notification to both channels.
840  gpr_log(GPR_INFO, "=== RESUMING CHANNEL 1 PORT 2 ===");
841  hold_channel1_port2->Resume();
842  // Wait for channel 1 to retry port 0, so that we know it's seen the
843  // connectivity state notification for port 2.
844  gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 1 PORT 0 ===");
845  hold_channel1_port0->Wait();
846  gpr_log(GPR_INFO, "=== CHANNEL 1 PORT 0 STARTED ===");
847  // Channel 1 should now report TRANSIENT_FAILURE.
848  // Channel 2 should continue to report CONNECTING.
849  EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel1->GetState(false));
850  EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel2->GetState(false));
851  // Inject a hold for port 2, which will eventually be tried by channel 2.
852  auto hold_channel2_port2 = injector.AddHold(ports2[2]);
853  // Allow channel 2 to resume port 0. Port 0 will fail, as will port 1.
854  gpr_log(GPR_INFO, "=== RESUMING CHANNEL 2 PORT 0 ===");
855  hold_channel2_port0->Resume();
856  // Wait for channel 2 to try port 2.
857  gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 2 ===");
858  hold_channel2_port2->Wait();
859  gpr_log(GPR_INFO, "=== CHANNEL 2 PORT 2 STARTED ===");
860  // Channel 2 should still be CONNECTING here.
861  EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel2->GetState(false));
862  // Add a hold for channel 2 port 0.
863  hold_channel2_port0 = injector.AddHold(ports2[0]);
864  gpr_log(GPR_INFO, "=== RESUMING CHANNEL 2 PORT 2 ===");
865  hold_channel2_port2->Resume();
866  // Wait for channel 2 to retry port 0.
867  gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 0 ===");
868  hold_channel2_port0->Wait();
869  // Now channel 2 should be reporting TRANSIENT_FAILURE.
870  EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
871  // Clean up.
872  gpr_log(GPR_INFO, "=== RESUMING CHANNEL 1 PORT 0 AND CHANNEL 2 PORT 0 ===");
873  hold_channel1_port0->Resume();
874  hold_channel2_port0->Resume();
875 }
876 
877 TEST_F(PickFirstTest, Updates) {
878  // Start servers and send one RPC per server.
879  const int kNumServers = 3;
880  StartServers(kNumServers);
881  auto response_generator = BuildResolverResponseGenerator();
882  auto channel = BuildChannel("pick_first", response_generator);
883  auto stub = BuildStub(channel);
884 
885  std::vector<int> ports;
886 
887  // Perform one RPC against the first server.
888  ports.emplace_back(servers_[0]->port_);
889  response_generator.SetNextResolution(ports);
890  gpr_log(GPR_INFO, "****** SET [0] *******");
891  CheckRpcSendOk(DEBUG_LOCATION, stub);
892  EXPECT_EQ(servers_[0]->service_.request_count(), 1);
893 
894  // An empty update will result in the channel going into TRANSIENT_FAILURE.
895  ports.clear();
896  response_generator.SetNextResolution(ports);
897  gpr_log(GPR_INFO, "****** SET none *******");
898  grpc_connectivity_state channel_state;
899  do {
900  channel_state = channel->GetState(true /* try to connect */);
901  } while (channel_state == GRPC_CHANNEL_READY);
902  ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
903  servers_[0]->service_.ResetCounters();
904 
905  // Next update introduces servers_[1], making the channel recover.
906  ports.clear();
907  ports.emplace_back(servers_[1]->port_);
908  response_generator.SetNextResolution(ports);
909  gpr_log(GPR_INFO, "****** SET [1] *******");
910  WaitForServer(DEBUG_LOCATION, stub, 1);
911  EXPECT_EQ(servers_[0]->service_.request_count(), 0);
912 
913  // And again for servers_[2]
914  ports.clear();
915  ports.emplace_back(servers_[2]->port_);
916  response_generator.SetNextResolution(ports);
917  gpr_log(GPR_INFO, "****** SET [2] *******");
918  WaitForServer(DEBUG_LOCATION, stub, 2);
919  EXPECT_EQ(servers_[0]->service_.request_count(), 0);
920  EXPECT_EQ(servers_[1]->service_.request_count(), 0);
921 
922  // Check LB policy name for the channel.
923  EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
924 }
925 
926 TEST_F(PickFirstTest, UpdateSuperset) {
927  // Start servers and send one RPC per server.
928  const int kNumServers = 3;
929  StartServers(kNumServers);
930  auto response_generator = BuildResolverResponseGenerator();
931  auto channel = BuildChannel("pick_first", response_generator);
932  auto stub = BuildStub(channel);
933 
934  std::vector<int> ports;
935 
936  // Perform one RPC against the first server.
937  ports.emplace_back(servers_[0]->port_);
938  response_generator.SetNextResolution(ports);
939  gpr_log(GPR_INFO, "****** SET [0] *******");
940  CheckRpcSendOk(DEBUG_LOCATION, stub);
941  EXPECT_EQ(servers_[0]->service_.request_count(), 1);
942  servers_[0]->service_.ResetCounters();
943 
944  // Send and superset update
945  ports.clear();
946  ports.emplace_back(servers_[1]->port_);
947  ports.emplace_back(servers_[0]->port_);
948  response_generator.SetNextResolution(ports);
949  gpr_log(GPR_INFO, "****** SET superset *******");
950  CheckRpcSendOk(DEBUG_LOCATION, stub);
951  // We stick to the previously connected server.
952  WaitForServer(DEBUG_LOCATION, stub, 0);
953  EXPECT_EQ(0, servers_[1]->service_.request_count());
954 
955  // Check LB policy name for the channel.
956  EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
957 }
958 
959 TEST_F(PickFirstTest, UpdateToUnconnected) {
960  const int kNumServers = 2;
961  CreateServers(kNumServers);
962  StartServer(0);
963  auto response_generator = BuildResolverResponseGenerator();
964  auto channel = BuildChannel("pick_first", response_generator);
965  auto stub = BuildStub(channel);
966 
967  std::vector<int> ports;
968 
969  // Try to send rpcs against a list where the server is available.
970  ports.emplace_back(servers_[0]->port_);
971  response_generator.SetNextResolution(ports);
972  gpr_log(GPR_INFO, "****** SET [0] *******");
973  CheckRpcSendOk(DEBUG_LOCATION, stub);
974 
975  // Send resolution for which all servers are currently unavailable. Eventually
976  // this triggers replacing the existing working subchannel_list with the new
977  // currently unresponsive list.
978  ports.clear();
979  ports.emplace_back(grpc_pick_unused_port_or_die());
980  ports.emplace_back(servers_[1]->port_);
981  response_generator.SetNextResolution(ports);
982  gpr_log(GPR_INFO, "****** SET [unavailable] *******");
983  EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
984 
985  // Ensure that the last resolution was installed correctly by verifying that
986  // the channel becomes ready once one of if its endpoints becomes available.
987  gpr_log(GPR_INFO, "****** StartServer(1) *******");
988  StartServer(1);
989  EXPECT_TRUE(WaitForChannelReady(channel.get()));
990 }
991 
992 TEST_F(PickFirstTest, GlobalSubchannelPool) {
993  // Start one server.
994  const int kNumServers = 1;
995  StartServers(kNumServers);
996  std::vector<int> ports = GetServersPorts();
997  // Create two channels that (by default) use the global subchannel pool.
998  auto response_generator1 = BuildResolverResponseGenerator();
999  auto channel1 = BuildChannel("pick_first", response_generator1);
1000  auto stub1 = BuildStub(channel1);
1001  response_generator1.SetNextResolution(ports);
1002  auto response_generator2 = BuildResolverResponseGenerator();
1003  auto channel2 = BuildChannel("pick_first", response_generator2);
1004  auto stub2 = BuildStub(channel2);
1005  response_generator2.SetNextResolution(ports);
1006  WaitForServer(DEBUG_LOCATION, stub1, 0);
1007  // Send one RPC on each channel.
1008  CheckRpcSendOk(DEBUG_LOCATION, stub1);
1009  CheckRpcSendOk(DEBUG_LOCATION, stub2);
1010  // The server receives two requests.
1011  EXPECT_EQ(2, servers_[0]->service_.request_count());
1012  // The two requests are from the same client port, because the two channels
1013  // share subchannels via the global subchannel pool.
1014  EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1015 }
1016 
1017 TEST_F(PickFirstTest, LocalSubchannelPool) {
1018  // Start one server.
1019  const int kNumServers = 1;
1020  StartServers(kNumServers);
1021  std::vector<int> ports = GetServersPorts();
1022  // Create two channels that use local subchannel pool.
1023  ChannelArguments args;
1025  auto response_generator1 = BuildResolverResponseGenerator();
1026  auto channel1 = BuildChannel("pick_first", response_generator1, args);
1027  auto stub1 = BuildStub(channel1);
1028  response_generator1.SetNextResolution(ports);
1029  auto response_generator2 = BuildResolverResponseGenerator();
1030  auto channel2 = BuildChannel("pick_first", response_generator2, args);
1031  auto stub2 = BuildStub(channel2);
1032  response_generator2.SetNextResolution(ports);
1033  WaitForServer(DEBUG_LOCATION, stub1, 0);
1034  // Send one RPC on each channel.
1035  CheckRpcSendOk(DEBUG_LOCATION, stub1);
1036  CheckRpcSendOk(DEBUG_LOCATION, stub2);
1037  // The server receives two requests.
1038  EXPECT_EQ(2, servers_[0]->service_.request_count());
1039  // The two requests are from two client ports, because the two channels didn't
1040  // share subchannels with each other.
1041  EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
1042 }
1043 
1044 TEST_F(PickFirstTest, ManyUpdates) {
1045  const int kNumUpdates = 1000;
1046  const int kNumServers = 3;
1047  StartServers(kNumServers);
1048  auto response_generator = BuildResolverResponseGenerator();
1049  auto channel = BuildChannel("pick_first", response_generator);
1050  auto stub = BuildStub(channel);
1051  std::vector<int> ports = GetServersPorts();
1052  for (size_t i = 0; i < kNumUpdates; ++i) {
1053  std::shuffle(ports.begin(), ports.end(),
1054  std::mt19937(std::random_device()()));
1055  response_generator.SetNextResolution(ports);
1056  // We should re-enter core at the end of the loop to give the resolution
1057  // setting closure a chance to run.
1058  if ((i + 1) % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
1059  }
1060  // Check LB policy name for the channel.
1061  EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1062 }
1063 
1064 TEST_F(PickFirstTest, ReresolutionNoSelected) {
1065  // Prepare the ports for up servers and down servers.
1066  const int kNumServers = 3;
1067  const int kNumAliveServers = 1;
1068  StartServers(kNumAliveServers);
1069  std::vector<int> alive_ports, dead_ports;
1070  for (size_t i = 0; i < kNumServers; ++i) {
1071  if (i < kNumAliveServers) {
1072  alive_ports.emplace_back(servers_[i]->port_);
1073  } else {
1074  dead_ports.emplace_back(grpc_pick_unused_port_or_die());
1075  }
1076  }
1077  auto response_generator = BuildResolverResponseGenerator();
1078  auto channel = BuildChannel("pick_first", response_generator);
1079  auto stub = BuildStub(channel);
1080  // The initial resolution only contains dead ports. There won't be any
1081  // selected subchannel. Re-resolution will return the same result.
1082  response_generator.SetNextResolution(dead_ports);
1083  gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
1084  for (size_t i = 0; i < 10; ++i) {
1085  CheckRpcSendFailure(
1087  "failed to connect to all addresses; last error: "
1088  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1089  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1090  }
1091  // Set a re-resolution result that contains reachable ports, so that the
1092  // pick_first LB policy can recover soon.
1093  response_generator.SetNextResolutionUponError(alive_ports);
1094  gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
1095  WaitForServer(DEBUG_LOCATION, stub, 0, [](const Status& status) {
1096  EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1097  EXPECT_THAT(
1098  status.error_message(),
1100  "failed to connect to all addresses; last error: "
1101  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1102  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)"));
1103  });
1104  CheckRpcSendOk(DEBUG_LOCATION, stub);
1105  EXPECT_EQ(servers_[0]->service_.request_count(), 1);
1106  // Check LB policy name for the channel.
1107  EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
1108 }
1109 
1110 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) {
1111  std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1112  StartServers(1, ports);
1113  auto response_generator = BuildResolverResponseGenerator();
1114  auto channel = BuildChannel("pick_first", response_generator);
1115  auto stub = BuildStub(channel);
1116  response_generator.SetNextResolution(ports);
1117  gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
1118  WaitForServer(DEBUG_LOCATION, stub, 0);
1119  gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
1120  servers_[0]->Shutdown();
1121  EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1122  gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
1123  StartServers(1, ports);
1124  WaitForServer(DEBUG_LOCATION, stub, 0);
1125 }
1126 
1127 TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
1128  std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1130  CreateServers(2, ports);
1131  StartServer(1);
1132  auto response_generator = BuildResolverResponseGenerator();
1133  auto channel = BuildChannel("pick_first", response_generator);
1134  auto stub = BuildStub(channel);
1135  response_generator.SetNextResolution(ports);
1136  gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
1137  WaitForServer(DEBUG_LOCATION, stub, 1);
1138  gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
1139  servers_[1]->Shutdown();
1140  EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1141  gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
1142  StartServers(2, ports);
1143  WaitForServer(DEBUG_LOCATION, stub, 0);
1144 }
1145 
1146 TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
1147  std::vector<int> ports = {grpc_pick_unused_port_or_die()};
1148  StartServers(1, ports);
1149  auto response_generator = BuildResolverResponseGenerator();
1150  auto channel_1 = BuildChannel("pick_first", response_generator);
1151  auto stub_1 = BuildStub(channel_1);
1152  response_generator.SetNextResolution(ports);
1153  gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
1154  WaitForServer(DEBUG_LOCATION, stub_1, 0);
1155  gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
1156  servers_[0]->Shutdown();
1157  // Channel 1 will receive a re-resolution containing the same server. It will
1158  // create a new subchannel and hold a ref to it.
1159  StartServers(1, ports);
1160  gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
1161  auto response_generator_2 = BuildResolverResponseGenerator();
1162  auto channel_2 = BuildChannel("pick_first", response_generator_2);
1163  auto stub_2 = BuildStub(channel_2);
1164  response_generator_2.SetNextResolution(ports);
1165  gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
1166  WaitForServer(DEBUG_LOCATION, stub_2, 0, [](const Status& status) {
1167  EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1168  EXPECT_EQ("failed to connect to all addresses", status.error_message());
1169  });
1170  gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
1171  servers_[0]->Shutdown();
1172  // Wait until the disconnection has triggered the connectivity notification.
1173  // Otherwise, the subchannel may be picked for next call but will fail soon.
1174  EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
1175  // Channel 2 will also receive a re-resolution containing the same server.
1176  // Both channels will ref the same subchannel that failed.
1177  StartServers(1, ports);
1178  gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
1179  gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
1180  // The first call after the server restart will succeed.
1181  CheckRpcSendOk(DEBUG_LOCATION, stub_2);
1182  gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
1183  // Check LB policy name for the channel.
1184  EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
1185  // Check LB policy name for the channel.
1186  EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
1187 }
1188 
1189 TEST_F(PickFirstTest, IdleOnDisconnect) {
1190  // Start server, send RPC, and make sure channel is READY.
1191  const int kNumServers = 1;
1192  StartServers(kNumServers);
1193  auto response_generator = BuildResolverResponseGenerator();
1194  auto channel =
1195  BuildChannel("", response_generator); // pick_first is the default.
1196  auto stub = BuildStub(channel);
1197  response_generator.SetNextResolution(GetServersPorts());
1198  CheckRpcSendOk(DEBUG_LOCATION, stub);
1199  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1200  // Stop server. Channel should go into state IDLE.
1201  response_generator.SetFailureOnReresolution();
1202  servers_[0]->Shutdown();
1203  EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1204  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1205  servers_.clear();
1206 }
1207 
1208 TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) {
1209  auto response_generator = BuildResolverResponseGenerator();
1210  auto channel =
1211  BuildChannel("", response_generator); // pick_first is the default.
1212  auto stub = BuildStub(channel);
1213  // Create a number of servers, but only start 1 of them.
1214  CreateServers(10);
1215  StartServer(0);
1216  // Initially resolve to first server and make sure it connects.
1217  gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
1218  response_generator.SetNextResolution({servers_[0]->port_});
1219  CheckRpcSendOk(DEBUG_LOCATION, stub, true /* wait_for_ready */);
1220  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1221  // Send a resolution update with the remaining servers, none of which are
1222  // running yet, so the update will stay pending. Note that it's important
1223  // to have multiple servers here, or else the test will be flaky; with only
1224  // one server, the pending subchannel list has already gone into
1225  // TRANSIENT_FAILURE due to hitting the end of the list by the time we
1226  // check the state.
1227  gpr_log(GPR_INFO,
1228  "Phase 2: Resolver update pointing to remaining "
1229  "(not started) servers.");
1230  response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
1231  // RPCs will continue to be sent to the first server.
1232  CheckRpcSendOk(DEBUG_LOCATION, stub);
1233  // Now stop the first server, so that the current subchannel list
1234  // fails. This should cause us to immediately swap over to the
1235  // pending list, even though it's not yet connected. The state should
1236  // be set to CONNECTING, since that's what the pending subchannel list
1237  // was doing when we swapped over.
1238  gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
1239  servers_[0]->Shutdown();
1240  WaitForChannelNotReady(channel.get());
1241  // TODO(roth): This should always return CONNECTING, but it's flaky
1242  // between that and TRANSIENT_FAILURE. I suspect that this problem
1243  // will go away once we move the backoff code out of the subchannel
1244  // and into the LB policies.
1245  EXPECT_THAT(channel->GetState(false),
1248  // Now start the second server.
1249  gpr_log(GPR_INFO, "Phase 4: Starting second server.");
1250  StartServer(1);
1251  // The channel should go to READY state and RPCs should go to the
1252  // second server.
1253  WaitForChannelReady(channel.get());
1254  WaitForServer(DEBUG_LOCATION, stub, 1, [](const Status& status) {
1255  EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1256  EXPECT_EQ("failed to connect to all addresses", status.error_message());
1257  });
1258 }
1259 
1260 TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
1261  // Start server, send RPC, and make sure channel is READY.
1262  const int kNumServers = 1;
1263  StartServers(kNumServers);
1264  auto response_generator = BuildResolverResponseGenerator();
1265  auto channel =
1266  BuildChannel("", response_generator); // pick_first is the default.
1267  auto stub = BuildStub(channel);
1268  response_generator.SetNextResolution(GetServersPorts());
1269  CheckRpcSendOk(DEBUG_LOCATION, stub);
1270  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1271  // Stop server. Channel should go into state IDLE.
1272  servers_[0]->Shutdown();
1273  EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1274  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
1275  // Now send resolver update that includes no addresses. Channel
1276  // should stay in state IDLE.
1277  response_generator.SetNextResolution({});
1278  EXPECT_FALSE(channel->WaitForStateChange(
1280  // Now bring the backend back up and send a non-empty resolver update,
1281  // and then try to send an RPC. Channel should go back into state READY.
1282  StartServer(0);
1283  response_generator.SetNextResolution(GetServersPorts());
1284  CheckRpcSendOk(DEBUG_LOCATION, stub);
1285  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
1286 }
1287 
1288 TEST_F(PickFirstTest,
1289  StaysTransientFailureOnFailedConnectionAttemptUntilReady) {
1290  // Allocate 3 ports, with no servers running.
1291  std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1294  // Create channel with a 1-second backoff.
1295  ChannelArguments args;
1297  1000 * grpc_test_slowdown_factor());
1298  auto response_generator = BuildResolverResponseGenerator();
1299  auto channel = BuildChannel("", response_generator, args);
1300  auto stub = BuildStub(channel);
1301  response_generator.SetNextResolution(ports);
1302  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(false));
1303  // Send an RPC, which should fail.
1304  CheckRpcSendFailure(
1306  "failed to connect to all addresses; last error: "
1307  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1308  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1309  // Channel should be in TRANSIENT_FAILURE.
1311  // Now start a server on the last port.
1312  StartServers(1, {ports.back()});
1313  // Channel should remain in TRANSIENT_FAILURE until it transitions to READY.
1316  EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
1317  CheckRpcSendOk(DEBUG_LOCATION, stub);
1318 }
1319 
1320 //
1321 // round_robin tests
1322 //
1323 
1324 using RoundRobinTest = ClientLbEnd2endTest;
1325 
1326 TEST_F(RoundRobinTest, Basic) {
1327  // Start servers and send one RPC per server.
1328  const int kNumServers = 3;
1329  StartServers(kNumServers);
1330  auto response_generator = BuildResolverResponseGenerator();
1331  auto channel = BuildChannel("round_robin", response_generator);
1332  auto stub = BuildStub(channel);
1333  response_generator.SetNextResolution(GetServersPorts());
1334  // Wait until all backends are ready.
1335  do {
1336  CheckRpcSendOk(DEBUG_LOCATION, stub);
1337  } while (!SeenAllServers());
1338  ResetCounters();
1339  // "Sync" to the end of the list. Next sequence of picks will start at the
1340  // first server (index 0).
1341  WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
1342  std::vector<int> connection_order;
1343  for (size_t i = 0; i < servers_.size(); ++i) {
1344  CheckRpcSendOk(DEBUG_LOCATION, stub);
1345  UpdateConnectionOrder(servers_, &connection_order);
1346  }
1347  // Backends should be iterated over in the order in which the addresses were
1348  // given.
1349  const auto expected = std::vector<int>{0, 1, 2};
1350  EXPECT_EQ(expected, connection_order);
1351  // Check LB policy name for the channel.
1352  EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1353 }
1354 
1355 TEST_F(RoundRobinTest, ProcessPending) {
1356  StartServers(1); // Single server
1357  auto response_generator = BuildResolverResponseGenerator();
1358  auto channel = BuildChannel("round_robin", response_generator);
1359  auto stub = BuildStub(channel);
1360  response_generator.SetNextResolution({servers_[0]->port_});
1361  WaitForServer(DEBUG_LOCATION, stub, 0);
1362  // Create a new channel and its corresponding RR LB policy, which will pick
1363  // the subchannels in READY state from the previous RPC against the same
1364  // target (even if it happened over a different channel, because subchannels
1365  // are globally reused). Progress should happen without any transition from
1366  // this READY state.
1367  auto second_response_generator = BuildResolverResponseGenerator();
1368  auto second_channel = BuildChannel("round_robin", second_response_generator);
1369  auto second_stub = BuildStub(second_channel);
1370  second_response_generator.SetNextResolution({servers_[0]->port_});
1371  CheckRpcSendOk(DEBUG_LOCATION, second_stub);
1372 }
1373 
1374 TEST_F(RoundRobinTest, Updates) {
1375  // Start servers.
1376  const int kNumServers = 3;
1377  StartServers(kNumServers);
1378  auto response_generator = BuildResolverResponseGenerator();
1379  auto channel = BuildChannel("round_robin", response_generator);
1380  auto stub = BuildStub(channel);
1381  // Start with a single server.
1382  gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
1383  std::vector<int> ports = {servers_[0]->port_};
1384  response_generator.SetNextResolution(ports);
1385  WaitForServer(DEBUG_LOCATION, stub, 0);
1386  // Send RPCs. They should all go servers_[0]
1387  for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1388  EXPECT_EQ(10, servers_[0]->service_.request_count());
1389  EXPECT_EQ(0, servers_[1]->service_.request_count());
1390  EXPECT_EQ(0, servers_[2]->service_.request_count());
1391  ResetCounters();
1392  // And now for the second server.
1393  gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
1394  ports.clear();
1395  ports.emplace_back(servers_[1]->port_);
1396  response_generator.SetNextResolution(ports);
1397  // Wait until update has been processed, as signaled by the second backend
1398  // receiving a request.
1399  EXPECT_EQ(0, servers_[1]->service_.request_count());
1400  WaitForServer(DEBUG_LOCATION, stub, 1);
1401  for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1402  EXPECT_EQ(0, servers_[0]->service_.request_count());
1403  EXPECT_EQ(10, servers_[1]->service_.request_count());
1404  EXPECT_EQ(0, servers_[2]->service_.request_count());
1405  ResetCounters();
1406  // ... and for the last server.
1407  gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
1408  ports.clear();
1409  ports.emplace_back(servers_[2]->port_);
1410  response_generator.SetNextResolution(ports);
1411  WaitForServer(DEBUG_LOCATION, stub, 2);
1412  for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1413  EXPECT_EQ(0, servers_[0]->service_.request_count());
1414  EXPECT_EQ(0, servers_[1]->service_.request_count());
1415  EXPECT_EQ(10, servers_[2]->service_.request_count());
1416  ResetCounters();
1417  // Back to all servers.
1418  gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
1419  ports.clear();
1420  ports.emplace_back(servers_[0]->port_);
1421  ports.emplace_back(servers_[1]->port_);
1422  ports.emplace_back(servers_[2]->port_);
1423  response_generator.SetNextResolution(ports);
1424  WaitForServers(DEBUG_LOCATION, stub);
1425  // Send three RPCs, one per server.
1426  for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(DEBUG_LOCATION, stub);
1427  EXPECT_EQ(1, servers_[0]->service_.request_count());
1428  EXPECT_EQ(1, servers_[1]->service_.request_count());
1429  EXPECT_EQ(1, servers_[2]->service_.request_count());
1430  ResetCounters();
1431  // An empty update will result in the channel going into TRANSIENT_FAILURE.
1432  gpr_log(GPR_INFO, "*** NO BACKENDS ***");
1433  ports.clear();
1434  response_generator.SetNextResolution(ports);
1435  WaitForChannelNotReady(channel.get());
1436  CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1437  "empty address list: fake resolver empty address list");
1438  servers_[0]->service_.ResetCounters();
1439  // Next update introduces servers_[1], making the channel recover.
1440  gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
1441  ports.clear();
1442  ports.emplace_back(servers_[1]->port_);
1443  response_generator.SetNextResolution(ports);
1444  WaitForServer(DEBUG_LOCATION, stub, 1);
1445  EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false));
1446  // Check LB policy name for the channel.
1447  EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1448 }
1449 
1450 TEST_F(RoundRobinTest, UpdateInError) {
1451  StartServers(2);
1452  auto response_generator = BuildResolverResponseGenerator();
1453  auto channel = BuildChannel("round_robin", response_generator);
1454  auto stub = BuildStub(channel);
1455  // Start with a single server.
1456  response_generator.SetNextResolution(GetServersPorts(0, 1));
1457  // Send RPCs. They should all go to server 0.
1458  for (size_t i = 0; i < 10; ++i) {
1459  CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
1460  /*load_report=*/nullptr, /*timeout_ms=*/4000);
1461  }
1462  EXPECT_EQ(10, servers_[0]->service_.request_count());
1463  EXPECT_EQ(0, servers_[1]->service_.request_count());
1464  servers_[0]->service_.ResetCounters();
1465  // Send an update adding an unreachable server and server 1.
1466  std::vector<int> ports = {servers_[0]->port_, grpc_pick_unused_port_or_die(),
1467  servers_[1]->port_};
1468  response_generator.SetNextResolution(ports);
1469  WaitForServers(DEBUG_LOCATION, stub, 0, 2, /*status_check=*/nullptr,
1470  /*timeout=*/absl::Seconds(60));
1471  // Send a bunch more RPCs. They should all succeed and should be
1472  // split evenly between the two servers.
1473  // Note: The split may be slightly uneven because of an extra picker
1474  // update that can happen if the subchannels for servers 0 and 1
1475  // report READY before the subchannel for the unreachable server
1476  // transitions from CONNECTING to TRANSIENT_FAILURE.
1477  for (size_t i = 0; i < 10; ++i) {
1478  CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/false,
1479  /*load_report=*/nullptr, /*timeout_ms=*/4000);
1480  }
1481  EXPECT_THAT(servers_[0]->service_.request_count(),
1482  ::testing::AllOf(::testing::Ge(4), ::testing::Le(6)));
1483  EXPECT_THAT(servers_[1]->service_.request_count(),
1484  ::testing::AllOf(::testing::Ge(4), ::testing::Le(6)));
1485  EXPECT_EQ(10, servers_[0]->service_.request_count() +
1486  servers_[1]->service_.request_count());
1487 }
1488 
1489 TEST_F(RoundRobinTest, ManyUpdates) {
1490  // Start servers and send one RPC per server.
1491  const int kNumServers = 3;
1492  StartServers(kNumServers);
1493  auto response_generator = BuildResolverResponseGenerator();
1494  auto channel = BuildChannel("round_robin", response_generator);
1495  auto stub = BuildStub(channel);
1496  std::vector<int> ports = GetServersPorts();
1497  for (size_t i = 0; i < 1000; ++i) {
1498  std::shuffle(ports.begin(), ports.end(),
1499  std::mt19937(std::random_device()()));
1500  response_generator.SetNextResolution(ports);
1501  if (i % 10 == 0) CheckRpcSendOk(DEBUG_LOCATION, stub);
1502  }
1503  // Check LB policy name for the channel.
1504  EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
1505 }
1506 
1507 TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
1508  // Start 3 servers.
1509  StartServers(3);
1510  // Create channel.
1511  auto response_generator = BuildResolverResponseGenerator();
1512  auto channel = BuildChannel("round_robin", response_generator);
1513  auto stub = BuildStub(channel);
1514  // Initially, tell the channel about only the first two servers.
1515  std::vector<int> ports = {servers_[0]->port_, servers_[1]->port_};
1516  response_generator.SetNextResolution(ports);
1517  // Wait for both servers to be seen.
1518  WaitForServers(DEBUG_LOCATION, stub, 0, 2);
1519  // Tell the fake resolver to send an update that adds the last server, but
1520  // only when the LB policy requests re-resolution.
1521  ports.push_back(servers_[2]->port_);
1522  response_generator.SetNextResolutionUponError(ports);
1523  // Have server 0 send a GOAWAY. This should trigger a re-resolution.
1524  gpr_log(GPR_INFO, "****** SENDING GOAWAY FROM SERVER 0 *******");
1525  {
1527  grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
1528  }
1529  // Wait for the client to see server 2.
1530  WaitForServer(DEBUG_LOCATION, stub, 2);
1531 }
1532 
1533 TEST_F(RoundRobinTest, TransientFailure) {
1534  // Start servers and create channel. Channel should go to READY state.
1535  const int kNumServers = 3;
1536  StartServers(kNumServers);
1537  auto response_generator = BuildResolverResponseGenerator();
1538  auto channel = BuildChannel("round_robin", response_generator);
1539  auto stub = BuildStub(channel);
1540  response_generator.SetNextResolution(GetServersPorts());
1541  EXPECT_TRUE(WaitForChannelReady(channel.get()));
1542  // Now kill the servers. The channel should transition to TRANSIENT_FAILURE.
1543  for (size_t i = 0; i < servers_.size(); ++i) {
1544  servers_[i]->Shutdown();
1545  }
1546  auto predicate = [](grpc_connectivity_state state) {
1548  };
1549  EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
1550  CheckRpcSendFailure(
1552  "connections to all backends failing; last error: "
1553  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1554  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1555 }
1556 
1557 TEST_F(RoundRobinTest, TransientFailureAtStartup) {
1558  // Create channel and return servers that don't exist. Channel should
1559  // quickly transition into TRANSIENT_FAILURE.
1560  auto response_generator = BuildResolverResponseGenerator();
1561  auto channel = BuildChannel("round_robin", response_generator);
1562  auto stub = BuildStub(channel);
1563  response_generator.SetNextResolution({
1567  });
1568  for (size_t i = 0; i < servers_.size(); ++i) {
1569  servers_[i]->Shutdown();
1570  }
1571  auto predicate = [](grpc_connectivity_state state) {
1573  };
1574  EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
1575  CheckRpcSendFailure(
1577  "connections to all backends failing; last error: "
1578  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1579  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1580 }
1581 
1582 TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
1583  // Start connection injector.
1584  ConnectionHoldInjector injector;
1585  injector.Start();
1586  // Get port.
1587  const int port = grpc_pick_unused_port_or_die();
1588  // Create channel.
1589  auto response_generator = BuildResolverResponseGenerator();
1590  auto channel = BuildChannel("round_robin", response_generator);
1591  auto stub = BuildStub(channel);
1592  response_generator.SetNextResolution({port});
1593  // Allow first connection attempt to fail normally, and wait for
1594  // channel to report TRANSIENT_FAILURE.
1595  gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO REPORT TF ===");
1596  auto predicate = [](grpc_connectivity_state state) {
1598  };
1599  EXPECT_TRUE(
1600  WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
1601  // Wait for next connection attempt to start.
1602  auto hold = injector.AddHold(port);
1603  hold->Wait();
1604  // Now the subchannel should be reporting CONNECTING. Make sure the
1605  // channel is still in TRANSIENT_FAILURE and is still reporting the
1606  // right status.
1608  // Send a few RPCs, just to give the channel a chance to propagate a
1609  // new picker, in case it was going to incorrectly do so.
1610  gpr_log(GPR_INFO, "=== EXPECTING RPCs TO FAIL ===");
1611  for (size_t i = 0; i < 5; ++i) {
1612  CheckRpcSendFailure(
1614  "connections to all backends failing; last error: "
1615  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1616  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1617  }
1618  // Clean up.
1619  hold->Resume();
1620 }
1621 
1622 TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
1623  // Start connection injector.
1624  ConnectionHoldInjector injector;
1625  injector.Start();
1626  // Get port.
1627  const std::vector<int> ports = {grpc_pick_unused_port_or_die(),
1629  // Create channel.
1630  auto response_generator = BuildResolverResponseGenerator();
1631  auto channel = BuildChannel("round_robin", response_generator);
1632  auto stub = BuildStub(channel);
1633  response_generator.SetNextResolution(ports);
1634  // Allow first connection attempts to fail normally, and check that
1635  // the RPC fails with the right status message.
1636  CheckRpcSendFailure(
1638  "connections to all backends failing; last error: "
1639  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1640  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
1641  // Now intercept the next connection attempt for each port.
1642  auto hold1 = injector.AddHold(ports[0]);
1643  auto hold2 = injector.AddHold(ports[1]);
1644  hold1->Wait();
1645  hold2->Wait();
1646  // Inject a custom failure message.
1647  hold1->Wait();
1648  hold1->Fail(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Survey says... Bzzzzt!"));
1649  // Wait until RPC fails with the right message.
1650  absl::Time deadline =
1652  while (true) {
1654  EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1655  if (status.error_message() ==
1656  "connections to all backends failing; last error: "
1657  "UNKNOWN: Survey says... Bzzzzt!") {
1658  break;
1659  }
1660  EXPECT_THAT(
1661  status.error_message(),
1663  "connections to all backends failing; last error: "
1664  "(UNKNOWN: Failed to connect to remote host: Connection refused|"
1665  "UNAVAILABLE: Failed to connect to remote host: FD shutdown)"));
1666  EXPECT_LT(absl::Now(), deadline);
1667  if (absl::Now() >= deadline) break;
1668  }
1669  // Clean up.
1670  hold2->Resume();
1671 }
1672 
1673 TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
1674  // Start connection injector.
1675  ConnectionHoldInjector injector;
1676  injector.Start();
1677  // Start server.
1678  StartServers(1);
1679  // Create channel.
1680  auto response_generator = BuildResolverResponseGenerator();
1681  auto channel = BuildChannel("round_robin", response_generator);
1682  auto stub = BuildStub(channel);
1683  response_generator.SetNextResolution(GetServersPorts());
1684  // Start a thread constantly sending RPCs in a loop.
1685  gpr_log(GPR_ERROR, "=== STARTING CLIENT THREAD ===");
1686  std::atomic<bool> shutdown{false};
1687  gpr_event ev;
1688  gpr_event_init(&ev);
1689  std::thread thd([&]() {
1690  gpr_log(GPR_INFO, "sending first RPC");
1691  CheckRpcSendOk(DEBUG_LOCATION, stub);
1692  gpr_event_set(&ev, reinterpret_cast<void*>(1));
1693  while (!shutdown.load()) {
1694  gpr_log(GPR_INFO, "sending RPC");
1695  CheckRpcSendOk(DEBUG_LOCATION, stub);
1696  }
1697  });
1698  // Wait for first RPC to complete.
1699  gpr_log(GPR_ERROR, "=== WAITING FOR FIRST RPC TO COMPLETE ===");
1700  ASSERT_EQ(reinterpret_cast<void*>(1),
1702  // Channel should now be READY.
1703  ASSERT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
1704  // Tell injector to intercept the next connection attempt.
1705  auto hold1 =
1706  injector.AddHold(servers_[0]->port_, /*intercept_completion=*/true);
1707  // Now kill the server. The subchannel should report IDLE and be
1708  // immediately reconnected to, but this should not cause any test
1709  // failures.
1710  gpr_log(GPR_ERROR, "=== SHUTTING DOWN SERVER ===");
1711  {
1713  grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
1714  }
1716  servers_[0]->Shutdown();
1717  // Wait for next attempt to start.
1718  gpr_log(GPR_ERROR, "=== WAITING FOR RECONNECTION ATTEMPT ===");
1719  hold1->Wait();
1720  // Start server and allow attempt to continue.
1721  gpr_log(GPR_ERROR, "=== RESTARTING SERVER ===");
1722  StartServer(0);
1723  hold1->Resume();
1724  // Wait for next attempt to complete.
1725  gpr_log(GPR_ERROR, "=== WAITING FOR RECONNECTION ATTEMPT TO COMPLETE ===");
1726  hold1->WaitForCompletion();
1727  // Now shut down the thread.
1728  gpr_log(GPR_ERROR, "=== SHUTTING DOWN CLIENT THREAD ===");
1729  shutdown.store(true);
1730  thd.join();
1731 }
1732 
1733 TEST_F(RoundRobinTest, SingleReconnect) {
1734  const int kNumServers = 3;
1735  StartServers(kNumServers);
1736  const auto ports = GetServersPorts();
1737  auto response_generator = BuildResolverResponseGenerator();
1738  auto channel = BuildChannel("round_robin", response_generator);
1739  auto stub = BuildStub(channel);
1740  response_generator.SetNextResolution(ports);
1741  WaitForServers(DEBUG_LOCATION, stub);
1742  // Sync to end of list.
1743  WaitForServer(DEBUG_LOCATION, stub, servers_.size() - 1);
1744  for (size_t i = 0; i < servers_.size(); ++i) {
1745  CheckRpcSendOk(DEBUG_LOCATION, stub);
1746  EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
1747  }
1748  // One request should have gone to each server.
1749  for (size_t i = 0; i < servers_.size(); ++i) {
1750  EXPECT_EQ(1, servers_[i]->service_.request_count());
1751  }
1752  const auto pre_death = servers_[0]->service_.request_count();
1753  // Kill the first server.
1754  servers_[0]->Shutdown();
1755  // Client request still succeed. May need retrying if RR had returned a pick
1756  // before noticing the change in the server's connectivity.
1757  while (true) {
1759  if (status.ok()) break;
1760  EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
1761  EXPECT_EQ("", status.error_message());
1762  }
1763  // Send a bunch of RPCs that should succeed.
1764  for (int i = 0; i < 10 * kNumServers; ++i) {
1765  CheckRpcSendOk(DEBUG_LOCATION, stub);
1766  }
1767  const auto post_death = servers_[0]->service_.request_count();
1768  // No requests have gone to the deceased server.
1769  EXPECT_EQ(pre_death, post_death);
1770  // Bring the first server back up.
1771  StartServer(0);
1772  // Requests should start arriving at the first server either right away (if
1773  // the server managed to start before the RR policy retried the subchannel) or
1774  // after the subchannel retry delay otherwise (RR's subchannel retried before
1775  // the server was fully back up).
1776  WaitForServer(DEBUG_LOCATION, stub, 0);
1777 }
1778 
1779 // If health checking is required by client but health checking service
1780 // is not running on the server, the channel should be treated as healthy.
1781 TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) {
1782  StartServers(1); // Single server
1783  ChannelArguments args;
1784  args.SetServiceConfigJSON(
1785  "{\"healthCheckConfig\": "
1786  "{\"serviceName\": \"health_check_service_name\"}}");
1787  auto response_generator = BuildResolverResponseGenerator();
1788  auto channel = BuildChannel("round_robin", response_generator, args);
1789  auto stub = BuildStub(channel);
1790  response_generator.SetNextResolution({servers_[0]->port_});
1791  EXPECT_TRUE(WaitForChannelReady(channel.get()));
1792  CheckRpcSendOk(DEBUG_LOCATION, stub);
1793 }
1794 
1795 TEST_F(RoundRobinTest, HealthChecking) {
1797  // Start servers.
1798  const int kNumServers = 3;
1799  StartServers(kNumServers);
1800  ChannelArguments args;
1801  args.SetServiceConfigJSON(
1802  "{\"healthCheckConfig\": "
1803  "{\"serviceName\": \"health_check_service_name\"}}");
1804  auto response_generator = BuildResolverResponseGenerator();
1805  auto channel = BuildChannel("round_robin", response_generator, args);
1806  auto stub = BuildStub(channel);
1807  response_generator.SetNextResolution(GetServersPorts());
1808  // Channel should not become READY, because health checks should be failing.
1809  gpr_log(GPR_INFO,
1810  "*** initial state: unknown health check service name for "
1811  "all servers");
1812  EXPECT_FALSE(WaitForChannelReady(channel.get(), 1));
1813  // Now set one of the servers to be healthy.
1814  // The channel should become healthy and all requests should go to
1815  // the healthy server.
1816  gpr_log(GPR_INFO, "*** server 0 healthy");
1817  servers_[0]->SetServingStatus("health_check_service_name", true);
1818  EXPECT_TRUE(WaitForChannelReady(channel.get()));
1819  for (int i = 0; i < 10; ++i) {
1820  CheckRpcSendOk(DEBUG_LOCATION, stub);
1821  }
1822  EXPECT_EQ(10, servers_[0]->service_.request_count());
1823  EXPECT_EQ(0, servers_[1]->service_.request_count());
1824  EXPECT_EQ(0, servers_[2]->service_.request_count());
1825  // Now set a second server to be healthy.
1826  gpr_log(GPR_INFO, "*** server 2 healthy");
1827  servers_[2]->SetServingStatus("health_check_service_name", true);
1828  WaitForServer(DEBUG_LOCATION, stub, 2);
1829  for (int i = 0; i < 10; ++i) {
1830  CheckRpcSendOk(DEBUG_LOCATION, stub);
1831  }
1832  EXPECT_EQ(5, servers_[0]->service_.request_count());
1833  EXPECT_EQ(0, servers_[1]->service_.request_count());
1834  EXPECT_EQ(5, servers_[2]->service_.request_count());
1835  // Now set the remaining server to be healthy.
1836  gpr_log(GPR_INFO, "*** server 1 healthy");
1837  servers_[1]->SetServingStatus("health_check_service_name", true);
1838  WaitForServer(DEBUG_LOCATION, stub, 1);
1839  for (int i = 0; i < 9; ++i) {
1840  CheckRpcSendOk(DEBUG_LOCATION, stub);
1841  }
1842  EXPECT_EQ(3, servers_[0]->service_.request_count());
1843  EXPECT_EQ(3, servers_[1]->service_.request_count());
1844  EXPECT_EQ(3, servers_[2]->service_.request_count());
1845  // Now set one server to be unhealthy again. Then wait until the
1846  // unhealthiness has hit the client. We know that the client will see
1847  // this when we send kNumServers requests and one of the remaining servers
1848  // sees two of the requests.
1849  gpr_log(GPR_INFO, "*** server 0 unhealthy");
1850  servers_[0]->SetServingStatus("health_check_service_name", false);
1851  do {
1852  ResetCounters();
1853  for (int i = 0; i < kNumServers; ++i) {
1854  CheckRpcSendOk(DEBUG_LOCATION, stub);
1855  }
1856  } while (servers_[1]->service_.request_count() != 2 &&
1857  servers_[2]->service_.request_count() != 2);
1858  // Now set the remaining two servers to be unhealthy. Make sure the
1859  // channel leaves READY state and that RPCs fail.
1860  gpr_log(GPR_INFO, "*** all servers unhealthy");
1861  servers_[1]->SetServingStatus("health_check_service_name", false);
1862  servers_[2]->SetServingStatus("health_check_service_name", false);
1863  EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
1864  CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
1865  "connections to all backends failing; last error: "
1866  "UNAVAILABLE: backend unhealthy");
1867  // Clean up.
1869 }
1870 
1871 TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) {
1873  // Start servers.
1874  const int kNumServers = 3;
1875  StartServers(kNumServers);
1876  servers_[0]->SetServingStatus("health_check_service_name", true);
1877  servers_[1]->SetServingStatus("health_check_service_name", true);
1878  servers_[2]->SetServingStatus("health_check_service_name", true);
1879  ChannelArguments args;
1880  args.SetServiceConfigJSON(
1881  "{\"healthCheckConfig\": "
1882  "{\"serviceName\": \"health_check_service_name\"}}");
1883  auto response_generator = BuildResolverResponseGenerator();
1884  auto channel = BuildChannel("round_robin", response_generator, args);
1885  auto stub = BuildStub(channel);
1886  response_generator.SetNextResolution(GetServersPorts());
1887  WaitForServer(DEBUG_LOCATION, stub, 0);
1888  // Stop server 0 and send a new resolver result to ensure that RR
1889  // checks each subchannel's state.
1890  servers_[0]->Shutdown();
1891  response_generator.SetNextResolution(GetServersPorts());
1892  // Send a bunch more RPCs.
1893  for (size_t i = 0; i < 100; i++) {
1894  CheckRpcSendOk(DEBUG_LOCATION, stub);
1895  }
1896 }
1897 
1898 TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
1900  // Start server.
1901  const int kNumServers = 1;
1902  StartServers(kNumServers);
1903  // Create a channel with health-checking enabled.
1904  ChannelArguments args;
1905  args.SetServiceConfigJSON(
1906  "{\"healthCheckConfig\": "
1907  "{\"serviceName\": \"health_check_service_name\"}}");
1908  auto response_generator1 = BuildResolverResponseGenerator();
1909  auto channel1 = BuildChannel("round_robin", response_generator1, args);
1910  auto stub1 = BuildStub(channel1);
1911  std::vector<int> ports = GetServersPorts();
1912  response_generator1.SetNextResolution(ports);
1913  // Create a channel with health checking enabled but inhibited.
1915  auto response_generator2 = BuildResolverResponseGenerator();
1916  auto channel2 = BuildChannel("round_robin", response_generator2, args);
1917  auto stub2 = BuildStub(channel2);
1918  response_generator2.SetNextResolution(ports);
1919  // First channel should not become READY, because health checks should be
1920  // failing.
1921  EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1922  CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
1923  "connections to all backends failing; last error: "
1924  "UNAVAILABLE: backend unhealthy");
1925  // Second channel should be READY.
1926  EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1927  CheckRpcSendOk(DEBUG_LOCATION, stub2);
1928  // Enable health checks on the backend and wait for channel 1 to succeed.
1929  servers_[0]->SetServingStatus("health_check_service_name", true);
1930  CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
1931  // Check that we created only one subchannel to the backend.
1932  EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1933  // Clean up.
1935 }
1936 
1937 TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
1939  // Start server.
1940  const int kNumServers = 1;
1941  StartServers(kNumServers);
1942  // Create a channel with health-checking enabled.
1943  ChannelArguments args;
1944  args.SetServiceConfigJSON(
1945  "{\"healthCheckConfig\": "
1946  "{\"serviceName\": \"health_check_service_name\"}}");
1947  auto response_generator1 = BuildResolverResponseGenerator();
1948  auto channel1 = BuildChannel("round_robin", response_generator1, args);
1949  auto stub1 = BuildStub(channel1);
1950  std::vector<int> ports = GetServersPorts();
1951  response_generator1.SetNextResolution(ports);
1952  // Create a channel with health-checking enabled with a different
1953  // service name.
1954  ChannelArguments args2;
1955  args2.SetServiceConfigJSON(
1956  "{\"healthCheckConfig\": "
1957  "{\"serviceName\": \"health_check_service_name2\"}}");
1958  auto response_generator2 = BuildResolverResponseGenerator();
1959  auto channel2 = BuildChannel("round_robin", response_generator2, args2);
1960  auto stub2 = BuildStub(channel2);
1961  response_generator2.SetNextResolution(ports);
1962  // Allow health checks from channel 2 to succeed.
1963  servers_[0]->SetServingStatus("health_check_service_name2", true);
1964  // First channel should not become READY, because health checks should be
1965  // failing.
1966  EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
1967  CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
1968  "connections to all backends failing; last error: "
1969  "UNAVAILABLE: backend unhealthy");
1970  // Second channel should be READY.
1971  EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
1972  CheckRpcSendOk(DEBUG_LOCATION, stub2);
1973  // Enable health checks for channel 1 and wait for it to succeed.
1974  servers_[0]->SetServingStatus("health_check_service_name", true);
1975  CheckRpcSendOk(DEBUG_LOCATION, stub1, true /* wait_for_ready */);
1976  // Check that we created only one subchannel to the backend.
1977  EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
1978  // Clean up.
1980 }
1981 
1982 TEST_F(RoundRobinTest,
1983  HealthCheckingServiceNameChangesAfterSubchannelsCreated) {
1985  // Start server.
1986  const int kNumServers = 1;
1987  StartServers(kNumServers);
1988  // Create a channel with health-checking enabled.
1989  const char* kServiceConfigJson =
1990  "{\"healthCheckConfig\": "
1991  "{\"serviceName\": \"health_check_service_name\"}}";
1992  auto response_generator = BuildResolverResponseGenerator();
1993  auto channel = BuildChannel("round_robin", response_generator);
1994  auto stub = BuildStub(channel);
1995  std::vector<int> ports = GetServersPorts();
1996  response_generator.SetNextResolution(ports, kServiceConfigJson);
1997  servers_[0]->SetServingStatus("health_check_service_name", true);
1998  EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
1999  // Send an update on the channel to change it to use a health checking
2000  // service name that is not being reported as healthy.
2001  const char* kServiceConfigJson2 =
2002  "{\"healthCheckConfig\": "
2003  "{\"serviceName\": \"health_check_service_name2\"}}";
2004  response_generator.SetNextResolution(ports, kServiceConfigJson2);
2005  EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
2006  // Clean up.
2008 }
2009 
2010 //
2011 // LB policy pick args
2012 //
2013 
2014 class ClientLbPickArgsTest : public ClientLbEnd2endTest {
2015  protected:
2016  void SetUp() override {
2017  ClientLbEnd2endTest::SetUp();
2018  current_test_instance_ = this;
2019  }
2020 
2021  static void SetUpTestCase() {
2022  grpc_init();
2024  }
2025 
2026  static void TearDownTestCase() { grpc_shutdown(); }
2027 
2028  std::vector<grpc_core::PickArgsSeen> args_seen_list() {
2030  return args_seen_list_;
2031  }
2032 
2033  static std::string ArgsSeenListString(
2034  const std::vector<grpc_core::PickArgsSeen>& args_seen_list) {
2035  std::vector<std::string> entries;
2036  for (const auto& args_seen : args_seen_list) {
2037  std::vector<std::string> metadata;
2038  for (const auto& p : args_seen.metadata) {
2039  metadata.push_back(absl::StrCat(p.first, "=", p.second));
2040  }
2041  entries.push_back(absl::StrFormat("{path=\"%s\", metadata=[%s]}",
2042  args_seen.path,
2043  absl::StrJoin(metadata, ", ")));
2044  }
2045  return absl::StrCat("[", absl::StrJoin(entries, ", "), "]");
2046  }
2047 
2048  private:
2049  static void SavePickArgs(const grpc_core::PickArgsSeen& args_seen) {
2050  ClientLbPickArgsTest* self = current_test_instance_;
2051  grpc::internal::MutexLock lock(&self->mu_);
2052  self->args_seen_list_.emplace_back(args_seen);
2053  }
2054 
2055  static ClientLbPickArgsTest* current_test_instance_;
2057  std::vector<grpc_core::PickArgsSeen> args_seen_list_;
2058 };
2059 
2060 ClientLbPickArgsTest* ClientLbPickArgsTest::current_test_instance_ = nullptr;
2061 
2062 TEST_F(ClientLbPickArgsTest, Basic) {
2063  const int kNumServers = 1;
2064  StartServers(kNumServers);
2065  auto response_generator = BuildResolverResponseGenerator();
2066  auto channel = BuildChannel("test_pick_args_lb", response_generator);
2067  auto stub = BuildStub(channel);
2068  response_generator.SetNextResolution(GetServersPorts());
2069  // Proactively connect the channel, so that the LB policy will always
2070  // be connected before it sees the pick. Otherwise, the test would be
2071  // flaky because sometimes the pick would be seen twice (once in
2072  // CONNECTING and again in READY) and other times only once (in READY).
2074  // Check LB policy name for the channel.
2075  EXPECT_EQ("test_pick_args_lb", channel->GetLoadBalancingPolicyName());
2076  // Now send an RPC and check that the picker sees the expected data.
2077  CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
2078  auto pick_args_seen_list = args_seen_list();
2079  EXPECT_THAT(pick_args_seen_list,
2082  "/grpc.testing.EchoTestService/Echo"),
2085  ::testing::Pair("foo", "1"),
2086  ::testing::Pair("bar", "2"),
2087  ::testing::Pair("baz", "3"))))))
2088  << ArgsSeenListString(pick_args_seen_list);
2089 }
2090 
2091 //
2092 // tests that LB policies can get the call's trailing metadata
2093 //
2094 
2095 xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport(
2096  const grpc_core::BackendMetricData& backend_metric_data) {
2097  xds::data::orca::v3::OrcaLoadReport load_report;
2098  load_report.set_cpu_utilization(backend_metric_data.cpu_utilization);
2099  load_report.set_mem_utilization(backend_metric_data.mem_utilization);
2100  for (const auto& p : backend_metric_data.request_cost) {
2101  std::string name(p.first);
2102  (*load_report.mutable_request_cost())[name] = p.second;
2103  }
2104  for (const auto& p : backend_metric_data.utilization) {
2105  std::string name(p.first);
2106  (*load_report.mutable_utilization())[name] = p.second;
2107  }
2108  return load_report;
2109 }
2110 
2111 class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
2112  protected:
2113  void SetUp() override {
2114  ClientLbEnd2endTest::SetUp();
2115  current_test_instance_ = this;
2116  }
2117 
2118  static void SetUpTestCase() {
2119  grpc_init();
2121  ReportTrailerIntercepted);
2122  }
2123 
2124  static void TearDownTestCase() { grpc_shutdown(); }
2125 
2126  int trailers_intercepted() {
2128  return trailers_intercepted_;
2129  }
2130 
2131  absl::Status last_status() {
2133  return last_status_;
2134  }
2135 
2138  return std::move(trailing_metadata_);
2139  }
2140 
2143  return std::move(load_report_);
2144  }
2145 
2146  private:
2147  static void ReportTrailerIntercepted(
2148  const grpc_core::TrailingMetadataArgsSeen& args_seen) {
2149  const auto* backend_metric_data = args_seen.backend_metric_data;
2150  ClientLbInterceptTrailingMetadataTest* self = current_test_instance_;
2151  grpc::internal::MutexLock lock(&self->mu_);
2152  self->last_status_ = args_seen.status;
2153  self->trailers_intercepted_++;
2154  self->trailing_metadata_ = args_seen.metadata;
2155  if (backend_metric_data != nullptr) {
2156  self->load_report_ =
2157  BackendMetricDataToOrcaLoadReport(*backend_metric_data);
2158  }
2159  }
2160 
2161  static ClientLbInterceptTrailingMetadataTest* current_test_instance_;
2167 };
2168 
2169 ClientLbInterceptTrailingMetadataTest*
2171 
2172 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusOk) {
2173  StartServers(1);
2174  auto response_generator = BuildResolverResponseGenerator();
2175  auto channel =
2176  BuildChannel("intercept_trailing_metadata_lb", response_generator);
2177  auto stub = BuildStub(channel);
2178  response_generator.SetNextResolution(GetServersPorts());
2179  // Send an OK RPC.
2180  CheckRpcSendOk(DEBUG_LOCATION, stub);
2181  // Check LB policy name for the channel.
2182  EXPECT_EQ("intercept_trailing_metadata_lb",
2183  channel->GetLoadBalancingPolicyName());
2184  EXPECT_EQ(1, trailers_intercepted());
2185  EXPECT_EQ(absl::OkStatus(), last_status());
2186 }
2187 
2188 TEST_F(ClientLbInterceptTrailingMetadataTest, StatusFailed) {
2189  StartServers(1);
2190  auto response_generator = BuildResolverResponseGenerator();
2191  auto channel =
2192  BuildChannel("intercept_trailing_metadata_lb", response_generator);
2193  auto stub = BuildStub(channel);
2194  response_generator.SetNextResolution(GetServersPorts());
2195  EchoRequest request;
2196  auto* expected_error = request.mutable_param()->mutable_expected_error();
2197  expected_error->set_code(GRPC_STATUS_PERMISSION_DENIED);
2198  expected_error->set_error_message("bummer, man");
2199  Status status = SendRpc(stub, /*response=*/nullptr, /*timeout_ms=*/1000,
2200  /*wait_for_ready=*/false, &request);
2202  EXPECT_EQ(status.error_message(), "bummer, man");
2203  absl::Status status_seen_by_lb = last_status();
2204  EXPECT_EQ(status_seen_by_lb.code(), absl::StatusCode::kPermissionDenied);
2205  EXPECT_EQ(status_seen_by_lb.message(), "bummer, man");
2206 }
2207 
2208 TEST_F(ClientLbInterceptTrailingMetadataTest,
2209  StatusCancelledWithoutStartingRecvTrailingMetadata) {
2210  StartServers(1);
2211  auto response_generator = BuildResolverResponseGenerator();
2212  auto channel =
2213  BuildChannel("intercept_trailing_metadata_lb", response_generator);
2214  response_generator.SetNextResolution(GetServersPorts());
2215  auto stub = BuildStub(channel);
2216  {
2217  // Start a stream (sends initial metadata) and then cancel without
2218  // calling Finish().
2219  ClientContext ctx;
2220  auto stream = stub->BidiStream(&ctx);
2221  ctx.TryCancel();
2222  }
2223  // Check status seen by LB policy.
2224  EXPECT_EQ(1, trailers_intercepted());
2225  absl::Status status_seen_by_lb = last_status();
2226  EXPECT_EQ(status_seen_by_lb.code(), absl::StatusCode::kCancelled);
2227  EXPECT_EQ(status_seen_by_lb.message(), "call cancelled");
2228 }
2229 
2230 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
2231  const int kNumServers = 1;
2232  const int kNumRpcs = 10;
2233  StartServers(kNumServers);
2234  auto response_generator = BuildResolverResponseGenerator();
2235  ChannelArguments channel_args;
2236  channel_args.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
2237  auto channel = BuildChannel("intercept_trailing_metadata_lb",
2238  response_generator, channel_args);
2239  auto stub = BuildStub(channel);
2240  response_generator.SetNextResolution(GetServersPorts());
2241  for (size_t i = 0; i < kNumRpcs; ++i) {
2242  CheckRpcSendOk(DEBUG_LOCATION, stub);
2243  }
2244  // Check LB policy name for the channel.
2245  EXPECT_EQ("intercept_trailing_metadata_lb",
2246  channel->GetLoadBalancingPolicyName());
2247  EXPECT_EQ(kNumRpcs, trailers_intercepted());
2250  // TODO(roth): Should grpc-status be visible here?
2251  ::testing::Pair("grpc-status", "0"),
2252  ::testing::Pair("user-agent", ::testing::_),
2253  ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
2254  ::testing::Pair("baz", "3")));
2255  EXPECT_FALSE(backend_load_report().has_value());
2256 }
2257 
2258 TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
2259  const int kNumServers = 1;
2260  const int kNumRpcs = 10;
2261  StartServers(kNumServers);
2262  ChannelArguments args;
2263  args.SetServiceConfigJSON(
2264  "{\n"
2265  " \"methodConfig\": [ {\n"
2266  " \"name\": [\n"
2267  " { \"service\": \"grpc.testing.EchoTestService\" }\n"
2268  " ],\n"
2269  " \"retryPolicy\": {\n"
2270  " \"maxAttempts\": 3,\n"
2271  " \"initialBackoff\": \"1s\",\n"
2272  " \"maxBackoff\": \"120s\",\n"
2273  " \"backoffMultiplier\": 1.6,\n"
2274  " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
2275  " }\n"
2276  " } ]\n"
2277  "}");
2278  auto response_generator = BuildResolverResponseGenerator();
2279  auto channel =
2280  BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
2281  auto stub = BuildStub(channel);
2282  response_generator.SetNextResolution(GetServersPorts());
2283  for (size_t i = 0; i < kNumRpcs; ++i) {
2284  CheckRpcSendOk(DEBUG_LOCATION, stub);
2285  }
2286  // Check LB policy name for the channel.
2287  EXPECT_EQ("intercept_trailing_metadata_lb",
2288  channel->GetLoadBalancingPolicyName());
2289  EXPECT_EQ(kNumRpcs, trailers_intercepted());
2292  // TODO(roth): Should grpc-status be visible here?
2293  ::testing::Pair("grpc-status", "0"),
2294  ::testing::Pair("user-agent", ::testing::_),
2295  ::testing::Pair("foo", "1"), ::testing::Pair("bar", "2"),
2296  ::testing::Pair("baz", "3")));
2297  EXPECT_FALSE(backend_load_report().has_value());
2298 }
2299 
2300 TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
2301  const int kNumServers = 1;
2302  const int kNumRpcs = 10;
2303  StartServers(kNumServers);
2304  xds::data::orca::v3::OrcaLoadReport load_report;
2305  load_report.set_cpu_utilization(0.5);
2306  load_report.set_mem_utilization(0.75);
2307  auto* request_cost = load_report.mutable_request_cost();
2308  (*request_cost)["foo"] = 0.8;
2309  (*request_cost)["bar"] = 1.4;
2310  auto* utilization = load_report.mutable_utilization();
2311  (*utilization)["baz"] = 1.1;
2312  (*utilization)["quux"] = 0.9;
2313  auto response_generator = BuildResolverResponseGenerator();
2314  auto channel =
2315  BuildChannel("intercept_trailing_metadata_lb", response_generator);
2316  auto stub = BuildStub(channel);
2317  response_generator.SetNextResolution(GetServersPorts());
2318  for (size_t i = 0; i < kNumRpcs; ++i) {
2319  CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
2320  auto actual = backend_load_report();
2321  ASSERT_TRUE(actual.has_value());
2322  // TODO(roth): Change this to use EqualsProto() once that becomes
2323  // available in OSS.
2324  EXPECT_EQ(actual->cpu_utilization(), load_report.cpu_utilization());
2325  EXPECT_EQ(actual->mem_utilization(), load_report.mem_utilization());
2326  EXPECT_EQ(actual->request_cost().size(), load_report.request_cost().size());
2327  for (const auto& p : actual->request_cost()) {
2328  auto it = load_report.request_cost().find(p.first);
2329  ASSERT_NE(it, load_report.request_cost().end());
2330  EXPECT_EQ(it->second, p.second);
2331  }
2332  EXPECT_EQ(actual->utilization().size(), load_report.utilization().size());
2333  for (const auto& p : actual->utilization()) {
2334  auto it = load_report.utilization().find(p.first);
2335  ASSERT_NE(it, load_report.utilization().end());
2336  EXPECT_EQ(it->second, p.second);
2337  }
2338  }
2339  // Check LB policy name for the channel.
2340  EXPECT_EQ("intercept_trailing_metadata_lb",
2341  channel->GetLoadBalancingPolicyName());
2342  EXPECT_EQ(kNumRpcs, trailers_intercepted());
2343 }
2344 
2345 //
2346 // tests that address attributes from the resolver are visible to the LB policy
2347 //
2348 
2349 class ClientLbAddressTest : public ClientLbEnd2endTest {
2350  protected:
2351  static const char* kAttributeKey;
2352 
2353  class Attribute : public grpc_core::ServerAddress::AttributeInterface {
2354  public:
2355  explicit Attribute(const std::string& str) : str_(str) {}
2356 
2357  std::unique_ptr<AttributeInterface> Copy() const override {
2358  return absl::make_unique<Attribute>(str_);
2359  }
2360 
2361  int Cmp(const AttributeInterface* other) const override {
2362  return str_.compare(static_cast<const Attribute*>(other)->str_);
2363  }
2364 
2365  std::string ToString() const override { return str_; }
2366 
2367  private:
2369  };
2370 
2371  void SetUp() override {
2372  ClientLbEnd2endTest::SetUp();
2373  current_test_instance_ = this;
2374  }
2375 
2376  static void SetUpTestCase() {
2377  grpc_init();
2379  }
2380 
2381  static void TearDownTestCase() { grpc_shutdown(); }
2382 
2383  const std::vector<std::string>& addresses_seen() {
2385  return addresses_seen_;
2386  }
2387 
2388  private:
2389  static void SaveAddress(const grpc_core::ServerAddress& address) {
2390  ClientLbAddressTest* self = current_test_instance_;
2391  grpc::internal::MutexLock lock(&self->mu_);
2392  self->addresses_seen_.emplace_back(address.ToString());
2393  }
2394 
2395  static ClientLbAddressTest* current_test_instance_;
2397  std::vector<std::string> addresses_seen_;
2398 };
2399 
2400 const char* ClientLbAddressTest::kAttributeKey = "attribute_key";
2401 
2402 ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr;
2403 
2404 TEST_F(ClientLbAddressTest, Basic) {
2405  const int kNumServers = 1;
2406  StartServers(kNumServers);
2407  auto response_generator = BuildResolverResponseGenerator();
2408  auto channel = BuildChannel("address_test_lb", response_generator);
2409  auto stub = BuildStub(channel);
2410  // Addresses returned by the resolver will have attached attributes.
2411  response_generator.SetNextResolution(GetServersPorts(), nullptr,
2412  kAttributeKey,
2413  absl::make_unique<Attribute>("foo"));
2414  CheckRpcSendOk(DEBUG_LOCATION, stub);
2415  // Check LB policy name for the channel.
2416  EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
2417  // Make sure that the attributes wind up on the subchannels.
2418  std::vector<std::string> expected;
2419  for (const int port : GetServersPorts()) {
2420  expected.emplace_back(
2421  absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", port,
2422  " args={} attributes={", kAttributeKey, "=foo}"));
2423  }
2424  EXPECT_EQ(addresses_seen(), expected);
2425 }
2426 
2427 //
2428 // tests OOB backend metric API
2429 //
2430 
2431 class OobBackendMetricTest : public ClientLbEnd2endTest {
2432  protected:
2433  using BackendMetricReport =
2434  std::pair<int /*port*/, xds::data::orca::v3::OrcaLoadReport>;
2435 
2436  void SetUp() override {
2437  ClientLbEnd2endTest::SetUp();
2438  current_test_instance_ = this;
2439  }
2440 
2441  static void SetUpTestCase() {
2442  grpc_init();
2444  BackendMetricCallback);
2445  }
2446 
2447  static void TearDownTestCase() { grpc_shutdown(); }
2448 
2449  absl::optional<BackendMetricReport> GetBackendMetricReport() {
2451  if (backend_metric_reports_.empty()) return absl::nullopt;
2452  auto result = std::move(backend_metric_reports_.front());
2453  backend_metric_reports_.pop_front();
2454  return result;
2455  }
2456 
2457  private:
2458  static void BackendMetricCallback(
2459  grpc_core::ServerAddress address,
2460  const grpc_core::BackendMetricData& backend_metric_data) {
2461  auto load_report = BackendMetricDataToOrcaLoadReport(backend_metric_data);
2462  int port = grpc_sockaddr_get_port(&address.address());
2464  current_test_instance_->backend_metric_reports_.push_back(
2465  {port, std::move(load_report)});
2466  }
2467 
2468  static OobBackendMetricTest* current_test_instance_;
2470  std::deque<BackendMetricReport> backend_metric_reports_ ABSL_GUARDED_BY(&mu_);
2471 };
2472 
2473 OobBackendMetricTest* OobBackendMetricTest::current_test_instance_ = nullptr;
2474 
2475 TEST_F(OobBackendMetricTest, Basic) {
2476  StartServers(1);
2477  // Set initial backend metric data on server.
2478  constexpr char kMetricName[] = "foo";
2479  servers_[0]->orca_service_.SetCpuUtilization(0.1);
2480  servers_[0]->orca_service_.SetMemoryUtilization(0.2);
2481  servers_[0]->orca_service_.SetNamedUtilization(kMetricName, 0.3);
2482  // Start client.
2483  auto response_generator = BuildResolverResponseGenerator();
2484  auto channel = BuildChannel("oob_backend_metric_test_lb", response_generator);
2485  auto stub = BuildStub(channel);
2486  response_generator.SetNextResolution(GetServersPorts());
2487  // Send an OK RPC.
2488  CheckRpcSendOk(DEBUG_LOCATION, stub);
2489  // Check LB policy name for the channel.
2490  EXPECT_EQ("oob_backend_metric_test_lb",
2491  channel->GetLoadBalancingPolicyName());
2492  // Check report seen by client.
2493  for (size_t i = 0; i < 5; ++i) {
2494  auto report = GetBackendMetricReport();
2495  if (report.has_value()) {
2496  EXPECT_EQ(report->first, servers_[0]->port_);
2497  EXPECT_EQ(report->second.cpu_utilization(), 0.1);
2498  EXPECT_EQ(report->second.mem_utilization(), 0.2);
2499  EXPECT_THAT(
2500  report->second.utilization(),
2501  ::testing::UnorderedElementsAre(::testing::Pair(kMetricName, 0.3)));
2502  break;
2503  }
2505  }
2506  // Now update the utilization data on the server.
2507  // Note that the server may send a new report while we're updating these,
2508  // so we set them in reverse order, so that we know we'll get all new
2509  // data once we see a report with the new CPU utilization value.
2510  servers_[0]->orca_service_.SetNamedUtilization(kMetricName, 0.6);
2511  servers_[0]->orca_service_.SetMemoryUtilization(0.5);
2512  servers_[0]->orca_service_.SetCpuUtilization(0.4);
2513  // Wait for client to see new report.
2514  for (size_t i = 0; i < 5; ++i) {
2515  auto report = GetBackendMetricReport();
2516  if (report.has_value()) {
2517  EXPECT_EQ(report->first, servers_[0]->port_);
2518  if (report->second.cpu_utilization() != 0.1) {
2519  EXPECT_EQ(report->second.cpu_utilization(), 0.4);
2520  EXPECT_EQ(report->second.mem_utilization(), 0.5);
2521  EXPECT_THAT(
2522  report->second.utilization(),
2523  ::testing::UnorderedElementsAre(::testing::Pair(kMetricName, 0.6)));
2524  break;
2525  }
2526  }
2528  }
2529 }
2530 
2531 } // namespace
2532 } // namespace testing
2533 } // namespace grpc
2534 
2535 int main(int argc, char** argv) {
2536  ::testing::InitGoogleTest(&argc, argv);
2537  grpc::testing::TestEnvironment env(&argc, argv);
2538  grpc_init();
2540  const auto result = RUN_ALL_TESTS();
2541  grpc_shutdown();
2542  return result;
2543 }
grpc::EXPECT_THAT
EXPECT_THAT(status.error_message(), ::testing::HasSubstr("subject_token_type"))
xds_interop_client.str
str
Definition: xds_interop_client.py:487
EXPECT_FALSE
#define EXPECT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1970
response_generator_
grpc_core::RefCountedPtr< grpc_core::FakeResolverResponseGenerator > response_generator_
Definition: client_lb_end2end_test.cc:219
connection_delay_injector.h
grpc::gpr_setenv
gpr_setenv("STS_CREDENTIALS", creds_file_name)
grpc::ClientContext::peer
std::string peer() const
Definition: client_context.cc:174
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
cond_
grpc::internal::CondVar cond_
Definition: client_lb_end2end_test.cc:373
main
int main(int argc, char **argv)
Definition: client_lb_end2end_test.cc:2535
grpc::status
auto status
Definition: cpp/client/credentials_test.cc:200
testing::ContainsRegex
PolymorphicMatcher< internal::MatchesRegexMatcher > ContainsRegex(const internal::RE *regex)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8835
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
grpc_core::MakeRefCounted
RefCountedPtr< T > MakeRefCounted(Args &&... args)
Definition: ref_counted_ptr.h:335
grpc_core::LocalhostResolves
void LocalhostResolves(bool *ipv4, bool *ipv6)
Definition: resolve_localhost_ip46.cc:50
regen-readme.it
it
Definition: regen-readme.py:15
fake_credentials.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc_timeout_seconds_to_deadline
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
Definition: test/core/util/test_config.cc:81
ipv6_only_
bool ipv6_only_
Definition: client_lb_end2end_test.cc:217
log.h
port.h
trailers_intercepted_
int trailers_intercepted_
Definition: client_lb_end2end_test.cc:2163
backoff.h
sockaddr_utils.h
ctx
Definition: benchmark-async.c:30
grpc_core::DebugLocation
Definition: debug_location.h:31
trailing_metadata_
grpc_core::MetadataVector trailing_metadata_
Definition: client_lb_end2end_test.cc:2165
resolve_localhost_ip46.h
generate.env
env
Definition: generate.py:37
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
metadata
Definition: cq_verifier.cc:48
find
static void ** find(grpc_chttp2_stream_map *map, uint32_t key)
Definition: stream_map.cc:99
grpc_core::BackendMetricData::mem_utilization
double mem_utilization
Definition: backend_metric_data.h:36
grpc::experimental::ChannelResetConnectionBackoff
void ChannelResetConnectionBackoff(Channel *channel)
Definition: channel_cc.cc:111
connectivity_state.h
grpc::internal::Mutex
Definition: include/grpcpp/impl/codegen/sync.h:59
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
absl::Time
Definition: third_party/abseil-cpp/absl/time/time.h:642
grpc
Definition: grpcpp/alarm.h:33
clients_
std::set< std::string > clients_
Definition: client_lb_end2end_test.cc:137
grpc_core::RefCountedPtr::get
T * get() const
Definition: ref_counted_ptr.h:146
grpc_core::FakeResolverResponseGenerator::SetResponse
void SetResponse(Resolver::Result result)
Definition: fake_resolver.cc:229
grpc::EnableDefaultHealthCheckService
void EnableDefaultHealthCheckService(bool enable)
Definition: health_check_service.cc:30
grpc::testing::ConnectionAttemptInjector::Init
static void Init()
Definition: connection_delay_injector.cc:70
run_interop_tests.timeout_seconds
timeout_seconds
Definition: run_interop_tests.py:1553
grpc_core::BackendMetricData
Definition: backend_metric_data.h:31
client
Definition: examples/python/async_streaming/client.py:1
TestServiceImpl::Echo
grpc::Status Echo(grpc::ServerContext *context, const grpc::testing::EchoRequest *request, grpc::testing::EchoResponse *response)
grpc::ASSERT_EQ
ASSERT_EQ(sizeof(valid_json), fwrite(valid_json, 1, sizeof(valid_json), creds_file))
grpc::ClientContext::set_wait_for_ready
void set_wait_for_ready(bool wait_for_ready)
Definition: grpcpp/impl/codegen/client_context.h:285
benchmark.request
request
Definition: benchmark.py:77
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
GRPC_STATUS_PERMISSION_DENIED
@ GRPC_STATUS_PERMISSION_DENIED
Definition: include/grpc/impl/codegen/status.h:68
EXPECT_GT
#define EXPECT_GT(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2036
gpr_event_set
GPRAPI void gpr_event_set(gpr_event *ev, void *value)
Definition: sync.cc:59
metadata
struct metadata metadata
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
fake_resolver.h
grpc_core::ServerAddress
Definition: server_address.h:49
secure_credentials.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc::ClientContext::set_deadline
void set_deadline(const T &deadline)
Definition: grpcpp/impl/codegen/client_context.h:274
grpc_resolved_address
Definition: resolved_address.h:34
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
health_check_service_interface.h
grpc_core::RegisterOobBackendMetricTestLoadBalancingPolicy
void RegisterOobBackendMetricTestLoadBalancingPolicy(OobBackendMetricCallback cb)
Definition: test_lb_policies.cc:670
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:55
grpc_core::RegisterAddressTestLoadBalancingPolicy
void RegisterAddressTestLoadBalancingPolicy(AddressTestCallback cb)
Definition: test_lb_policies.cc:660
ctx
static struct test_ctx ctx
Definition: test-ipc-send-recv.c:65
GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL
#define GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL
Definition: grpc_types.h:443
setup.name
name
Definition: setup.py:542
env.h
orca_service_
experimental::OrcaService orca_service_
Definition: client_lb_end2end_test.cc:369
time.h
grpc_core::MetadataVector
std::vector< std::pair< std::string, std::string > > MetadataVector
Definition: test_lb_policies.h:24
testing::Ge
internal::GeMatcher< Rhs > Ge(Rhs x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8585
EXPECT_LE
#define EXPECT_LE(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2030
xds_manager.p
p
Definition: xds_manager.py:60
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
testing::AnyOf
internal::AnyOfResult2< M1, M2 >::type AnyOf(M1 m1, M2 m2)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:13555
grpc_core::PickArgsSeen
Definition: test_lb_policies.h:26
grpc_core::URI::Parse
static absl::StatusOr< URI > Parse(absl::string_view uri_text)
Definition: uri_parser.cc:209
grpc_core::DebugLocation::file
const char * file() const
Definition: debug_location.h:34
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
grpc::ClientContext::AddMetadata
void AddMetadata(const std::string &meta_key, const std::string &meta_value)
Definition: client_context.cc:121
grpc::internal::MutexLock
Definition: include/grpcpp/impl/codegen/sync.h:86
server_address
std::string server_address("0.0.0.0:10000")
absl::StatusCode::kPermissionDenied
@ kPermissionDenied
testing::MatchesRegex
PolymorphicMatcher< internal::MatchesRegexMatcher > MatchesRegex(const internal::RE *regex)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8824
testing::Test
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:402
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
test_service_impl.h
test_lb_policies.h
grpc_core::FakeResolverResponseGenerator::SetFailureOnReresolution
void SetFailureOnReresolution()
Definition: fake_resolver.cc:286
testing::ElementsAre
internal::ElementsAreMatcher< ::testing::tuple<> > ElementsAre()
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:13040
global_subchannel_pool.h
absl::synchronization_internal::Get
static GraphId Get(const IdMap &id, int num)
Definition: abseil-cpp/absl/synchronization/internal/graphcycles_test.cc:44
grpc_parse_uri
bool grpc_parse_uri(const grpc_core::URI &uri, grpc_resolved_address *resolved_addr)
Definition: parse_address.cc:293
GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS
#define GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS
Definition: grpc_types.h:263
GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS
#define GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS
Definition: grpc_types.h:173
GRPC_ARG_ENABLE_RETRIES
#define GRPC_ARG_ENABLE_RETRIES
Definition: grpc_types.h:396
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::ServerAddress::address
const grpc_resolved_address & address() const
Definition: server_address.h:95
framework.rpc.grpc_channelz.Channel
Channel
Definition: grpc_channelz.py:32
clients
static client_t clients[NUM_CLIENTS]
Definition: test-pipe-connect-multiple.c:40
parse_address.h
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
grpc_status._async.trailing_metadata
trailing_metadata
Definition: grpcio_status/grpc_status/_async.py:36
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
sync.h
grpc_core::RefCountedPtr< grpc_core::FakeResolverResponseGenerator >
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
absl::StrJoin
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
Definition: abseil-cpp/absl/strings/str_join.h:239
gen_stats_data.found
bool found
Definition: gen_stats_data.py:61
grpc_core::PickArgsSeen::metadata
MetadataVector metadata
Definition: test_lb_policies.h:28
gpr_time_cmp
GPRAPI int gpr_time_cmp(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:30
grpc_core::DebugLocation::line
int line() const
Definition: debug_location.h:35
creds_
std::shared_ptr< ChannelCredentials > creds_
Definition: client_lb_end2end_test.cc:523
t0
static int64_t t0
Definition: bloaty/third_party/re2/util/benchmark.cc:44
grpc::internal::CondVar::Wait
void Wait(Mutex *mu)
Definition: include/grpcpp/impl/codegen/sync.h:135
EXPECT_NE
#define EXPECT_NE(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2028
current_test_instance_
static ClientLbPickArgsTest * current_test_instance_
Definition: client_lb_end2end_test.cc:2055
grpc.StatusCode.PERMISSION_DENIED
tuple PERMISSION_DENIED
Definition: src/python/grpcio/grpc/__init__.py:268
gpr_time_sub
GPRAPI gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:168
grpc_timeout_milliseconds_to_deadline
gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms)
Definition: test/core/util/test_config.cc:89
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::Resolver::Result
Results returned by the resolver.
Definition: resolver/resolver.h:56
server_host_
const std::string server_host_
Definition: client_lb_end2end_test.cc:521
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR
#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR
Definition: fake_resolver.h:31
grpc.h
absl::Status::message
absl::string_view message() const
Definition: third_party/abseil-cpp/absl/status/status.h:806
grpc_core::TrailingMetadataArgsSeen::metadata
MetadataVector metadata
Definition: test_lb_policies.h:41
grpc_fake_transport_security_credentials_create
grpc_channel_credentials * grpc_fake_transport_security_credentials_create()
Definition: fake_credentials.cc:79
args_seen_list_
std::vector< grpc_core::PickArgsSeen > args_seen_list_
Definition: client_lb_end2end_test.cc:2057
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
grpc_test_slowdown_factor
int64_t grpc_test_slowdown_factor()
Definition: test/core/util/test_config.cc:76
started_
bool started_
Definition: xds_cluster_impl.cc:357
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
kAttributeKey
static const char * kAttributeKey
Definition: client_lb_end2end_test.cc:2351
port_
const int port_
Definition: client_lb_end2end_test.cc:366
channel.h
server_address.h
time.h
grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy
void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(InterceptRecvTrailingMetadataCallback cb)
Definition: test_lb_policies.cc:654
absl::Duration
Definition: third_party/abseil-cpp/absl/time/time.h:159
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
grpc_core::Duration::FromTimespec
static Duration FromTimespec(gpr_timespec t)
Definition: src/core/lib/gprpp/time.cc:175
backup_poller.h
addresses_seen_
std::vector< std::string > addresses_seen_
Definition: client_lb_end2end_test.cc:2397
grpc_resolved_address::len
socklen_t len
Definition: resolved_address.h:36
grpc_core::RegisterTestPickArgsLoadBalancingPolicy
void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb, const char *delegate_policy_name)
Definition: test_lb_policies.cc:647
grpc_core::BackendMetricData::cpu_utilization
double cpu_utilization
CPU utilization expressed as a fraction of available CPU resources.
Definition: backend_metric_data.h:33
gpr_event_init
GPRAPI void gpr_event_init(gpr_event *ev)
Definition: sync.cc:54
grpc_fake_transport_security_server_credentials_create
grpc_server_credentials * grpc_fake_transport_security_server_credentials_create()
Definition: fake_credentials.cc:84
grpc::StatusCode
StatusCode
Definition: grpcpp/impl/codegen/status_code_enum.h:26
kNumRpcs
const int kNumRpcs
Definition: thread_stress_test.cc:50
benchmark::Shutdown
void Shutdown()
Definition: benchmark/src/benchmark.cc:607
grpc_core::FakeResolverResponseGenerator::SetReresolutionResponse
void SetReresolutionResponse(Resolver::Result result)
Definition: fake_resolver.cc:246
grpc_core::BackendMetricData::request_cost
std::map< absl::string_view, double > request_cost
Definition: backend_metric_data.h:40
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
call_metric_recorder.h
load_report_
xds::data::orca::v3::OrcaLoadReport load_report_
Definition: client_lb_end2end_test.cc:139
grpc_pick_unused_port_or_die
int grpc_pick_unused_port_or_die(void)
grpc_core::ServerAddressList
std::vector< ServerAddress > ServerAddressList
Definition: server_address.h:120
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc::ASSERT_NE
ASSERT_NE(creds_file_name, nullptr)
GPR_GLOBAL_CONFIG_SET
#define GPR_GLOBAL_CONFIG_SET(name, value)
Definition: global_config_generic.h:26
tcp_client.h
grpc_core::TrailingMetadataArgsSeen::backend_metric_data
const BackendMetricData * backend_metric_data
Definition: test_lb_policies.h:40
grpc_core::BackendMetricData::utilization
std::map< absl::string_view, double > utilization
Definition: backend_metric_data.h:44
GRPC_ARG_MIN_RECONNECT_BACKOFF_MS
#define GRPC_ARG_MIN_RECONNECT_BACKOFF_MS
Definition: grpc_types.h:259
gpr_event_wait
GPRAPI void * gpr_event_wait(gpr_event *ev, gpr_timespec abs_deadline)
Definition: sync.cc:73
grpc_core::ServiceConfigImpl::Create
static RefCountedPtr< ServiceConfig > Create(const grpc_channel_args *args, absl::string_view json_string, grpc_error_handle *error)
Definition: service_config_impl.cc:41
testing::Pair
internal::PairMatcher< FirstMatcher, SecondMatcher > Pair(FirstMatcher first_matcher, SecondMatcher second_matcher)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:9152
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
GRPC_CHANNEL_CONNECTING
@ GRPC_CHANNEL_CONNECTING
Definition: include/grpc/impl/codegen/connectivity_state.h:34
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
mu_
grpc::internal::Mutex mu_
Definition: client_lb_end2end_test.cc:134
test_config.h
thread_
std::unique_ptr< std::thread > thread_
Definition: client_lb_end2end_test.cc:370
absl::Seconds
constexpr Duration Seconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:419
grpc_core::Duration::Milliseconds
static constexpr Duration Milliseconds(int64_t millis)
Definition: src/core/lib/gprpp/time.h:155
grpc_sockaddr_get_port
int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr)
Definition: sockaddr_utils.cc:303
secure_server_credentials.h
wait_for_ready
bool wait_for_ready
Definition: rls_end2end_test.cc:240
GRPC_ARG_INHIBIT_HEALTH_CHECKING
#define GRPC_ARG_INHIBIT_HEALTH_CHECKING
Definition: grpc_types.h:424
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
gpr_event
Definition: impl/codegen/sync_generic.h:31
client_context.h
testing::_
const internal::AnythingMatcher _
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8548
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
grpc_core::Duration::millis
constexpr int64_t millis() const
Definition: src/core/lib/gprpp/time.h:208
absl::StatusCode::kCancelled
@ kCancelled
debug_location.h
http2_server_health_check.server_host
server_host
Definition: http2_server_health_check.py:27
request_count_
int request_count_
Definition: client_lb_end2end_test.cc:135
grpc::experimental::EnableCallMetricRecording
void EnableCallMetricRecording(ServerBuilder *)
Definition: orca_interceptor.cc:74
grpc::testing::TEST_F
TEST_F(ChannelArgumentsTest, SetInt)
Definition: channel_arguments_test.cc:134
server
Definition: examples/python/async_streaming/server.py:1
grpc_core::TrailingMetadataArgsSeen
Definition: test_lb_policies.h:38
grpc_core::ServerAddress::AttributeInterface
Definition: server_address.h:58
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
orca_service.h
testing::Le
internal::LeMatcher< Rhs > Le(Rhs x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8597
StartServer
void StartServer(JNIEnv *env, jobject obj, jmethodID is_cancelled_mid, int port)
Definition: grpc-helloworld.cc:48
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_core::PickArgsSeen::path
std::string path
Definition: test_lb_policies.h:27
index
int index
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:1184
grpc::testing::ToString
std::string ToString(const grpc::string_ref &r)
Definition: string_ref_helper.cc:24
alloc.h
grpc_core::ServerAddress::ToString
std::string ToString() const
Definition: server_address.cc:161
Copy
@ Copy
Definition: upb/benchmarks/benchmark.cc:200
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
EXPECT_LT
#define EXPECT_LT(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2032
grpc::testing::TestEnvironment
Definition: test/core/util/test_config.h:54
service_config_impl.h
grpc::CreateCustomChannel
std::shared_ptr< Channel > CreateCustomChannel(const grpc::string &target, const std::shared_ptr< ChannelCredentials > &creds, const ChannelArguments &args)
timeout_ms
int timeout_ms
Definition: rls_end2end_test.cc:239
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
ASSERT_TRUE
#define ASSERT_TRUE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1973
testing::UnorderedElementsAre
internal::UnorderedElementsAreMatcher< ::testing::tuple<> > UnorderedElementsAre()
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:13255
ASSERT_FALSE
#define ASSERT_FALSE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1976
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
server.h
grpc::testing::EXPECT_EQ
EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange")
ref_counted_ptr.h
EXPECT_GE
#define EXPECT_GE(val1, val2)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2034
testing::Field
grpc.StatusCode.UNAVAILABLE
tuple UNAVAILABLE
Definition: src/python/grpcio/grpc/__init__.py:278
channel_args.h
grpc::internal::CondVar
Definition: include/grpcpp/impl/codegen/sync.h:124
testing::AllOf
internal::AllOfResult2< M1, M2 >::type AllOf(M1 m1, M2 m2)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:13472
service_config.h
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
clients_mu_
grpc::internal::Mutex clients_mu_
Definition: client_lb_end2end_test.cc:136
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
atm.h
grpc::testing::EXPECT_TRUE
EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(minimum_valid_json, &options) .ok())
profile_analyzer.thd
thd
Definition: profile_analyzer.py:168
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
server.h
grpc::internal::CondVar::Signal
void Signal()
Definition: include/grpcpp/impl/codegen/sync.h:132
gpr_timespec
Definition: gpr_types.h:50
tests.unit._exit_scenarios.try_to_connect
try_to_connect
Definition: _exit_scenarios.py:189
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
grpc_error
Definition: error_internal.h:42
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
grpc_core::FakeResolverResponseGenerator
Definition: fake_resolver.h:46
str_
std::string str_
Definition: client_lb_end2end_test.cc:2368
service_
MyTestServiceImpl service_
Definition: client_lb_end2end_test.cc:368
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
grpc_core::CppImplOf< Server, grpc_server >::FromC
static Server * FromC(grpc_server *c_type)
Definition: cpp_impl_of.h:30
absl::Status::code
absl::StatusCode code() const
Definition: third_party/abseil-cpp/absl/status/status.cc:233
server_
std::unique_ptr< Server > server_
Definition: client_lb_end2end_test.cc:367
t1
Table t1
Definition: abseil-cpp/absl/container/internal/raw_hash_set_allocator_test.cc:185
TestServiceImpl
Definition: interop_server.cc:139
grpc::testing::SendRpc
static void SendRpc(grpc::testing::EchoTestService::Stub *stub, int num_rpcs, bool allow_exhaustion, gpr_atm *errors)
Definition: thread_stress_test.cc:277
grpc_resolved_address::addr
char addr[GRPC_MAX_SOCKADDR_SIZE]
Definition: resolved_address.h:35
pair
std::pair< std::string, std::string > pair
Definition: abseil-cpp/absl/container/internal/raw_hash_set_benchmark.cc:78
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
server_builder.h
last_status_
absl::Status last_status_
Definition: client_lb_end2end_test.cc:2164
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
run_interop_tests.servers
servers
Definition: run_interop_tests.py:1288
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
create_channel.h
state
static struct rpc_state state
Definition: bad_server_response_test.cc:87
servers_
std::vector< std::unique_ptr< ServerData > > servers_
Definition: client_lb_end2end_test.cc:522
grpc_core::TrailingMetadataArgsSeen::status
absl::Status status
Definition: test_lb_policies.h:39
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:55