28 #include <type_traits>
32 #include "absl/base/attributes.h"
33 #include "absl/base/thread_annotations.h"
34 #include "absl/container/inlined_vector.h"
35 #include "absl/memory/memory.h"
36 #include "absl/status/status.h"
37 #include "absl/status/statusor.h"
38 #include "absl/strings/numbers.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/optional.h"
43 #define XXH_INLINE_ALL
84 size_t* max_ring_size,
85 std::vector<grpc_error_handle>* error_list) {
86 *min_ring_size = 1024;
87 *max_ring_size = 8388608;
90 "ring_hash_experimental should be of type object"));
94 auto ring_hash_it = ring_hash.find(
"min_ring_size");
95 if (ring_hash_it != ring_hash.end()) {
98 "field:min_ring_size error: should be of type number"));
101 ring_hash_it->second.string_value().c_str());
104 ring_hash_it = ring_hash.find(
"max_ring_size");
105 if (ring_hash_it != ring_hash.end()) {
108 "field:max_ring_size error: should be of type number"));
111 ring_hash_it->second.string_value().c_str());
114 if (*min_ring_size == 0 || *min_ring_size > 8388608 || *max_ring_size == 0 ||
115 *max_ring_size > 8388608 || *min_ring_size > *max_ring_size) {
117 "field:max_ring_size and or min_ring_size error: "
118 "values need to be in the range of 1 to 8388608 "
119 "and max_ring_size cannot be smaller than "
126 constexpr
char kRingHash[] =
"ring_hash_experimental";
130 RingHashLbConfig(
size_t min_ring_size,
size_t max_ring_size)
132 const char*
name()
const override {
return kRingHash; }
145 class RingHash :
public LoadBalancingPolicy {
149 const char*
name()
const override {
return kRingHash; }
151 void UpdateLocked(UpdateArgs
args)
override;
152 void ResetBackoffLocked()
override;
155 ~RingHash()
override;
158 class RingHashSubchannelList;
165 class RingHashSubchannelData
166 :
public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
168 RingHashSubchannelData(
169 SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
171 const ServerAddress& address,
172 RefCountedPtr<SubchannelInterface>
subchannel)
176 const ServerAddress& address()
const {
return address_; }
190 void ProcessConnectivityChangeLocked(
209 class RingHashSubchannelList
210 :
public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
214 : SubchannelList(policy,
216 ?
"RingHashSubchannelList"
218 std::
move(addresses), policy->channel_control_helper(),
228 ~RingHashSubchannelList()
override {
230 RingHash*
p =
static_cast<RingHash*
>(policy());
246 void UpdateRingHashConnectivityStateLocked(
size_t index,
247 bool connection_attempt_complete,
251 bool AllSubchannelsSeenInitialState() {
252 for (
size_t i = 0;
i < num_subchannels(); ++
i) {
253 if (!
subchannel(i)->connectivity_state().has_value())
return false;
258 void ShutdownLocked()
override {
281 class Ring :
public RefCounted<Ring> {
288 Ring(RingHash* parent,
289 RefCountedPtr<RingHashSubchannelList> subchannel_list);
291 const std::vector<Entry>& ring()
const {
return ring_; }
295 std::vector<Entry>
ring_;
298 class Picker :
public SubchannelPicker {
300 Picker(RefCountedPtr<RingHash> parent, RefCountedPtr<Ring> ring)
303 PickResult Pick(PickArgs
args)
override;
308 class SubchannelConnectionAttempter :
public Orphanable {
310 explicit SubchannelConnectionAttempter(
311 RefCountedPtr<RingHash> ring_hash_lb)
316 void AddSubchannel(RefCountedPtr<SubchannelInterface>
subchannel) {
320 void Orphan()
override {
328 auto*
self =
static_cast<SubchannelConnectionAttempter*
>(
arg);
329 self->ring_hash_lb_->work_serializer()->Run(
331 if (!
self->ring_hash_lb_->shutdown_) {
347 RefCountedPtr<Ring>
ring_;
350 void ShutdownLocked()
override;
366 RingHash::Ring::Ring(RingHash* parent,
367 RefCountedPtr<RingHashSubchannelList> subchannel_list)
371 struct AddressWeight {
376 double normalized_weight;
378 std::vector<AddressWeight> address_weights;
380 address_weights.reserve(num_subchannels);
381 for (
size_t i = 0;
i < num_subchannels; ++
i) {
383 const ServerAddressWeightAttribute* weight_attribute =
static_cast<
384 const ServerAddressWeightAttribute*
>(sd->address().GetAttribute(
385 ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
386 AddressWeight address_weight;
387 address_weight.address =
391 if (weight_attribute !=
nullptr && weight_attribute->weight() > 0) {
392 address_weight.weight = weight_attribute->weight();
394 sum += address_weight.weight;
395 address_weights.push_back(
std::move(address_weight));
398 double min_normalized_weight = 1.0;
399 double max_normalized_weight = 0.0;
400 for (
auto& address : address_weights) {
401 address.normalized_weight =
static_cast<double>(address.weight) /
sum;
402 min_normalized_weight =
403 std::min(address.normalized_weight, min_normalized_weight);
404 max_normalized_weight =
405 std::max(address.normalized_weight, max_normalized_weight);
414 const size_t min_ring_size = parent->config_->min_ring_size();
415 const size_t max_ring_size = parent->config_->max_ring_size();
417 std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
418 static_cast<double>(max_ring_size));
420 const size_t ring_size = std::ceil(
scale);
421 ring_.reserve(ring_size);
428 double current_hashes = 0.0;
429 double target_hashes = 0.0;
430 uint64_t min_hashes_per_host = ring_size;
432 for (
size_t i = 0;
i < num_subchannels; ++
i) {
433 const std::string& address_string = address_weights[
i].address;
434 hash_key_buffer.
assign(address_string.begin(), address_string.end());
436 auto offset_start = hash_key_buffer.
end();
437 target_hashes +=
scale * address_weights[
i].normalized_weight;
439 while (current_hashes < target_hashes) {
441 hash_key_buffer.
insert(offset_start, count_str.begin(), count_str.end());
443 hash_key_buffer.
size());
448 hash_key_buffer.
erase(offset_start, hash_key_buffer.
end());
450 min_hashes_per_host =
452 max_hashes_per_host =
456 [](
const Entry& lhs,
const Entry& rhs) ->
bool {
457 return lhs.hash < rhs.hash;
461 "[RH %p picker %p] created ring from subchannel_list=%p "
462 "with %" PRIuPTR
" ring entries",
471 RingHash::PickResult RingHash::Picker::Pick(PickArgs
args) {
472 auto*
call_state =
static_cast<ClientChannel::LoadBalancedCall::LbCallState*
>(
480 const std::vector<Ring::Entry>& ring =
ring_->ring();
485 size_t highp = ring.size();
486 size_t first_index = 0;
488 first_index = (lowp + highp) / 2;
489 if (first_index == ring.size()) {
493 uint64_t midval = ring[first_index].hash;
494 uint64_t midval1 = first_index == 0 ? 0 : ring[first_index - 1].hash;
495 if (h <= midval && h > midval1) {
499 lowp = first_index + 1;
501 highp = first_index - 1;
508 OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
509 auto ScheduleSubchannelConnectionAttempt =
510 [&](RefCountedPtr<SubchannelInterface>
subchannel) {
511 if (subchannel_connection_attempter ==
nullptr) {
512 subchannel_connection_attempter =
513 MakeOrphanable<SubchannelConnectionAttempter>(
parent_);
517 switch (ring[first_index].
subchannel->GetConnectivityState()) {
519 return PickResult::Complete(
520 ring[first_index].
subchannel->subchannel()->Ref());
522 ScheduleSubchannelConnectionAttempt(
523 ring[first_index].
subchannel->subchannel()->Ref());
526 return PickResult::Queue();
530 ScheduleSubchannelConnectionAttempt(
531 ring[first_index].
subchannel->subchannel()->Ref());
535 bool found_second_subchannel =
false;
536 bool found_first_non_failed =
false;
537 for (
size_t i = 1;
i < ring.size(); ++
i) {
538 const Ring::Entry& entry = ring[(first_index +
i) % ring.size()];
539 if (entry.subchannel == ring[first_index].subchannel) {
543 entry.subchannel->GetConnectivityState();
545 return PickResult::Complete(entry.subchannel->subchannel()->Ref());
547 if (!found_second_subchannel) {
548 switch (connectivity_state) {
550 ScheduleSubchannelConnectionAttempt(
551 entry.subchannel->subchannel()->Ref());
554 return PickResult::Queue();
558 found_second_subchannel =
true;
560 if (!found_first_non_failed) {
562 ScheduleSubchannelConnectionAttempt(
563 entry.subchannel->subchannel()->Ref());
566 ScheduleSubchannelConnectionAttempt(
567 entry.subchannel->subchannel()->Ref());
569 found_first_non_failed =
true;
574 "ring hash cannot find a connected subchannel; first failure: ",
575 ring[first_index].
subchannel->GetConnectivityStatus().ToString())));
582 void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
609 void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
611 RingHash*
p =
static_cast<RingHash*
>(policy());
615 if (
p->latest_pending_subchannel_list_.get() ==
this &&
616 AllSubchannelsSeenInitialState()) {
619 p->subchannel_list_.get(),
this);
621 p->subchannel_list_ =
std::move(
p->latest_pending_subchannel_list_);
624 if (
p->subchannel_list_.get() !=
this)
return;
638 bool start_connection_attempt =
false;
643 start_connection_attempt =
true;
648 start_connection_attempt =
true;
653 start_connection_attempt =
true;
668 p->channel_control_helper()->UpdateState(
692 connection_attempt_complete) {
695 if (start_connection_attempt &&
697 size_t next_index = (
index + 1) % num_subchannels();
700 "[RH %p] triggering internal connection attempt for subchannel "
701 "%p, subchannel_list %p (index %" PRIuPTR
" of %" PRIuPTR
")",
706 subchannel(next_index)->subchannel()->RequestConnection();
714 void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
717 RingHash*
p =
static_cast<RingHash*
>(subchannel_list()->policy());
722 "[RH %p] connectivity changed for subchannel %p, subchannel_list %p "
723 "(index %" PRIuPTR
" of %" PRIuPTR
"): prev_state=%s new_state=%s",
725 subchannel_list()->num_subchannels(),
738 "[RH %p] Subchannel %p reported %s; requesting re-resolution",
p,
741 p->channel_control_helper()->RequestReresolution();
748 bool update_status =
true;
758 update_status =
false;
761 subchannel_list()->UpdateStateCountersLocked(last_connectivity_state,
772 subchannel_list()->UpdateRingHashConnectivityStateLocked(
786 RingHash::~RingHash() {
794 void RingHash::ShutdownLocked() {
803 void RingHash::ResetBackoffLocked() {
810 void RingHash::UpdateLocked(UpdateArgs
args) {
813 if (
args.addresses.ok()) {
815 gpr_log(
GPR_INFO,
"[RH %p] received update with %" PRIuPTR
" addresses",
816 this,
args.addresses->size());
822 this,
args.addresses.status().ToString().c_str());
830 gpr_log(
GPR_INFO,
"[RH %p] replacing latest pending subchannel list %p",
845 "[RH %p] empty address list, replacing subchannel list %p",
this,
856 channel_control_helper()->UpdateState(
858 absl::make_unique<TransientFailurePicker>(
status));
871 class RingHashFactory :
public LoadBalancingPolicyFactory {
873 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
878 const char*
name()
const override {
return kRingHash; }
880 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
882 size_t min_ring_size;
883 size_t max_ring_size;
884 std::vector<grpc_error_handle> error_list;
886 if (error_list.empty()) {
887 return MakeRefCounted<RingHashLbConfig>(min_ring_size, max_ring_size);
890 "ring_hash_experimental LB policy config", &error_list);
899 LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
900 absl::make_unique<RingHashFactory>());