30 #include "absl/memory/memory.h"
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
69 constexpr
char kPriority[] =
"priority_experimental";
83 struct PriorityLbChild {
84 RefCountedPtr<LoadBalancingPolicy::Config>
config;
88 PriorityLbConfig(std::map<std::string, PriorityLbChild>
children,
89 std::vector<std::string> priorities)
92 const char*
name()
const override {
return kPriority; }
94 const std::map<std::string, PriorityLbChild>&
children()
const {
97 const std::vector<std::string>& priorities()
const {
return priorities_; }
105 class PriorityLb :
public LoadBalancingPolicy {
109 const char*
name()
const override {
return kPriority; }
111 void UpdateLocked(UpdateArgs
args)
override;
113 void ResetBackoffLocked()
override;
117 class ChildPriority :
public InternallyRefCounted<ChildPriority> {
119 ChildPriority(RefCountedPtr<PriorityLb> priority_policy,
std::string name);
121 ~ChildPriority()
override {
127 void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config>
config,
130 void ResetBackoffLocked();
131 void MaybeDeactivateLocked();
132 void MaybeReactivateLocked();
134 void Orphan()
override;
136 std::unique_ptr<SubchannelPicker> GetPicker();
146 bool FailoverTimerPending()
const {
return failover_timer_ !=
nullptr; }
150 class RefCountedPicker :
public RefCounted<RefCountedPicker> {
152 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
161 class RefCountedPickerWrapper :
public SubchannelPicker {
163 explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
165 PickResult Pick(PickArgs
args)
override {
return picker_->Pick(
args); }
168 RefCountedPtr<RefCountedPicker>
picker_;
171 class Helper :
public ChannelControlHelper {
173 explicit Helper(RefCountedPtr<ChildPriority>
priority)
178 RefCountedPtr<SubchannelInterface> CreateSubchannel(
182 std::unique_ptr<SubchannelPicker> picker)
override;
183 void RequestReresolution()
override;
185 void AddTraceEvent(TraceSeverity
severity,
192 class DeactivationTimer :
public InternallyRefCounted<DeactivationTimer> {
194 explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority);
196 void Orphan()
override;
208 class FailoverTimer :
public InternallyRefCounted<FailoverTimer> {
210 explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority);
212 void Orphan()
override;
225 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
228 void OnConnectivityStateUpdateLocked(
230 std::unique_ptr<SubchannelPicker> picker);
248 ~PriorityLb()
override;
250 void ShutdownLocked()
override;
259 void HandleChildConnectivityStateChangeLocked(ChildPriority*
child);
262 void DeleteChild(ChildPriority*
child);
275 void ChoosePriorityLocked();
296 std::map<std::string, OrphanablePtr<ChildPriority>>
children_;
314 {
static_cast<int>(kDefaultChildFailoverTimeout.millis()), 0,
321 PriorityLb::~PriorityLb() {
323 gpr_log(
GPR_INFO,
"[priority_lb %p] destroying priority LB policy",
this);
328 void PriorityLb::ShutdownLocked() {
341 "[priority_lb %p] exiting IDLE for current priority %d child %s",
348 void PriorityLb::ResetBackoffLocked() {
349 for (
const auto&
p :
children_)
p.second->ResetBackoffLocked();
352 void PriorityLb::UpdateLocked(UpdateArgs
args) {
378 auto config_it =
config_->children().find(child_name);
379 if (config_it ==
config_->children().end()) {
381 child->MaybeDeactivateLocked();
384 child->UpdateLocked(config_it->second.config,
385 config_it->second.ignore_reresolution_requests);
390 ChoosePriorityLocked();
393 uint32_t PriorityLb::GetChildPriorityLocked(
402 void PriorityLb::HandleChildConnectivityStateChangeLocked(
403 ChildPriority*
child) {
415 "[priority_lb %p] state update for current child from before "
423 channel_control_helper()->UpdateState(
child->connectivity_state(),
424 child->connectivity_status(),
433 ChoosePriorityLocked();
438 uint32_t child_priority = GetChildPriorityLocked(
child->name());
441 "[priority_lb %p] state update for priority %u, child %s, current "
447 ChoosePriorityLocked();
450 void PriorityLb::DeleteChild(ChildPriority*
child) {
458 ChoosePriorityLocked();
463 void PriorityLb::ChoosePriorityLocked() {
465 if (
config_->priorities().empty()) {
469 channel_control_helper()->UpdateState(
471 absl::make_unique<TransientFailurePicker>(
status));
482 gpr_log(
GPR_INFO,
"[priority_lb %p] trying priority %u, child %s",
this,
486 if (
child ==
nullptr) {
494 channel_control_helper()->UpdateState(
499 child = MakeOrphanable<ChildPriority>(
501 auto child_config =
config_->children().find(child_name);
503 child->UpdateLocked(child_config->second.config,
504 child_config->second.ignore_reresolution_requests);
508 child->MaybeReactivateLocked();
517 if (
child->FailoverTimerPending()) {
520 "[priority_lb %p] priority %u, child %s: child still "
521 "attempting to connect, will wait",
522 this,
priority, child_name.c_str());
528 channel_control_helper()->UpdateState(
child->connectivity_state(),
529 child->connectivity_status(),
537 "[priority_lb %p] skipping priority %u, child %s: state=%s, "
538 "failover timer not pending",
547 "[priority_lb %p] no priority reachable, checking for CONNECTING "
548 "priority to delegate to",
556 gpr_log(
GPR_INFO,
"[priority_lb %p] trying priority %u, child %s",
this,
562 channel_control_helper()->UpdateState(
child->connectivity_state(),
563 child->connectivity_status(),
572 "[priority_lb %p] no priority in CONNECTING, delegating to "
573 "lowest priority child %s",
574 this, child_name.c_str());
578 channel_control_helper()->UpdateState(
child->connectivity_state(),
579 child->connectivity_status(),
585 gpr_log(
GPR_INFO,
"[priority_lb %p] selected priority %u, child %s",
this,
594 if (
it !=
children_.end())
it->second->MaybeDeactivateLocked();
598 channel_control_helper()->UpdateState(
child->connectivity_state(),
599 child->connectivity_status(),
607 PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
608 RefCountedPtr<PriorityLb::ChildPriority> child_priority)
612 "[priority_lb %p] child %s (%p): deactivating -- will remove in "
616 kChildRetentionInterval.millis());
624 void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
637 void PriorityLb::ChildPriority::DeactivationTimer::OnTimer(
639 auto*
self =
static_cast<DeactivationTimer*
>(
arg);
641 self->child_priority_->priority_policy_->work_serializer()->Run(
645 void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked(
650 "[priority_lb %p] child %s (%p): deactivation timer fired, "
666 PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
667 RefCountedPtr<PriorityLb::ChildPriority> child_priority)
672 "[priority_lb %p] child %s (%p): starting failover timer for %" PRId64
687 void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
691 "[priority_lb %p] child %s (%p): cancelling failover timer",
701 void PriorityLb::ChildPriority::FailoverTimer::OnTimer(
703 auto*
self =
static_cast<FailoverTimer*
>(
arg);
705 self->child_priority_->priority_policy_->work_serializer()->Run(
709 void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked(
714 "[priority_lb %p] child %s (%p): failover timer fired, "
715 "reporting TRANSIENT_FAILURE",
733 PriorityLb::ChildPriority::ChildPriority(
744 void PriorityLb::ChildPriority::Orphan() {
762 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker>
763 PriorityLb::ChildPriority::GetPicker() {
765 return absl::make_unique<QueuePicker>(
771 void PriorityLb::ChildPriority::UpdateLocked(
772 RefCountedPtr<LoadBalancingPolicy::Config>
config,
785 UpdateArgs update_args;
796 "[priority_lb %p] child %s (%p): updating child policy handler %p",
802 OrphanablePtr<LoadBalancingPolicy>
803 PriorityLb::ChildPriority::CreateChildPolicyLocked(
807 lb_policy_args.args =
args;
808 lb_policy_args.channel_control_helper =
810 OrphanablePtr<LoadBalancingPolicy> lb_policy =
811 MakeOrphanable<ChildPolicyHandler>(
std::move(lb_policy_args),
815 "[priority_lb %p] child %s (%p): created new child policy "
831 void PriorityLb::ChildPriority::ResetBackoffLocked() {
835 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
837 std::unique_ptr<SubchannelPicker> picker) {
840 "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
870 void PriorityLb::ChildPriority::MaybeDeactivateLocked() {
876 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
884 RefCountedPtr<SubchannelInterface>
885 PriorityLb::ChildPriority::Helper::CreateSubchannel(
887 if (
priority_->priority_policy_->shutting_down_)
return nullptr;
888 return priority_->priority_policy_->channel_control_helper()
892 void PriorityLb::ChildPriority::Helper::UpdateState(
894 std::unique_ptr<SubchannelPicker> picker) {
895 if (
priority_->priority_policy_->shutting_down_)
return;
900 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
901 if (
priority_->priority_policy_->shutting_down_)
return;
902 if (
priority_->ignore_reresolution_requests_) {
905 priority_->priority_policy_->channel_control_helper()->RequestReresolution();
909 return priority_->priority_policy_->channel_control_helper()->GetAuthority();
912 void PriorityLb::ChildPriority::Helper::AddTraceEvent(
914 if (
priority_->priority_policy_->shutting_down_)
return;
923 class PriorityLbFactory :
public LoadBalancingPolicyFactory {
925 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
930 const char*
name()
const override {
return kPriority; }
932 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
935 if (json.type() == Json::Type::JSON_NULL) {
939 "field:loadBalancingPolicy error:priority policy requires "
940 "configuration. Please use loadBalancingConfig field of service "
944 std::vector<grpc_error_handle> error_list;
946 std::map<std::string, PriorityLbConfig::PriorityLbChild>
children;
947 auto it = json.object_value().find(
"children");
948 if (
it == json.object_value().end()) {
950 "field:children error:required field missing"));
951 }
else if (
it->second.type() != Json::Type::OBJECT) {
953 "field:children error:type should be object"));
955 const Json::Object&
object =
it->second.object_value();
956 for (
const auto&
p :
object) {
959 if (
element.type() != Json::Type::OBJECT) {
962 " error:should be type object")));
964 auto it2 =
element.object_value().find(
"config");
965 if (it2 ==
element.object_value().end()) {
968 " error:missing 'config' field")));
971 auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
973 bool ignore_resolution_requests =
false;
977 element.object_value().find(
"ignore_reresolution_requests");
978 if (it3 !=
element.object_value().end()) {
979 if (it3->second.type() == Json::Type::JSON_TRUE) {
980 ignore_resolution_requests =
true;
981 }
else if (it3->second.type() != Json::Type::JSON_FALSE) {
984 " field:ignore_reresolution_requests:should "
985 "be type boolean")));
990 error_list.push_back(
997 children[child_name].ignore_reresolution_requests =
998 ignore_resolution_requests;
1004 std::vector<std::string> priorities;
1005 it = json.object_value().find(
"priorities");
1006 if (
it == json.object_value().end()) {
1008 "field:priorities error:required field missing"));
1009 }
else if (
it->second.type() != Json::Type::ARRAY) {
1011 "field:priorities error:type should be array"));
1013 const Json::Array&
array =
it->second.array_value();
1014 for (
size_t i = 0;
i <
array.size(); ++
i) {
1016 if (
element.type() != Json::Type::STRING) {
1018 "field:priorities element:",
i,
" error:should be type string")));
1021 "field:priorities element:",
i,
" error:unknown child '",
1022 element.string_value(),
"'")));
1024 priorities.emplace_back(
element.string_value());
1027 if (priorities.size() !=
children.size()) {
1029 "field:priorities error:priorities size (", priorities.size(),
1030 ") != children size (",
children.size(),
")")));
1033 if (error_list.empty()) {
1038 "priority_experimental LB policy config", &error_list);
1055 absl::make_unique<grpc_core::PriorityLbFactory>());