28 #include "absl/memory/memory.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/types/optional.h"
62 constexpr
char kRoundRobin[] =
"round_robin";
64 class RoundRobin :
public LoadBalancingPolicy {
68 const char*
name()
const override {
return kRoundRobin; }
70 void UpdateLocked(UpdateArgs
args)
override;
71 void ResetBackoffLocked()
override;
74 ~RoundRobin()
override;
77 class RoundRobinSubchannelList;
83 class RoundRobinSubchannelData
84 :
public SubchannelData<RoundRobinSubchannelList,
85 RoundRobinSubchannelData> {
87 RoundRobinSubchannelData(
88 SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
90 const ServerAddress& address,
101 void ProcessConnectivityChangeLocked(
106 void UpdateLogicalConnectivityStateLocked(
118 class RoundRobinSubchannelList
119 :
public SubchannelList<RoundRobinSubchannelList,
120 RoundRobinSubchannelData> {
124 : SubchannelList(policy,
126 ?
"RoundRobinSubchannelList"
128 std::move(addresses), policy->channel_control_helper(),
136 ~RoundRobinSubchannelList()
override {
137 RoundRobin*
p =
static_cast<RoundRobin*
>(policy());
143 void UpdateStateCountersLocked(
150 void MaybeUpdateRoundRobinConnectivityStateLocked(
155 return absl::StrCat(
"num_subchannels=", num_subchannels(),
168 class Picker :
public SubchannelPicker {
170 Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list);
172 PickResult Pick(PickArgs
args)
override;
182 void ShutdownLocked()
override;
199 RoundRobin::Picker::Picker(RoundRobin* parent,
200 RoundRobinSubchannelList* subchannel_list)
202 for (
size_t i = 0;
i < subchannel_list->num_subchannels(); ++
i) {
203 RoundRobinSubchannelData* sd = subchannel_list->subchannel(
i);
216 "[RR %p picker %p] created picker from subchannel_list=%p "
217 "with %" PRIuPTR
" READY subchannels; last_picked_index_=%" PRIuPTR,
223 RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs ) {
227 "[RR %p picker %p] returning index %" PRIuPTR
", subchannel=%p",
244 RoundRobin::~RoundRobin() {
252 void RoundRobin::ShutdownLocked() {
261 void RoundRobin::ResetBackoffLocked() {
268 void RoundRobin::UpdateLocked(UpdateArgs
args) {
270 if (
args.addresses.ok()) {
272 gpr_log(
GPR_INFO,
"[RR %p] received update with %" PRIuPTR
" addresses",
273 this,
args.addresses->size());
278 gpr_log(
GPR_INFO,
"[RR %p] received update with address error: %s",
this,
279 args.addresses.status().ToString().c_str());
288 gpr_log(
GPR_INFO,
"[RR %p] replacing previous pending subchannel list %p",
299 gpr_log(
GPR_INFO,
"[RR %p] replacing previous subchannel list %p",
this,
305 "empty address list: ",
args.resolution_note))
307 channel_control_helper()->UpdateState(
309 absl::make_unique<TransientFailurePicker>(
status));
315 channel_control_helper()->UpdateState(
325 void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
351 void RoundRobin::RoundRobinSubchannelList::
352 MaybeUpdateRoundRobinConnectivityStateLocked(
absl::Status status_for_tf) {
353 RoundRobin*
p =
static_cast<RoundRobin*
>(policy());
361 if (
p->latest_pending_subchannel_list_.get() ==
this &&
362 (
p->subchannel_list_->num_ready_ == 0 ||
num_ready_ > 0 ||
366 p->subchannel_list_ !=
nullptr ?
p->subchannel_list_->CountersString()
370 "[RR %p] swapping out subchannel list %p (%s) in favor of %p (%s)",
p,
371 p->subchannel_list_.get(), old_counters_string.c_str(),
this,
372 CountersString().
c_str());
374 p->subchannel_list_ =
std::move(
p->latest_pending_subchannel_list_);
377 if (
p->subchannel_list_.get() !=
this)
return;
387 p->channel_control_helper()->UpdateState(
391 gpr_log(
GPR_INFO,
"[RR %p] reporting CONNECTING with subchannel list %p",
394 p->channel_control_helper()->UpdateState(
396 absl::make_unique<QueuePicker>(
p->Ref(
DEBUG_LOCATION,
"QueuePicker")));
400 "[RR %p] reporting TRANSIENT_FAILURE with subchannel list %p: %s",
401 p,
this, status_for_tf.
ToString().c_str());
403 if (!status_for_tf.
ok()) {
405 absl::StrCat(
"connections to all backends failing; last error: ",
408 p->channel_control_helper()->UpdateState(
418 void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
421 RoundRobin*
p =
static_cast<RoundRobin*
>(subchannel_list()->policy());
431 "[RR %p] Subchannel %p reported %s; requesting re-resolution",
p,
434 p->channel_control_helper()->RequestReresolution();
439 "[RR %p] Subchannel %p reported IDLE; requesting connection",
p,
445 UpdateLogicalConnectivityStateLocked(new_state);
447 subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked(
448 connectivity_status());
451 void RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
453 RoundRobin*
p =
static_cast<RoundRobin*
>(subchannel_list()->policy());
457 "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
458 "(index %" PRIuPTR
" of %" PRIuPTR
"): prev_state=%s new_state=%s",
460 subchannel_list()->num_subchannels(),
479 "[RR %p] subchannel %p, subchannel_list %p (index %" PRIuPTR
480 " of %" PRIuPTR
"): treating IDLE as CONNECTING",
482 subchannel_list()->num_subchannels());
503 const char*
name()
const override {
return kRoundRobin; }
506 class RoundRobinFactory :
public LoadBalancingPolicyFactory {
508 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
513 const char*
name()
const override {
return kRoundRobin; }
515 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
517 return MakeRefCounted<RoundRobinConfig>();
528 absl::make_unique<grpc_core::RoundRobinFactory>());