29 #include "absl/memory/memory.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
71 using ::grpc_event_engine::experimental::EventEngine;
74 constexpr
char kWeightedTarget[] =
"weighted_target_experimental";
85 RefCountedPtr<LoadBalancingPolicy::Config>
config;
88 using TargetMap = std::map<std::string, ChildConfig>;
90 explicit WeightedTargetLbConfig(TargetMap target_map)
93 const char*
name()
const override {
return kWeightedTarget; }
95 const TargetMap& target_map()
const {
return target_map_; }
102 class WeightedTargetLb :
public LoadBalancingPolicy {
104 explicit WeightedTargetLb(
Args args);
106 const char*
name()
const override {
return kWeightedTarget; }
108 void UpdateLocked(UpdateArgs
args)
override;
109 void ResetBackoffLocked()
override;
113 class ChildPickerWrapper :
public RefCounted<ChildPickerWrapper> {
115 explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)
125 class WeightedPicker :
public SubchannelPicker {
132 std::vector<std::pair<uint32_t, RefCountedPtr<ChildPickerWrapper>>>;
134 explicit WeightedPicker(PickerList pickers)
137 PickResult Pick(PickArgs
args)
override;
144 class WeightedChild :
public InternallyRefCounted<WeightedChild> {
146 WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,
148 ~WeightedChild()
override;
150 void Orphan()
override;
152 void UpdateLocked(
const WeightedTargetLbConfig::ChildConfig&
config,
155 void ResetBackoffLocked();
156 void DeactivateLocked();
162 RefCountedPtr<ChildPickerWrapper> picker_wrapper()
const {
167 class Helper :
public ChannelControlHelper {
169 explicit Helper(RefCountedPtr<WeightedChild> weighted_child)
174 RefCountedPtr<SubchannelInterface> CreateSubchannel(
178 std::unique_ptr<SubchannelPicker> picker)
override;
179 void RequestReresolution()
override;
181 void AddTraceEvent(TraceSeverity
severity,
188 class DelayedRemovalTimer
189 :
public InternallyRefCounted<DelayedRemovalTimer> {
191 explicit DelayedRemovalTimer(RefCountedPtr<WeightedChild> weighted_child);
193 void Orphan()
override;
196 void OnTimerLocked();
203 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
206 void OnConnectivityStateUpdateLocked(
208 std::unique_ptr<SubchannelPicker> picker);
225 ~WeightedTargetLb()
override;
227 void ShutdownLocked()
override;
229 void UpdateStateLocked();
232 RefCountedPtr<WeightedTargetLbConfig>
config_;
239 std::map<std::string, OrphanablePtr<WeightedChild>>
targets_;
246 WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
252 size_t start_index = 0;
253 size_t end_index =
pickers_.size() - 1;
255 while (end_index > start_index) {
256 mid = (start_index + end_index) / 2;
260 start_index = mid + 1;
276 WeightedTargetLb::WeightedTargetLb(
Args args)
283 WeightedTargetLb::~WeightedTargetLb() {
286 "[weighted_target_lb %p] destroying weighted_target LB policy",
291 void WeightedTargetLb::ShutdownLocked() {
299 void WeightedTargetLb::ResetBackoffLocked() {
300 for (
auto&
p :
targets_)
p.second->ResetBackoffLocked();
303 void WeightedTargetLb::UpdateLocked(UpdateArgs
args) {
314 WeightedChild*
child =
p.second.get();
316 child->DeactivateLocked();
322 for (
const auto&
p :
config_->target_map()) {
324 const WeightedTargetLbConfig::ChildConfig&
config =
p.second;
328 target = MakeOrphanable<WeightedChild>(
332 if (address_map.
ok()) {
335 addresses = address_map.
status();
340 if (
config_->target_map().empty()) {
342 "no children in weighted_target policy: ",
args.resolution_note));
343 channel_control_helper()->UpdateState(
345 absl::make_unique<TransientFailurePicker>(
status));
351 void WeightedTargetLb::UpdateStateLocked() {
360 "[weighted_target_lb %p] scanning children to determine "
361 "connectivity state",
369 WeightedPicker::PickerList ready_picker_list;
371 WeightedPicker::PickerList tf_picker_list;
375 size_t num_connecting = 0;
379 const WeightedChild*
child =
p.second.get();
381 if (
config_->target_map().find(child_name) ==
config_->target_map().end()) {
386 "[weighted_target_lb %p] child=%s state=%s weight=%d picker=%p",
387 this, child_name.c_str(),
389 child->weight(),
child->picker_wrapper().get());
391 switch (
child->connectivity_state()) {
394 ready_end +=
child->weight();
395 ready_picker_list.emplace_back(ready_end,
child->picker_wrapper());
408 tf_end +=
child->weight();
409 tf_picker_list.emplace_back(tf_end,
child->picker_wrapper());
418 if (!ready_picker_list.empty()) {
420 }
else if (num_connecting > 0) {
422 }
else if (num_idle > 0) {
428 gpr_log(
GPR_INFO,
"[weighted_target_lb %p] connectivity changed to %s",
431 std::unique_ptr<SubchannelPicker> picker;
433 switch (connectivity_state) {
435 picker = absl::make_unique<WeightedPicker>(
std::move(ready_picker_list));
443 picker = absl::make_unique<WeightedPicker>(
std::move(tf_picker_list));
445 channel_control_helper()->UpdateState(connectivity_state,
status,
453 WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
454 RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
457 kChildRetentionInterval, [
self =
Ref()]()
mutable {
458 self->weighted_child_->weighted_target_policy_->work_serializer()->
Run(
459 [
self =
std::move(
self)] {
self->OnTimerLocked(); },
464 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
468 "[weighted_target_lb %p] WeightedChild %p %s: cancelling "
469 "delayed removal timer",
478 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() {
489 WeightedTargetLb::WeightedChild::WeightedChild(
490 RefCountedPtr<WeightedTargetLb> weighted_target_policy,
494 gpr_log(
GPR_INFO,
"[weighted_target_lb %p] created WeightedChild %p for %s",
499 WeightedTargetLb::WeightedChild::~WeightedChild() {
502 "[weighted_target_lb %p] WeightedChild %p %s: destroying child",
508 void WeightedTargetLb::WeightedChild::Orphan() {
511 "[weighted_target_lb %p] WeightedChild %p %s: shutting down child",
527 OrphanablePtr<LoadBalancingPolicy>
528 WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
532 lb_policy_args.args =
args;
533 lb_policy_args.channel_control_helper =
535 OrphanablePtr<LoadBalancingPolicy> lb_policy =
536 MakeOrphanable<ChildPolicyHandler>(
std::move(lb_policy_args),
540 "[weighted_target_lb %p] WeightedChild %p %s: Created new child "
549 lb_policy->interested_parties(),
554 void WeightedTargetLb::WeightedChild::UpdateLocked(
555 const WeightedTargetLbConfig::ChildConfig&
config,
565 "[weighted_target_lb %p] WeightedChild %p %s: reactivating",
575 UpdateArgs update_args;
576 update_args.config =
config.config;
577 update_args.addresses =
std::move(addresses);
582 "[weighted_target_lb %p] WeightedChild %p %s: Updating child "
590 void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
594 void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
596 std::unique_ptr<SubchannelPicker> picker) {
601 "[weighted_target_lb %p] WeightedChild %p %s: connectivity "
602 "state update: state=%s (%s) picker_wrapper=%p",
620 void WeightedTargetLb::WeightedChild::DeactivateLocked() {
625 "[weighted_target_lb %p] WeightedChild %p %s: deactivating",
639 RefCountedPtr<SubchannelInterface>
640 WeightedTargetLb::WeightedChild::Helper::CreateSubchannel(
642 if (
weighted_child_->weighted_target_policy_->shutting_down_)
return nullptr;
643 return weighted_child_->weighted_target_policy_->channel_control_helper()
647 void WeightedTargetLb::WeightedChild::Helper::UpdateState(
649 std::unique_ptr<SubchannelPicker> picker) {
655 void WeightedTargetLb::WeightedChild::Helper::RequestReresolution() {
658 ->RequestReresolution();
662 return weighted_child_->weighted_target_policy_->channel_control_helper()
666 void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent(
677 class WeightedTargetLbFactory :
public LoadBalancingPolicyFactory {
679 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
684 const char*
name()
const override {
return kWeightedTarget; }
686 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
689 if (json.type() == Json::Type::JSON_NULL) {
693 "field:loadBalancingPolicy error:weighted_target policy requires "
694 "configuration. Please use loadBalancingConfig field of service "
698 std::vector<grpc_error_handle> error_list;
700 WeightedTargetLbConfig::TargetMap target_map;
701 auto it = json.object_value().find(
"targets");
702 if (
it == json.object_value().end()) {
704 "field:targets error:required field not present"));
705 }
else if (
it->second.type() != Json::Type::OBJECT) {
707 "field:targets error:type should be object"));
709 for (
const auto&
p :
it->second.object_value()) {
710 WeightedTargetLbConfig::ChildConfig child_config;
711 std::vector<grpc_error_handle> child_errors =
712 ParseChildConfig(
p.second, &child_config);
713 if (!child_errors.empty()) {
715 absl::StrCat(
"field:targets key:",
p.first), &child_errors));
717 target_map[
p.first] =
std::move(child_config);
721 if (!error_list.empty()) {
723 "weighted_target_experimental LB policy config", &error_list);
726 return MakeRefCounted<WeightedTargetLbConfig>(
std::move(target_map));
730 static std::vector<grpc_error_handle> ParseChildConfig(
731 const Json& json, WeightedTargetLbConfig::ChildConfig* child_config) {
732 std::vector<grpc_error_handle> error_list;
733 if (json.type() != Json::Type::OBJECT) {
735 "value should be of type object"));
739 auto it = json.object_value().find(
"weight");
740 if (
it == json.object_value().end()) {
742 "required field \"weight\" not specified"));
743 }
else if (
it->second.type() != Json::Type::NUMBER) {
745 "field:weight error:must be of type number"));
750 "field:weight error:unparseable value"));
753 "field:weight error:value must be greater than zero"));
755 child_config->weight =
weight;
759 it = json.object_value().find(
"childPolicy");
760 if (
it != json.object_value().end()) {
762 child_config->config =
763 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second,
765 if (child_config->config ==
nullptr) {
767 std::vector<grpc_error_handle> child_errors;
769 error_list.push_back(
788 absl::make_unique<grpc_core::WeightedTargetLbFactory>());