30 #include "absl/container/inlined_vector.h"
31 #include "absl/memory/memory.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
75 #define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
85 constexpr
char kXdsClusterResolver[] =
"xds_cluster_resolver_experimental";
90 struct DiscoveryMechanism {
94 enum DiscoveryMechanismType {
98 DiscoveryMechanismType
type;
103 bool operator==(
const DiscoveryMechanism& other)
const {
107 type == other.type &&
114 XdsClusterResolverLbConfig(
115 std::vector<DiscoveryMechanism> discovery_mechanisms,
Json xds_lb_policy)
119 const char*
name()
const override {
return kXdsClusterResolver; }
120 const std::vector<DiscoveryMechanism>& discovery_mechanisms()
const {
132 class XdsClusterResolverLb :
public LoadBalancingPolicy {
134 XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,
Args args);
136 const char*
name()
const override {
return kXdsClusterResolver; }
138 void UpdateLocked(UpdateArgs
args)
override;
139 void ResetBackoffLocked()
override;
152 class DiscoveryMechanism :
public InternallyRefCounted<DiscoveryMechanism> {
155 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
159 XdsClusterResolverLb* parent()
const {
return parent_.get(); }
162 virtual void Start() = 0;
164 virtual bool disable_reresolution() = 0;
172 class EdsDiscoveryMechanism :
public DiscoveryMechanism {
174 EdsDiscoveryMechanism(
175 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
177 : DiscoveryMechanism(
std::move(xds_cluster_resolver_lb),
index) {}
178 void Start()
override;
179 void Orphan()
override;
181 bool disable_reresolution()
override {
return true; }
184 class EndpointWatcher :
public XdsEndpointResourceType::WatcherInterface {
186 explicit EndpointWatcher(
189 ~EndpointWatcher()
override {
192 void OnResourceChanged(XdsEndpointResource
update)
override {
197 [
this,
update]()
mutable {
212 void OnResourceDoesNotExist()
override {
216 OnResourceDoesNotExistHelper();
226 void OnResourceChangedHelper(XdsEndpointResource
update) {
234 void OnResourceDoesNotExistHelper() {
243 friend class EndpointWatcher;
246 auto&
config = parent()->config_->discovery_mechanisms()[
index()];
247 if (!
config.eds_service_name.empty())
return config.eds_service_name;
248 return config.cluster_name;
255 class LogicalDNSDiscoveryMechanism :
public DiscoveryMechanism {
257 LogicalDNSDiscoveryMechanism(
258 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
260 : DiscoveryMechanism(
std::move(xds_cluster_resolver_lb),
index) {}
261 void Start()
override;
262 void Orphan()
override;
270 bool disable_reresolution()
override {
return false; };
273 class ResolverResultHandler :
public Resolver::ResultHandler {
275 explicit ResolverResultHandler(
279 ~ResolverResultHandler()
override {}
289 friend class ResolverResultHandler;
294 struct DiscoveryMechanismEntry {
302 const XdsClusterResolverLbConfig::DiscoveryMechanism&
config()
const;
308 class Helper :
public ChannelControlHelper {
311 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy)
319 RefCountedPtr<SubchannelInterface> CreateSubchannel(
322 std::unique_ptr<SubchannelPicker> picker)
override;
325 void RequestReresolution()
override {}
327 void AddTraceEvent(TraceSeverity
severity,
334 ~XdsClusterResolverLb()
override;
336 void ShutdownLocked()
override;
338 void OnEndpointChanged(
size_t index, XdsEndpointResource
update);
340 void OnResourceDoesNotExist(
size_t index);
342 void MaybeDestroyChildPolicyLocked();
344 void UpdateChildPolicyLocked();
345 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
348 RefCountedPtr<Config> CreateChildPolicyConfigLocked();
357 RefCountedPtr<XdsClusterResolverLbConfig>
config_;
372 RefCountedPtr<SubchannelInterface>
373 XdsClusterResolverLb::Helper::CreateSubchannel(ServerAddress address,
380 void XdsClusterResolverLb::Helper::UpdateState(
382 std::unique_ptr<SubchannelPicker> picker) {
389 "[xds_cluster_resolver_lb %p] child policy updated state=%s (%s) "
402 void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity
severity,
413 void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() {
416 "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
417 ":%p starting xds watch for %s",
420 auto watcher = MakeRefCounted<EndpointWatcher>(
427 void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() {
430 "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
431 ":%p cancelling xds watch for %s",
443 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() {
445 parent()->config_->discovery_mechanisms()[
index()].dns_hostname;
447 FakeResolverResponseGenerator* fake_resolver_response_generator =
448 grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
451 if (fake_resolver_response_generator !=
nullptr) {
454 fake_resolver_response_generator);
461 target.c_str(),
args, parent()->interested_parties(),
462 parent()->work_serializer(),
463 absl::make_unique<ResolverResultHandler>(
467 parent()->OnResourceDoesNotExist(
index());
473 "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism "
474 "%" PRIuPTR
":%p starting dns resolver %p",
479 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() {
483 "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism %" PRIuPTR
484 ":%p shutting down dns resolver %p",
495 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
497 if (!
result.addresses.ok()) {
499 result.addresses.status());
505 XdsEndpointResource
update;
506 XdsEndpointResource::Priority::Locality locality;
507 locality.name = MakeRefCounted<XdsLocalityName>(
"",
"",
"");
508 locality.lb_weight = 1;
510 XdsEndpointResource::Priority
priority;
521 const XdsClusterResolverLbConfig::DiscoveryMechanism&
527 std::string XdsClusterResolverLb::DiscoveryMechanismEntry::GetChildPolicyName(
537 XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,
541 gpr_log(
GPR_INFO,
"[xds_cluster_resolver_lb %p] created -- xds_client=%p",
546 XdsClusterResolverLb::~XdsClusterResolverLb() {
549 "[xds_cluster_resolver_lb %p] destroying xds_cluster_resolver LB "
555 void XdsClusterResolverLb::ShutdownLocked() {
560 MaybeDestroyChildPolicyLocked();
568 void XdsClusterResolverLb::MaybeDestroyChildPolicyLocked() {
571 interested_parties());
576 void XdsClusterResolverLb::UpdateLocked(UpdateArgs
args) {
578 gpr_log(
GPR_INFO,
"[xds_cluster_resolver_lb %p] Received update",
this);
580 const bool is_initial_update =
args_ ==
nullptr;
591 if (is_initial_update) {
593 DiscoveryMechanismEntry entry;
594 if (
config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
595 DiscoveryMechanismType::EDS) {
596 entry.discovery_mechanism = MakeOrphanable<EdsDiscoveryMechanism>(
599 }
else if (
config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
600 DiscoveryMechanismType::LOGICAL_DNS) {
601 entry.discovery_mechanism =
602 MakeOrphanable<LogicalDNSDiscoveryMechanism>(
617 void XdsClusterResolverLb::ResetBackoffLocked() {
627 void XdsClusterResolverLb::OnEndpointChanged(
size_t index,
628 XdsEndpointResource
update) {
632 "[xds_cluster_resolver_lb %p] Received update from xds client"
633 " for discovery mechanism %" PRIuPTR
"",
641 if (
update.priorities.empty())
update.priorities.emplace_back();
646 std::map<XdsLocalityName*,
size_t , XdsLocalityName::Less>
648 std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
650 if (discovery_entry.latest_update.has_value()) {
651 const auto& prev_priority_list = discovery_entry.latest_update->priorities;
654 size_t child_number = discovery_entry.priority_child_numbers[
priority];
655 const auto& localities = prev_priority_list[
priority].localities;
656 for (
const auto&
p : localities) {
657 XdsLocalityName* locality_name =
p.first;
658 locality_child_map[locality_name] = child_number;
659 child_locality_map[child_number].insert(locality_name);
670 for (
const auto&
p : localities) {
671 XdsLocalityName* locality_name =
p.first;
673 auto it = locality_child_map.find(locality_name);
674 if (
it != locality_child_map.end()) {
675 child_number =
it->second;
676 locality_child_map.erase(
it);
680 for (XdsLocalityName* old_locality :
681 child_locality_map[*child_number]) {
682 locality_child_map.erase(old_locality);
689 locality_child_map.erase(locality_name);
694 for (child_number = discovery_entry.next_available_child_number;
695 child_locality_map.find(*child_number) != child_locality_map.end();
698 discovery_entry.next_available_child_number = *child_number + 1;
701 child_locality_map[*child_number];
718 if (!mechanism.latest_update.has_value())
return;
721 UpdateChildPolicyLocked();
726 "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
727 " xds watcher reported error: %s",
733 OnEndpointChanged(
index, XdsEndpointResource());
737 void XdsClusterResolverLb::OnResourceDoesNotExist(
size_t index) {
739 "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
740 " resource does not exist",
744 OnEndpointChanged(
index, XdsEndpointResource());
755 priority < discovery_entry.latest_update->priorities.size();
757 const auto& priority_entry =
758 discovery_entry.latest_update->priorities[
priority];
760 discovery_entry.GetChildPolicyName(
priority);
761 for (
const auto&
p : priority_entry.localities) {
762 const auto& locality_name =
p.first;
763 const auto& locality =
p.second;
764 std::vector<std::string> hierarchical_path = {
765 priority_child_name, locality_name->AsHumanReadableString()};
766 for (
const auto& endpoint : locality.endpoints) {
767 const ServerAddressWeightAttribute* weight_attribute =
static_cast<
768 const ServerAddressWeightAttribute*
>(endpoint.GetAttribute(
769 ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
771 if (weight_attribute !=
nullptr) {
772 weight = locality.lb_weight * weight_attribute->weight();
774 addresses.emplace_back(
780 absl::make_unique<XdsLocalityAttribute>(
781 locality_name->Ref()))
783 ServerAddressWeightAttribute::
784 kServerAddressWeightAttributeKey,
785 absl::make_unique<ServerAddressWeightAttribute>(
weight)));
793 RefCountedPtr<LoadBalancingPolicy::Config>
794 XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
795 Json::Object priority_children;
796 Json::Array priority_priorities;
798 const auto& discovery_config = discovery_entry.config();
800 priority < discovery_entry.latest_update->priorities.size();
802 const auto& priority_entry =
803 discovery_entry.latest_update->priorities[
priority];
805 if (!discovery_entry.discovery_mechanism->override_child_policy()
808 discovery_entry.discovery_mechanism->override_child_policy();
810 const auto& xds_lb_policy =
config_->xds_lb_policy().object_value();
811 if (xds_lb_policy.find(
"ROUND_ROBIN") != xds_lb_policy.end()) {
812 const auto& localities = priority_entry.localities;
813 Json::Object weighted_targets;
814 for (
const auto&
p : localities) {
815 XdsLocalityName* locality_name =
p.first;
816 const auto& locality =
p.second;
818 weighted_targets[locality_name->AsHumanReadableString()] =
820 {
"weight", locality.lb_weight},
824 {
"round_robin", Json::Object()},
831 child_policy = Json::Array{
833 {
"weighted_target_experimental",
835 {
"targets", Json::Object()},
840 *(*child_policy.mutable_array())[0].mutable_object();
843 (*
it->second.mutable_object())[
"targets"] =
846 auto it = xds_lb_policy.find(
"RING_HASH");
848 Json::Object ring_hash_experimental_policy =
849 it->second.object_value();
850 child_policy = Json::Array{
852 {
"ring_hash_experimental", ring_hash_experimental_policy},
858 Json::Array drop_categories;
859 if (discovery_entry.latest_update->drop_config !=
nullptr) {
860 for (
const auto& category :
861 discovery_entry.latest_update->drop_config->drop_category_list()) {
862 drop_categories.push_back(Json::Object{
863 {
"category", category.name},
864 {
"requests_per_million", category.parts_per_million},
868 Json::Object xds_cluster_impl_config = {
869 {
"clusterName", discovery_config.cluster_name},
870 {
"childPolicy",
std::move(child_policy)},
871 {
"dropCategories",
std::move(drop_categories)},
872 {
"maxConcurrentRequests", discovery_config.max_concurrent_requests},
874 if (!discovery_config.eds_service_name.empty()) {
875 xds_cluster_impl_config[
"edsServiceName"] =
876 discovery_config.eds_service_name;
878 if (discovery_config.lrs_load_reporting_server.has_value()) {
879 xds_cluster_impl_config[
"lrsLoadReportingServer"] =
880 discovery_config.lrs_load_reporting_server->ToJson();
882 Json locality_picking_policy;
884 Json::Object outlier_detection_config;
885 if (discovery_entry.config().outlier_detection_lb_config.has_value()) {
886 outlier_detection_config =
887 discovery_entry.config().outlier_detection_lb_config.value();
890 outlier_detection_config[
"interval"] =
891 Duration::Infinity().ToJsonString();
893 outlier_detection_config[
"childPolicy"] = Json::Array{Json::Object{
894 {
"xds_cluster_impl_experimental",
897 locality_picking_policy = Json::Array{Json::Object{
898 {
"outlier_detection_experimental",
902 locality_picking_policy = Json::Array{Json::Object{
903 {
"xds_cluster_impl_experimental",
909 priority_priorities.emplace_back(child_name);
910 Json::Object child_config = {
911 {
"config",
std::move(locality_picking_policy)},
913 if (discovery_entry.discovery_mechanism->disable_reresolution()) {
914 child_config[
"ignore_reresolution_requests"] =
true;
916 priority_children[child_name] =
std::move(child_config);
919 Json json = Json::Array{Json::Object{
920 {
"priority_experimental",
922 {
"children",
std::move(priority_children)},
923 {
"priorities",
std::move(priority_priorities)},
930 "[xds_cluster_resolver_lb %p] generated config for child policy: %s",
931 this, json_str.c_str());
934 RefCountedPtr<LoadBalancingPolicy::Config>
config =
935 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &
error);
940 "[xds_cluster_resolver_lb %p] error parsing generated child policy "
942 "will put channel in TRANSIENT_FAILURE: %s",
945 "xds_cluster_resolver LB policy: error parsing generated child policy "
947 channel_control_helper()->UpdateState(
949 absl::make_unique<TransientFailurePicker>(
status));
955 void XdsClusterResolverLb::UpdateChildPolicyLocked() {
957 UpdateArgs update_args;
958 update_args.config = CreateChildPolicyConfigLocked();
959 if (update_args.config ==
nullptr)
return;
960 update_args.addresses = CreateChildPolicyAddressesLocked();
961 update_args.args = CreateChildPolicyArgsLocked(
args_);
966 gpr_log(
GPR_INFO,
"[xds_cluster_resolver_lb %p] Updating child policy %p",
983 OrphanablePtr<LoadBalancingPolicy>
986 lb_policy_args.work_serializer = work_serializer();
987 lb_policy_args.args =
args;
988 lb_policy_args.channel_control_helper =
990 OrphanablePtr<LoadBalancingPolicy> lb_policy =
991 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
992 "priority_experimental",
std::move(lb_policy_args));
995 "[xds_cluster_resolver_lb %p] failure creating child policy",
this);
1000 "[xds_cluster_resolver_lb %p]: Created new child policy %p",
this,
1007 interested_parties());
1015 class XdsClusterResolverLbFactory :
public LoadBalancingPolicyFactory {
1017 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1019 RefCountedPtr<XdsClient> xds_client =
1020 XdsClient::GetFromChannelArgs(*
args.args);
1021 if (xds_client ==
nullptr) {
1023 "XdsClient not present in channel args -- cannot instantiate "
1024 "xds_cluster_resolver LB policy");
1027 return MakeOrphanable<XdsClusterResolverChildHandler>(
std::move(xds_client),
1031 const char*
name()
const override {
return kXdsClusterResolver; }
1033 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1036 if (json.type() == Json::Type::JSON_NULL) {
1040 "field:loadBalancingPolicy error:xds_cluster_resolver policy "
1041 "requires configuration. "
1042 "Please use loadBalancingConfig field of service config instead.");
1045 std::vector<grpc_error_handle> error_list;
1046 std::vector<XdsClusterResolverLbConfig::DiscoveryMechanism>
1047 discovery_mechanisms;
1048 auto it = json.object_value().find(
"discoveryMechanisms");
1049 if (
it == json.object_value().end()) {
1051 "field:discoveryMechanisms error:required field missing"));
1052 }
else if (
it->second.type() != Json::Type::ARRAY) {
1054 "field:discoveryMechanisms error:type should be array"));
1056 const Json::Array&
array =
it->second.array_value();
1057 for (
size_t i = 0;
i <
array.size(); ++
i) {
1059 std::vector<grpc_error_handle> discovery_mechanism_errors =
1061 if (!discovery_mechanism_errors.empty()) {
1063 absl::StrCat(
"field:discovery_mechanism element: ",
i,
" error"));
1065 discovery_mechanism_errors) {
1068 error_list.push_back(
error);
1073 if (discovery_mechanisms.empty()) {
1075 "field:discovery_mechanism error:list is missing or empty"));
1077 Json xds_lb_policy = Json::Object{
1078 {
"ROUND_ROBIN", Json::Object()},
1080 it = json.object_value().find(
"xdsLbPolicy");
1081 if (
it != json.object_value().end()) {
1082 if (
it->second.type() != Json::Type::ARRAY) {
1084 "field:xdsLbPolicy error:type should be array"));
1086 const Json::Array&
array =
it->second.array_value();
1087 for (
size_t i = 0;
i <
array.size(); ++
i) {
1090 "field:xdsLbPolicy error:element should be of type object"));
1093 const Json::Object& policy =
array[
i].object_value();
1094 auto policy_it = policy.find(
"ROUND_ROBIN");
1095 if (policy_it != policy.end()) {
1096 if (policy_it->second.type() != Json::Type::OBJECT) {
1098 "field:ROUND_ROBIN error:type should be object"));
1102 policy_it = policy.find(
"RING_HASH");
1103 if (policy_it != policy.end()) {
1104 xds_lb_policy =
array[
i];
1105 size_t min_ring_size;
1106 size_t max_ring_size;
1108 &max_ring_size, &error_list);
1114 if (error_list.empty()) {
1115 return MakeRefCounted<XdsClusterResolverLbConfig>(
1119 "xds_cluster_resolver_experimental LB policy config", &error_list);
1125 static std::vector<grpc_error_handle> ParseDiscoveryMechanism(
1128 std::vector<grpc_error_handle> error_list;
1129 if (json.type() != Json::Type::OBJECT) {
1131 "value should be of type object"));
1135 auto it = json.object_value().find(
"clusterName");
1136 if (
it == json.object_value().end()) {
1138 "field:clusterName error:required field missing"));
1139 }
else if (
it->second.type() != Json::Type::STRING) {
1141 "field:clusterName error:type should be string"));
1146 it = json.object_value().find(
"lrsLoadReportingServer");
1147 if (
it != json.object_value().end()) {
1148 if (
it->second.type() != Json::Type::OBJECT) {
1150 "field:lrsLoadReportingServer error:type should be object"));
1157 absl::StrCat(
"errors parsing lrs_load_reporting_server")));
1164 it = json.object_value().find(
"max_concurrent_requests");
1165 if (
it != json.object_value().end()) {
1166 if (
it->second.type() != Json::Type::NUMBER) {
1168 "field:max_concurrent_requests error:must be of type number"));
1175 it = json.object_value().find(
"outlierDetection");
1176 if (
it != json.object_value().end()) {
1177 if (
it->second.type() != Json::Type::OBJECT) {
1179 "field:outlierDetection error:type should be object"));
1188 it->second.object_value();
1193 it = json.object_value().find(
"type");
1194 if (
it == json.object_value().end()) {
1196 "field:type error:required field missing"));
1197 }
else if (
it->second.type() != Json::Type::STRING) {
1199 "field:type error:type should be string"));
1201 if (
it->second.string_value() ==
"EDS") {
1203 DiscoveryMechanism::DiscoveryMechanismType::EDS;
1204 it = json.object_value().find(
"edsServiceName");
1205 if (
it != json.object_value().end()) {
1206 if (
it->second.type() != Json::Type::STRING) {
1208 "field:edsServiceName error:type should be string"));
1213 }
else if (
it->second.string_value() ==
"LOGICAL_DNS") {
1215 DiscoveryMechanism::DiscoveryMechanismType::LOGICAL_DNS;
1216 it = json.object_value().find(
"dnsHostname");
1217 if (
it == json.object_value().end()) {
1219 "field:dnsHostname error:required field missing"));
1220 }
else if (
it->second.type() != Json::Type::STRING) {
1222 "field:dnsHostname error:type should be string"));
1228 "field:type error:invalid type"));
1234 class XdsClusterResolverChildHandler :
public ChildPolicyHandler {
1236 XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,
1242 bool ConfigChangeRequiresNewPolicyInstance(
1245 GPR_ASSERT(old_config->name() == kXdsClusterResolver);
1246 GPR_ASSERT(new_config->name() == kXdsClusterResolver);
1247 XdsClusterResolverLbConfig* old_xds_cluster_resolver_config =
1248 static_cast<XdsClusterResolverLbConfig*
>(old_config);
1249 XdsClusterResolverLbConfig* new_xds_cluster_resolver_config =
1250 static_cast<XdsClusterResolverLbConfig*
>(new_config);
1251 return old_xds_cluster_resolver_config->discovery_mechanisms() !=
1252 new_xds_cluster_resolver_config->discovery_mechanisms();
1255 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1276 absl::make_unique<grpc_core::XdsClusterResolverLbFactory>());