cds.cc
Go to the documentation of this file.
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
19 #include <algorithm>
20 #include <map>
21 #include <memory>
22 #include <set>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/memory/memory.h"
28 #include "absl/status/status.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
33 
34 #include <grpc/grpc.h>
35 #include <grpc/grpc_security.h>
38 #include <grpc/support/log.h>
39 
62 #include "src/core/lib/json/json.h"
70 
71 namespace grpc_core {
72 
73 TraceFlag grpc_cds_lb_trace(false, "cds_lb");
74 
75 namespace {
76 
77 constexpr char kCds[] = "cds_experimental";
78 
79 constexpr int kMaxAggregateClusterRecursionDepth = 16;
80 
81 // Config for this LB policy.
82 class CdsLbConfig : public LoadBalancingPolicy::Config {
83  public:
84  explicit CdsLbConfig(std::string cluster) : cluster_(std::move(cluster)) {}
85  const std::string& cluster() const { return cluster_; }
86  const char* name() const override { return kCds; }
87 
88  private:
90 };
91 
92 // CDS LB policy.
93 class CdsLb : public LoadBalancingPolicy {
94  public:
95  CdsLb(RefCountedPtr<XdsClient> xds_client, Args args);
96 
97  const char* name() const override { return kCds; }
98 
99  void UpdateLocked(UpdateArgs args) override;
100  void ResetBackoffLocked() override;
101  void ExitIdleLocked() override;
102 
103  private:
104  // Watcher for getting cluster data from XdsClient.
105  class ClusterWatcher : public XdsClusterResourceType::WatcherInterface {
106  public:
107  ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
108  : parent_(std::move(parent)), name_(std::move(name)) {}
109 
110  void OnResourceChanged(XdsClusterResource cluster_data) override {
111  Ref().release(); // Ref held by lambda
112  parent_->work_serializer()->Run(
113  // TODO(roth): When we move to C++14, capture cluster_data with
114  // std::move().
115  [this, cluster_data]() mutable {
116  parent_->OnClusterChanged(name_, std::move(cluster_data));
117  Unref();
118  },
120  }
121  void OnError(absl::Status status) override {
122  Ref().release(); // Ref held by lambda
123  parent_->work_serializer()->Run(
124  [this, status]() {
125  parent_->OnError(name_, status);
126  Unref();
127  },
129  }
130  void OnResourceDoesNotExist() override {
131  Ref().release(); // Ref held by lambda
132  parent_->work_serializer()->Run(
133  [this]() {
134  parent_->OnResourceDoesNotExist(name_);
135  Unref();
136  },
138  }
139 
140  private:
141  RefCountedPtr<CdsLb> parent_;
143  };
144 
145  struct WatcherState {
146  // Pointer to watcher, to be used when cancelling.
147  // Not owned, so do not dereference.
148  ClusterWatcher* watcher = nullptr;
149  // Most recent update obtained from this watcher.
151  };
152 
153  // Delegating helper to be passed to child policy.
154  class Helper : public ChannelControlHelper {
155  public:
156  explicit Helper(RefCountedPtr<CdsLb> parent) : parent_(std::move(parent)) {}
157  RefCountedPtr<SubchannelInterface> CreateSubchannel(
158  ServerAddress address, const grpc_channel_args& args) override;
159  void UpdateState(grpc_connectivity_state state, const absl::Status& status,
160  std::unique_ptr<SubchannelPicker> picker) override;
161  void RequestReresolution() override;
162  absl::string_view GetAuthority() override;
163  void AddTraceEvent(TraceSeverity severity,
164  absl::string_view message) override;
165 
166  private:
167  RefCountedPtr<CdsLb> parent_;
168  };
169 
170  ~CdsLb() override;
171 
172  void ShutdownLocked() override;
173 
174  absl::StatusOr<bool> GenerateDiscoveryMechanismForCluster(
175  const std::string& name, int depth, Json::Array* discovery_mechanisms,
176  std::set<std::string>* clusters_added);
177  void OnClusterChanged(const std::string& name,
178  XdsClusterResource cluster_data);
180  void OnResourceDoesNotExist(const std::string& name);
181 
182  absl::Status UpdateXdsCertificateProvider(
183  const std::string& cluster_name, const XdsClusterResource& cluster_data);
184 
185  void CancelClusterDataWatch(absl::string_view cluster_name,
186  ClusterWatcher* watcher,
187  bool delay_unsubscription = false);
188 
189  void MaybeDestroyChildPolicyLocked();
190 
191  RefCountedPtr<CdsLbConfig> config_;
192 
193  // Current channel args from the resolver.
194  const grpc_channel_args* args_ = nullptr;
195 
196  // The xds client.
197  RefCountedPtr<XdsClient> xds_client_;
198 
199  // Maps from cluster name to the state for that cluster.
200  // The root of the tree is config_->cluster().
201  std::map<std::string, WatcherState> watchers_;
202 
203  RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
204  RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
205  RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
206 
207  // Child LB policy.
208  OrphanablePtr<LoadBalancingPolicy> child_policy_;
209 
210  // Internal state.
211  bool shutting_down_ = false;
212 };
213 
214 //
215 // CdsLb::Helper
216 //
217 
218 RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel(
219  ServerAddress address, const grpc_channel_args& args) {
220  if (parent_->shutting_down_) return nullptr;
221  return parent_->channel_control_helper()->CreateSubchannel(std::move(address),
222  args);
223 }
224 
225 void CdsLb::Helper::UpdateState(grpc_connectivity_state state,
226  const absl::Status& status,
227  std::unique_ptr<SubchannelPicker> picker) {
228  if (parent_->shutting_down_ || parent_->child_policy_ == nullptr) return;
230  gpr_log(GPR_INFO, "[cdslb %p] state updated by child: %s (%s)", this,
232  }
233  parent_->channel_control_helper()->UpdateState(state, status,
234  std::move(picker));
235 }
236 
237 void CdsLb::Helper::RequestReresolution() {
238  if (parent_->shutting_down_) return;
240  gpr_log(GPR_INFO, "[cdslb %p] Re-resolution requested from child policy.",
241  parent_.get());
242  }
243  parent_->channel_control_helper()->RequestReresolution();
244 }
245 
246 absl::string_view CdsLb::Helper::GetAuthority() {
247  return parent_->channel_control_helper()->GetAuthority();
248 }
249 
250 void CdsLb::Helper::AddTraceEvent(TraceSeverity severity,
252  if (parent_->shutting_down_) return;
253  parent_->channel_control_helper()->AddTraceEvent(severity, message);
254 }
255 
256 //
257 // CdsLb
258 //
259 
260 CdsLb::CdsLb(RefCountedPtr<XdsClient> xds_client, Args args)
261  : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
263  gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p", this,
264  xds_client_.get());
265  }
266 }
267 
268 CdsLb::~CdsLb() {
270  gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this);
271  }
272 }
273 
274 void CdsLb::ShutdownLocked() {
276  gpr_log(GPR_INFO, "[cdslb %p] shutting down", this);
277  }
278  shutting_down_ = true;
279  MaybeDestroyChildPolicyLocked();
280  if (xds_client_ != nullptr) {
281  for (auto& watcher : watchers_) {
283  gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
284  watcher.first.c_str());
285  }
286  CancelClusterDataWatch(watcher.first, watcher.second.watcher,
287  /*delay_unsubscription=*/false);
288  }
289  watchers_.clear();
290  xds_client_.reset(DEBUG_LOCATION, "CdsLb");
291  }
293  args_ = nullptr;
294 }
295 
296 void CdsLb::MaybeDestroyChildPolicyLocked() {
297  if (child_policy_ != nullptr) {
298  grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
299  interested_parties());
300  child_policy_.reset();
301  }
302 }
303 
304 void CdsLb::ResetBackoffLocked() {
305  if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
306 }
307 
308 void CdsLb::ExitIdleLocked() {
309  if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
310 }
311 
312 void CdsLb::UpdateLocked(UpdateArgs args) {
313  // Update config.
314  auto old_config = std::move(config_);
315  config_ = std::move(args.config);
317  gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s", this,
318  config_->cluster().c_str());
319  }
320  // Update args.
322  args_ = args.args;
323  args.args = nullptr;
324  // If cluster name changed, cancel watcher and restart.
325  if (old_config == nullptr || old_config->cluster() != config_->cluster()) {
326  if (old_config != nullptr) {
327  for (auto& watcher : watchers_) {
329  gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
330  watcher.first.c_str());
331  }
332  CancelClusterDataWatch(watcher.first, watcher.second.watcher,
333  /*delay_unsubscription=*/true);
334  }
335  watchers_.clear();
336  }
337  auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), config_->cluster());
338  watchers_[config_->cluster()].watcher = watcher.get();
339  XdsClusterResourceType::StartWatch(xds_client_.get(), config_->cluster(),
340  std::move(watcher));
341  }
342 }
343 
344 // Generates the discovery mechanism config for the specified cluster name.
345 //
346 // If no CdsUpdate has been received for the cluster, starts the watcher
347 // if needed, and returns false. Otherwise, generates the discovery
348 // mechanism config, adds it to *discovery_mechanisms, and returns true.
349 //
350 // For aggregate clusters, may call itself recursively. Returns an
351 // error if depth exceeds kMaxAggregateClusterRecursionDepth.
352 absl::StatusOr<bool> CdsLb::GenerateDiscoveryMechanismForCluster(
353  const std::string& name, int depth, Json::Array* discovery_mechanisms,
354  std::set<std::string>* clusters_added) {
355  if (depth == kMaxAggregateClusterRecursionDepth) {
357  "aggregate cluster graph exceeds max depth");
358  }
359  if (!clusters_added->insert(name).second) {
360  return true; // Discovery mechanism already added from some other branch.
361  }
362  auto& state = watchers_[name];
363  // Create a new watcher if needed.
364  if (state.watcher == nullptr) {
365  auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), name);
367  gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
368  name.c_str());
369  }
370  state.watcher = watcher.get();
371  XdsClusterResourceType::StartWatch(xds_client_.get(), name,
372  std::move(watcher));
373  return false;
374  }
375  // Don't have the update we need yet.
376  if (!state.update.has_value()) return false;
377  // For AGGREGATE clusters, recursively expand to child clusters.
378  if (state.update->cluster_type ==
379  XdsClusterResource::ClusterType::AGGREGATE) {
380  bool missing_cluster = false;
381  for (const std::string& child_name :
382  state.update->prioritized_cluster_names) {
383  auto result = GenerateDiscoveryMechanismForCluster(
384  child_name, depth + 1, discovery_mechanisms, clusters_added);
385  if (!result.ok()) return result;
386  if (!*result) missing_cluster = true;
387  }
388  return !missing_cluster;
389  }
390  Json::Object mechanism = {
391  {"clusterName", name},
392  {"max_concurrent_requests", state.update->max_concurrent_requests},
393  };
394  if (state.update->outlier_detection.has_value()) {
395  auto& outlier_detection_update = state.update->outlier_detection.value();
396  Json::Object outlier_detection;
397  outlier_detection["interval"] =
398  outlier_detection_update.interval.ToJsonString();
399  outlier_detection["baseEjectionTime"] =
400  outlier_detection_update.base_ejection_time.ToJsonString();
401  outlier_detection["maxEjectionTime"] =
402  outlier_detection_update.max_ejection_time.ToJsonString();
403  outlier_detection["maxEjectionPercent"] =
404  outlier_detection_update.max_ejection_percent;
405  if (outlier_detection_update.success_rate_ejection.has_value()) {
406  outlier_detection["successRateEjection"] = Json::Object{
407  {"stdevFactor",
408  outlier_detection_update.success_rate_ejection->stdev_factor},
409  {"enforcementPercentage",
410  outlier_detection_update.success_rate_ejection
411  ->enforcement_percentage},
412  {"minimumHosts",
413  outlier_detection_update.success_rate_ejection->minimum_hosts},
414  {"requestVolume",
415  outlier_detection_update.success_rate_ejection->request_volume},
416  };
417  }
418  if (outlier_detection_update.failure_percentage_ejection.has_value()) {
419  outlier_detection["failurePercentageEjection"] = Json::Object{
420  {"threshold",
421  outlier_detection_update.failure_percentage_ejection->threshold},
422  {"enforcementPercentage",
423  outlier_detection_update.failure_percentage_ejection
424  ->enforcement_percentage},
425  {"minimumHosts",
426  outlier_detection_update.failure_percentage_ejection->minimum_hosts},
427  {"requestVolume", outlier_detection_update
428  .failure_percentage_ejection->request_volume},
429  };
430  }
431  mechanism["outlierDetection"] = std::move(outlier_detection);
432  }
433  switch (state.update->cluster_type) {
434  case XdsClusterResource::ClusterType::EDS:
435  mechanism["type"] = "EDS";
436  if (!state.update->eds_service_name.empty()) {
437  mechanism["edsServiceName"] = state.update->eds_service_name;
438  }
439  break;
440  case XdsClusterResource::ClusterType::LOGICAL_DNS:
441  mechanism["type"] = "LOGICAL_DNS";
442  mechanism["dnsHostname"] = state.update->dns_hostname;
443  break;
444  default:
445  GPR_ASSERT(0);
446  break;
447  }
448  if (state.update->lrs_load_reporting_server.has_value()) {
449  mechanism["lrsLoadReportingServer"] =
450  state.update->lrs_load_reporting_server->ToJson();
451  }
452  discovery_mechanisms->emplace_back(std::move(mechanism));
453  return true;
454 }
455 
456 void CdsLb::OnClusterChanged(const std::string& name,
457  XdsClusterResource cluster_data) {
459  gpr_log(
460  GPR_INFO,
461  "[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
462  this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str());
463  }
464  // Store the update in the map if we are still interested in watching this
465  // cluster (i.e., it is not cancelled already).
466  // If we've already deleted this entry, then this is an update notification
467  // that was scheduled before the deletion, so we can just ignore it.
468  auto it = watchers_.find(name);
469  if (it == watchers_.end()) return;
470  it->second.update = cluster_data;
471  // Take care of integration with new certificate code.
473  UpdateXdsCertificateProvider(name, it->second.update.value());
474  if (!status.ok()) {
475  return OnError(name, status);
476  }
477  // Scan the map starting from the root cluster to generate the list of
478  // discovery mechanisms. If we don't have some of the data we need (i.e., we
479  // just started up and not all watchers have returned data yet), then don't
480  // update the child policy at all.
481  Json::Array discovery_mechanisms;
482  std::set<std::string> clusters_added;
483  auto result = GenerateDiscoveryMechanismForCluster(
484  config_->cluster(), /*depth=*/0, &discovery_mechanisms, &clusters_added);
485  if (!result.ok()) {
486  return OnError(name, result.status());
487  }
488  if (*result) {
489  // LB policy is configured by aggregate cluster, not by the individual
490  // underlying cluster that we may be processing an update for.
491  auto it = watchers_.find(config_->cluster());
492  GPR_ASSERT(it != watchers_.end());
493  const std::string& lb_policy = it->second.update->lb_policy;
494  // Construct config for child policy.
495  Json::Object xds_lb_policy;
496  if (lb_policy == "RING_HASH") {
497  xds_lb_policy["RING_HASH"] = Json::Object{
498  {"min_ring_size", cluster_data.min_ring_size},
499  {"max_ring_size", cluster_data.max_ring_size},
500  };
501  } else {
502  xds_lb_policy["ROUND_ROBIN"] = Json::Object();
503  }
504  Json::Object child_config = {
505  {"xdsLbPolicy",
506  Json::Array{
507  xds_lb_policy,
508  }},
509  {"discoveryMechanisms", std::move(discovery_mechanisms)},
510  };
511  Json json = Json::Array{
512  Json::Object{
513  {"xds_cluster_resolver_experimental", std::move(child_config)},
514  },
515  };
517  std::string json_str = json.Dump(/*indent=*/1);
518  gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s",
519  this, json_str.c_str());
520  }
522  RefCountedPtr<LoadBalancingPolicy::Config> config =
523  LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
524  if (!GRPC_ERROR_IS_NONE(error)) {
527  return;
528  }
529  // Create child policy if not already present.
530  if (child_policy_ == nullptr) {
532  args.work_serializer = work_serializer();
533  args.args = args_;
534  args.channel_control_helper = absl::make_unique<Helper>(Ref());
535  child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
536  config->name(), std::move(args));
537  if (child_policy_ == nullptr) {
538  OnError(name, absl::UnavailableError("failed to create child policy"));
539  return;
540  }
541  grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
542  interested_parties());
544  gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
545  config->name(), child_policy_.get());
546  }
547  }
548  // Update child policy.
549  UpdateArgs args;
550  args.config = std::move(config);
551  if (xds_certificate_provider_ != nullptr) {
552  grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
553  args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
554  } else {
556  }
557  child_policy_->UpdateLocked(std::move(args));
558  }
559  // Remove entries in watchers_ for any clusters not in clusters_added
560  for (auto it = watchers_.begin(); it != watchers_.end();) {
561  const std::string& cluster_name = it->first;
562  if (clusters_added.find(cluster_name) != clusters_added.end()) {
563  ++it;
564  continue;
565  }
567  gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
568  cluster_name.c_str());
569  }
570  CancelClusterDataWatch(cluster_name, it->second.watcher,
571  /*delay_unsubscription=*/false);
572  it = watchers_.erase(it);
573  }
574 }
575 
577  gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s",
578  this, name.c_str(), status.ToString().c_str());
579  // Go into TRANSIENT_FAILURE if we have not yet created the child
580  // policy (i.e., we have not yet received data from xds). Otherwise,
581  // we keep running with the data we had previously.
582  if (child_policy_ == nullptr) {
583  channel_control_helper()->UpdateState(
585  absl::make_unique<TransientFailurePicker>(absl::UnavailableError(
586  absl::StrCat(name, ": ", status.ToString()))));
587  }
588 }
589 
590 void CdsLb::OnResourceDoesNotExist(const std::string& name) {
592  "[cdslb %p] CDS resource for %s does not exist -- reporting "
593  "TRANSIENT_FAILURE",
594  this, name.c_str());
596  absl::StrCat("CDS resource \"", config_->cluster(), "\" does not exist"));
597  channel_control_helper()->UpdateState(
599  absl::make_unique<TransientFailurePicker>(status));
600  MaybeDestroyChildPolicyLocked();
601 }
602 
603 absl::Status CdsLb::UpdateXdsCertificateProvider(
604  const std::string& cluster_name, const XdsClusterResource& cluster_data) {
605  // Early out if channel is not configured to use xds security.
606  grpc_channel_credentials* channel_credentials =
608  if (channel_credentials == nullptr ||
609  channel_credentials->type() != XdsCredentials::Type()) {
610  xds_certificate_provider_ = nullptr;
611  return absl::OkStatus();
612  }
613  if (xds_certificate_provider_ == nullptr) {
614  xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>();
615  }
616  // Configure root cert.
617  absl::string_view root_provider_instance_name =
618  cluster_data.common_tls_context.certificate_validation_context
619  .ca_certificate_provider_instance.instance_name;
620  absl::string_view root_provider_cert_name =
621  cluster_data.common_tls_context.certificate_validation_context
622  .ca_certificate_provider_instance.certificate_name;
623  RefCountedPtr<XdsCertificateProvider> new_root_provider;
624  if (!root_provider_instance_name.empty()) {
625  new_root_provider =
626  xds_client_->certificate_provider_store()
627  .CreateOrGetCertificateProvider(root_provider_instance_name);
628  if (new_root_provider == nullptr) {
629  return absl::UnavailableError(
630  absl::StrCat("Certificate provider instance name: \"",
631  root_provider_instance_name, "\" not recognized."));
632  }
633  }
634  if (root_certificate_provider_ != new_root_provider) {
635  if (root_certificate_provider_ != nullptr &&
638  interested_parties(),
640  }
641  if (new_root_provider != nullptr &&
642  new_root_provider->interested_parties() != nullptr) {
643  grpc_pollset_set_add_pollset_set(interested_parties(),
644  new_root_provider->interested_parties());
645  }
646  root_certificate_provider_ = std::move(new_root_provider);
647  }
648  xds_certificate_provider_->UpdateRootCertNameAndDistributor(
649  cluster_name, root_provider_cert_name,
650  root_certificate_provider_ == nullptr
651  ? nullptr
653  // Configure identity cert.
654  absl::string_view identity_provider_instance_name =
655  cluster_data.common_tls_context.tls_certificate_provider_instance
656  .instance_name;
657  absl::string_view identity_provider_cert_name =
658  cluster_data.common_tls_context.tls_certificate_provider_instance
659  .certificate_name;
660  RefCountedPtr<XdsCertificateProvider> new_identity_provider;
661  if (!identity_provider_instance_name.empty()) {
662  new_identity_provider =
663  xds_client_->certificate_provider_store()
664  .CreateOrGetCertificateProvider(identity_provider_instance_name);
665  if (new_identity_provider == nullptr) {
666  return absl::UnavailableError(
667  absl::StrCat("Certificate provider instance name: \"",
668  identity_provider_instance_name, "\" not recognized."));
669  }
670  }
671  if (identity_certificate_provider_ != new_identity_provider) {
672  if (identity_certificate_provider_ != nullptr &&
675  interested_parties(),
677  }
678  if (new_identity_provider != nullptr &&
679  new_identity_provider->interested_parties() != nullptr) {
681  interested_parties(), new_identity_provider->interested_parties());
682  }
683  identity_certificate_provider_ = std::move(new_identity_provider);
684  }
685  xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
686  cluster_name, identity_provider_cert_name,
688  ? nullptr
690  // Configure SAN matchers.
691  const std::vector<StringMatcher>& match_subject_alt_names =
692  cluster_data.common_tls_context.certificate_validation_context
693  .match_subject_alt_names;
694  xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
695  cluster_name, match_subject_alt_names);
696  return absl::OkStatus();
697 }
698 
699 void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
700  ClusterWatcher* watcher,
701  bool delay_unsubscription) {
702  if (xds_certificate_provider_ != nullptr) {
704  xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "",
705  nullptr);
706  xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "",
707  nullptr);
708  xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {});
709  }
710  XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name, watcher,
711  delay_unsubscription);
712 }
713 //
714 // factory
715 //
716 
717 class CdsLbFactory : public LoadBalancingPolicyFactory {
718  public:
719  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
720  LoadBalancingPolicy::Args args) const override {
721  RefCountedPtr<XdsClient> xds_client =
722  XdsClient::GetFromChannelArgs(*args.args);
723  if (xds_client == nullptr) {
725  "XdsClient not present in channel args -- cannot instantiate "
726  "cds LB policy");
727  return nullptr;
728  }
729  return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args));
730  }
731 
732  const char* name() const override { return kCds; }
733 
734  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
735  const Json& json, grpc_error_handle* error) const override {
737  if (json.type() == Json::Type::JSON_NULL) {
738  // xds was mentioned as a policy in the deprecated loadBalancingPolicy
739  // field or in the client API.
741  "field:loadBalancingPolicy error:cds policy requires configuration. "
742  "Please use loadBalancingConfig field of service config instead.");
743  return nullptr;
744  }
745  std::vector<grpc_error_handle> error_list;
746  // cluster name.
748  auto it = json.object_value().find("cluster");
749  if (it == json.object_value().end()) {
750  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
751  "required field 'cluster' not present"));
752  } else if (it->second.type() != Json::Type::STRING) {
753  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
754  "field:cluster error:type should be string"));
755  } else {
756  cluster = it->second.string_value();
757  }
758  if (!error_list.empty()) {
759  *error = GRPC_ERROR_CREATE_FROM_VECTOR("Cds Parser", &error_list);
760  return nullptr;
761  }
762  return MakeRefCounted<CdsLbConfig>(std::move(cluster));
763  }
764 };
765 
766 } // namespace
767 
768 } // namespace grpc_core
769 
770 //
771 // Plugin registration
772 //
773 
777  absl::make_unique<grpc_core::CdsLbFactory>());
778 }
779 
grpc_core::Json::Array
std::vector< Json > Array
Definition: src/core/lib/json/json.h:55
grpc_arg
Definition: grpc_types.h:103
trace.h
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
Type
struct Type Type
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:673
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
regen-readme.it
it
Definition: regen-readme.py:15
grpc_core::LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory
static void RegisterLoadBalancingPolicyFactory(std::unique_ptr< LoadBalancingPolicyFactory > factory)
Definition: lb_policy_registry.cc:87
config_
RefCountedPtr< CdsLbConfig > config_
Definition: cds.cc:191
orphanable.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
parent_
RefCountedPtr< CdsLb > parent_
Definition: cds.cc:141
bloat_diff.severity
def severity
Definition: bloat_diff.py:143
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc_tls_certificate_distributor.h
connectivity_state.h
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
certificate_provider_store.h
grpc_channel_credentials::type
virtual grpc_core::UniqueTypeName type() const =0
grpc_core
Definition: call_metric_recorder.h:31
cluster_name
std::string cluster_name
Definition: xds_cluster_resolver.cc:91
ExitIdleLocked
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb void ExitIdleLocked()
Definition: rls.cc:299
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
child_policy_
OrphanablePtr< LoadBalancingPolicy > child_policy_
Definition: cds.cc:208
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
lb_policy.h
lb_policy_factory.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
status
absl::Status status
Definition: rls.cc:251
xds_resource_type_impl.h
setup.name
name
Definition: setup.py:542
grpc_security.h
grpc_pollset_set_del_pollset_set
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:52
credentials.h
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
subchannel_interface.h
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
GRPC_ERROR_CREATE_FROM_VECTOR
#define GRPC_ERROR_CREATE_FROM_VECTOR(desc, error_list)
Definition: error.h:314
grpc_types.h
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::grpc_cds_lb_trace
TraceFlag grpc_cds_lb_trace(false, "cds_lb")
grpc_tls_certificate_provider.h
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
identity_certificate_provider_
RefCountedPtr< grpc_tls_certificate_provider > identity_certificate_provider_
Definition: cds.cc:204
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
hpack_encoder_fixtures::Args
Args({0, 16384})
cluster
absl::string_view cluster
Definition: xds_resolver.cc:331
Json
JSON (JavaScript Object Notation).
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:227
absl::FailedPreconditionError
Status FailedPreconditionError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:343
xds_common_types.h
name_
std::string name_
Definition: cds.cc:142
xds_client.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
work_serializer.h
xds_bootstrap.h
grpc.h
grpc_channel_credentials_find_in_args
grpc_channel_credentials * grpc_channel_credentials_find_in_args(const grpc_channel_args *args)
Definition: credentials.cc:83
connectivity_state.h
matchers.h
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
shutting_down_
bool shutting_down_
Definition: cds.cc:211
pollset_set.h
xds_certificate_provider_
RefCountedPtr< XdsCertificateProvider > xds_certificate_provider_
Definition: cds.cc:205
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
gen_settings_ids.OnError
OnError
Definition: gen_settings_ids.py:27
server_address.h
time.h
grpc_channel_args_copy
grpc_channel_args * grpc_channel_args_copy(const grpc_channel_args *src)
Definition: channel_args.cc:285
watchers_
std::map< std::string, WatcherState > watchers_
Definition: cds.cc:201
grpc_lb_policy_cds_shutdown
void grpc_lb_policy_cds_shutdown()
Definition: cds.cc:780
error.h
xds_cluster.h
cluster_
std::string cluster_
Definition: cds.cc:89
json.h
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
xds_credentials.h
grpc_lb_policy_cds_init
void grpc_lb_policy_cds_init()
Definition: cds.cc:774
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
debug_location.h
lb_policy_registry.h
grpc_tls_certificate_provider::distributor
virtual grpc_core::RefCountedPtr< grpc_tls_certificate_distributor > distributor() const =0
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
xds_client_
RefCountedPtr< XdsClient > xds_client_
Definition: cds.cc:197
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_pollset_set_add_pollset_set
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:47
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
absl::Status::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: third_party/abseil-cpp/absl/status/status.h:802
absl::UnavailableError
Status UnavailableError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:375
unique_type_name.h
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
ref_counted_ptr.h
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
watcher
ClusterWatcher * watcher
Definition: cds.cc:148
channel_args.h
xds_certificate_provider.h
check_redundant_namespace_qualifiers.Config
Config
Definition: check_redundant_namespace_qualifiers.py:142
absl::string_view::empty
constexpr bool empty() const noexcept
Definition: abseil-cpp/absl/strings/string_view.h:292
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
grpc_error
Definition: error_internal.h:42
grpc_tls_certificate_provider::interested_parties
virtual grpc_pollset_set * interested_parties() const
Definition: grpc_tls_certificate_provider.h:56
root_certificate_provider_
RefCountedPtr< grpc_tls_certificate_provider > root_certificate_provider_
Definition: cds.cc:203
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
mkowners.depth
depth
Definition: mkowners.py:114
update
absl::optional< XdsClusterResource > update
Definition: cds.cc:150
grpc_channel_credentials
Definition: src/core/lib/security/credentials/credentials.h:96
args_
const grpc_channel_args * args_
Definition: cds.cc:194
grpc_channel_args_copy_and_add
grpc_channel_args * grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:224
outlier_detection.h
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:52