29 #include "absl/memory/memory.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
63 #define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
71 constexpr
char kXdsClusterManager[] =
"xds_cluster_manager_experimental";
77 std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
79 explicit XdsClusterManagerLbConfig(ClusterMap cluster_map)
82 const char*
name()
const override {
return kXdsClusterManager; }
84 const ClusterMap& cluster_map()
const {
return cluster_map_; }
91 class XdsClusterManagerLb :
public LoadBalancingPolicy {
93 explicit XdsClusterManagerLb(
Args args);
95 const char*
name()
const override {
return kXdsClusterManager; }
97 void UpdateLocked(UpdateArgs
args)
override;
99 void ResetBackoffLocked()
override;
103 class ChildPickerWrapper :
public RefCounted<ChildPickerWrapper> {
106 std::unique_ptr<SubchannelPicker> picker)
119 class ClusterPicker :
public SubchannelPicker {
123 RefCountedPtr<ChildPickerWrapper>>;
127 explicit ClusterPicker(ClusterMap cluster_map)
130 PickResult Pick(PickArgs
args)
override;
137 class ClusterChild :
public InternallyRefCounted<ClusterChild> {
139 ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
141 ~ClusterChild()
override;
143 void Orphan()
override;
145 void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config>
config,
149 void ResetBackoffLocked();
150 void DeactivateLocked();
155 RefCountedPtr<ChildPickerWrapper> picker_wrapper()
const {
160 class Helper :
public ChannelControlHelper {
162 explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
169 RefCountedPtr<SubchannelInterface> CreateSubchannel(
173 std::unique_ptr<SubchannelPicker> picker)
override;
174 void RequestReresolution()
override;
176 void AddTraceEvent(TraceSeverity
severity,
184 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
208 ~XdsClusterManagerLb()
override;
210 void ShutdownLocked()
override;
212 void UpdateStateLocked();
215 RefCountedPtr<XdsClusterManagerLbConfig>
config_;
222 std::map<std::string, OrphanablePtr<ClusterChild>>
children_;
229 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
231 auto*
call_state =
static_cast<ClientChannel::LoadBalancedCall::LbCallState*
>(
237 return it->second->Pick(
args);
240 "xds cluster manager picker: unknown cluster \"",
cluster_name,
"\"")));
247 XdsClusterManagerLb::XdsClusterManagerLb(
Args args)
250 XdsClusterManagerLb::~XdsClusterManagerLb() {
254 "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
259 void XdsClusterManagerLb::ShutdownLocked() {
268 for (
auto&
p :
children_)
p.second->ExitIdleLocked();
271 void XdsClusterManagerLb::ResetBackoffLocked() {
272 for (
auto&
p :
children_)
p.second->ResetBackoffLocked();
275 void XdsClusterManagerLb::UpdateLocked(UpdateArgs
args) {
278 gpr_log(
GPR_INFO,
"[xds_cluster_manager_lb %p] Received update",
this);
286 ClusterChild*
child =
p.second.get();
288 child->DeactivateLocked();
292 for (
const auto&
p :
config_->cluster_map()) {
294 const RefCountedPtr<LoadBalancingPolicy::Config>&
config =
p.second;
296 if (
child ==
nullptr) {
306 void XdsClusterManagerLb::UpdateStateLocked() {
315 size_t num_ready = 0;
316 size_t num_connecting = 0;
318 size_t num_transient_failures = 0;
320 const auto& child_name =
p.first;
321 const ClusterChild*
child =
p.second.get();
323 if (
config_->cluster_map().find(child_name) ==
324 config_->cluster_map().end()) {
327 switch (
child->connectivity_state()) {
341 ++num_transient_failures;
352 }
else if (num_connecting > 0) {
354 }
else if (num_idle > 0) {
360 gpr_log(
GPR_INFO,
"[xds_cluster_manager_lb %p] connectivity changed to %s",
363 ClusterPicker::ClusterMap cluster_map;
364 for (
const auto&
p :
config_->cluster_map()) {
366 RefCountedPtr<ChildPickerWrapper>& child_picker = cluster_map[
cluster_name];
368 if (child_picker ==
nullptr) {
371 "[xds_cluster_manager_lb %p] child %s has not yet returned a "
372 "picker; creating a QueuePicker.",
375 child_picker = MakeRefCounted<ChildPickerWrapper>(
380 std::unique_ptr<SubchannelPicker> picker =
381 absl::make_unique<ClusterPicker>(
std::move(cluster_map));
385 "TRANSIENT_FAILURE from XdsClusterManagerLb");
387 channel_control_helper()->UpdateState(connectivity_state,
status,
395 XdsClusterManagerLb::ClusterChild::ClusterChild(
396 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
402 "[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
406 grpc_schedule_on_exec_ctx);
409 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
412 "[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
419 void XdsClusterManagerLb::ClusterChild::Orphan() {
422 "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
423 "shutting down child",
442 OrphanablePtr<LoadBalancingPolicy>
443 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
446 lb_policy_args.work_serializer =
448 lb_policy_args.args =
args;
449 lb_policy_args.channel_control_helper =
451 OrphanablePtr<LoadBalancingPolicy> lb_policy =
452 MakeOrphanable<ChildPolicyHandler>(
std::move(lb_policy_args),
456 "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
466 lb_policy->interested_parties(),
471 void XdsClusterManagerLb::ClusterChild::UpdateLocked(
472 RefCountedPtr<LoadBalancingPolicy::Config>
config,
487 UpdateArgs update_args;
489 update_args.addresses = addresses;
494 "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
507 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
511 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
525 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer(
527 ClusterChild*
self =
static_cast<ClusterChild*
>(
arg);
529 self->xds_cluster_manager_policy_->work_serializer()->Run(
530 [
self,
error]() {
self->OnDelayedRemovalTimerLocked(
error); },
534 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
548 RefCountedPtr<SubchannelInterface>
549 XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
555 ->channel_control_helper()
559 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
561 std::unique_ptr<SubchannelPicker> picker) {
565 "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
590 void XdsClusterManagerLb::ClusterChild::Helper::RequestReresolution() {
595 ->channel_control_helper()
596 ->RequestReresolution();
601 ->channel_control_helper()
605 void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
611 ->channel_control_helper()
619 class XdsClusterManagerLbFactory :
public LoadBalancingPolicyFactory {
621 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
626 const char*
name()
const override {
return kXdsClusterManager; }
628 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
631 if (json.type() == Json::Type::JSON_NULL) {
635 "field:loadBalancingPolicy error:xds_cluster_manager policy requires "
636 "configuration. Please use loadBalancingConfig field of service "
640 std::vector<grpc_error_handle> error_list;
641 XdsClusterManagerLbConfig::ClusterMap cluster_map;
643 auto it = json.object_value().find(
"children");
644 if (
it == json.object_value().end()) {
646 "field:children error:required field not present"));
647 }
else if (
it->second.type() != Json::Type::OBJECT) {
649 "field:children error:type should be object"));
651 for (
const auto&
p :
it->second.object_value()) {
653 if (child_name.empty()) {
655 "field:children element error: name cannot be empty"));
658 RefCountedPtr<LoadBalancingPolicy::Config> child_config;
659 std::vector<grpc_error_handle> child_errors =
660 ParseChildConfig(
p.second, &child_config);
661 if (!child_errors.empty()) {
663 absl::StrCat(
"field:children name:", child_name), &child_errors));
665 cluster_map[child_name] =
std::move(child_config);
666 clusters_to_be_used.insert(child_name);
670 if (cluster_map.empty()) {
671 error_list.push_back(
674 if (!error_list.empty()) {
676 "xds_cluster_manager_experimental LB policy config", &error_list);
679 return MakeRefCounted<XdsClusterManagerLbConfig>(
std::move(cluster_map));
683 static std::vector<grpc_error_handle> ParseChildConfig(
685 RefCountedPtr<LoadBalancingPolicy::Config>* child_config) {
686 std::vector<grpc_error_handle> error_list;
687 if (json.type() != Json::Type::OBJECT) {
689 "value should be of type object"));
692 auto it = json.object_value().find(
"childPolicy");
693 if (
it == json.object_value().end()) {
694 error_list.push_back(
698 *child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
700 if (*child_config ==
nullptr) {
702 std::vector<grpc_error_handle> child_errors;
704 error_list.push_back(
723 absl::make_unique<grpc_core::XdsClusterManagerLbFactory>());