27 #include "absl/memory/memory.h"
28 #include "absl/status/status.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
77 constexpr
char kCds[] =
"cds_experimental";
79 constexpr
int kMaxAggregateClusterRecursionDepth = 16;
86 const char*
name()
const override {
return kCds; }
93 class CdsLb :
public LoadBalancingPolicy {
95 CdsLb(RefCountedPtr<XdsClient> xds_client,
Args args);
97 const char*
name()
const override {
return kCds; }
99 void UpdateLocked(UpdateArgs
args)
override;
100 void ResetBackoffLocked()
override;
105 class ClusterWatcher :
public XdsClusterResourceType::WatcherInterface {
110 void OnResourceChanged(XdsClusterResource cluster_data)
override {
112 parent_->work_serializer()->Run(
115 [
this, cluster_data]()
mutable {
123 parent_->work_serializer()->Run(
130 void OnResourceDoesNotExist()
override {
132 parent_->work_serializer()->Run(
145 struct WatcherState {
154 class Helper :
public ChannelControlHelper {
156 explicit Helper(RefCountedPtr<CdsLb> parent) :
parent_(
std::move(parent)) {}
157 RefCountedPtr<SubchannelInterface> CreateSubchannel(
160 std::unique_ptr<SubchannelPicker> picker)
override;
161 void RequestReresolution()
override;
163 void AddTraceEvent(TraceSeverity
severity,
172 void ShutdownLocked()
override;
176 std::set<std::string>* clusters_added);
178 XdsClusterResource cluster_data);
187 bool delay_unsubscription =
false);
189 void MaybeDestroyChildPolicyLocked();
218 RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel(
220 if (
parent_->shutting_down_)
return nullptr;
221 return parent_->channel_control_helper()->CreateSubchannel(
std::move(address),
227 std::unique_ptr<SubchannelPicker> picker) {
228 if (
parent_->shutting_down_ ||
parent_->child_policy_ ==
nullptr)
return;
237 void CdsLb::Helper::RequestReresolution() {
238 if (
parent_->shutting_down_)
return;
240 gpr_log(
GPR_INFO,
"[cdslb %p] Re-resolution requested from child policy.",
243 parent_->channel_control_helper()->RequestReresolution();
247 return parent_->channel_control_helper()->GetAuthority();
250 void CdsLb::Helper::AddTraceEvent(TraceSeverity
severity,
252 if (
parent_->shutting_down_)
return;
260 CdsLb::CdsLb(RefCountedPtr<XdsClient> xds_client,
Args args)
274 void CdsLb::ShutdownLocked() {
279 MaybeDestroyChildPolicyLocked();
296 void CdsLb::MaybeDestroyChildPolicyLocked() {
299 interested_parties());
304 void CdsLb::ResetBackoffLocked() {
312 void CdsLb::UpdateLocked(UpdateArgs
args) {
325 if (old_config ==
nullptr || old_config->cluster() !=
config_->cluster()) {
326 if (old_config !=
nullptr) {
354 std::set<std::string>* clusters_added) {
355 if (
depth == kMaxAggregateClusterRecursionDepth) {
357 "aggregate cluster graph exceeds max depth");
359 if (!clusters_added->insert(
name).second) {
364 if (
state.watcher ==
nullptr) {
376 if (!
state.update.has_value())
return false;
378 if (
state.update->cluster_type ==
379 XdsClusterResource::ClusterType::AGGREGATE) {
380 bool missing_cluster =
false;
382 state.update->prioritized_cluster_names) {
383 auto result = GenerateDiscoveryMechanismForCluster(
384 child_name,
depth + 1, discovery_mechanisms, clusters_added);
386 if (!*
result) missing_cluster =
true;
388 return !missing_cluster;
390 Json::Object mechanism = {
391 {
"clusterName",
name},
392 {
"max_concurrent_requests",
state.update->max_concurrent_requests},
394 if (
state.update->outlier_detection.has_value()) {
395 auto& outlier_detection_update =
state.update->outlier_detection.value();
396 Json::Object outlier_detection;
397 outlier_detection[
"interval"] =
398 outlier_detection_update.interval.ToJsonString();
399 outlier_detection[
"baseEjectionTime"] =
400 outlier_detection_update.base_ejection_time.ToJsonString();
401 outlier_detection[
"maxEjectionTime"] =
402 outlier_detection_update.max_ejection_time.ToJsonString();
403 outlier_detection[
"maxEjectionPercent"] =
404 outlier_detection_update.max_ejection_percent;
405 if (outlier_detection_update.success_rate_ejection.has_value()) {
406 outlier_detection[
"successRateEjection"] = Json::Object{
408 outlier_detection_update.success_rate_ejection->stdev_factor},
409 {
"enforcementPercentage",
410 outlier_detection_update.success_rate_ejection
411 ->enforcement_percentage},
413 outlier_detection_update.success_rate_ejection->minimum_hosts},
415 outlier_detection_update.success_rate_ejection->request_volume},
418 if (outlier_detection_update.failure_percentage_ejection.has_value()) {
419 outlier_detection[
"failurePercentageEjection"] = Json::Object{
421 outlier_detection_update.failure_percentage_ejection->threshold},
422 {
"enforcementPercentage",
423 outlier_detection_update.failure_percentage_ejection
424 ->enforcement_percentage},
426 outlier_detection_update.failure_percentage_ejection->minimum_hosts},
427 {
"requestVolume", outlier_detection_update
428 .failure_percentage_ejection->request_volume},
431 mechanism[
"outlierDetection"] =
std::move(outlier_detection);
433 switch (
state.update->cluster_type) {
434 case XdsClusterResource::ClusterType::EDS:
435 mechanism[
"type"] =
"EDS";
436 if (!
state.update->eds_service_name.empty()) {
437 mechanism[
"edsServiceName"] =
state.update->eds_service_name;
440 case XdsClusterResource::ClusterType::LOGICAL_DNS:
441 mechanism[
"type"] =
"LOGICAL_DNS";
442 mechanism[
"dnsHostname"] =
state.update->dns_hostname;
448 if (
state.update->lrs_load_reporting_server.has_value()) {
449 mechanism[
"lrsLoadReportingServer"] =
450 state.update->lrs_load_reporting_server->ToJson();
452 discovery_mechanisms->emplace_back(
std::move(mechanism));
457 XdsClusterResource cluster_data) {
461 "[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
462 this,
name.c_str(),
xds_client_.get(), cluster_data.ToString().c_str());
470 it->second.update = cluster_data;
473 UpdateXdsCertificateProvider(
name,
it->second.update.value());
481 Json::Array discovery_mechanisms;
482 std::set<std::string> clusters_added;
483 auto result = GenerateDiscoveryMechanismForCluster(
484 config_->cluster(), 0, &discovery_mechanisms, &clusters_added);
495 Json::Object xds_lb_policy;
496 if (lb_policy ==
"RING_HASH") {
497 xds_lb_policy[
"RING_HASH"] = Json::Object{
498 {
"min_ring_size", cluster_data.min_ring_size},
499 {
"max_ring_size", cluster_data.max_ring_size},
502 xds_lb_policy[
"ROUND_ROBIN"] = Json::Object();
504 Json::Object child_config = {
509 {
"discoveryMechanisms",
std::move(discovery_mechanisms)},
511 Json json = Json::Array{
513 {
"xds_cluster_resolver_experimental",
std::move(child_config)},
519 this, json_str.c_str());
522 RefCountedPtr<LoadBalancingPolicy::Config>
config =
523 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &
error);
532 args.work_serializer = work_serializer();
534 args.channel_control_helper = absl::make_unique<Helper>(
Ref());
535 child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
542 interested_parties());
562 if (clusters_added.find(
cluster_name) != clusters_added.end()) {
577 gpr_log(
GPR_ERROR,
"[cdslb %p] xds error obtaining data for cluster %s: %s",
583 channel_control_helper()->UpdateState(
592 "[cdslb %p] CDS resource for %s does not exist -- reporting "
597 channel_control_helper()->UpdateState(
599 absl::make_unique<TransientFailurePicker>(
status));
600 MaybeDestroyChildPolicyLocked();
608 if (channel_credentials ==
nullptr ||
618 cluster_data.common_tls_context.certificate_validation_context
619 .ca_certificate_provider_instance.instance_name;
621 cluster_data.common_tls_context.certificate_validation_context
622 .ca_certificate_provider_instance.certificate_name;
623 RefCountedPtr<XdsCertificateProvider> new_root_provider;
624 if (!root_provider_instance_name.
empty()) {
627 .CreateOrGetCertificateProvider(root_provider_instance_name);
628 if (new_root_provider ==
nullptr) {
631 root_provider_instance_name,
"\" not recognized."));
638 interested_parties(),
641 if (new_root_provider !=
nullptr &&
642 new_root_provider->interested_parties() !=
nullptr) {
644 new_root_provider->interested_parties());
655 cluster_data.common_tls_context.tls_certificate_provider_instance
658 cluster_data.common_tls_context.tls_certificate_provider_instance
660 RefCountedPtr<XdsCertificateProvider> new_identity_provider;
661 if (!identity_provider_instance_name.
empty()) {
662 new_identity_provider =
664 .CreateOrGetCertificateProvider(identity_provider_instance_name);
665 if (new_identity_provider ==
nullptr) {
668 identity_provider_instance_name,
"\" not recognized."));
675 interested_parties(),
678 if (new_identity_provider !=
nullptr &&
679 new_identity_provider->interested_parties() !=
nullptr) {
681 interested_parties(), new_identity_provider->interested_parties());
691 const std::vector<StringMatcher>& match_subject_alt_names =
692 cluster_data.common_tls_context.certificate_validation_context
693 .match_subject_alt_names;
701 bool delay_unsubscription) {
711 delay_unsubscription);
717 class CdsLbFactory :
public LoadBalancingPolicyFactory {
719 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
721 RefCountedPtr<XdsClient> xds_client =
722 XdsClient::GetFromChannelArgs(*
args.args);
723 if (xds_client ==
nullptr) {
725 "XdsClient not present in channel args -- cannot instantiate "
732 const char*
name()
const override {
return kCds; }
734 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
737 if (json.type() == Json::Type::JSON_NULL) {
741 "field:loadBalancingPolicy error:cds policy requires configuration. "
742 "Please use loadBalancingConfig field of service config instead.");
745 std::vector<grpc_error_handle> error_list;
748 auto it = json.object_value().find(
"cluster");
749 if (
it == json.object_value().end()) {
751 "required field 'cluster' not present"));
752 }
else if (
it->second.type() != Json::Type::STRING) {
754 "field:cluster error:type should be string"));
758 if (!error_list.empty()) {
777 absl::make_unique<grpc_core::CdsLbFactory>());