ring_hash.cc
Go to the documentation of this file.
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
19 #include <inttypes.h>
20 #include <stdlib.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <cmath>
25 #include <map>
26 #include <memory>
27 #include <string>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/base/attributes.h"
33 #include "absl/base/thread_annotations.h"
34 #include "absl/container/inlined_vector.h"
35 #include "absl/memory/memory.h"
36 #include "absl/status/status.h"
37 #include "absl/status/statusor.h"
38 #include "absl/strings/numbers.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/optional.h"
42 
43 #define XXH_INLINE_ALL
44 #include "xxhash.h"
45 
48 #include <grpc/support/log.h>
49 
69 #include "src/core/lib/json/json.h"
72 
73 namespace grpc_core {
74 
75 TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb");
76 
78  static UniqueTypeName::Factory kFactory("request_hash");
79  return kFactory.Create();
80 }
81 
82 // Helper Parser method
83 void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size,
84  size_t* max_ring_size,
85  std::vector<grpc_error_handle>* error_list) {
86  *min_ring_size = 1024;
87  *max_ring_size = 8388608;
88  if (json.type() != Json::Type::OBJECT) {
89  error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
90  "ring_hash_experimental should be of type object"));
91  return;
92  }
93  const Json::Object& ring_hash = json.object_value();
94  auto ring_hash_it = ring_hash.find("min_ring_size");
95  if (ring_hash_it != ring_hash.end()) {
96  if (ring_hash_it->second.type() != Json::Type::NUMBER) {
97  error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
98  "field:min_ring_size error: should be of type number"));
99  } else {
100  *min_ring_size = gpr_parse_nonnegative_int(
101  ring_hash_it->second.string_value().c_str());
102  }
103  }
104  ring_hash_it = ring_hash.find("max_ring_size");
105  if (ring_hash_it != ring_hash.end()) {
106  if (ring_hash_it->second.type() != Json::Type::NUMBER) {
107  error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
108  "field:max_ring_size error: should be of type number"));
109  } else {
110  *max_ring_size = gpr_parse_nonnegative_int(
111  ring_hash_it->second.string_value().c_str());
112  }
113  }
114  if (*min_ring_size == 0 || *min_ring_size > 8388608 || *max_ring_size == 0 ||
115  *max_ring_size > 8388608 || *min_ring_size > *max_ring_size) {
116  error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
117  "field:max_ring_size and or min_ring_size error: "
118  "values need to be in the range of 1 to 8388608 "
119  "and max_ring_size cannot be smaller than "
120  "min_ring_size"));
121  }
122 }
123 
124 namespace {
125 
126 constexpr char kRingHash[] = "ring_hash_experimental";
127 
128 class RingHashLbConfig : public LoadBalancingPolicy::Config {
129  public:
130  RingHashLbConfig(size_t min_ring_size, size_t max_ring_size)
131  : min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {}
132  const char* name() const override { return kRingHash; }
133  size_t min_ring_size() const { return min_ring_size_; }
134  size_t max_ring_size() const { return max_ring_size_; }
135 
136  private:
139 };
140 
141 //
142 // ring_hash LB policy
143 //
144 
145 class RingHash : public LoadBalancingPolicy {
146  public:
147  explicit RingHash(Args args);
148 
149  const char* name() const override { return kRingHash; }
150 
151  void UpdateLocked(UpdateArgs args) override;
152  void ResetBackoffLocked() override;
153 
154  private:
155  ~RingHash() override;
156 
157  // Forward declarations.
158  class RingHashSubchannelList;
159  class Ring;
160 
161  // Data for a particular subchannel in a subchannel list.
162  // This subclass adds the following functionality:
163  // - Tracks the previous connectivity state of the subchannel, so that
164  // we know how many subchannels are in each state.
165  class RingHashSubchannelData
166  : public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
167  public:
168  RingHashSubchannelData(
169  SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
170  subchannel_list,
171  const ServerAddress& address,
172  RefCountedPtr<SubchannelInterface> subchannel)
173  : SubchannelData(subchannel_list, address, std::move(subchannel)),
174  address_(address) {}
175 
176  const ServerAddress& address() const { return address_; }
177 
178  grpc_connectivity_state GetConnectivityState() const {
179  return connectivity_state_.load(std::memory_order_relaxed);
180  }
181 
182  absl::Status GetConnectivityStatus() const {
183  MutexLock lock(&mu_);
184  return connectivity_status_;
185  }
186 
187  private:
188  // Performs connectivity state updates that need to be done only
189  // after we have started watching.
190  void ProcessConnectivityChangeLocked(
192  grpc_connectivity_state new_state) override;
193 
194  ServerAddress address_;
195 
196  // Last logical connectivity state seen.
197  // Note that this may differ from the state actually reported by the
198  // subchannel in some cases; for example, once this is set to
199  // TRANSIENT_FAILURE, we do not change it again until we get READY,
200  // so we skip any interim stops in CONNECTING.
201  // Uses an atomic so that it can be accessed outside of the WorkSerializer.
202  std::atomic<grpc_connectivity_state> connectivity_state_{GRPC_CHANNEL_IDLE};
203 
204  mutable Mutex mu_;
206  };
207 
208  // A list of subchannels.
209  class RingHashSubchannelList
210  : public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
211  public:
212  RingHashSubchannelList(RingHash* policy, ServerAddressList addresses,
213  const grpc_channel_args& args)
214  : SubchannelList(policy,
216  ? "RingHashSubchannelList"
217  : nullptr),
218  std::move(addresses), policy->channel_control_helper(),
219  args),
220  num_idle_(num_subchannels()),
221  ring_(MakeRefCounted<Ring>(policy, Ref(DEBUG_LOCATION, "Ring"))) {
222  // Need to maintain a ref to the LB policy as long as we maintain
223  // any references to subchannels, since the subchannels'
224  // pollset_sets will include the LB policy's pollset_set.
225  policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
226  }
227 
228  ~RingHashSubchannelList() override {
229  ring_.reset(DEBUG_LOCATION, "~RingHashSubchannelList");
230  RingHash* p = static_cast<RingHash*>(policy());
231  p->Unref(DEBUG_LOCATION, "subchannel_list");
232  }
233 
234  // Updates the counters of subchannels in each state when a
235  // subchannel transitions from old_state to new_state.
236  void UpdateStateCountersLocked(grpc_connectivity_state old_state,
237  grpc_connectivity_state new_state);
238 
239  // Updates the RH policy's connectivity state based on the
240  // subchannel list's state counters, creating new picker and new ring.
241  // The index parameter indicates the index into the list of the subchannel
242  // whose status report triggered the call to
243  // UpdateRingHashConnectivityStateLocked().
244  // connection_attempt_complete is true if the subchannel just
245  // finished a connection attempt.
246  void UpdateRingHashConnectivityStateLocked(size_t index,
247  bool connection_attempt_complete,
249 
250  private:
251  bool AllSubchannelsSeenInitialState() {
252  for (size_t i = 0; i < num_subchannels(); ++i) {
253  if (!subchannel(i)->connectivity_state().has_value()) return false;
254  }
255  return true;
256  }
257 
258  void ShutdownLocked() override {
259  ring_.reset(DEBUG_LOCATION, "RingHashSubchannelList::ShutdownLocked()");
261  }
262 
263  size_t num_idle_;
264  size_t num_ready_ = 0;
265  size_t num_connecting_ = 0;
267 
268  RefCountedPtr<Ring> ring_;
269 
270  // The index of the subchannel currently doing an internally
271  // triggered connection attempt, if any.
273 
274  // TODO(roth): If we ever change the helper UpdateState() API to not
275  // need the status reported for TRANSIENT_FAILURE state (because
276  // it's not currently actually used for anything outside of the picker),
277  // then we will no longer need this data member.
279  };
280 
281  class Ring : public RefCounted<Ring> {
282  public:
283  struct Entry {
285  RingHashSubchannelData* subchannel;
286  };
287 
288  Ring(RingHash* parent,
289  RefCountedPtr<RingHashSubchannelList> subchannel_list);
290 
291  const std::vector<Entry>& ring() const { return ring_; }
292 
293  private:
294  RefCountedPtr<RingHashSubchannelList> subchannel_list_;
295  std::vector<Entry> ring_;
296  };
297 
298  class Picker : public SubchannelPicker {
299  public:
300  Picker(RefCountedPtr<RingHash> parent, RefCountedPtr<Ring> ring)
301  : parent_(std::move(parent)), ring_(std::move(ring)) {}
302 
303  PickResult Pick(PickArgs args) override;
304 
305  private:
306  // A fire-and-forget class that schedules subchannel connection attempts
307  // on the control plane WorkSerializer.
308  class SubchannelConnectionAttempter : public Orphanable {
309  public:
310  explicit SubchannelConnectionAttempter(
311  RefCountedPtr<RingHash> ring_hash_lb)
312  : ring_hash_lb_(std::move(ring_hash_lb)) {
313  GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
314  }
315 
316  void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
317  subchannels_.push_back(std::move(subchannel));
318  }
319 
320  void Orphan() override {
321  // Hop into ExecCtx, so that we're not holding the data plane mutex
322  // while we run control-plane code.
324  }
325 
326  private:
327  static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
328  auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
329  self->ring_hash_lb_->work_serializer()->Run(
330  [self]() {
331  if (!self->ring_hash_lb_->shutdown_) {
332  for (auto& subchannel : self->subchannels_) {
333  subchannel->RequestConnection();
334  }
335  }
336  delete self;
337  },
339  }
340 
341  RefCountedPtr<RingHash> ring_hash_lb_;
343  std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
344  };
345 
346  RefCountedPtr<RingHash> parent_;
347  RefCountedPtr<Ring> ring_;
348  };
349 
350  void ShutdownLocked() override;
351 
352  // Current config from resolver.
353  RefCountedPtr<RingHashLbConfig> config_;
354 
355  // list of subchannels.
356  OrphanablePtr<RingHashSubchannelList> subchannel_list_;
357  OrphanablePtr<RingHashSubchannelList> latest_pending_subchannel_list_;
358  // indicating if we are shutting down.
359  bool shutdown_ = false;
360 };
361 
362 //
363 // RingHash::Ring
364 //
365 
366 RingHash::Ring::Ring(RingHash* parent,
367  RefCountedPtr<RingHashSubchannelList> subchannel_list)
368  : subchannel_list_(std::move(subchannel_list)) {
369  size_t num_subchannels = subchannel_list_->num_subchannels();
370  // Store the weights while finding the sum.
371  struct AddressWeight {
372  std::string address;
373  // Default weight is 1 for the cases where a weight is not provided,
374  // each occurrence of the address will be counted a weight value of 1.
375  uint32_t weight = 1;
376  double normalized_weight;
377  };
378  std::vector<AddressWeight> address_weights;
379  size_t sum = 0;
380  address_weights.reserve(num_subchannels);
381  for (size_t i = 0; i < num_subchannels; ++i) {
382  RingHashSubchannelData* sd = subchannel_list_->subchannel(i);
383  const ServerAddressWeightAttribute* weight_attribute = static_cast<
384  const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
385  ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
386  AddressWeight address_weight;
387  address_weight.address =
388  grpc_sockaddr_to_string(&sd->address().address(), false).value();
389  // Weight should never be zero, but ignore it just in case, since
390  // that value would screw up the ring-building algorithm.
391  if (weight_attribute != nullptr && weight_attribute->weight() > 0) {
392  address_weight.weight = weight_attribute->weight();
393  }
394  sum += address_weight.weight;
395  address_weights.push_back(std::move(address_weight));
396  }
397  // Calculating normalized weights and find min and max.
398  double min_normalized_weight = 1.0;
399  double max_normalized_weight = 0.0;
400  for (auto& address : address_weights) {
401  address.normalized_weight = static_cast<double>(address.weight) / sum;
402  min_normalized_weight =
403  std::min(address.normalized_weight, min_normalized_weight);
404  max_normalized_weight =
405  std::max(address.normalized_weight, max_normalized_weight);
406  }
407  // Scale up the number of hashes per host such that the least-weighted host
408  // gets a whole number of hashes on the ring. Other hosts might not end up
409  // with whole numbers, and that's fine (the ring-building algorithm below can
410  // handle this). This preserves the original implementation's behavior: when
411  // weights aren't provided, all hosts should get an equal number of hashes. In
412  // the case where this number exceeds the max_ring_size, it's scaled back down
413  // to fit.
414  const size_t min_ring_size = parent->config_->min_ring_size();
415  const size_t max_ring_size = parent->config_->max_ring_size();
416  const double scale = std::min(
417  std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
418  static_cast<double>(max_ring_size));
419  // Reserve memory for the entire ring up front.
420  const size_t ring_size = std::ceil(scale);
421  ring_.reserve(ring_size);
422  // Populate the hash ring by walking through the (host, weight) pairs in
423  // normalized_host_weights, and generating (scale * weight) hashes for each
424  // host. Since these aren't necessarily whole numbers, we maintain running
425  // sums -- current_hashes and target_hashes -- which allows us to populate the
426  // ring in a mostly stable way.
427  absl::InlinedVector<char, 196> hash_key_buffer;
428  double current_hashes = 0.0;
429  double target_hashes = 0.0;
430  uint64_t min_hashes_per_host = ring_size;
431  uint64_t max_hashes_per_host = 0;
432  for (size_t i = 0; i < num_subchannels; ++i) {
433  const std::string& address_string = address_weights[i].address;
434  hash_key_buffer.assign(address_string.begin(), address_string.end());
435  hash_key_buffer.emplace_back('_');
436  auto offset_start = hash_key_buffer.end();
437  target_hashes += scale * address_weights[i].normalized_weight;
438  size_t count = 0;
439  while (current_hashes < target_hashes) {
440  const std::string count_str = absl::StrCat(count);
441  hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
442  absl::string_view hash_key(hash_key_buffer.data(),
443  hash_key_buffer.size());
444  const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
445  ring_.push_back({hash, subchannel_list_->subchannel(i)});
446  ++count;
447  ++current_hashes;
448  hash_key_buffer.erase(offset_start, hash_key_buffer.end());
449  }
450  min_hashes_per_host =
451  std::min(static_cast<uint64_t>(i), min_hashes_per_host);
452  max_hashes_per_host =
453  std::max(static_cast<uint64_t>(i), max_hashes_per_host);
454  }
455  std::sort(ring_.begin(), ring_.end(),
456  [](const Entry& lhs, const Entry& rhs) -> bool {
457  return lhs.hash < rhs.hash;
458  });
461  "[RH %p picker %p] created ring from subchannel_list=%p "
462  "with %" PRIuPTR " ring entries",
463  parent, this, subchannel_list_.get(), ring_.size());
464  }
465 }
466 
467 //
468 // RingHash::Picker
469 //
470 
471 RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
472  auto* call_state = static_cast<ClientChannel::LoadBalancedCall::LbCallState*>(
473  args.call_state);
474  auto hash = call_state->GetCallAttribute(RequestHashAttributeName());
475  uint64_t h;
476  if (!absl::SimpleAtoi(hash, &h)) {
477  return PickResult::Fail(
478  absl::InternalError("ring hash value is not a number"));
479  }
480  const std::vector<Ring::Entry>& ring = ring_->ring();
481  // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
482  // (ketama_get_server) NOTE: The algorithm depends on using signed integers
483  // for lowp, highp, and first_index. Do not change them!
484  size_t lowp = 0;
485  size_t highp = ring.size();
486  size_t first_index = 0;
487  while (true) {
488  first_index = (lowp + highp) / 2;
489  if (first_index == ring.size()) {
490  first_index = 0;
491  break;
492  }
493  uint64_t midval = ring[first_index].hash;
494  uint64_t midval1 = first_index == 0 ? 0 : ring[first_index - 1].hash;
495  if (h <= midval && h > midval1) {
496  break;
497  }
498  if (midval < h) {
499  lowp = first_index + 1;
500  } else {
501  highp = first_index - 1;
502  }
503  if (lowp > highp) {
504  first_index = 0;
505  break;
506  }
507  }
508  OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
509  auto ScheduleSubchannelConnectionAttempt =
510  [&](RefCountedPtr<SubchannelInterface> subchannel) {
511  if (subchannel_connection_attempter == nullptr) {
512  subchannel_connection_attempter =
513  MakeOrphanable<SubchannelConnectionAttempter>(parent_);
514  }
515  subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
516  };
517  switch (ring[first_index].subchannel->GetConnectivityState()) {
518  case GRPC_CHANNEL_READY:
519  return PickResult::Complete(
520  ring[first_index].subchannel->subchannel()->Ref());
521  case GRPC_CHANNEL_IDLE:
522  ScheduleSubchannelConnectionAttempt(
523  ring[first_index].subchannel->subchannel()->Ref());
526  return PickResult::Queue();
527  default: // GRPC_CHANNEL_TRANSIENT_FAILURE
528  break;
529  }
530  ScheduleSubchannelConnectionAttempt(
531  ring[first_index].subchannel->subchannel()->Ref());
532  // Loop through remaining subchannels to find one in READY.
533  // On the way, we make sure the right set of connection attempts
534  // will happen.
535  bool found_second_subchannel = false;
536  bool found_first_non_failed = false;
537  for (size_t i = 1; i < ring.size(); ++i) {
538  const Ring::Entry& entry = ring[(first_index + i) % ring.size()];
539  if (entry.subchannel == ring[first_index].subchannel) {
540  continue;
541  }
542  grpc_connectivity_state connectivity_state =
543  entry.subchannel->GetConnectivityState();
544  if (connectivity_state == GRPC_CHANNEL_READY) {
545  return PickResult::Complete(entry.subchannel->subchannel()->Ref());
546  }
547  if (!found_second_subchannel) {
548  switch (connectivity_state) {
549  case GRPC_CHANNEL_IDLE:
550  ScheduleSubchannelConnectionAttempt(
551  entry.subchannel->subchannel()->Ref());
554  return PickResult::Queue();
555  default:
556  break;
557  }
558  found_second_subchannel = true;
559  }
560  if (!found_first_non_failed) {
561  if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
562  ScheduleSubchannelConnectionAttempt(
563  entry.subchannel->subchannel()->Ref());
564  } else {
565  if (connectivity_state == GRPC_CHANNEL_IDLE) {
566  ScheduleSubchannelConnectionAttempt(
567  entry.subchannel->subchannel()->Ref());
568  }
569  found_first_non_failed = true;
570  }
571  }
572  }
574  "ring hash cannot find a connected subchannel; first failure: ",
575  ring[first_index].subchannel->GetConnectivityStatus().ToString())));
576 }
577 
578 //
579 // RingHash::RingHashSubchannelList
580 //
581 
582 void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
583  grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
584  if (old_state == GRPC_CHANNEL_IDLE) {
585  GPR_ASSERT(num_idle_ > 0);
586  --num_idle_;
587  } else if (old_state == GRPC_CHANNEL_READY) {
588  GPR_ASSERT(num_ready_ > 0);
589  --num_ready_;
590  } else if (old_state == GRPC_CHANNEL_CONNECTING) {
592  --num_connecting_;
593  } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
596  }
597  GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
598  if (new_state == GRPC_CHANNEL_IDLE) {
599  ++num_idle_;
600  } else if (new_state == GRPC_CHANNEL_READY) {
601  ++num_ready_;
602  } else if (new_state == GRPC_CHANNEL_CONNECTING) {
603  ++num_connecting_;
604  } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
606  }
607 }
608 
609 void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
610  size_t index, bool connection_attempt_complete, absl::Status status) {
611  RingHash* p = static_cast<RingHash*>(policy());
612  // If this is latest_pending_subchannel_list_, then swap it into
613  // subchannel_list_ as soon as we get the initial connectivity state
614  // report for every subchannel in the list.
615  if (p->latest_pending_subchannel_list_.get() == this &&
616  AllSubchannelsSeenInitialState()) {
618  gpr_log(GPR_INFO, "[RH %p] replacing subchannel list %p with %p", p,
619  p->subchannel_list_.get(), this);
620  }
621  p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
622  }
623  // Only set connectivity state if this is the current subchannel list.
624  if (p->subchannel_list_.get() != this) return;
625  // The overall aggregation rules here are:
626  // 1. If there is at least one subchannel in READY state, report READY.
627  // 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
628  // TRANSIENT_FAILURE.
629  // 3. If there is at least one subchannel in CONNECTING state, report
630  // CONNECTING.
631  // 4. If there is one subchannel in TRANSIENT_FAILURE state and there is
632  // more than one subchannel, report CONNECTING.
633  // 5. If there is at least one subchannel in IDLE state, report IDLE.
634  // 6. Otherwise, report TRANSIENT_FAILURE.
635  //
636  // We set start_connection_attempt to true if we match rules 2, 3, or 6.
638  bool start_connection_attempt = false;
639  if (num_ready_ > 0) {
641  } else if (num_transient_failure_ >= 2) {
643  start_connection_attempt = true;
644  } else if (num_connecting_ > 0) {
646  } else if (num_transient_failure_ == 1 && num_subchannels() > 1) {
648  start_connection_attempt = true;
649  } else if (num_idle_ > 0) {
651  } else {
653  start_connection_attempt = true;
654  }
655  // In TRANSIENT_FAILURE, report the last reported failure.
656  // Otherwise, report OK.
658  if (!status.ok()) {
660  "no reachable subchannels; last error: ", status.ToString()));
661  }
663  } else {
665  }
666  // Generate new picker and return it to the channel.
667  // Note that we use our own picker regardless of connectivity state.
668  p->channel_control_helper()->UpdateState(
669  state, status,
670  absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
671  ring_));
672  // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
673  // not be getting any pick requests from the priority policy.
674  // However, because the ring_hash policy does not attempt to
675  // reconnect to subchannels unless it is getting pick requests,
676  // it will need special handling to ensure that it will eventually
677  // recover from TRANSIENT_FAILURE state once the problem is resolved.
678  // Specifically, it will make sure that it is attempting to connect to
679  // at least one subchannel at any given time. After a given subchannel
680  // fails a connection attempt, it will move on to the next subchannel
681  // in the ring. It will keep doing this until one of the subchannels
682  // successfully connects, at which point it will report READY and stop
683  // proactively trying to connect. The policy will remain in
684  // TRANSIENT_FAILURE until at least one subchannel becomes connected,
685  // even if subchannels are in state CONNECTING during that time.
686  //
687  // Note that we do the same thing when the policy is in state
688  // CONNECTING, just to ensure that we don't remain in CONNECTING state
689  // indefinitely if there are no new picks coming in.
692  connection_attempt_complete) {
694  }
695  if (start_connection_attempt &&
697  size_t next_index = (index + 1) % num_subchannels();
700  "[RH %p] triggering internal connection attempt for subchannel "
701  "%p, subchannel_list %p (index %" PRIuPTR " of %" PRIuPTR ")",
702  p, subchannel(next_index)->subchannel(), this, next_index,
703  num_subchannels());
704  }
706  subchannel(next_index)->subchannel()->RequestConnection();
707  }
708 }
709 
710 //
711 // RingHash::RingHashSubchannelData
712 //
713 
714 void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
716  grpc_connectivity_state new_state) {
717  RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
718  grpc_connectivity_state last_connectivity_state = GetConnectivityState();
720  gpr_log(
721  GPR_INFO,
722  "[RH %p] connectivity changed for subchannel %p, subchannel_list %p "
723  "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
724  p, subchannel(), subchannel_list(), Index(),
725  subchannel_list()->num_subchannels(),
726  ConnectivityStateName(last_connectivity_state),
727  ConnectivityStateName(new_state));
728  }
729  GPR_ASSERT(subchannel() != nullptr);
730  // If this is not the initial state notification and the new state is
731  // TRANSIENT_FAILURE or IDLE, re-resolve.
732  // Note that we don't want to do this on the initial state notification,
733  // because that would result in an endless loop of re-resolution.
734  if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
735  new_state == GRPC_CHANNEL_IDLE)) {
738  "[RH %p] Subchannel %p reported %s; requesting re-resolution", p,
739  subchannel(), ConnectivityStateName(new_state));
740  }
741  p->channel_control_helper()->RequestReresolution();
742  }
743  const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING;
744  // Decide what state to report for the purposes of aggregation and
745  // picker behavior.
746  // If the last recorded state was TRANSIENT_FAILURE, ignore the update
747  // unless the new state is READY.
748  bool update_status = true;
749  absl::Status status = connectivity_status();
750  if (last_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE &&
751  new_state != GRPC_CHANNEL_READY &&
752  new_state != GRPC_CHANNEL_TRANSIENT_FAILURE) {
753  new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
754  {
755  MutexLock lock(&mu_);
757  }
758  update_status = false;
759  }
760  // Update state counters used for aggregation.
761  subchannel_list()->UpdateStateCountersLocked(last_connectivity_state,
762  new_state);
763  // Update status seen by picker if needed.
764  if (update_status) {
765  MutexLock lock(&mu_);
766  connectivity_status_ = connectivity_status();
767  }
768  // Update last seen state, also used by picker.
769  connectivity_state_.store(new_state, std::memory_order_relaxed);
770  // Update the RH policy's connectivity state, creating new picker and new
771  // ring.
772  subchannel_list()->UpdateRingHashConnectivityStateLocked(
773  Index(), connection_attempt_complete, status);
774 }
775 
776 //
777 // RingHash
778 //
779 
780 RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
782  gpr_log(GPR_INFO, "[RH %p] Created", this);
783  }
784 }
785 
786 RingHash::~RingHash() {
788  gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
789  }
790  GPR_ASSERT(subchannel_list_ == nullptr);
792 }
793 
794 void RingHash::ShutdownLocked() {
796  gpr_log(GPR_INFO, "[RH %p] Shutting down", this);
797  }
798  shutdown_ = true;
799  subchannel_list_.reset();
801 }
802 
803 void RingHash::ResetBackoffLocked() {
804  subchannel_list_->ResetBackoffLocked();
805  if (latest_pending_subchannel_list_ != nullptr) {
806  latest_pending_subchannel_list_->ResetBackoffLocked();
807  }
808 }
809 
810 void RingHash::UpdateLocked(UpdateArgs args) {
811  config_ = std::move(args.config);
812  ServerAddressList addresses;
813  if (args.addresses.ok()) {
815  gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
816  this, args.addresses->size());
817  }
818  addresses = *std::move(args.addresses);
819  } else {
821  gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",
822  this, args.addresses.status().ToString().c_str());
823  }
824  // If we already have a subchannel list, then ignore the resolver
825  // failure and keep using the existing list.
826  if (subchannel_list_ != nullptr) return;
827  }
829  latest_pending_subchannel_list_ != nullptr) {
830  gpr_log(GPR_INFO, "[RH %p] replacing latest pending subchannel list %p",
831  this, latest_pending_subchannel_list_.get());
832  }
833  latest_pending_subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
834  this, std::move(addresses), *args.args);
835  latest_pending_subchannel_list_->StartWatchingLocked();
836  // If we have no existing list or the new list is empty, immediately
837  // promote the new list.
838  // Otherwise, do nothing; the new list will be promoted when the
839  // initial subchannel states are reported.
840  if (subchannel_list_ == nullptr ||
841  latest_pending_subchannel_list_->num_subchannels() == 0) {
843  subchannel_list_ != nullptr) {
845  "[RH %p] empty address list, replacing subchannel list %p", this,
846  subchannel_list_.get());
847  }
849  // If the new list is empty, report TRANSIENT_FAILURE.
850  if (subchannel_list_->num_subchannels() == 0) {
852  args.addresses.ok()
854  absl::StrCat("empty address list: ", args.resolution_note))
855  : args.addresses.status();
856  channel_control_helper()->UpdateState(
858  absl::make_unique<TransientFailurePicker>(status));
859  } else {
860  // Otherwise, report IDLE.
861  subchannel_list_->UpdateRingHashConnectivityStateLocked(
862  /*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus());
863  }
864  }
865 }
866 
867 //
868 // factory
869 //
870 
871 class RingHashFactory : public LoadBalancingPolicyFactory {
872  public:
873  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
874  LoadBalancingPolicy::Args args) const override {
875  return MakeOrphanable<RingHash>(std::move(args));
876  }
877 
878  const char* name() const override { return kRingHash; }
879 
880  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
881  const Json& json, grpc_error_handle* error) const override {
882  size_t min_ring_size;
883  size_t max_ring_size;
884  std::vector<grpc_error_handle> error_list;
885  ParseRingHashLbConfig(json, &min_ring_size, &max_ring_size, &error_list);
886  if (error_list.empty()) {
887  return MakeRefCounted<RingHashLbConfig>(min_ring_size, max_ring_size);
888  } else {
890  "ring_hash_experimental LB policy config", &error_list);
891  return nullptr;
892  }
893  }
894 };
895 
896 } // namespace
897 
899  LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
900  absl::make_unique<RingHashFactory>());
901 }
902 
904 
905 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
trace.h
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::UniqueTypeName::Factory::Create
UniqueTypeName Create()
Definition: unique_type_name.h:67
subchannels_
std::vector< RefCountedPtr< SubchannelInterface > > subchannels_
Definition: ring_hash.cc:343
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
grpc_core::MakeRefCounted
RefCountedPtr< T > MakeRefCounted(Args &&... args)
Definition: ref_counted_ptr.h:335
last_failure_
absl::Status last_failure_
Definition: ring_hash.cc:278
orphanable.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
call_state
Definition: test/core/fling/server.cc:74
sockaddr_utils.h
MutexLock
#define MutexLock(x)
Definition: bloaty/third_party/re2/util/mutex.h:125
grpc_core::Json::type
Type type() const
Definition: src/core/lib/json/json.h:174
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
absl::InlinedVector::emplace_back
reference emplace_back(Args &&... args)
Definition: abseil-cpp/absl/container/inlined_vector.h:675
connectivity_state.h
grpc_core::RequestHashAttributeName
UniqueTypeName RequestHashAttributeName()
Definition: ring_hash.cc:77
grpc_core::Json::Type::OBJECT
@ OBJECT
grpc_core
Definition: call_metric_recorder.h:31
num_transient_failure_
size_t num_transient_failure_
Definition: ring_hash.cc:266
grpc::testing::sum
double sum(const T &container, F functor)
Definition: test/cpp/qps/stats.h:30
string.h
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
subchannel
RingHashSubchannelData * subchannel
Definition: ring_hash.cc:285
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
lb_policy.h
address_
ServerAddress address_
Definition: ring_hash.cc:194
client_channel.h
lb_policy_factory.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc_core::Json::object_value
const Object & object_value() const
Definition: src/core/lib/json/json.h:177
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
closure.h
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
status
absl::Status status
Definition: rls.cc:251
grpc_core::GrpcLbPolicyRingHashShutdown
void GrpcLbPolicyRingHashShutdown()
Definition: ring_hash.cc:903
setup.name
name
Definition: setup.py:542
absl::InternalError
Status InternalError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:347
xds_manager.p
p
Definition: xds_manager.py:60
grpc_sockaddr_to_string
absl::StatusOr< std::string > grpc_sockaddr_to_string(const grpc_resolved_address *resolved_addr, bool normalize)
Definition: sockaddr_utils.cc:194
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
subchannel_interface.h
num_ready_
size_t num_ready_
Definition: ring_hash.cc:264
absl::InlinedVector::erase
iterator erase(const_iterator pos)
Definition: abseil-cpp/absl/container/inlined_vector.h:706
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
GRPC_ERROR_CREATE_FROM_VECTOR
#define GRPC_ERROR_CREATE_FROM_VECTOR(desc, error_list)
Definition: error.h:314
parent_
RefCountedPtr< RingHash > parent_
Definition: ring_hash.cc:346
connectivity_state_
std::atomic< grpc_connectivity_state > connectivity_state_
Definition: ring_hash.cc:202
latest_pending_subchannel_list_
OrphanablePtr< RingHashSubchannelList > latest_pending_subchannel_list_
Definition: ring_hash.cc:357
grpc_types.h
min_ring_size_
size_t min_ring_size_
Definition: ring_hash.cc:137
hash
uint64_t hash
Definition: ring_hash.cc:284
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
max_ring_size_
size_t max_ring_size_
Definition: ring_hash.cc:138
absl::SimpleAtoi
ABSL_NAMESPACE_BEGIN ABSL_MUST_USE_RESULT bool SimpleAtoi(absl::string_view str, int_type *out)
Definition: abseil-cpp/absl/strings/numbers.h:271
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
hpack_encoder_fixtures::Args
Args({0, 16384})
subchannel_list_
RefCountedPtr< RingHashSubchannelList > subchannel_list_
Definition: ring_hash.cc:294
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
Json
JSON (JavaScript Object Notation).
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:227
grpc_core::grpc_lb_ring_hash_trace
TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb")
connectivity_status_
absl::Status connectivity_status_
Definition: priority.cc:239
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
work_serializer.h
connectivity_state.h
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
absl::optional< grpc_connectivity_state >
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
arg
Definition: cmdline.cc:40
num_idle_
size_t num_idle_
Definition: ring_hash.cc:263
server_address.h
absl::InlinedVector::data
pointer data() noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:302
grpc_core::Json::Type::NUMBER
@ NUMBER
error.h
XXH64
XXH_PUBLIC_API XXH64_hash_t XXH64(const void *input, size_t length, XXH64_hash_t seed)
Calculates the 64-bit hash of input using xxHash64.
absl::InlinedVector::size
size_type size() const noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:270
json.h
bm_speedup.scale
def scale(a, mul)
Definition: bm_speedup.py:24
min
#define min(a, b)
Definition: qsort.h:83
weight
uint32_t weight
Definition: weighted_target.cc:84
grpc_core::ServerAddressList
std::vector< ServerAddress > ServerAddressList
Definition: server_address.h:120
absl::InlinedVector::insert
iterator insert(const_iterator pos, const_reference v)
Definition: abseil-cpp/absl/container/inlined_vector.h:569
internally_triggered_connection_index_
absl::optional< size_t > internally_triggered_connection_index_
Definition: ring_hash.cc:272
num_connecting_
size_t num_connecting_
Definition: ring_hash.cc:265
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
GRPC_CHANNEL_CONNECTING
@ GRPC_CHANNEL_CONNECTING
Definition: include/grpc/impl/codegen/connectivity_state.h:34
xxhash.h
grpc_core::Json::Object
std::map< std::string, Json > Object
Definition: src/core/lib/json/json.h:54
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
debug_location.h
grpc_core::UniqueTypeName
Definition: unique_type_name.h:56
lb_policy_registry.h
shutdown_
bool shutdown_
Definition: ring_hash.cc:359
ref_counted.h
count
int * count
Definition: bloaty/third_party/googletest/googlemock/test/gmock_stress_test.cc:96
absl::optional::reset
ABSL_ATTRIBUTE_REINITIALIZES void reset() noexcept
Definition: abseil-cpp/absl/types/optional.h:342
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
index
int index
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/protobuf.h:1184
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
subchannel_list.h
grpc_core::ParseRingHashLbConfig
void ParseRingHashLbConfig(const Json &json, size_t *min_ring_size, size_t *max_ring_size, std::vector< grpc_error_handle > *error_list)
Definition: ring_hash.cc:83
arg
struct arg arg
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
exec_ctx.h
ring_
RefCountedPtr< Ring > ring_
Definition: ring_hash.cc:268
absl::Status::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: third_party/abseil-cpp/absl/status/status.h:802
absl::InlinedVector::assign
void assign(size_type n, const_reference v)
Definition: abseil-cpp/absl/container/inlined_vector.h:507
absl::UnavailableError
Status UnavailableError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:375
unique_type_name.h
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
ref_counted_ptr.h
mu_
Mutex mu_
Definition: ring_hash.cc:204
check_redundant_namespace_qualifiers.Config
Config
Definition: check_redundant_namespace_qualifiers.py:142
gpr_parse_nonnegative_int
int gpr_parse_nonnegative_int(const char *value)
Definition: string.cc:218
grpc_core::GrpcLbPolicyRingHashInit
void GrpcLbPolicyRingHashInit()
Definition: ring_hash.cc:898
Fail
void Fail(const char *msg)
Definition: bloaty/third_party/googletest/googletest/test/gtest_assert_by_exception_test.cc:52
ring_hash_lb_
RefCountedPtr< RingHash > ring_hash_lb_
Definition: ring_hash.cc:341
absl::StatusOr::value
const T & value() const &ABSL_ATTRIBUTE_LIFETIME_BOUND
Definition: abseil-cpp/absl/status/statusor.h:687
GRPC_CHANNEL_SHUTDOWN
@ GRPC_CHANNEL_SHUTDOWN
Definition: include/grpc/impl/codegen/connectivity_state.h:40
absl::InlinedVector
Definition: abseil-cpp/absl/container/inlined_vector.h:69
google::protobuf::python::descriptor::Index
static PyObject * Index(PyContainer *self, PyObject *item)
Definition: bloaty/third_party/protobuf/python/google/protobuf/pyext/descriptor_containers.cc:672
grpc_error
Definition: error_internal.h:42
grpc_core::SubchannelList::ShutdownLocked
virtual void ShutdownLocked()
Definition: subchannel_list.h:423
closure_
grpc_closure closure_
Definition: ring_hash.cc:342
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
config_
RefCountedPtr< RingHashLbConfig > config_
Definition: ring_hash.cc:353
ABSL_FALLTHROUGH_INTENDED
#define ABSL_FALLTHROUGH_INTENDED
Definition: abseil-cpp/absl/base/attributes.h:641
absl::str_format_internal::LengthMod::h
@ h
grpc_closure
Definition: closure.h:56
grpc_core::UniqueTypeName::Factory
Definition: unique_type_name.h:60
absl::InlinedVector::end
iterator end() noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:401
sync.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
state
static struct rpc_state state
Definition: bad_server_response_test.cc:87
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:05