28 #include "absl/memory/memory.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/types/optional.h"
63 constexpr
char kPickFirst[] =
"pick_first";
65 class PickFirst :
public LoadBalancingPolicy {
69 const char*
name()
const override {
return kPickFirst; }
71 void UpdateLocked(UpdateArgs
args)
override;
73 void ResetBackoffLocked()
override;
76 ~PickFirst()
override;
78 class PickFirstSubchannelList;
80 class PickFirstSubchannelData
81 :
public SubchannelData<PickFirstSubchannelList,
82 PickFirstSubchannelData> {
84 PickFirstSubchannelData(
85 SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
87 const ServerAddress& address,
91 void ProcessConnectivityChangeLocked(
96 void ProcessUnselectedReadyLocked();
99 class PickFirstSubchannelList
100 :
public SubchannelList<PickFirstSubchannelList,
101 PickFirstSubchannelData> {
105 : SubchannelList(policy,
107 ?
"PickFirstSubchannelList"
109 std::
move(addresses), policy->channel_control_helper(),
120 ~PickFirstSubchannelList()
override {
121 PickFirst*
p =
static_cast<PickFirst*
>(policy());
126 void set_in_transient_failure(
bool in_transient_failure) {
133 bool AllSubchannelsSeenInitialState() {
134 for (
size_t i = 0;
i < num_subchannels(); ++
i) {
135 if (!
subchannel(i)->connectivity_state().has_value())
return false;
145 class Picker :
public SubchannelPicker {
147 explicit Picker(RefCountedPtr<SubchannelInterface>
subchannel)
150 PickResult Pick(PickArgs )
override {
158 void ShutdownLocked()
override;
160 void AttemptToConnectUsingLatestUpdateArgsLocked();
182 PickFirst::~PickFirst() {
190 void PickFirst::ShutdownLocked() {
206 AttemptToConnectUsingLatestUpdateArgsLocked();
210 void PickFirst::ResetBackoffLocked() {
217 void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
227 "[PF %p] Shutting down previous pending subchannel list %p",
this,
241 channel_control_helper()->UpdateState(
243 absl::make_unique<TransientFailurePicker>(
status));
247 channel_control_helper()->UpdateState(
265 void PickFirst::UpdateLocked(UpdateArgs
args) {
267 if (
args.addresses.ok()) {
269 "Pick First %p received update with %" PRIuPTR
" addresses",
this,
270 args.addresses->size());
272 gpr_log(
GPR_INFO,
"Pick First %p received update with address error: %s",
273 this,
args.addresses.status().ToString().c_str());
293 AttemptToConnectUsingLatestUpdateArgsLocked();
297 void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
300 PickFirst*
p =
static_cast<PickFirst*
>(subchannel_list()->policy());
303 GPR_ASSERT(subchannel_list() ==
p->subchannel_list_.get() ||
304 subchannel_list() ==
p->latest_pending_subchannel_list_.get());
307 if (
p->selected_ ==
this) {
308 GPR_ASSERT(subchannel_list() ==
p->subchannel_list_.get());
311 "Pick First %p selected subchannel connectivity changed to %s",
p,
317 if (
p->latest_pending_subchannel_list_ !=
nullptr) {
320 "Pick First %p promoting pending subchannel list %p to "
322 p,
p->latest_pending_subchannel_list_.get(),
323 p->subchannel_list_.get());
325 p->selected_ =
nullptr;
326 p->subchannel_list_ =
std::move(
p->latest_pending_subchannel_list_);
328 if (
p->subchannel_list_->in_transient_failure()) {
330 "selected subchannel failed; switching to pending update; "
333 ->subchannel(
p->subchannel_list_->num_subchannels())
334 ->connectivity_status()
336 p->channel_control_helper()->UpdateState(
338 absl::make_unique<TransientFailurePicker>(
status));
340 p->channel_control_helper()->UpdateState(
342 absl::make_unique<QueuePicker>(
350 p->channel_control_helper()->RequestReresolution();
356 p->selected_ =
nullptr;
357 p->subchannel_list_.reset();
358 p->channel_control_helper()->UpdateState(
360 absl::make_unique<QueuePicker>(
p->Ref(
DEBUG_LOCATION,
"QueuePicker")));
374 subchannel_list()->set_in_transient_failure(
false);
375 ProcessUnselectedReadyLocked();
384 if (subchannel_list()->AllSubchannelsSeenInitialState()) {
385 subchannel_list()->subchannel(0)->subchannel()->RequestConnection();
391 if (
Index() != subchannel_list()->attempting_index())
return;
398 size_t next_index = (
Index() + 1) % subchannel_list()->num_subchannels();
399 subchannel_list()->set_attempting_index(next_index);
400 PickFirstSubchannelData* sd = subchannel_list()->subchannel(next_index);
402 if (sd->Index() == 0) {
405 "Pick First %p subchannel list %p failed to connect to "
407 p, subchannel_list());
409 subchannel_list()->set_in_transient_failure(
true);
413 if (subchannel_list() ==
p->latest_pending_subchannel_list_.get()) {
416 "Pick First %p promoting pending subchannel list %p to "
418 p,
p->latest_pending_subchannel_list_.get(),
419 p->subchannel_list_.get());
421 p->selected_ =
nullptr;
422 p->subchannel_list_ =
std::move(
p->latest_pending_subchannel_list_);
427 if (subchannel_list() ==
p->subchannel_list_.get()) {
428 p->channel_control_helper()->RequestReresolution();
430 absl::StrCat(
"failed to connect to all addresses; last error: ",
432 p->channel_control_helper()->UpdateState(
434 absl::make_unique<TransientFailurePicker>(
status));
443 auto sd_state = sd->connectivity_state();
445 sd->subchannel()->RequestConnection();
456 if (subchannel_list() ==
p->subchannel_list_.get() &&
457 !subchannel_list()->in_transient_failure()) {
458 p->channel_control_helper()->UpdateState(
460 absl::make_unique<QueuePicker>(
470 void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
471 PickFirst*
p =
static_cast<PickFirst*
>(subchannel_list()->policy());
481 GPR_ASSERT(subchannel_list() ==
p->subchannel_list_.get() ||
482 subchannel_list() ==
p->latest_pending_subchannel_list_.get());
484 if (subchannel_list() ==
p->latest_pending_subchannel_list_.get()) {
487 "Pick First %p promoting pending subchannel list %p to "
489 p,
p->latest_pending_subchannel_list_.get(),
490 p->subchannel_list_.get());
492 p->subchannel_list_ =
std::move(
p->latest_pending_subchannel_list_);
499 p->channel_control_helper()->UpdateState(
502 for (
size_t i = 0;
i < subchannel_list()->num_subchannels(); ++
i) {
504 subchannel_list()->subchannel(
i)->ShutdownLocked();
511 const char*
name()
const override {
return kPickFirst; }
518 class PickFirstFactory :
public LoadBalancingPolicyFactory {
520 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
525 const char*
name()
const override {
return kPickFirst; }
527 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
529 return MakeRefCounted<PickFirstConfig>();
540 absl::make_unique<grpc_core::PickFirstFactory>());