xds_cluster_impl.cc
Go to the documentation of this file.
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
19 #include <stddef.h>
20 #include <stdint.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/memory/memory.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 #include "absl/types/variant.h"
38 
41 #include <grpc/support/log.h>
42 
64 #include "src/core/lib/json/json.h"
67 
68 namespace grpc_core {
69 
70 TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
71 
72 namespace {
73 
74 //
75 // global circuit breaker atomic map
76 //
77 
78 class CircuitBreakerCallCounterMap {
79  public:
80  using Key =
81  std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
82 
83  class CallCounter : public RefCounted<CallCounter> {
84  public:
85  explicit CallCounter(Key key) : key_(std::move(key)) {}
86  ~CallCounter() override;
87 
88  uint32_t Load() {
89  return concurrent_requests_.load(std::memory_order_seq_cst);
90  }
91  uint32_t Increment() { return concurrent_requests_.fetch_add(1); }
92  void Decrement() { concurrent_requests_.fetch_sub(1); }
93 
94  private:
96  std::atomic<uint32_t> concurrent_requests_{0};
97  };
98 
99  RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
101 
102  private:
104  std::map<Key, CallCounter*> map_ ABSL_GUARDED_BY(mu_);
105 };
106 
107 CircuitBreakerCallCounterMap* g_call_counter_map = nullptr;
108 
109 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
110 CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
111  const std::string& eds_service_name) {
113  RefCountedPtr<CallCounter> result;
114  MutexLock lock(&mu_);
115  auto it = map_.find(key);
116  if (it == map_.end()) {
117  it = map_.insert({key, nullptr}).first;
118  } else {
119  result = it->second->RefIfNonZero();
120  }
121  if (result == nullptr) {
122  result = MakeRefCounted<CallCounter>(std::move(key));
123  it->second = result.get();
124  }
125  return result;
126 }
127 
128 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
129  MutexLock lock(&g_call_counter_map->mu_);
130  auto it = g_call_counter_map->map_.find(key_);
131  if (it != g_call_counter_map->map_.end() && it->second == this) {
132  g_call_counter_map->map_.erase(it);
133  }
134 }
135 
136 //
137 // LB policy
138 //
139 
140 constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental";
141 
142 // Config for xDS Cluster Impl LB policy.
143 class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
144  public:
145  XdsClusterImplLbConfig(
146  RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
150  RefCountedPtr<XdsEndpointResource::DropConfig> drop_config)
151  : child_policy_(std::move(child_policy)),
156  drop_config_(std::move(drop_config)) {}
157 
158  const char* name() const override { return kXdsClusterImpl; }
159 
160  RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
161  return child_policy_;
162  }
163  const std::string& cluster_name() const { return cluster_name_; }
164  const std::string& eds_service_name() const { return eds_service_name_; }
166  const {
168  };
170  RefCountedPtr<XdsEndpointResource::DropConfig> drop_config() const {
171  return drop_config_;
172  }
173 
174  private:
175  RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
180  RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
181 };
182 
183 // xDS Cluster Impl LB policy.
184 class XdsClusterImplLb : public LoadBalancingPolicy {
185  public:
186  XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client, Args args);
187 
188  const char* name() const override { return kXdsClusterImpl; }
189 
190  void UpdateLocked(UpdateArgs args) override;
191  void ExitIdleLocked() override;
192  void ResetBackoffLocked() override;
193 
194  private:
195  class StatsSubchannelWrapper : public DelegatingSubchannel {
196  public:
197  StatsSubchannelWrapper(
198  RefCountedPtr<SubchannelInterface> wrapped_subchannel,
199  RefCountedPtr<XdsClusterLocalityStats> locality_stats)
200  : DelegatingSubchannel(std::move(wrapped_subchannel)),
201  locality_stats_(std::move(locality_stats)) {}
202 
203  XdsClusterLocalityStats* locality_stats() const {
204  return locality_stats_.get();
205  }
206 
207  private:
208  RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
209  };
210 
211  // A simple wrapper for ref-counting a picker from the child policy.
212  class RefCountedPicker : public RefCounted<RefCountedPicker> {
213  public:
214  explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
215  : picker_(std::move(picker)) {}
216  PickResult Pick(PickArgs args) { return picker_->Pick(args); }
217 
218  private:
219  std::unique_ptr<SubchannelPicker> picker_;
220  };
221 
222  // A picker that wraps the picker from the child to perform drops.
223  class Picker : public SubchannelPicker {
224  public:
225  Picker(XdsClusterImplLb* xds_cluster_impl_lb,
226  RefCountedPtr<RefCountedPicker> picker);
227 
228  PickResult Pick(PickArgs args) override;
229 
230  private:
231  class SubchannelCallTracker;
232 
233  RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
235  RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
236  RefCountedPtr<XdsClusterDropStats> drop_stats_;
237  RefCountedPtr<RefCountedPicker> picker_;
238  };
239 
240  class Helper : public ChannelControlHelper {
241  public:
242  explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
243  : xds_cluster_impl_policy_(std::move(xds_cluster_impl_policy)) {}
244 
245  ~Helper() override {
246  xds_cluster_impl_policy_.reset(DEBUG_LOCATION, "Helper");
247  }
248 
249  RefCountedPtr<SubchannelInterface> CreateSubchannel(
250  ServerAddress address, const grpc_channel_args& args) override;
251  void UpdateState(grpc_connectivity_state state, const absl::Status& status,
252  std::unique_ptr<SubchannelPicker> picker) override;
253  void RequestReresolution() override;
254  absl::string_view GetAuthority() override;
255  void AddTraceEvent(TraceSeverity severity,
256  absl::string_view message) override;
257 
258  private:
259  RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy_;
260  };
261 
262  ~XdsClusterImplLb() override;
263 
264  void ShutdownLocked() override;
265 
266  OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
267  const grpc_channel_args* args);
268  void UpdateChildPolicyLocked(absl::StatusOr<ServerAddressList> addresses,
269  const grpc_channel_args* args);
270 
271  void MaybeUpdatePickerLocked();
272 
273  // Current config from the resolver.
274  RefCountedPtr<XdsClusterImplLbConfig> config_;
275 
276  // Current concurrent number of requests.
277  RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
278 
279  // Internal state.
280  bool shutting_down_ = false;
281 
282  // The xds client.
283  RefCountedPtr<XdsClient> xds_client_;
284 
285  // The stats for client-side load reporting.
286  RefCountedPtr<XdsClusterDropStats> drop_stats_;
287 
288  OrphanablePtr<LoadBalancingPolicy> child_policy_;
289 
290  // Latest state and picker reported by the child policy.
293  RefCountedPtr<RefCountedPicker> picker_;
294 };
295 
296 //
297 // XdsClusterImplLb::Picker::SubchannelCallTracker
298 //
299 
300 class XdsClusterImplLb::Picker::SubchannelCallTracker
301  : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
302  public:
303  SubchannelCallTracker(
304  std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
305  original_subchannel_call_tracker,
306  RefCountedPtr<XdsClusterLocalityStats> locality_stats,
307  RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
309  std::move(original_subchannel_call_tracker)),
310  locality_stats_(std::move(locality_stats)),
311  call_counter_(std::move(call_counter)) {}
312 
313  ~SubchannelCallTracker() override {
314  locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
315  call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
317  }
318 
319  void Start() override {
320  // Increment number of calls in flight.
321  call_counter_->Increment();
322  // Record a call started.
323  if (locality_stats_ != nullptr) {
324  locality_stats_->AddCallStarted();
325  }
326  // Delegate if needed.
327  if (original_subchannel_call_tracker_ != nullptr) {
329  }
330 #ifndef NDEBUG
331  started_ = true;
332 #endif
333  }
334 
335  void Finish(FinishArgs args) override {
336  // Delegate if needed.
337  if (original_subchannel_call_tracker_ != nullptr) {
339  }
340  // Record call completion for load reporting.
341  if (locality_stats_ != nullptr) {
342  locality_stats_->AddCallFinished(!args.status.ok());
343  }
344  // Decrement number of calls in flight.
345  call_counter_->Decrement();
346 #ifndef NDEBUG
347  started_ = false;
348 #endif
349  }
350 
351  private:
352  std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
354  RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
355  RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
356 #ifndef NDEBUG
357  bool started_ = false;
358 #endif
359 };
360 
361 //
362 // XdsClusterImplLb::Picker
363 //
364 
365 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
366  RefCountedPtr<RefCountedPicker> picker)
367  : call_counter_(xds_cluster_impl_lb->call_counter_),
369  xds_cluster_impl_lb->config_->max_concurrent_requests()),
370  drop_config_(xds_cluster_impl_lb->config_->drop_config()),
371  drop_stats_(xds_cluster_impl_lb->drop_stats_),
372  picker_(std::move(picker)) {
374  gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
375  xds_cluster_impl_lb, this);
376  }
377 }
378 
379 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
380  LoadBalancingPolicy::PickArgs args) {
381  // Handle EDS drops.
382  const std::string* drop_category;
383  if (drop_config_->ShouldDrop(&drop_category)) {
384  if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
385  return PickResult::Drop(absl::UnavailableError(
386  absl::StrCat("EDS-configured drop: ", *drop_category)));
387  }
388  // Check if we exceeded the max concurrent requests circuit breaking limit.
389  // Note: We check the value here, but we don't actually increment the
390  // counter for the current request until the channel calls the subchannel
391  // call tracker's Start() method. This means that we may wind up
392  // allowing more concurrent requests than the configured limit.
393  if (call_counter_->Load() >= max_concurrent_requests_) {
394  if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
395  return PickResult::Drop(absl::UnavailableError("circuit breaker drop"));
396  }
397  // If we're not dropping the call, we should always have a child picker.
398  if (picker_ == nullptr) { // Should never happen.
400  "xds_cluster_impl picker not given any child picker"));
401  }
402  // Not dropping, so delegate to child picker.
403  PickResult result = picker_->Pick(args);
404  auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
405  if (complete_pick != nullptr) {
406  RefCountedPtr<XdsClusterLocalityStats> locality_stats;
407  if (drop_stats_ != nullptr) { // If load reporting is enabled.
408  auto* subchannel_wrapper =
409  static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
410  // Handle load reporting.
411  locality_stats = subchannel_wrapper->locality_stats()->Ref(
412  DEBUG_LOCATION, "SubchannelCallTracker");
413  // Unwrap subchannel to pass back up the stack.
414  complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
415  }
416  // Inject subchannel call tracker to record call completion.
417  complete_pick->subchannel_call_tracker =
418  absl::make_unique<SubchannelCallTracker>(
419  std::move(complete_pick->subchannel_call_tracker),
420  std::move(locality_stats),
421  call_counter_->Ref(DEBUG_LOCATION, "SubchannelCallTracker"));
422  } else {
423  // TODO(roth): We should ideally also record call failures here in the case
424  // where a pick fails. This is challenging, because we don't know which
425  // picks are for wait_for_ready RPCs or how many times we'll return a
426  // failure for the same wait_for_ready RPC.
427  }
428  return result;
429 }
430 
431 //
432 // XdsClusterImplLb
433 //
434 
435 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
436  Args args)
437  : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
439  gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p",
440  this, xds_client_.get());
441  }
442 }
443 
444 XdsClusterImplLb::~XdsClusterImplLb() {
447  "[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
448  this);
449  }
450 }
451 
452 void XdsClusterImplLb::ShutdownLocked() {
454  gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
455  }
456  shutting_down_ = true;
457  // Remove the child policy's interested_parties pollset_set from the
458  // xDS policy.
459  if (child_policy_ != nullptr) {
460  grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
461  interested_parties());
462  child_policy_.reset();
463  }
464  // Drop our ref to the child's picker, in case it's holding a ref to
465  // the child.
466  picker_.reset();
467  drop_stats_.reset();
468  xds_client_.reset();
469 }
470 
472  if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
473 }
474 
475 void XdsClusterImplLb::ResetBackoffLocked() {
476  // The XdsClient will have its backoff reset by the xds resolver, so we
477  // don't need to do it here.
478  if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
479 }
480 
481 void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
483  gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
484  }
485  // Update config.
486  const bool is_initial_update = config_ == nullptr;
487  auto old_config = std::move(config_);
488  config_ = std::move(args.config);
489  // On initial update, create drop stats.
490  if (is_initial_update) {
491  if (config_->lrs_load_reporting_server().has_value()) {
492  drop_stats_ = xds_client_->AddClusterDropStats(
493  config_->lrs_load_reporting_server().value(), config_->cluster_name(),
494  config_->eds_service_name());
495  if (drop_stats_ == nullptr) {
497  "[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
498  "LRS server %s, cluster %s, EDS service name %s, load "
499  "reporting for drops will not be done.",
500  this, config_->lrs_load_reporting_server()->server_uri.c_str(),
501  config_->cluster_name().c_str(),
502  config_->eds_service_name().c_str());
503  }
504  }
505  call_counter_ = g_call_counter_map->GetOrCreate(
506  config_->cluster_name(), config_->eds_service_name());
507  } else {
508  // Cluster name, EDS service name, and LRS server name should never
509  // change, because the xds_cluster_resolver policy above us should be
510  // swapped out if that happens.
511  GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
512  GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
513  GPR_ASSERT(config_->lrs_load_reporting_server() ==
514  old_config->lrs_load_reporting_server());
515  }
516  // Update picker if max_concurrent_requests has changed.
517  if (is_initial_update || config_->max_concurrent_requests() !=
518  old_config->max_concurrent_requests()) {
519  MaybeUpdatePickerLocked();
520  }
521  // Update child policy.
522  UpdateChildPolicyLocked(std::move(args.addresses), args.args);
523 }
524 
525 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
526  // If we're dropping all calls, report READY, regardless of what (or
527  // whether) the child has reported.
528  if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
529  auto drop_picker = absl::make_unique<Picker>(this, picker_);
532  "[xds_cluster_impl_lb %p] updating connectivity (drop all): "
533  "state=READY picker=%p",
534  this, drop_picker.get());
535  }
536  channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
537  std::move(drop_picker));
538  return;
539  }
540  // Otherwise, update only if we have a child picker.
541  if (picker_ != nullptr) {
542  auto drop_picker = absl::make_unique<Picker>(this, picker_);
545  "[xds_cluster_impl_lb %p] updating connectivity: state=%s "
546  "status=(%s) picker=%p",
547  this, ConnectivityStateName(state_), status_.ToString().c_str(),
548  drop_picker.get());
549  }
550  channel_control_helper()->UpdateState(state_, status_,
551  std::move(drop_picker));
552  }
553 }
554 
555 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
556  const grpc_channel_args* args) {
557  LoadBalancingPolicy::Args lb_policy_args;
558  lb_policy_args.work_serializer = work_serializer();
559  lb_policy_args.args = args;
560  lb_policy_args.channel_control_helper =
561  absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
562  OrphanablePtr<LoadBalancingPolicy> lb_policy =
563  MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
567  "[xds_cluster_impl_lb %p] Created new child policy handler %p",
568  this, lb_policy.get());
569  }
570  // Add our interested_parties pollset_set to that of the newly created
571  // child policy. This will make the child policy progress upon activity on
572  // this policy, which in turn is tied to the application's call.
573  grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
574  interested_parties());
575  return lb_policy;
576 }
577 
578 void XdsClusterImplLb::UpdateChildPolicyLocked(
580  const grpc_channel_args* args) {
581  // Create policy if needed.
582  if (child_policy_ == nullptr) {
583  child_policy_ = CreateChildPolicyLocked(args);
584  }
585  // Construct update args.
586  UpdateArgs update_args;
587  update_args.addresses = std::move(addresses);
588  update_args.config = config_->child_policy();
590  const_cast<char*>(GRPC_ARG_XDS_CLUSTER_NAME),
591  const_cast<char*>(config_->cluster_name().c_str()));
592  update_args.args = grpc_channel_args_copy_and_add(args, &cluster_arg, 1);
593  // Update the policy.
596  "[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
597  child_policy_.get());
598  }
599  child_policy_->UpdateLocked(std::move(update_args));
600 }
601 
602 //
603 // XdsClusterImplLb::Helper
604 //
605 
606 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
607  ServerAddress address, const grpc_channel_args& args) {
608  if (xds_cluster_impl_policy_->shutting_down_) return nullptr;
609  // If load reporting is enabled, wrap the subchannel such that it
610  // includes the locality stats object, which will be used by the EdsPicker.
611  if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
612  .has_value()) {
613  RefCountedPtr<XdsLocalityName> locality_name;
614  auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
615  if (attribute != nullptr) {
616  const auto* locality_attr =
617  static_cast<const XdsLocalityAttribute*>(attribute);
618  locality_name = locality_attr->locality_name();
619  }
620  RefCountedPtr<XdsClusterLocalityStats> locality_stats =
621  xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats(
622  xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
623  .value(),
624  xds_cluster_impl_policy_->config_->cluster_name(),
625  xds_cluster_impl_policy_->config_->eds_service_name(),
626  std::move(locality_name));
627  if (locality_stats != nullptr) {
628  return MakeRefCounted<StatsSubchannelWrapper>(
629  xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
630  std::move(address), args),
631  std::move(locality_stats));
632  }
634  "[xds_cluster_impl_lb %p] Failed to get locality stats object for "
635  "LRS server %s, cluster %s, EDS service name %s; load reports will "
636  "not be generated (not wrapping subchannel)",
637  this,
638  xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
639  ->server_uri.c_str(),
640  xds_cluster_impl_policy_->config_->cluster_name().c_str(),
641  xds_cluster_impl_policy_->config_->eds_service_name().c_str());
642  }
643  // Load reporting not enabled, so don't wrap the subchannel.
644  return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
645  std::move(address), args);
646 }
647 
648 void XdsClusterImplLb::Helper::UpdateState(
650  std::unique_ptr<SubchannelPicker> picker) {
651  if (xds_cluster_impl_policy_->shutting_down_) return;
654  "[xds_cluster_impl_lb %p] child connectivity state update: "
655  "state=%s (%s) "
656  "picker=%p",
658  status.ToString().c_str(), picker.get());
659  }
660  // Save the state and picker.
662  xds_cluster_impl_policy_->status_ = status;
663  xds_cluster_impl_policy_->picker_ =
664  MakeRefCounted<RefCountedPicker>(std::move(picker));
665  // Wrap the picker and return it to the channel.
666  xds_cluster_impl_policy_->MaybeUpdatePickerLocked();
667 }
668 
669 void XdsClusterImplLb::Helper::RequestReresolution() {
670  if (xds_cluster_impl_policy_->shutting_down_) return;
671  xds_cluster_impl_policy_->channel_control_helper()->RequestReresolution();
672 }
673 
674 absl::string_view XdsClusterImplLb::Helper::GetAuthority() {
675  return xds_cluster_impl_policy_->channel_control_helper()->GetAuthority();
676 }
677 
678 void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity,
680  if (xds_cluster_impl_policy_->shutting_down_) return;
681  xds_cluster_impl_policy_->channel_control_helper()->AddTraceEvent(severity,
682  message);
683 }
684 
685 //
686 // factory
687 //
688 
689 class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
690  public:
691  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
692  LoadBalancingPolicy::Args args) const override {
693  RefCountedPtr<XdsClient> xds_client =
694  XdsClient::GetFromChannelArgs(*args.args);
695  if (xds_client == nullptr) {
697  "XdsClient not present in channel args -- cannot instantiate "
698  "xds_cluster_impl LB policy");
699  return nullptr;
700  }
701  return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
702  std::move(args));
703  }
704 
705  const char* name() const override { return kXdsClusterImpl; }
706 
707  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
708  const Json& json, grpc_error_handle* error) const override {
710  if (json.type() == Json::Type::JSON_NULL) {
711  // This policy was configured in the deprecated loadBalancingPolicy
712  // field or in the client API.
714  "field:loadBalancingPolicy error:xds_cluster_impl policy requires "
715  "configuration. Please use loadBalancingConfig field of service "
716  "config instead.");
717  return nullptr;
718  }
719  std::vector<grpc_error_handle> error_list;
720  // Child policy.
721  RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
722  auto it = json.object_value().find("childPolicy");
723  if (it == json.object_value().end()) {
724  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
725  "field:childPolicy error:required field missing"));
726  } else {
728  child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
729  it->second, &parse_error);
730  if (child_policy == nullptr) {
732  std::vector<grpc_error_handle> child_errors;
733  child_errors.push_back(parse_error);
734  error_list.push_back(
735  GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
736  }
737  }
738  // Cluster name.
740  it = json.object_value().find("clusterName");
741  if (it == json.object_value().end()) {
742  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
743  "field:clusterName error:required field missing"));
744  } else if (it->second.type() != Json::Type::STRING) {
745  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
746  "field:clusterName error:type should be string"));
747  } else {
748  cluster_name = it->second.string_value();
749  }
750  // EDS service name.
752  it = json.object_value().find("edsServiceName");
753  if (it != json.object_value().end()) {
754  if (it->second.type() != Json::Type::STRING) {
755  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
756  "field:edsServiceName error:type should be string"));
757  } else {
758  eds_service_name = it->second.string_value();
759  }
760  }
761  // LRS load reporting server name.
763  it = json.object_value().find("lrsLoadReportingServer");
764  if (it != json.object_value().end()) {
765  if (it->second.type() != Json::Type::OBJECT) {
766  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
767  "field:lrsLoadReportingServer error:type should be object"));
768  } else {
769  grpc_error_handle parser_error;
771  it->second.object_value(), &parser_error);
772  if (!GRPC_ERROR_IS_NONE(parser_error)) {
773  error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
774  absl::StrCat("errors parsing lrs_load_reporting_server")));
775  error_list.push_back(parser_error);
776  }
777  }
778  }
779  // Max concurrent requests.
781  it = json.object_value().find("maxConcurrentRequests");
782  if (it != json.object_value().end()) {
783  if (it->second.type() != Json::Type::NUMBER) {
784  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
785  "field:max_concurrent_requests error:must be of type number"));
786  } else {
788  gpr_parse_nonnegative_int(it->second.string_value().c_str());
789  }
790  }
791  // Drop config.
792  auto drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
793  it = json.object_value().find("dropCategories");
794  if (it == json.object_value().end()) {
795  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
796  "field:dropCategories error:required field missing"));
797  } else {
798  std::vector<grpc_error_handle> child_errors =
799  ParseDropCategories(it->second, drop_config.get());
800  if (!child_errors.empty()) {
801  error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
802  "field:dropCategories", &child_errors));
803  }
804  }
805  if (!error_list.empty()) {
807  "xds_cluster_impl_experimental LB policy config", &error_list);
808  return nullptr;
809  }
810  return MakeRefCounted<XdsClusterImplLbConfig>(
811  std::move(child_policy), std::move(cluster_name),
813  max_concurrent_requests, std::move(drop_config));
814  }
815 
816  private:
817  static std::vector<grpc_error_handle> ParseDropCategories(
818  const Json& json, XdsEndpointResource::DropConfig* drop_config) {
819  std::vector<grpc_error_handle> error_list;
820  if (json.type() != Json::Type::ARRAY) {
821  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
822  "dropCategories field is not an array"));
823  return error_list;
824  }
825  for (size_t i = 0; i < json.array_value().size(); ++i) {
826  const Json& entry = json.array_value()[i];
827  std::vector<grpc_error_handle> child_errors =
828  ParseDropCategory(entry, drop_config);
829  if (!child_errors.empty()) {
831  absl::StrCat("errors parsing index ", i));
832  for (size_t i = 0; i < child_errors.size(); ++i) {
833  error = grpc_error_add_child(error, child_errors[i]);
834  }
835  error_list.push_back(error);
836  }
837  }
838  return error_list;
839  }
840 
841  static std::vector<grpc_error_handle> ParseDropCategory(
842  const Json& json, XdsEndpointResource::DropConfig* drop_config) {
843  std::vector<grpc_error_handle> error_list;
844  if (json.type() != Json::Type::OBJECT) {
845  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
846  "dropCategories entry is not an object"));
847  return error_list;
848  }
849  std::string category;
850  auto it = json.object_value().find("category");
851  if (it == json.object_value().end()) {
852  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
853  "\"category\" field not present"));
854  } else if (it->second.type() != Json::Type::STRING) {
855  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
856  "\"category\" field is not a string"));
857  } else {
858  category = it->second.string_value();
859  }
860  uint32_t requests_per_million = 0;
861  it = json.object_value().find("requests_per_million");
862  if (it == json.object_value().end()) {
863  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
864  "\"requests_per_million\" field is not present"));
865  } else if (it->second.type() != Json::Type::NUMBER) {
866  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
867  "\"requests_per_million\" field is not a number"));
868  } else {
869  requests_per_million =
870  gpr_parse_nonnegative_int(it->second.string_value().c_str());
871  }
872  if (error_list.empty()) {
873  drop_config->AddCategory(std::move(category), requests_per_million);
874  }
875  return error_list;
876  }
877 };
878 
879 } // namespace
880 
881 } // namespace grpc_core
882 
883 //
884 // Plugin registration
885 //
886 
888  grpc_core::g_call_counter_map = new grpc_core::CircuitBreakerCallCounterMap();
891  absl::make_unique<grpc_core::XdsClusterImplLbFactory>());
892 }
893 
895  delete grpc_core::g_call_counter_map;
896 }
grpc_arg
Definition: grpc_types.h:103
trace.h
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
regen-readme.it
it
Definition: regen-readme.py:15
grpc_core::LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory
static void RegisterLoadBalancingPolicyFactory(std::unique_ptr< LoadBalancingPolicyFactory > factory)
Definition: lb_policy_registry.cc:87
orphanable.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
bloat_diff.severity
def severity
Definition: bloat_diff.py:143
MutexLock
#define MutexLock(x)
Definition: bloaty/third_party/re2/util/mutex.h:125
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
connectivity_state.h
xds_client_stats.h
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
key_
Key key_
Definition: xds_cluster_impl.cc:95
status_
absl::Status status_
Definition: xds_cluster_impl.cc:292
grpc_core
Definition: call_metric_recorder.h:31
cluster_name
std::string cluster_name
Definition: xds_cluster_resolver.cc:91
grpc_lb_policy_xds_cluster_impl_init
void grpc_lb_policy_xds_cluster_impl_init()
Definition: xds_cluster_impl.cc:887
string.h
xds_endpoint.h
locality_stats_
RefCountedPtr< XdsClusterLocalityStats > locality_stats_
Definition: xds_cluster_impl.cc:208
shutting_down_
bool shutting_down_
Definition: xds_cluster_impl.cc:280
ExitIdleLocked
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb void ExitIdleLocked()
Definition: rls.cc:299
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
lb_policy.h
grpc_lb_policy_xds_cluster_impl_shutdown
void grpc_lb_policy_xds_cluster_impl_shutdown()
Definition: xds_cluster_impl.cc:894
picker_
std::unique_ptr< SubchannelPicker > picker_
Definition: xds_cluster_impl.cc:219
cluster_name_
std::string cluster_name_
Definition: xds_cluster_impl.cc:176
lb_policy_factory.h
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
status
absl::Status status
Definition: rls.cc:251
setup.name
name
Definition: setup.py:542
grpc_channel_arg_string_create
grpc_arg grpc_channel_arg_string_create(char *name, char *value)
Definition: channel_args.cc:476
absl::InternalError
Status InternalError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:347
grpc_pollset_set_del_pollset_set
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:52
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
subchannel_interface.h
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
child_policy_
RefCountedPtr< LoadBalancingPolicy::Config > child_policy_
Definition: xds_cluster_impl.cc:175
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
GRPC_ERROR_CREATE_FROM_VECTOR
#define GRPC_ERROR_CREATE_FROM_VECTOR(desc, error_list)
Definition: error.h:314
call_counter_
RefCountedPtr< CircuitBreakerCallCounterMap::CallCounter > call_counter_
Definition: xds_cluster_impl.cc:231
grpc_types.h
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_error_add_child
grpc_error_handle grpc_error_add_child(grpc_error_handle src, grpc_error_handle child)
Definition: error.cc:678
benchmark::internal::Increment
void Increment(UserCounters *l, UserCounters const &r)
Definition: benchmark/src/counter.cc:49
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
hpack_encoder_fixtures::Args
Args({0, 16384})
cluster
absl::string_view cluster
Definition: xds_resolver.cc:331
Json
JSON (JavaScript Object Notation).
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:227
config_
RefCountedPtr< XdsClusterImplLbConfig > config_
Definition: xds_cluster_impl.cc:274
xds_client.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
max_concurrent_requests_
uint32_t max_concurrent_requests_
Definition: xds_cluster_impl.cc:179
xds_bootstrap.h
connectivity_state.h
lrs_load_reporting_server
absl::optional< XdsBootstrap::XdsServer > lrs_load_reporting_server
Definition: xds_cluster_resolver.cc:92
drop_config_
RefCountedPtr< XdsEndpointResource::DropConfig > drop_config_
Definition: xds_cluster_impl.cc:180
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
grpc_core::grpc_xds_cluster_impl_lb_trace
TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb")
pollset_set.h
started_
bool started_
Definition: xds_cluster_impl.cc:357
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
server_address.h
xds_channel_args.h
absl::flags_internal::Parse
bool Parse(FlagOpFn op, absl::string_view text, void *dst, std::string *error)
Definition: abseil-cpp/absl/flags/internal/flag.h:125
error.h
original_subchannel_call_tracker_
std::unique_ptr< LoadBalancingPolicy::SubchannelCallTrackerInterface > original_subchannel_call_tracker_
Definition: xds_cluster_impl.cc:353
json.h
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
eds_service_name_
std::string eds_service_name_
Definition: xds_cluster_impl.cc:177
stdint.h
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
child_policy_handler.h
state_
grpc_connectivity_state state_
Definition: xds_cluster_impl.cc:291
grpc_core::kXdsLocalityNameAttributeKey
const char * kXdsLocalityNameAttributeKey
Definition: xds_cluster_resolver.cc:81
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
debug_location.h
key
const char * key
Definition: hpack_parser_table.cc:164
drop_stats_
RefCountedPtr< XdsClusterDropStats > drop_stats_
Definition: xds_cluster_impl.cc:236
eds_service_name
std::string eds_service_name
Definition: xds_cluster_resolver.cc:99
lb_policy_registry.h
ref_counted.h
benchmark::internal::Finish
double Finish(Counter const &c, IterationCount iterations, double cpu_time, double num_threads)
Definition: benchmark/src/counter.cc:20
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
GRPC_ERROR_CREATE_FROM_CPP_STRING
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
Definition: error.h:297
testing::Key
internal::KeyMatcher< M > Key(M inner_matcher)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:9141
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_pollset_set_add_pollset_set
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:47
first
StrT first
Definition: cxa_demangle.cpp:4884
lrs_load_reporting_server_
absl::optional< XdsBootstrap::XdsServer > lrs_load_reporting_server_
Definition: xds_cluster_impl.cc:178
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
GRPC_ARG_XDS_CLUSTER_NAME
#define GRPC_ARG_XDS_CLUSTER_NAME
Definition: filters/client_channel/lb_policy/xds/xds_channel_args.h:22
absl::UnavailableError
Status UnavailableError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:375
ref_counted_ptr.h
concurrent_requests_
std::atomic< uint32_t > concurrent_requests_
Definition: xds_cluster_impl.cc:96
channel_args.h
check_redundant_namespace_qualifiers.Config
Config
Definition: check_redundant_namespace_qualifiers.py:142
gpr_parse_nonnegative_int
int gpr_parse_nonnegative_int(const char *value)
Definition: string.cc:218
Fail
void Fail(const char *msg)
Definition: bloaty/third_party/googletest/googletest/test/gtest_assert_by_exception_test.cc:52
xds.h
absl::StatusOr< ServerAddressList >
xds_client_
RefCountedPtr< XdsClient > xds_client_
Definition: xds_cluster_impl.cc:283
grpc_error
Definition: error_internal.h:42
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
pair
std::pair< std::string, std::string > pair
Definition: abseil-cpp/absl/container/internal/raw_hash_set_benchmark.cc:78
sync.h
grpc_channel_args_copy_and_add
grpc_channel_args * grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:224
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
state
static struct rpc_state state
Definition: bad_server_response_test.cc:87
mu_
Mutex mu_
Definition: xds_cluster_impl.cc:103
xds_cluster_impl_policy_
RefCountedPtr< XdsClusterImplLb > xds_cluster_impl_policy_
Definition: xds_cluster_impl.cc:259
Decrement
static Validator * Decrement(int *counter)
Definition: api_fuzzer.cc:297
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
parse_error
@ parse_error
Definition: pem_info.c:88
max_concurrent_requests
uint32_t max_concurrent_requests
Definition: xds_cluster_resolver.cc:93
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:55