30 #include "absl/base/thread_annotations.h"
31 #include "absl/memory/memory.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 #include "absl/types/variant.h"
78 class CircuitBreakerCallCounterMap {
83 class CallCounter :
public RefCounted<CallCounter> {
86 ~CallCounter()
override;
107 CircuitBreakerCallCounterMap* g_call_counter_map =
nullptr;
109 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
113 RefCountedPtr<CallCounter>
result;
115 auto it = map_.find(
key);
116 if (
it == map_.end()) {
119 result =
it->second->RefIfNonZero();
128 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
129 MutexLock lock(&g_call_counter_map->mu_);
130 auto it = g_call_counter_map->map_.find(
key_);
131 if (
it != g_call_counter_map->map_.end() &&
it->second ==
this) {
132 g_call_counter_map->map_.erase(
it);
140 constexpr
char kXdsClusterImpl[] =
"xds_cluster_impl_experimental";
145 XdsClusterImplLbConfig(
146 RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
150 RefCountedPtr<XdsEndpointResource::DropConfig> drop_config)
158 const char*
name()
const override {
return kXdsClusterImpl; }
160 RefCountedPtr<LoadBalancingPolicy::Config> child_policy()
const {
170 RefCountedPtr<XdsEndpointResource::DropConfig> drop_config()
const {
184 class XdsClusterImplLb :
public LoadBalancingPolicy {
186 XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
Args args);
188 const char*
name()
const override {
return kXdsClusterImpl; }
190 void UpdateLocked(UpdateArgs
args)
override;
192 void ResetBackoffLocked()
override;
195 class StatsSubchannelWrapper :
public DelegatingSubchannel {
197 StatsSubchannelWrapper(
198 RefCountedPtr<SubchannelInterface> wrapped_subchannel,
199 RefCountedPtr<XdsClusterLocalityStats> locality_stats)
200 : DelegatingSubchannel(
std::
move(wrapped_subchannel)),
203 XdsClusterLocalityStats* locality_stats()
const {
212 class RefCountedPicker :
public RefCounted<RefCountedPicker> {
214 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
223 class Picker :
public SubchannelPicker {
225 Picker(XdsClusterImplLb* xds_cluster_impl_lb,
226 RefCountedPtr<RefCountedPicker> picker);
228 PickResult Pick(PickArgs
args)
override;
231 class SubchannelCallTracker;
233 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
call_counter_;
235 RefCountedPtr<XdsEndpointResource::DropConfig>
drop_config_;
237 RefCountedPtr<RefCountedPicker>
picker_;
240 class Helper :
public ChannelControlHelper {
242 explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
249 RefCountedPtr<SubchannelInterface> CreateSubchannel(
252 std::unique_ptr<SubchannelPicker> picker)
override;
253 void RequestReresolution()
override;
255 void AddTraceEvent(TraceSeverity
severity,
262 ~XdsClusterImplLb()
override;
264 void ShutdownLocked()
override;
266 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
271 void MaybeUpdatePickerLocked();
274 RefCountedPtr<XdsClusterImplLbConfig>
config_;
277 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
call_counter_;
293 RefCountedPtr<RefCountedPicker>
picker_;
300 class XdsClusterImplLb::Picker::SubchannelCallTracker
301 :
public LoadBalancingPolicy::SubchannelCallTrackerInterface {
303 SubchannelCallTracker(
304 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
305 original_subchannel_call_tracker,
306 RefCountedPtr<XdsClusterLocalityStats> locality_stats,
307 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
309 std::move(original_subchannel_call_tracker)),
313 ~SubchannelCallTracker()
override {
319 void Start()
override {
352 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
355 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
call_counter_;
365 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
366 RefCountedPtr<RefCountedPicker> picker)
374 gpr_log(
GPR_INFO,
"[xds_cluster_impl_lb %p] constructed new picker %p",
375 xds_cluster_impl_lb,
this);
379 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
380 LoadBalancingPolicy::PickArgs
args) {
386 absl::StrCat(
"EDS-configured drop: ", *drop_category)));
400 "xds_cluster_impl picker not given any child picker"));
404 auto* complete_pick = absl::get_if<PickResult::Complete>(&
result.result);
405 if (complete_pick !=
nullptr) {
406 RefCountedPtr<XdsClusterLocalityStats> locality_stats;
408 auto* subchannel_wrapper =
409 static_cast<StatsSubchannelWrapper*
>(complete_pick->subchannel.get());
411 locality_stats = subchannel_wrapper->locality_stats()->Ref(
414 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
417 complete_pick->subchannel_call_tracker =
418 absl::make_unique<SubchannelCallTracker>(
419 std::move(complete_pick->subchannel_call_tracker),
435 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
439 gpr_log(
GPR_INFO,
"[xds_cluster_impl_lb %p] created -- using xds client %p",
444 XdsClusterImplLb::~XdsClusterImplLb() {
447 "[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
452 void XdsClusterImplLb::ShutdownLocked() {
461 interested_parties());
475 void XdsClusterImplLb::ResetBackoffLocked() {
481 void XdsClusterImplLb::UpdateLocked(UpdateArgs
args) {
486 const bool is_initial_update =
config_ ==
nullptr;
490 if (is_initial_update) {
491 if (
config_->lrs_load_reporting_server().has_value()) {
493 config_->lrs_load_reporting_server().value(),
config_->cluster_name(),
497 "[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
498 "LRS server %s, cluster %s, EDS service name %s, load "
499 "reporting for drops will not be done.",
500 this,
config_->lrs_load_reporting_server()->server_uri.c_str(),
501 config_->cluster_name().c_str(),
502 config_->eds_service_name().c_str());
514 old_config->lrs_load_reporting_server());
517 if (is_initial_update ||
config_->max_concurrent_requests() !=
518 old_config->max_concurrent_requests()) {
519 MaybeUpdatePickerLocked();
525 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
528 if (
config_->drop_config() !=
nullptr &&
config_->drop_config()->drop_all()) {
529 auto drop_picker = absl::make_unique<Picker>(
this,
picker_);
532 "[xds_cluster_impl_lb %p] updating connectivity (drop all): "
533 "state=READY picker=%p",
534 this, drop_picker.get());
542 auto drop_picker = absl::make_unique<Picker>(
this,
picker_);
545 "[xds_cluster_impl_lb %p] updating connectivity: state=%s "
546 "status=(%s) picker=%p",
555 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
558 lb_policy_args.work_serializer = work_serializer();
559 lb_policy_args.args =
args;
560 lb_policy_args.channel_control_helper =
562 OrphanablePtr<LoadBalancingPolicy> lb_policy =
563 MakeOrphanable<ChildPolicyHandler>(
std::move(lb_policy_args),
567 "[xds_cluster_impl_lb %p] Created new child policy handler %p",
568 this, lb_policy.get());
574 interested_parties());
578 void XdsClusterImplLb::UpdateChildPolicyLocked(
586 UpdateArgs update_args;
587 update_args.addresses =
std::move(addresses);
588 update_args.config =
config_->child_policy();
591 const_cast<char*
>(
config_->cluster_name().c_str()));
596 "[xds_cluster_impl_lb %p] Updating child policy handler %p",
this,
606 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
613 RefCountedPtr<XdsLocalityName> locality_name;
615 if (attribute !=
nullptr) {
616 const auto* locality_attr =
617 static_cast<const XdsLocalityAttribute*
>(attribute);
618 locality_name = locality_attr->locality_name();
620 RefCountedPtr<XdsClusterLocalityStats> locality_stats =
627 if (locality_stats !=
nullptr) {
628 return MakeRefCounted<StatsSubchannelWrapper>(
634 "[xds_cluster_impl_lb %p] Failed to get locality stats object for "
635 "LRS server %s, cluster %s, EDS service name %s; load reports will "
636 "not be generated (not wrapping subchannel)",
639 ->server_uri.c_str(),
648 void XdsClusterImplLb::Helper::UpdateState(
650 std::unique_ptr<SubchannelPicker> picker) {
654 "[xds_cluster_impl_lb %p] child connectivity state update: "
664 MakeRefCounted<RefCountedPicker>(
std::move(picker));
669 void XdsClusterImplLb::Helper::RequestReresolution() {
678 void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity
severity,
689 class XdsClusterImplLbFactory :
public LoadBalancingPolicyFactory {
691 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
693 RefCountedPtr<XdsClient> xds_client =
694 XdsClient::GetFromChannelArgs(*
args.args);
695 if (xds_client ==
nullptr) {
697 "XdsClient not present in channel args -- cannot instantiate "
698 "xds_cluster_impl LB policy");
701 return MakeOrphanable<XdsClusterImplLb>(
std::move(xds_client),
705 const char*
name()
const override {
return kXdsClusterImpl; }
707 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
710 if (json.type() == Json::Type::JSON_NULL) {
714 "field:loadBalancingPolicy error:xds_cluster_impl policy requires "
715 "configuration. Please use loadBalancingConfig field of service "
719 std::vector<grpc_error_handle> error_list;
721 RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
722 auto it = json.object_value().find(
"childPolicy");
723 if (
it == json.object_value().end()) {
725 "field:childPolicy error:required field missing"));
728 child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
730 if (child_policy ==
nullptr) {
732 std::vector<grpc_error_handle> child_errors;
734 error_list.push_back(
740 it = json.object_value().find(
"clusterName");
741 if (
it == json.object_value().end()) {
743 "field:clusterName error:required field missing"));
744 }
else if (
it->second.type() != Json::Type::STRING) {
746 "field:clusterName error:type should be string"));
752 it = json.object_value().find(
"edsServiceName");
753 if (
it != json.object_value().end()) {
754 if (
it->second.type() != Json::Type::STRING) {
756 "field:edsServiceName error:type should be string"));
763 it = json.object_value().find(
"lrsLoadReportingServer");
764 if (
it != json.object_value().end()) {
765 if (
it->second.type() != Json::Type::OBJECT) {
767 "field:lrsLoadReportingServer error:type should be object"));
771 it->second.object_value(), &parser_error);
774 absl::StrCat(
"errors parsing lrs_load_reporting_server")));
775 error_list.push_back(parser_error);
781 it = json.object_value().find(
"maxConcurrentRequests");
782 if (
it != json.object_value().end()) {
783 if (
it->second.type() != Json::Type::NUMBER) {
785 "field:max_concurrent_requests error:must be of type number"));
792 auto drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
793 it = json.object_value().find(
"dropCategories");
794 if (
it == json.object_value().end()) {
796 "field:dropCategories error:required field missing"));
798 std::vector<grpc_error_handle> child_errors =
799 ParseDropCategories(
it->second, drop_config.get());
800 if (!child_errors.empty()) {
802 "field:dropCategories", &child_errors));
805 if (!error_list.empty()) {
807 "xds_cluster_impl_experimental LB policy config", &error_list);
810 return MakeRefCounted<XdsClusterImplLbConfig>(
817 static std::vector<grpc_error_handle> ParseDropCategories(
818 const Json& json, XdsEndpointResource::DropConfig* drop_config) {
819 std::vector<grpc_error_handle> error_list;
820 if (json.type() != Json::Type::ARRAY) {
822 "dropCategories field is not an array"));
825 for (
size_t i = 0;
i < json.array_value().
size(); ++
i) {
826 const Json& entry = json.array_value()[
i];
827 std::vector<grpc_error_handle> child_errors =
828 ParseDropCategory(entry, drop_config);
829 if (!child_errors.empty()) {
832 for (
size_t i = 0;
i < child_errors.size(); ++
i) {
835 error_list.push_back(
error);
841 static std::vector<grpc_error_handle> ParseDropCategory(
842 const Json& json, XdsEndpointResource::DropConfig* drop_config) {
843 std::vector<grpc_error_handle> error_list;
844 if (json.type() != Json::Type::OBJECT) {
846 "dropCategories entry is not an object"));
850 auto it = json.object_value().find(
"category");
851 if (
it == json.object_value().end()) {
853 "\"category\" field not present"));
854 }
else if (
it->second.type() != Json::Type::STRING) {
856 "\"category\" field is not a string"));
858 category =
it->second.string_value();
861 it = json.object_value().find(
"requests_per_million");
862 if (
it == json.object_value().end()) {
864 "\"requests_per_million\" field is not present"));
865 }
else if (
it->second.type() != Json::Type::NUMBER) {
867 "\"requests_per_million\" field is not a number"));
869 requests_per_million =
872 if (error_list.empty()) {
873 drop_config->AddCategory(
std::move(category), requests_per_million);
888 grpc_core::g_call_counter_map =
new grpc_core::CircuitBreakerCallCounterMap();
891 absl::make_unique<grpc_core::XdsClusterImplLbFactory>());
895 delete grpc_core::g_call_counter_map;