xds_cluster_resolver.cc
Go to the documentation of this file.
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
19 #include <inttypes.h>
20 #include <stddef.h>
21 
22 #include <algorithm>
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/container/inlined_vector.h"
31 #include "absl/memory/memory.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 
40 #include <grpc/support/log.h>
41 
69 #include "src/core/lib/json/json.h"
74 
75 #define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
76 
77 namespace grpc_core {
78 
79 TraceFlag grpc_lb_xds_cluster_resolver_trace(false, "xds_cluster_resolver_lb");
80 
81 const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
82 
83 namespace {
84 
85 constexpr char kXdsClusterResolver[] = "xds_cluster_resolver_experimental";
86 
87 // Config for EDS LB policy.
88 class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
89  public:
90  struct DiscoveryMechanism {
94  enum DiscoveryMechanismType {
95  EDS,
96  LOGICAL_DNS,
97  };
98  DiscoveryMechanismType type;
102 
103  bool operator==(const DiscoveryMechanism& other) const {
104  return (cluster_name == other.cluster_name &&
105  lrs_load_reporting_server == other.lrs_load_reporting_server &&
106  max_concurrent_requests == other.max_concurrent_requests &&
107  type == other.type &&
108  eds_service_name == other.eds_service_name &&
109  dns_hostname == other.dns_hostname &&
110  outlier_detection_lb_config == other.outlier_detection_lb_config);
111  }
112  };
113 
114  XdsClusterResolverLbConfig(
115  std::vector<DiscoveryMechanism> discovery_mechanisms, Json xds_lb_policy)
116  : discovery_mechanisms_(std::move(discovery_mechanisms)),
117  xds_lb_policy_(std::move(xds_lb_policy)) {}
118 
119  const char* name() const override { return kXdsClusterResolver; }
120  const std::vector<DiscoveryMechanism>& discovery_mechanisms() const {
121  return discovery_mechanisms_;
122  }
123 
124  const Json& xds_lb_policy() const { return xds_lb_policy_; }
125 
126  private:
127  std::vector<DiscoveryMechanism> discovery_mechanisms_;
129 };
130 
131 // Xds Cluster Resolver LB policy.
132 class XdsClusterResolverLb : public LoadBalancingPolicy {
133  public:
134  XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args);
135 
136  const char* name() const override { return kXdsClusterResolver; }
137 
138  void UpdateLocked(UpdateArgs args) override;
139  void ResetBackoffLocked() override;
140  void ExitIdleLocked() override;
141 
142  private:
143  // Discovery Mechanism Base class
144  //
145  // Implemented by EDS and LOGICAL_DNS.
146  //
147  // Implementations are responsible for calling the LB policy's
148  // OnEndpointChanged(), OnError(), and OnResourceDoesNotExist()
149  // methods when the corresponding events occur.
150  //
151  // Must implement Orphan() method to cancel the watchers.
152  class DiscoveryMechanism : public InternallyRefCounted<DiscoveryMechanism> {
153  public:
154  DiscoveryMechanism(
155  RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
156  size_t index)
157  : parent_(std::move(xds_cluster_resolver_lb)), index_(index) {}
158 
159  XdsClusterResolverLb* parent() const { return parent_.get(); }
160  size_t index() const { return index_; }
161 
162  virtual void Start() = 0;
163  virtual Json::Array override_child_policy() = 0;
164  virtual bool disable_reresolution() = 0;
165 
166  private:
167  RefCountedPtr<XdsClusterResolverLb> parent_;
168  // Stores its own index in the vector of DiscoveryMechanism.
169  size_t index_;
170  };
171 
172  class EdsDiscoveryMechanism : public DiscoveryMechanism {
173  public:
174  EdsDiscoveryMechanism(
175  RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
176  size_t index)
177  : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
178  void Start() override;
179  void Orphan() override;
180  Json::Array override_child_policy() override { return Json::Array{}; }
181  bool disable_reresolution() override { return true; }
182 
183  private:
184  class EndpointWatcher : public XdsEndpointResourceType::WatcherInterface {
185  public:
186  explicit EndpointWatcher(
187  RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism)
189  ~EndpointWatcher() override {
190  discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher");
191  }
192  void OnResourceChanged(XdsEndpointResource update) override {
193  Ref().release(); // ref held by callback
194  discovery_mechanism_->parent()->work_serializer()->Run(
195  // TODO(yashykt): When we move to C++14, capture update with
196  // std::move
197  [this, update]() mutable {
198  OnResourceChangedHelper(std::move(update));
199  Unref();
200  },
202  }
203  void OnError(absl::Status status) override {
204  Ref().release(); // ref held by callback
205  discovery_mechanism_->parent()->work_serializer()->Run(
206  [this, status]() {
207  OnErrorHelper(status);
208  Unref();
209  },
211  }
212  void OnResourceDoesNotExist() override {
213  Ref().release(); // ref held by callback
214  discovery_mechanism_->parent()->work_serializer()->Run(
215  [this]() {
216  OnResourceDoesNotExistHelper();
217  Unref();
218  },
220  }
221 
222  private:
223  // Code accessing protected methods of `DiscoveryMechanism` need to be
224  // in methods of this class rather than in lambdas to work around an MSVC
225  // bug.
226  void OnResourceChangedHelper(XdsEndpointResource update) {
227  discovery_mechanism_->parent()->OnEndpointChanged(
229  }
230  void OnErrorHelper(absl::Status status) {
231  discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(),
232  status);
233  }
234  void OnResourceDoesNotExistHelper() {
235  discovery_mechanism_->parent()->OnResourceDoesNotExist(
236  discovery_mechanism_->index());
237  }
238  RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
239  };
240 
241  // This is necessary only because of a bug in msvc where nested class
242  // cannot access protected member in base class.
243  friend class EndpointWatcher;
244 
245  absl::string_view GetEdsResourceName() const {
246  auto& config = parent()->config_->discovery_mechanisms()[index()];
247  if (!config.eds_service_name.empty()) return config.eds_service_name;
248  return config.cluster_name;
249  }
250 
251  // Note that this is not owned, so this pointer must never be dereferenced.
252  EndpointWatcher* watcher_ = nullptr;
253  };
254 
255  class LogicalDNSDiscoveryMechanism : public DiscoveryMechanism {
256  public:
257  LogicalDNSDiscoveryMechanism(
258  RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
259  size_t index)
260  : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
261  void Start() override;
262  void Orphan() override;
263  Json::Array override_child_policy() override {
264  return Json::Array{
265  Json::Object{
266  {"pick_first", Json::Object()},
267  },
268  };
269  }
270  bool disable_reresolution() override { return false; };
271 
272  private:
273  class ResolverResultHandler : public Resolver::ResultHandler {
274  public:
275  explicit ResolverResultHandler(
276  RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism)
278 
279  ~ResolverResultHandler() override {}
280 
281  void ReportResult(Resolver::Result result) override;
282 
283  private:
284  RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism_;
285  };
286 
287  // This is necessary only because of a bug in msvc where nested class cannot
288  // access protected member in base class.
289  friend class ResolverResultHandler;
290 
291  OrphanablePtr<Resolver> resolver_;
292  };
293 
294  struct DiscoveryMechanismEntry {
295  OrphanablePtr<DiscoveryMechanism> discovery_mechanism;
296  // Most recent update reported by the discovery mechanism.
298  // State used to retain child policy names for priority policy.
299  std::vector<size_t /*child_number*/> priority_child_numbers;
301 
302  const XdsClusterResolverLbConfig::DiscoveryMechanism& config() const;
303 
304  // Returns the child policy name for a given priority.
305  std::string GetChildPolicyName(size_t priority) const;
306  };
307 
308  class Helper : public ChannelControlHelper {
309  public:
310  explicit Helper(
311  RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy)
312  : xds_cluster_resolver_policy_(std::move(xds_cluster_resolver_policy)) {
313  }
314 
315  ~Helper() override {
317  }
318 
319  RefCountedPtr<SubchannelInterface> CreateSubchannel(
320  ServerAddress address, const grpc_channel_args& args) override;
321  void UpdateState(grpc_connectivity_state state, const absl::Status& status,
322  std::unique_ptr<SubchannelPicker> picker) override;
323  // This is a no-op, because we get the addresses from the xds
324  // client, which is a watch-based API.
325  void RequestReresolution() override {}
326  absl::string_view GetAuthority() override;
327  void AddTraceEvent(TraceSeverity severity,
328  absl::string_view message) override;
329 
330  private:
331  RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy_;
332  };
333 
334  ~XdsClusterResolverLb() override;
335 
336  void ShutdownLocked() override;
337 
338  void OnEndpointChanged(size_t index, XdsEndpointResource update);
339  void OnError(size_t index, absl::Status status);
340  void OnResourceDoesNotExist(size_t index);
341 
342  void MaybeDestroyChildPolicyLocked();
343 
344  void UpdateChildPolicyLocked();
345  OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
346  const grpc_channel_args* args);
347  ServerAddressList CreateChildPolicyAddressesLocked();
348  RefCountedPtr<Config> CreateChildPolicyConfigLocked();
349  grpc_channel_args* CreateChildPolicyArgsLocked(
350  const grpc_channel_args* args_in);
351 
352  // The xds client and endpoint watcher.
353  RefCountedPtr<XdsClient> xds_client_;
354 
355  // Current channel args and config from the resolver.
356  const grpc_channel_args* args_ = nullptr;
357  RefCountedPtr<XdsClusterResolverLbConfig> config_;
358 
359  // Internal state.
360  bool shutting_down_ = false;
361 
362  // Vector of discovery mechansism entries in priority order.
363  std::vector<DiscoveryMechanismEntry> discovery_mechanisms_;
364 
365  OrphanablePtr<LoadBalancingPolicy> child_policy_;
366 };
367 
368 //
369 // XdsClusterResolverLb::Helper
370 //
371 
372 RefCountedPtr<SubchannelInterface>
373 XdsClusterResolverLb::Helper::CreateSubchannel(ServerAddress address,
374  const grpc_channel_args& args) {
375  if (xds_cluster_resolver_policy_->shutting_down_) return nullptr;
376  return xds_cluster_resolver_policy_->channel_control_helper()
377  ->CreateSubchannel(std::move(address), args);
378 }
379 
380 void XdsClusterResolverLb::Helper::UpdateState(
382  std::unique_ptr<SubchannelPicker> picker) {
383  if (xds_cluster_resolver_policy_->shutting_down_ ||
384  xds_cluster_resolver_policy_->child_policy_ == nullptr) {
385  return;
386  }
389  "[xds_cluster_resolver_lb %p] child policy updated state=%s (%s) "
390  "picker=%p",
392  status.ToString().c_str(), picker.get());
393  }
394  xds_cluster_resolver_policy_->channel_control_helper()->UpdateState(
395  state, status, std::move(picker));
396 }
397 
398 absl::string_view XdsClusterResolverLb::Helper::GetAuthority() {
399  return xds_cluster_resolver_policy_->channel_control_helper()->GetAuthority();
400 }
401 
402 void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity,
404  if (xds_cluster_resolver_policy_->shutting_down_) return;
405  xds_cluster_resolver_policy_->channel_control_helper()->AddTraceEvent(
406  severity, message);
407 }
408 
409 //
410 // XdsClusterResolverLb::EdsDiscoveryMechanism
411 //
412 
413 void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() {
416  "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
417  ":%p starting xds watch for %s",
418  parent(), index(), this, std::string(GetEdsResourceName()).c_str());
419  }
420  auto watcher = MakeRefCounted<EndpointWatcher>(
421  Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"));
422  watcher_ = watcher.get();
424  GetEdsResourceName(), std::move(watcher));
425 }
426 
427 void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() {
430  "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
431  ":%p cancelling xds watch for %s",
432  parent(), index(), this, std::string(GetEdsResourceName()).c_str());
433  }
435  GetEdsResourceName(), watcher_);
436  Unref();
437 }
438 
439 //
440 // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism
441 //
442 
443 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() {
445  parent()->config_->discovery_mechanisms()[index()].dns_hostname;
446  grpc_channel_args* args = nullptr;
447  FakeResolverResponseGenerator* fake_resolver_response_generator =
448  grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
449  parent()->args_,
451  if (fake_resolver_response_generator != nullptr) {
452  target = absl::StrCat("fake:", target);
454  fake_resolver_response_generator);
455  args = grpc_channel_args_copy_and_add(parent()->args_, &new_arg, 1);
456  } else {
457  target = absl::StrCat("dns:", target);
458  args = grpc_channel_args_copy(parent()->args_);
459  }
461  target.c_str(), args, parent()->interested_parties(),
462  parent()->work_serializer(),
463  absl::make_unique<ResolverResultHandler>(
464  Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism")));
466  if (resolver_ == nullptr) {
467  parent()->OnResourceDoesNotExist(index());
468  return;
469  }
470  resolver_->StartLocked();
473  "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism "
474  "%" PRIuPTR ":%p starting dns resolver %p",
475  parent(), index(), this, resolver_.get());
476  }
477 }
478 
479 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() {
481  gpr_log(
482  GPR_INFO,
483  "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism %" PRIuPTR
484  ":%p shutting down dns resolver %p",
485  parent(), index(), this, resolver_.get());
486  }
487  resolver_.reset();
488  Unref();
489 }
490 
491 //
492 // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler
493 //
494 
495 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
496  ReportResult(Resolver::Result result) {
497  if (!result.addresses.ok()) {
498  discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(),
499  result.addresses.status());
500  return;
501  }
502  // Convert resolver result to EDS update.
503  // TODO(roth): Figure out a way to pass resolution_note through to the
504  // child policy.
505  XdsEndpointResource update;
506  XdsEndpointResource::Priority::Locality locality;
507  locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
508  locality.lb_weight = 1;
509  locality.endpoints = std::move(*result.addresses);
510  XdsEndpointResource::Priority priority;
511  priority.localities.emplace(locality.name.get(), std::move(locality));
512  update.priorities.emplace_back(std::move(priority));
513  discovery_mechanism_->parent()->OnEndpointChanged(
515 }
516 
517 //
518 // XdsClusterResolverLb::DiscoveryMechanismEntry
519 //
520 
521 const XdsClusterResolverLbConfig::DiscoveryMechanism&
523  return discovery_mechanism->parent()
524  ->config_->discovery_mechanisms()[discovery_mechanism->index()];
525 }
526 
527 std::string XdsClusterResolverLb::DiscoveryMechanismEntry::GetChildPolicyName(
528  size_t priority) const {
529  return absl::StrCat("{cluster=", config().cluster_name,
530  ", child_number=", priority_child_numbers[priority], "}");
531 }
532 
533 //
534 // XdsClusterResolverLb public methods
535 //
536 
537 XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,
538  Args args)
539  : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
541  gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] created -- xds_client=%p",
542  this, xds_client_.get());
543  }
544 }
545 
546 XdsClusterResolverLb::~XdsClusterResolverLb() {
549  "[xds_cluster_resolver_lb %p] destroying xds_cluster_resolver LB "
550  "policy",
551  this);
552  }
553 }
554 
555 void XdsClusterResolverLb::ShutdownLocked() {
557  gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] shutting down", this);
558  }
559  shutting_down_ = true;
560  MaybeDestroyChildPolicyLocked();
561  discovery_mechanisms_.clear();
562  xds_client_.reset(DEBUG_LOCATION, "XdsClusterResolverLb");
563  // Destroy channel args.
565  args_ = nullptr;
566 }
567 
568 void XdsClusterResolverLb::MaybeDestroyChildPolicyLocked() {
569  if (child_policy_ != nullptr) {
570  grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
571  interested_parties());
572  child_policy_.reset();
573  }
574 }
575 
576 void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) {
578  gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Received update", this);
579  }
580  const bool is_initial_update = args_ == nullptr;
581  // Update config.
582  auto old_config = std::move(config_);
583  config_ = std::move(args.config);
584  // Update args.
586  args_ = args.args;
587  args.args = nullptr;
588  // Update child policy if needed.
589  if (child_policy_ != nullptr) UpdateChildPolicyLocked();
590  // Create endpoint watcher if needed.
591  if (is_initial_update) {
592  for (const auto& config : config_->discovery_mechanisms()) {
593  DiscoveryMechanismEntry entry;
594  if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
595  DiscoveryMechanismType::EDS) {
596  entry.discovery_mechanism = MakeOrphanable<EdsDiscoveryMechanism>(
597  Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"),
598  discovery_mechanisms_.size());
599  } else if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
600  DiscoveryMechanismType::LOGICAL_DNS) {
601  entry.discovery_mechanism =
602  MakeOrphanable<LogicalDNSDiscoveryMechanism>(
603  Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"),
604  discovery_mechanisms_.size());
605  } else {
606  GPR_ASSERT(0);
607  }
608  discovery_mechanisms_.push_back(std::move(entry));
609  }
610  // Call start() on all discovery mechanisms after creation.
611  for (const auto& discovery_mechanism : discovery_mechanisms_) {
612  discovery_mechanism.discovery_mechanism->Start();
613  }
614  }
615 }
616 
617 void XdsClusterResolverLb::ResetBackoffLocked() {
618  if (child_policy_ != nullptr) {
619  child_policy_->ResetBackoffLocked();
620  }
621 }
622 
624  if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
625 }
626 
627 void XdsClusterResolverLb::OnEndpointChanged(size_t index,
628  XdsEndpointResource update) {
629  if (shutting_down_) return;
632  "[xds_cluster_resolver_lb %p] Received update from xds client"
633  " for discovery mechanism %" PRIuPTR "",
634  this, index);
635  }
636  DiscoveryMechanismEntry& discovery_entry = discovery_mechanisms_[index];
637  // We need at least one priority for each discovery mechanism, just so that we
638  // have a child in which to create the xds_cluster_impl policy. This ensures
639  // that we properly handle the case of a discovery mechanism dropping 100% of
640  // calls, the OnError() case, and the OnResourceDoesNotExist() case.
641  if (update.priorities.empty()) update.priorities.emplace_back();
642  // Update priority_child_numbers, reusing old child numbers in an
643  // intelligent way to avoid unnecessary churn.
644  // First, build some maps from locality to child number and the reverse
645  // from the old data in the entry's update and priority_child_numbers.
646  std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
647  locality_child_map;
648  std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
649  child_locality_map;
650  if (discovery_entry.latest_update.has_value()) {
651  const auto& prev_priority_list = discovery_entry.latest_update->priorities;
652  for (size_t priority = 0; priority < prev_priority_list.size();
653  ++priority) {
654  size_t child_number = discovery_entry.priority_child_numbers[priority];
655  const auto& localities = prev_priority_list[priority].localities;
656  for (const auto& p : localities) {
657  XdsLocalityName* locality_name = p.first;
658  locality_child_map[locality_name] = child_number;
659  child_locality_map[child_number].insert(locality_name);
660  }
661  }
662  }
663  // Construct new list of children.
664  std::vector<size_t> priority_child_numbers;
665  for (size_t priority = 0; priority < update.priorities.size(); ++priority) {
666  const auto& localities = update.priorities[priority].localities;
667  absl::optional<size_t> child_number;
668  // If one of the localities in this priority already existed, reuse its
669  // child number.
670  for (const auto& p : localities) {
671  XdsLocalityName* locality_name = p.first;
672  if (!child_number.has_value()) {
673  auto it = locality_child_map.find(locality_name);
674  if (it != locality_child_map.end()) {
675  child_number = it->second;
676  locality_child_map.erase(it);
677  // Remove localities that *used* to be in this child number, so
678  // that we don't incorrectly reuse this child number for a
679  // subsequent priority.
680  for (XdsLocalityName* old_locality :
681  child_locality_map[*child_number]) {
682  locality_child_map.erase(old_locality);
683  }
684  }
685  } else {
686  // Remove all localities that are now in this child number, so
687  // that we don't accidentally reuse this child number for a
688  // subsequent priority.
689  locality_child_map.erase(locality_name);
690  }
691  }
692  // If we didn't find an existing child number, assign a new one.
693  if (!child_number.has_value()) {
694  for (child_number = discovery_entry.next_available_child_number;
695  child_locality_map.find(*child_number) != child_locality_map.end();
696  ++(*child_number)) {
697  }
698  discovery_entry.next_available_child_number = *child_number + 1;
699  // Add entry so we know that the child number is in use.
700  // (Don't need to add the list of localities, since we won't use them.)
701  child_locality_map[*child_number];
702  }
703  priority_child_numbers.push_back(*child_number);
704  }
705  // Save update.
706  discovery_entry.latest_update = std::move(update);
707  discovery_entry.priority_child_numbers = std::move(priority_child_numbers);
708  // If any discovery mechanism has not received its first update,
709  // wait until that happens before creating the child policy.
710  // TODO(roth): If this becomes problematic in the future (e.g., a
711  // secondary discovery mechanism delaying us from starting up at all),
712  // we can consider some sort of optimization whereby we can create the
713  // priority policy with only a subset of its children. But we need to
714  // make sure not to get into a situation where the priority policy
715  // will put the channel into TRANSIENT_FAILURE instead of CONNECTING
716  // while we're still waiting for the other discovery mechanism(s).
717  for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
718  if (!mechanism.latest_update.has_value()) return;
719  }
720  // Update child policy.
721  UpdateChildPolicyLocked();
722 }
723 
726  "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
727  " xds watcher reported error: %s",
728  this, index, status.ToString().c_str());
729  if (shutting_down_) return;
730  if (!discovery_mechanisms_[index].latest_update.has_value()) {
731  // Call OnEndpointChanged with an empty update just like
732  // OnResourceDoesNotExist.
733  OnEndpointChanged(index, XdsEndpointResource());
734  }
735 }
736 
737 void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index) {
739  "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
740  " resource does not exist",
741  this, index);
742  if (shutting_down_) return;
743  // Call OnEndpointChanged with an empty update.
744  OnEndpointChanged(index, XdsEndpointResource());
745 }
746 
747 //
748 // child policy-related methods
749 //
750 
751 ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
752  ServerAddressList addresses;
753  for (const auto& discovery_entry : discovery_mechanisms_) {
754  for (size_t priority = 0;
755  priority < discovery_entry.latest_update->priorities.size();
756  ++priority) {
757  const auto& priority_entry =
758  discovery_entry.latest_update->priorities[priority];
759  std::string priority_child_name =
760  discovery_entry.GetChildPolicyName(priority);
761  for (const auto& p : priority_entry.localities) {
762  const auto& locality_name = p.first;
763  const auto& locality = p.second;
764  std::vector<std::string> hierarchical_path = {
765  priority_child_name, locality_name->AsHumanReadableString()};
766  for (const auto& endpoint : locality.endpoints) {
767  const ServerAddressWeightAttribute* weight_attribute = static_cast<
768  const ServerAddressWeightAttribute*>(endpoint.GetAttribute(
769  ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
770  uint32_t weight = locality.lb_weight;
771  if (weight_attribute != nullptr) {
772  weight = locality.lb_weight * weight_attribute->weight();
773  }
774  addresses.emplace_back(
775  endpoint
776  .WithAttribute(
778  MakeHierarchicalPathAttribute(hierarchical_path))
779  .WithAttribute(kXdsLocalityNameAttributeKey,
780  absl::make_unique<XdsLocalityAttribute>(
781  locality_name->Ref()))
782  .WithAttribute(
783  ServerAddressWeightAttribute::
784  kServerAddressWeightAttributeKey,
785  absl::make_unique<ServerAddressWeightAttribute>(weight)));
786  }
787  }
788  }
789  }
790  return addresses;
791 }
792 
793 RefCountedPtr<LoadBalancingPolicy::Config>
794 XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
795  Json::Object priority_children;
796  Json::Array priority_priorities;
797  for (const auto& discovery_entry : discovery_mechanisms_) {
798  const auto& discovery_config = discovery_entry.config();
799  for (size_t priority = 0;
800  priority < discovery_entry.latest_update->priorities.size();
801  ++priority) {
802  const auto& priority_entry =
803  discovery_entry.latest_update->priorities[priority];
804  Json child_policy;
805  if (!discovery_entry.discovery_mechanism->override_child_policy()
806  .empty()) {
807  child_policy =
808  discovery_entry.discovery_mechanism->override_child_policy();
809  } else {
810  const auto& xds_lb_policy = config_->xds_lb_policy().object_value();
811  if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) {
812  const auto& localities = priority_entry.localities;
813  Json::Object weighted_targets;
814  for (const auto& p : localities) {
815  XdsLocalityName* locality_name = p.first;
816  const auto& locality = p.second;
817  // Add weighted target entry.
818  weighted_targets[locality_name->AsHumanReadableString()] =
819  Json::Object{
820  {"weight", locality.lb_weight},
821  {"childPolicy",
822  Json::Array{
823  Json::Object{
824  {"round_robin", Json::Object()},
825  },
826  }},
827  };
828  }
829  // Construct locality-picking policy.
830  // Start with field from our config and add the "targets" field.
831  child_policy = Json::Array{
832  Json::Object{
833  {"weighted_target_experimental",
834  Json::Object{
835  {"targets", Json::Object()},
836  }},
837  },
838  };
839  Json::Object& config =
840  *(*child_policy.mutable_array())[0].mutable_object();
841  auto it = config.begin();
842  GPR_ASSERT(it != config.end());
843  (*it->second.mutable_object())["targets"] =
844  std::move(weighted_targets);
845  } else {
846  auto it = xds_lb_policy.find("RING_HASH");
847  GPR_ASSERT(it != xds_lb_policy.end());
848  Json::Object ring_hash_experimental_policy =
849  it->second.object_value();
850  child_policy = Json::Array{
851  Json::Object{
852  {"ring_hash_experimental", ring_hash_experimental_policy},
853  },
854  };
855  }
856  }
857  // Wrap it in the drop policy.
858  Json::Array drop_categories;
859  if (discovery_entry.latest_update->drop_config != nullptr) {
860  for (const auto& category :
861  discovery_entry.latest_update->drop_config->drop_category_list()) {
862  drop_categories.push_back(Json::Object{
863  {"category", category.name},
864  {"requests_per_million", category.parts_per_million},
865  });
866  }
867  }
868  Json::Object xds_cluster_impl_config = {
869  {"clusterName", discovery_config.cluster_name},
870  {"childPolicy", std::move(child_policy)},
871  {"dropCategories", std::move(drop_categories)},
872  {"maxConcurrentRequests", discovery_config.max_concurrent_requests},
873  };
874  if (!discovery_config.eds_service_name.empty()) {
875  xds_cluster_impl_config["edsServiceName"] =
876  discovery_config.eds_service_name;
877  }
878  if (discovery_config.lrs_load_reporting_server.has_value()) {
879  xds_cluster_impl_config["lrsLoadReportingServer"] =
880  discovery_config.lrs_load_reporting_server->ToJson();
881  }
882  Json locality_picking_policy;
884  Json::Object outlier_detection_config;
885  if (discovery_entry.config().outlier_detection_lb_config.has_value()) {
886  outlier_detection_config =
887  discovery_entry.config().outlier_detection_lb_config.value();
888  } else {
889  // outlier detection will be a no-op
890  outlier_detection_config["interval"] =
891  Duration::Infinity().ToJsonString();
892  }
893  outlier_detection_config["childPolicy"] = Json::Array{Json::Object{
894  {"xds_cluster_impl_experimental",
895  std::move(xds_cluster_impl_config)},
896  }};
897  locality_picking_policy = Json::Array{Json::Object{
898  {"outlier_detection_experimental",
899  std::move(outlier_detection_config)},
900  }};
901  } else {
902  locality_picking_policy = Json::Array{Json::Object{
903  {"xds_cluster_impl_experimental",
904  std::move(xds_cluster_impl_config)},
905  }};
906  }
907  // Add priority entry, with the appropriate child name.
908  std::string child_name = discovery_entry.GetChildPolicyName(priority);
909  priority_priorities.emplace_back(child_name);
910  Json::Object child_config = {
911  {"config", std::move(locality_picking_policy)},
912  };
913  if (discovery_entry.discovery_mechanism->disable_reresolution()) {
914  child_config["ignore_reresolution_requests"] = true;
915  }
916  priority_children[child_name] = std::move(child_config);
917  }
918  }
919  Json json = Json::Array{Json::Object{
920  {"priority_experimental",
921  Json::Object{
922  {"children", std::move(priority_children)},
923  {"priorities", std::move(priority_priorities)},
924  }},
925  }};
927  std::string json_str = json.Dump(/*indent=*/1);
928  gpr_log(
929  GPR_INFO,
930  "[xds_cluster_resolver_lb %p] generated config for child policy: %s",
931  this, json_str.c_str());
932  }
934  RefCountedPtr<LoadBalancingPolicy::Config> config =
935  LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
936  if (!GRPC_ERROR_IS_NONE(error)) {
937  // This should never happen, but if it does, we basically have no
938  // way to fix it, so we put the channel in TRANSIENT_FAILURE.
940  "[xds_cluster_resolver_lb %p] error parsing generated child policy "
941  "config -- "
942  "will put channel in TRANSIENT_FAILURE: %s",
945  "xds_cluster_resolver LB policy: error parsing generated child policy "
946  "config");
947  channel_control_helper()->UpdateState(
949  absl::make_unique<TransientFailurePicker>(status));
950  return nullptr;
951  }
952  return config;
953 }
954 
955 void XdsClusterResolverLb::UpdateChildPolicyLocked() {
956  if (shutting_down_) return;
957  UpdateArgs update_args;
958  update_args.config = CreateChildPolicyConfigLocked();
959  if (update_args.config == nullptr) return;
960  update_args.addresses = CreateChildPolicyAddressesLocked();
961  update_args.args = CreateChildPolicyArgsLocked(args_);
962  if (child_policy_ == nullptr) {
963  child_policy_ = CreateChildPolicyLocked(update_args.args);
964  }
966  gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Updating child policy %p",
967  this, child_policy_.get());
968  }
969  child_policy_->UpdateLocked(std::move(update_args));
970 }
971 
972 grpc_channel_args* XdsClusterResolverLb::CreateChildPolicyArgsLocked(
973  const grpc_channel_args* args) {
975  // Inhibit client-side health checking, since the balancer does this
976  // for us.
978  const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
979  };
980  return grpc_channel_args_copy_and_add(args, new_args.data(), new_args.size());
981 }
982 
983 OrphanablePtr<LoadBalancingPolicy>
984 XdsClusterResolverLb::CreateChildPolicyLocked(const grpc_channel_args* args) {
985  LoadBalancingPolicy::Args lb_policy_args;
986  lb_policy_args.work_serializer = work_serializer();
987  lb_policy_args.args = args;
988  lb_policy_args.channel_control_helper =
989  absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
990  OrphanablePtr<LoadBalancingPolicy> lb_policy =
991  LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
992  "priority_experimental", std::move(lb_policy_args));
993  if (GPR_UNLIKELY(lb_policy == nullptr)) {
995  "[xds_cluster_resolver_lb %p] failure creating child policy", this);
996  return nullptr;
997  }
1000  "[xds_cluster_resolver_lb %p]: Created new child policy %p", this,
1001  lb_policy.get());
1002  }
1003  // Add our interested_parties pollset_set to that of the newly created
1004  // child policy. This will make the child policy progress upon activity on
1005  // this policy, which in turn is tied to the application's call.
1006  grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1007  interested_parties());
1008  return lb_policy;
1009 }
1010 
1011 //
1012 // factory
1013 //
1014 
1015 class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
1016  public:
1017  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1018  LoadBalancingPolicy::Args args) const override {
1019  RefCountedPtr<XdsClient> xds_client =
1020  XdsClient::GetFromChannelArgs(*args.args);
1021  if (xds_client == nullptr) {
1023  "XdsClient not present in channel args -- cannot instantiate "
1024  "xds_cluster_resolver LB policy");
1025  return nullptr;
1026  }
1027  return MakeOrphanable<XdsClusterResolverChildHandler>(std::move(xds_client),
1028  std::move(args));
1029  }
1030 
1031  const char* name() const override { return kXdsClusterResolver; }
1032 
1033  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1034  const Json& json, grpc_error_handle* error) const override {
1036  if (json.type() == Json::Type::JSON_NULL) {
1037  // xds_cluster_resolver was mentioned as a policy in the deprecated
1038  // loadBalancingPolicy field or in the client API.
1040  "field:loadBalancingPolicy error:xds_cluster_resolver policy "
1041  "requires configuration. "
1042  "Please use loadBalancingConfig field of service config instead.");
1043  return nullptr;
1044  }
1045  std::vector<grpc_error_handle> error_list;
1046  std::vector<XdsClusterResolverLbConfig::DiscoveryMechanism>
1047  discovery_mechanisms;
1048  auto it = json.object_value().find("discoveryMechanisms");
1049  if (it == json.object_value().end()) {
1050  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1051  "field:discoveryMechanisms error:required field missing"));
1052  } else if (it->second.type() != Json::Type::ARRAY) {
1053  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1054  "field:discoveryMechanisms error:type should be array"));
1055  } else {
1056  const Json::Array& array = it->second.array_value();
1057  for (size_t i = 0; i < array.size(); ++i) {
1058  XdsClusterResolverLbConfig::DiscoveryMechanism discovery_mechanism;
1059  std::vector<grpc_error_handle> discovery_mechanism_errors =
1060  ParseDiscoveryMechanism(array[i], &discovery_mechanism);
1061  if (!discovery_mechanism_errors.empty()) {
1063  absl::StrCat("field:discovery_mechanism element: ", i, " error"));
1064  for (const grpc_error_handle& discovery_mechanism_error :
1065  discovery_mechanism_errors) {
1066  error = grpc_error_add_child(error, discovery_mechanism_error);
1067  }
1068  error_list.push_back(error);
1069  }
1070  discovery_mechanisms.emplace_back(std::move(discovery_mechanism));
1071  }
1072  }
1073  if (discovery_mechanisms.empty()) {
1074  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1075  "field:discovery_mechanism error:list is missing or empty"));
1076  }
1077  Json xds_lb_policy = Json::Object{
1078  {"ROUND_ROBIN", Json::Object()},
1079  };
1080  it = json.object_value().find("xdsLbPolicy");
1081  if (it != json.object_value().end()) {
1082  if (it->second.type() != Json::Type::ARRAY) {
1083  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1084  "field:xdsLbPolicy error:type should be array"));
1085  } else {
1086  const Json::Array& array = it->second.array_value();
1087  for (size_t i = 0; i < array.size(); ++i) {
1088  if (array[i].type() != Json::Type::OBJECT) {
1089  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1090  "field:xdsLbPolicy error:element should be of type object"));
1091  continue;
1092  }
1093  const Json::Object& policy = array[i].object_value();
1094  auto policy_it = policy.find("ROUND_ROBIN");
1095  if (policy_it != policy.end()) {
1096  if (policy_it->second.type() != Json::Type::OBJECT) {
1097  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1098  "field:ROUND_ROBIN error:type should be object"));
1099  }
1100  break;
1101  }
1102  policy_it = policy.find("RING_HASH");
1103  if (policy_it != policy.end()) {
1104  xds_lb_policy = array[i];
1105  size_t min_ring_size;
1106  size_t max_ring_size;
1107  ParseRingHashLbConfig(policy_it->second, &min_ring_size,
1108  &max_ring_size, &error_list);
1109  }
1110  }
1111  }
1112  }
1113  // Construct config.
1114  if (error_list.empty()) {
1115  return MakeRefCounted<XdsClusterResolverLbConfig>(
1116  std::move(discovery_mechanisms), std::move(xds_lb_policy));
1117  } else {
1119  "xds_cluster_resolver_experimental LB policy config", &error_list);
1120  return nullptr;
1121  }
1122  }
1123 
1124  private:
1125  static std::vector<grpc_error_handle> ParseDiscoveryMechanism(
1126  const Json& json,
1127  XdsClusterResolverLbConfig::DiscoveryMechanism* discovery_mechanism) {
1128  std::vector<grpc_error_handle> error_list;
1129  if (json.type() != Json::Type::OBJECT) {
1130  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1131  "value should be of type object"));
1132  return error_list;
1133  }
1134  // Cluster name.
1135  auto it = json.object_value().find("clusterName");
1136  if (it == json.object_value().end()) {
1137  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1138  "field:clusterName error:required field missing"));
1139  } else if (it->second.type() != Json::Type::STRING) {
1140  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1141  "field:clusterName error:type should be string"));
1142  } else {
1143  discovery_mechanism->cluster_name = it->second.string_value();
1144  }
1145  // LRS load reporting server name.
1146  it = json.object_value().find("lrsLoadReportingServer");
1147  if (it != json.object_value().end()) {
1148  if (it->second.type() != Json::Type::OBJECT) {
1149  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1150  "field:lrsLoadReportingServer error:type should be object"));
1151  } else {
1153  discovery_mechanism->lrs_load_reporting_server.emplace(
1156  error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
1157  absl::StrCat("errors parsing lrs_load_reporting_server")));
1158  error_list.push_back(parse_error);
1159  }
1160  }
1161  }
1162  // Max concurrent requests.
1163  discovery_mechanism->max_concurrent_requests = 1024;
1164  it = json.object_value().find("max_concurrent_requests");
1165  if (it != json.object_value().end()) {
1166  if (it->second.type() != Json::Type::NUMBER) {
1167  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1168  "field:max_concurrent_requests error:must be of type number"));
1169  } else {
1170  discovery_mechanism->max_concurrent_requests =
1171  gpr_parse_nonnegative_int(it->second.string_value().c_str());
1172  }
1173  }
1175  it = json.object_value().find("outlierDetection");
1176  if (it != json.object_value().end()) {
1177  if (it->second.type() != Json::Type::OBJECT) {
1178  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1179  "field:outlierDetection error:type should be object"));
1180  } else {
1181  // No need to validate the contents of the outlier detection config,
1182  // because in this particular case, the JSON is generated by the CDS
1183  // policy instead of coming from service config, so it's not actually
1184  // any better to catch the problem here than it is to catch it in the
1185  // outlier_detection policy itself, so here we just act as a
1186  // pass-through.
1187  discovery_mechanism->outlier_detection_lb_config =
1188  it->second.object_value();
1189  }
1190  }
1191  }
1192  // Discovery Mechanism type
1193  it = json.object_value().find("type");
1194  if (it == json.object_value().end()) {
1195  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1196  "field:type error:required field missing"));
1197  } else if (it->second.type() != Json::Type::STRING) {
1198  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1199  "field:type error:type should be string"));
1200  } else {
1201  if (it->second.string_value() == "EDS") {
1202  discovery_mechanism->type = XdsClusterResolverLbConfig::
1203  DiscoveryMechanism::DiscoveryMechanismType::EDS;
1204  it = json.object_value().find("edsServiceName");
1205  if (it != json.object_value().end()) {
1206  if (it->second.type() != Json::Type::STRING) {
1207  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1208  "field:edsServiceName error:type should be string"));
1209  } else {
1210  discovery_mechanism->eds_service_name = it->second.string_value();
1211  }
1212  }
1213  } else if (it->second.string_value() == "LOGICAL_DNS") {
1214  discovery_mechanism->type = XdsClusterResolverLbConfig::
1215  DiscoveryMechanism::DiscoveryMechanismType::LOGICAL_DNS;
1216  it = json.object_value().find("dnsHostname");
1217  if (it == json.object_value().end()) {
1218  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1219  "field:dnsHostname error:required field missing"));
1220  } else if (it->second.type() != Json::Type::STRING) {
1221  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1222  "field:dnsHostname error:type should be string"));
1223  } else {
1224  discovery_mechanism->dns_hostname = it->second.string_value();
1225  }
1226  } else {
1227  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1228  "field:type error:invalid type"));
1229  }
1230  }
1231  return error_list;
1232  }
1233 
1234  class XdsClusterResolverChildHandler : public ChildPolicyHandler {
1235  public:
1236  XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,
1237  Args args)
1238  : ChildPolicyHandler(std::move(args),
1240  xds_client_(std::move(xds_client)) {}
1241 
1242  bool ConfigChangeRequiresNewPolicyInstance(
1243  LoadBalancingPolicy::Config* old_config,
1244  LoadBalancingPolicy::Config* new_config) const override {
1245  GPR_ASSERT(old_config->name() == kXdsClusterResolver);
1246  GPR_ASSERT(new_config->name() == kXdsClusterResolver);
1247  XdsClusterResolverLbConfig* old_xds_cluster_resolver_config =
1248  static_cast<XdsClusterResolverLbConfig*>(old_config);
1249  XdsClusterResolverLbConfig* new_xds_cluster_resolver_config =
1250  static_cast<XdsClusterResolverLbConfig*>(new_config);
1251  return old_xds_cluster_resolver_config->discovery_mechanisms() !=
1252  new_xds_cluster_resolver_config->discovery_mechanisms();
1253  }
1254 
1255  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1256  const char* /*name*/, LoadBalancingPolicy::Args args) const override {
1257  return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args));
1258  }
1259 
1260  private:
1261  RefCountedPtr<XdsClient> xds_client_;
1262  };
1263 };
1264 
1265 } // namespace
1266 
1267 } // namespace grpc_core
1268 
1269 //
1270 // Plugin registration
1271 //
1272 
1276  absl::make_unique<grpc_core::XdsClusterResolverLbFactory>());
1277 }
1278 
grpc_core::Json::Array
std::vector< Json > Array
Definition: src/core/lib/json/json.h:55
grpc_arg
Definition: grpc_types.h:103
trace.h
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::XdsResourceTypeImpl< XdsEndpointResourceType, XdsEndpointResource >::CancelWatch
static void CancelWatch(XdsClient *xds_client, absl::string_view resource_name, WatcherInterface *watcher, bool delay_unsubscription=false)
Definition: xds_resource_type_impl.h:66
address_filtering.h
regen-readme.it
it
Definition: regen-readme.py:15
grpc_core::LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory
static void RegisterLoadBalancingPolicyFactory(std::unique_ptr< LoadBalancingPolicyFactory > factory)
Definition: lb_policy_registry.cc:87
orphanable.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
core_configuration.h
bloat_diff.severity
def severity
Definition: bloat_diff.py:143
grpc_event_engine::experimental::slice_detail::operator==
bool operator==(const BaseSlice &a, const BaseSlice &b)
Definition: include/grpc/event_engine/slice.h:117
priority
int priority
Definition: abseil-cpp/absl/synchronization/internal/graphcycles.cc:286
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
connectivity_state.h
xds_client_stats.h
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
grpc_lb_policy_xds_cluster_resolver_init
void grpc_lb_policy_xds_cluster_resolver_init()
Definition: xds_cluster_resolver.cc:1273
xds_client_
RefCountedPtr< XdsClient > xds_client_
Definition: xds_cluster_resolver.cc:353
GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR
#define GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR
Definition: filters/client_channel/lb_policy/xds/xds_channel_args.h:26
grpc_core
Definition: call_metric_recorder.h:31
cluster_name
std::string cluster_name
Definition: xds_cluster_resolver.cc:91
string.h
xds_endpoint.h
ExitIdleLocked
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb void ExitIdleLocked()
Definition: rls.cc:299
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_core::kHierarchicalPathAttributeKey
const char * kHierarchicalPathAttributeKey
Definition: address_filtering.cc:34
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
lb_policy.h
fake_resolver.h
lb_policy_factory.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc_core::LoadBalancingPolicy::Config
Definition: lb_policy.h:305
status
absl::Status status
Definition: rls.cc:251
grpc_core::XdsOutlierDetectionEnabled
bool XdsOutlierDetectionEnabled()
Definition: outlier_detection.cc:77
xds_resource_type_impl.h
setup.name
name
Definition: setup.py:542
absl::InternalError
Status InternalError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:347
args_
const grpc_channel_args * args_
Definition: xds_cluster_resolver.cc:356
xds_manager.p
p
Definition: xds_manager.py:60
grpc_core::XdsResourceTypeImpl< XdsEndpointResourceType, XdsEndpointResource >::StartWatch
static void StartWatch(XdsClient *xds_client, absl::string_view resource_name, RefCountedPtr< WatcherInterface > watcher)
Definition: xds_resource_type_impl.h:62
latest_update
absl::optional< XdsEndpointResource > latest_update
Definition: xds_cluster_resolver.cc:297
watcher_
EndpointWatcher * watcher_
Definition: xds_cluster_resolver.cc:252
grpc_pollset_set_del_pollset_set
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:52
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
subchannel_interface.h
grpc_core::grpc_lb_xds_cluster_resolver_trace
TraceFlag grpc_lb_xds_cluster_resolver_trace(false, "xds_cluster_resolver_lb")
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
GRPC_ERROR_CREATE_FROM_VECTOR
#define GRPC_ERROR_CREATE_FROM_VECTOR(desc, error_list)
Definition: error.h:314
grpc_core::MakeHierarchicalPathAttribute
std::unique_ptr< ServerAddress::AttributeInterface > MakeHierarchicalPathAttribute(std::vector< std::string > path)
Definition: address_filtering.cc:72
grpc_types.h
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
discovery_mechanism_
RefCountedPtr< EdsDiscoveryMechanism > discovery_mechanism_
Definition: xds_cluster_resolver.cc:238
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_error_add_child
grpc_error_handle grpc_error_add_child(grpc_error_handle src, grpc_error_handle child)
Definition: error.cc:678
discovery_mechanisms_
std::vector< DiscoveryMechanism > discovery_mechanisms_
Definition: xds_cluster_resolver.cc:127
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
outlier_detection_lb_config
absl::optional< Json::Object > outlier_detection_lb_config
Definition: xds_cluster_resolver.cc:101
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
hpack_encoder_fixtures::Args
Args({0, 16384})
array
Definition: undname.c:101
discovery_mechanism
OrphanablePtr< DiscoveryMechanism > discovery_mechanism
Definition: xds_cluster_resolver.cc:295
grpc_core::CoreConfiguration::Get
static const CoreConfiguration & Get()
Definition: core_configuration.h:82
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
re2::Result
TestInstance::Result Result
Definition: bloaty/third_party/re2/re2/testing/tester.cc:96
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
config
struct config_s config
Json
JSON (JavaScript Object Notation).
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:227
xds_client.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
work_serializer.h
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
xds_bootstrap.h
connectivity_state.h
lrs_load_reporting_server
absl::optional< XdsBootstrap::XdsServer > lrs_load_reporting_server
Definition: xds_cluster_resolver.cc:92
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
pollset_set.h
xds_cluster_resolver_policy_
RefCountedPtr< XdsClusterResolverLb > xds_cluster_resolver_policy_
Definition: xds_cluster_resolver.cc:331
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
gen_settings_ids.OnError
OnError
Definition: gen_settings_ids.py:27
server_address.h
time.h
xds_channel_args.h
grpc_channel_args_copy
grpc_channel_args * grpc_channel_args_copy(const grpc_channel_args *src)
Definition: channel_args.cc:285
absl::InlinedVector::data
pointer data() noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:302
index_
size_t index_
Definition: xds_cluster_resolver.cc:169
absl::flags_internal::Parse
bool Parse(FlagOpFn op, absl::string_view text, void *dst, std::string *error)
Definition: abseil-cpp/absl/flags/internal/flag.h:125
error.h
type
DiscoveryMechanismType type
Definition: xds_cluster_resolver.cc:98
ring_hash.h
absl::InlinedVector::size
size_type size() const noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:270
json.h
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
child_policy_
OrphanablePtr< LoadBalancingPolicy > child_policy_
Definition: xds_cluster_resolver.cc:365
weight
uint32_t weight
Definition: weighted_target.cc:84
grpc_core::ServerAddressList
std::vector< ServerAddress > ServerAddressList
Definition: server_address.h:120
resolver_registry.h
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
dns_hostname
std::string dns_hostname
Definition: xds_cluster_resolver.cc:100
resolver.h
child_policy_handler.h
grpc_core::ResolverRegistry::CreateResolver
OrphanablePtr< Resolver > CreateResolver(absl::string_view target, const grpc_channel_args *args, grpc_pollset_set *pollset_set, std::shared_ptr< WorkSerializer > work_serializer, std::unique_ptr< Resolver::ResultHandler > result_handler) const
Definition: resolver_registry.cc:73
grpc_core::Json::Object
std::map< std::string, Json > Object
Definition: src/core/lib/json/json.h:54
shutting_down_
bool shutting_down_
Definition: xds_cluster_resolver.cc:360
grpc_core::kXdsLocalityNameAttributeKey
const char * kXdsLocalityNameAttributeKey
Definition: xds_cluster_resolver.cc:81
GRPC_ARG_INHIBIT_HEALTH_CHECKING
#define GRPC_ARG_INHIBIT_HEALTH_CHECKING
Definition: grpc_types.h:424
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
parent_
RefCountedPtr< XdsClusterResolverLb > parent_
Definition: xds_cluster_resolver.cc:167
debug_location.h
grpc_channel_arg_integer_create
grpc_arg grpc_channel_arg_integer_create(char *name, int value)
Definition: channel_args.cc:484
eds_service_name
std::string eds_service_name
Definition: xds_cluster_resolver.cc:99
lb_policy_registry.h
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
index
int index
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:1184
GRPC_ERROR_CREATE_FROM_CPP_STRING
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
Definition: error.h:297
config_
RefCountedPtr< XdsClusterResolverLbConfig > config_
Definition: xds_cluster_resolver.cc:357
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
xds_lb_policy_
Json xds_lb_policy_
Definition: xds_cluster_resolver.cc:128
grpc_pollset_set_add_pollset_set
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:47
priority_child_numbers
std::vector< size_t > priority_child_numbers
Definition: xds_cluster_resolver.cc:299
grpc_core::ParseRingHashLbConfig
void ParseRingHashLbConfig(const Json &json, size_t *min_ring_size, size_t *max_ring_size, std::vector< grpc_error_handle > *error_list)
Definition: ring_hash.cc:83
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc_core::CoreConfiguration::resolver_registry
const ResolverRegistry & resolver_registry() const
Definition: core_configuration.h:157
resolver_
OrphanablePtr< Resolver > resolver_
Definition: xds_cluster_resolver.cc:291
ref_counted_ptr.h
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
watcher
ClusterWatcher * watcher
Definition: cds.cc:148
channel_args.h
check_redundant_namespace_qualifiers.Config
Config
Definition: check_redundant_namespace_qualifiers.py:142
gpr_parse_nonnegative_int
int gpr_parse_nonnegative_int(const char *value)
Definition: string.cc:218
grpc_lb_policy_xds_cluster_resolver_shutdown
void grpc_lb_policy_xds_cluster_resolver_shutdown()
Definition: xds_cluster_resolver.cc:1279
xds.h
grpc_core::FakeResolverResponseGenerator::MakeChannelArg
static grpc_arg MakeChannelArg(FakeResolverResponseGenerator *generator)
Definition: fake_resolver.cc:338
absl::InlinedVector
Definition: abseil-cpp/absl/container/inlined_vector.h:69
grpc_error
Definition: error_internal.h:42
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
update
absl::optional< XdsClusterResource > update
Definition: cds.cc:150
setup.target
target
Definition: third_party/bloaty/third_party/protobuf/python/setup.py:179
grpc_channel_args_copy_and_add
grpc_channel_args * grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:224
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
outlier_detection.h
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
parse_error
@ parse_error
Definition: pem_info.c:88
max_concurrent_requests
uint32_t max_concurrent_requests
Definition: xds_cluster_resolver.cc:93
next_available_child_number
size_t next_available_child_number
Definition: xds_cluster_resolver.cc:300
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:57