xds_client.cc
Go to the documentation of this file.
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24 
25 #include <algorithm>
26 
27 #include "absl/container/inlined_vector.h"
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/str_split.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/strings/strip.h"
35 
36 #include <grpc/byte_buffer.h>
38 #include <grpc/grpc.h>
41 #include <grpc/slice.h>
42 #include <grpc/status.h>
43 #include <grpc/support/alloc.h>
44 #include <grpc/support/log.h>
46 #include <grpc/support/time.h>
47 
61 #include "src/core/lib/gpr/env.h"
82 
83 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
84 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
85 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
86 #define GRPC_XDS_RECONNECT_JITTER 0.2
87 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
88 
89 namespace grpc_core {
90 
91 TraceFlag grpc_xds_client_trace(false, "xds_client");
92 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
93 
94 namespace {
95 
96 Mutex* g_mu = nullptr;
97 
98 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
99 XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
100 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
101 
102 } // namespace
103 
104 //
105 // Internal class declarations
106 //
107 
108 // An xds call wrapper that can restart a call upon failure. Holds a ref to
109 // the xds channel. The template parameter is the kind of wrapped xds call.
110 template <typename T>
112  : public InternallyRefCounted<RetryableCall<T>> {
113  public:
115 
116  void Orphan() override;
117 
118  void OnCallFinishedLocked();
119 
120  T* calld() const { return calld_.get(); }
121  ChannelState* chand() const { return chand_.get(); }
122 
123  bool IsCurrentCallOnChannel() const;
124 
125  private:
126  void StartNewCallLocked();
127  void StartRetryTimerLocked();
128  static void OnRetryTimer(void* arg, grpc_error_handle error);
130 
131  // The wrapped xds call that talks to the xds server. It's instantiated
132  // every time we start a new call. It's null during call retry backoff.
134  // The owning xds channel.
136 
137  // Retry state.
142 
143  bool shutting_down_ = false;
144 };
145 
146 // Contains an ADS call to the xds server.
148  : public InternallyRefCounted<AdsCallState> {
149  public:
150  // The ctor and dtor should not be used directly.
152  ~AdsCallState() override;
153 
154  void Orphan() override;
155 
156  RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
157  ChannelState* chand() const { return parent_->chand(); }
158  XdsClient* xds_client() const { return chand()->xds_client(); }
159  bool seen_response() const { return seen_response_; }
160 
161  void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name,
162  bool delay_send)
164  void UnsubscribeLocked(const XdsResourceType* type,
165  const XdsResourceName& name, bool delay_unsubscription)
167 
169 
170  private:
172  public:
173  struct Result {
178  std::vector<std::string> errors;
179  std::map<std::string /*authority*/, std::set<XdsResourceKey>>
181  bool have_valid_resources = false;
182  };
183 
184  explicit AdsResponseParser(AdsCallState* ads_call_state)
185  : ads_call_state_(ads_call_state) {}
186 
187  absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override
189 
190  void ParseResource(const XdsEncodingContext& context, size_t idx,
192  absl::string_view serialized_resource) override
194 
196 
197  private:
199 
203  };
204 
205  class ResourceTimer : public InternallyRefCounted<ResourceTimer> {
206  public:
208  : type_(type), name_(name) {
210  grpc_schedule_on_exec_ctx);
211  }
212 
213  void Orphan() override {
215  Unref(DEBUG_LOCATION, "Orphan");
216  }
217 
220  if (!timer_start_needed_) return;
221  timer_start_needed_ = false;
222  // Check if we already have a cached version of this resource
223  // (i.e., if this is the initial request for the resource after an
224  // ADS stream restart). If so, we don't start the timer, because
225  // (a) we already have the resource and (b) the server may
226  // optimize by not resending the resource that we already have.
227  auto& authority_state =
228  ads_calld->xds_client()->authority_state_map_[name_.authority];
229  ResourceState& state = authority_state.resource_map[type_][name_.key];
230  if (state.resource != nullptr) return;
231  // Start timer.
233  Ref(DEBUG_LOCATION, "timer").release();
234  timer_pending_ = true;
236  &timer_,
237  ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
238  &timer_callback_);
239  }
240 
242  // If the timer hasn't been started yet, make sure we don't start
243  // it later. This can happen if the last watch for an LDS or CDS
244  // resource is cancelled and then restarted, both while an ADS
245  // request for a different resource type is being sent (causing
246  // the unsubscription and then resubscription requests to be
247  // queued), and then we get a response for the LDS or CDS resource.
248  // In that case, we would call MaybeCancelTimer() when we receive the
249  // response and then MaybeStartTimer() when we finally send the new
250  // LDS or CDS request, thus causing the timer to fire when it shouldn't.
251  // For details, see https://github.com/grpc/grpc/issues/29583.
252  // TODO(roth): Find a way to write a test for this case.
253  timer_start_needed_ = false;
254  if (timer_pending_) {
256  timer_pending_ = false;
257  }
258  }
259 
260  private:
261  static void OnTimer(void* arg, grpc_error_handle error) {
262  ResourceTimer* self = static_cast<ResourceTimer*>(arg);
263  {
264  MutexLock lock(&self->ads_calld_->xds_client()->mu_);
265  self->OnTimerLocked(GRPC_ERROR_REF(error));
266  }
267  self->ads_calld_->xds_client()->work_serializer_.DrainQueue();
268  self->ads_calld_.reset();
269  self->Unref(DEBUG_LOCATION, "timer");
270  }
271 
275  timer_pending_ = false;
278  "[xds_client %p] xds server %s: timeout obtaining resource "
279  "{type=%s name=%s} from xds server",
280  ads_calld_->xds_client(),
281  ads_calld_->chand()->server_.server_uri.c_str(),
282  std::string(type_->type_url()).c_str(),
285  .c_str());
286  }
287  auto& authority_state =
288  ads_calld_->xds_client()->authority_state_map_[name_.authority];
289  ResourceState& state = authority_state.resource_map[type_][name_.key];
290  state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
291  ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
292  state.watchers);
293  }
295  }
296 
299 
301  bool timer_start_needed_ = true;
302  bool timer_pending_ = false;
305  };
306 
309 
310  // Nonce and error for this resource type.
313 
314  // Subscribed resources of this type.
315  std::map<std::string /*authority*/,
316  std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
318  };
319 
320  void SendMessageLocked(const XdsResourceType* type)
322 
324  void OnRequestSentLocked(grpc_error_handle error)
326  static void OnResponseReceived(void* arg, grpc_error_handle error);
327  bool OnResponseReceivedLocked()
329  static void OnStatusReceived(void* arg, grpc_error_handle error);
330  void OnStatusReceivedLocked(grpc_error_handle error)
332 
333  bool IsCurrentCallOnChannel() const;
334 
335  // Constructs a list of resource names of a given type for an ADS
336  // request. Also starts the timer for each resource if needed.
337  std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type)
339 
340  // The owning RetryableCall<>.
342 
345 
346  // Always non-NULL.
348 
349  // recv_initial_metadata
351 
352  // send_message
355 
356  // recv_message
359 
360  // recv_trailing_metadata
365 
366  // Resource types for which requests need to be sent.
368 
369  // State for each resource type.
371 };
372 
373 // Contains an LRS call to the xds server.
376  public:
377  // The ctor and dtor should not be used directly.
379  ~LrsCallState() override;
380 
381  void Orphan() override;
382 
383  void MaybeStartReportingLocked();
384 
386  ChannelState* chand() const { return parent_->chand(); }
387  XdsClient* xds_client() const { return chand()->xds_client(); }
388  bool seen_response() const { return seen_response_; }
389 
390  private:
391  // Reports client-side load stats according to a fixed interval.
392  class Reporter : public InternallyRefCounted<Reporter> {
393  public:
395  : parent_(std::move(parent)), report_interval_(report_interval) {
396  GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
397  grpc_schedule_on_exec_ctx);
398  GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
399  grpc_schedule_on_exec_ctx);
400  ScheduleNextReportLocked();
401  }
402 
403  void Orphan() override;
404 
405  private:
406  void ScheduleNextReportLocked()
408  static void OnNextReportTimer(void* arg, grpc_error_handle error);
409  bool OnNextReportTimerLocked(grpc_error_handle error)
411  bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
412  static void OnReportDone(void* arg, grpc_error_handle error);
413  bool OnReportDoneLocked(grpc_error_handle error)
415 
416  bool IsCurrentReporterOnCall() const {
417  return this == parent_->reporter_.get();
418  }
419  XdsClient* xds_client() const { return parent_->xds_client(); }
420 
421  // The owning LRS call.
423 
424  // The load reporting state.
426  bool last_report_counters_were_zero_ = false;
427  bool next_report_timer_callback_pending_ = false;
431  };
432 
433  static void OnInitialRequestSent(void* arg, grpc_error_handle error);
434  void OnInitialRequestSentLocked()
436  static void OnResponseReceived(void* arg, grpc_error_handle error);
437  bool OnResponseReceivedLocked()
439  static void OnStatusReceived(void* arg, grpc_error_handle error);
440  void OnStatusReceivedLocked(grpc_error_handle error)
442 
443  bool IsCurrentCallOnChannel() const;
444 
445  // The owning RetryableCall<>.
448 
449  // Always non-NULL.
451 
452  // recv_initial_metadata
454 
455  // send_message
457  grpc_closure on_initial_request_sent_;
458 
459  // recv_message
462 
463  // recv_trailing_metadata
468 
469  // Load reporting state.
470  bool send_all_clusters_ = false;
471  std::set<std::string> cluster_names_; // Asked for by the LRS server.
472  Duration load_reporting_interval_;
474 };
475 
476 //
477 // XdsClient::ChannelState::StateWatcher
478 //
479 
482  public:
484  : parent_(std::move(parent)) {}
485 
486  private:
488  const absl::Status& status) override {
489  {
490  MutexLock lock(&parent_->xds_client_->mu_);
491  if (!parent_->shutting_down_ &&
492  new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
493  // In TRANSIENT_FAILURE. Notify all watchers of error.
495  "[xds_client %p] xds channel for server %s in "
496  "state TRANSIENT_FAILURE: %s",
497  parent_->xds_client(), parent_->server_.server_uri.c_str(),
498  status.ToString().c_str());
499  parent_->xds_client_->NotifyOnErrorLocked(
501  "xds channel in TRANSIENT_FAILURE, connectivity error: ",
502  status.ToString())));
503  }
504  }
505  parent_->xds_client()->work_serializer_.DrainQueue();
506  }
507 
509 };
510 
511 //
512 // XdsClient::ChannelState
513 //
514 
515 namespace {
516 
517 grpc_channel* CreateXdsChannel(grpc_channel_args* args,
521  server.channel_creds_type, server.channel_creds_config);
522  return grpc_channel_create(server.server_uri.c_str(), channel_creds.get(),
523  args);
524 }
525 
526 } // namespace
527 
532  ? "ChannelState"
533  : nullptr),
535  server_(server) {
537  gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
538  xds_client_.get(), server.server_uri.c_str());
539  }
540  channel_ = CreateXdsChannel(xds_client_->args_, server);
541  GPR_ASSERT(channel_ != nullptr);
542  StartConnectivityWatchLocked();
543 }
544 
547  gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s",
548  xds_client(), this, server_.server_uri.c_str());
549  }
551  xds_client_.reset(DEBUG_LOCATION, "ChannelState");
552 }
553 
554 // This method should only ever be called when holding the lock, but we can't
555 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
556 // called from DualRefCounted::Unref, which cannot have a lock annotation for a
557 // lock in this subclass.
559  shutting_down_ = true;
560  CancelConnectivityWatchLocked();
561  // At this time, all strong refs are removed, remove from channel map to
562  // prevent subsequent subscription from trying to use this ChannelState as it
563  // is shutting down.
564  xds_client_->xds_server_channel_map_.erase(server_);
565  ads_calld_.reset();
566  lrs_calld_.reset();
567 }
568 
570  const {
571  return ads_calld_->calld();
572 }
573 
575  const {
576  return lrs_calld_->calld();
577 }
578 
580  return ads_calld_ != nullptr && ads_calld_->calld() != nullptr;
581 }
582 
584  if (lrs_calld_ != nullptr) return;
585  lrs_calld_.reset(new RetryableCall<LrsCallState>(
586  WeakRef(DEBUG_LOCATION, "ChannelState+lrs")));
587 }
588 
589 void XdsClient::ChannelState::StopLrsCallLocked() {
590  xds_client_->xds_load_report_server_map_.erase(server_);
591  lrs_calld_.reset();
592 }
593 
594 namespace {
595 
596 bool IsLameChannel(grpc_channel* channel) {
599  return elem->filter == &LameClientFilter::kFilter;
600 }
601 
602 } // namespace
603 
604 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
605  if (IsLameChannel(channel_)) {
606  xds_client()->NotifyOnErrorLocked(
607  absl::UnavailableError("xds client has a lame channel"));
608  return;
609  }
610  ClientChannel* client_channel =
612  GPR_ASSERT(client_channel != nullptr);
613  watcher_ = new StateWatcher(WeakRef(DEBUG_LOCATION, "ChannelState+watch"));
614  client_channel->AddConnectivityWatcher(
616  OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
617 }
618 
619 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
620  if (IsLameChannel(channel_)) {
621  return;
622  }
623  ClientChannel* client_channel =
625  GPR_ASSERT(client_channel != nullptr);
626  client_channel->RemoveConnectivityWatcher(watcher_);
627 }
628 
629 void XdsClient::ChannelState::SubscribeLocked(const XdsResourceType* type,
630  const XdsResourceName& name) {
631  if (ads_calld_ == nullptr) {
632  // Start the ADS call if this is the first request.
633  ads_calld_.reset(new RetryableCall<AdsCallState>(
634  WeakRef(DEBUG_LOCATION, "ChannelState+ads")));
635  // Note: AdsCallState's ctor will automatically subscribe to all
636  // resources that the XdsClient already has watchers for, so we can
637  // return here.
638  return;
639  }
640  // If the ADS call is in backoff state, we don't need to do anything now
641  // because when the call is restarted it will resend all necessary requests.
642  if (ads_calld() == nullptr) return;
643  // Subscribe to this resource if the ADS call is active.
644  ads_calld()->SubscribeLocked(type, name, /*delay_send=*/false);
645 }
646 
647 void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type,
648  const XdsResourceName& name,
649  bool delay_unsubscription) {
650  if (ads_calld_ != nullptr) {
651  auto* calld = ads_calld_->calld();
652  if (calld != nullptr) {
653  calld->UnsubscribeLocked(type, name, delay_unsubscription);
654  if (!calld->HasSubscribedResources()) {
655  ads_calld_.reset();
656  }
657  }
658  }
659 }
660 
661 //
662 // XdsClient::ChannelState::RetryableCall<>
663 //
664 
665 template <typename T>
668  : chand_(std::move(chand)),
669  backoff_(BackOff::Options()
670  .set_initial_backoff(Duration::Seconds(
673  .set_jitter(GRPC_XDS_RECONNECT_JITTER)
674  .set_max_backoff(Duration::Seconds(
676  // Closure Initialization
677  GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
678  grpc_schedule_on_exec_ctx);
679  StartNewCallLocked();
680 }
681 
682 template <typename T>
684  shutting_down_ = true;
685  calld_.reset();
687  this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
688 }
689 
690 template <typename T>
692  // If we saw a response on the current stream, reset backoff.
693  if (calld_->seen_response()) backoff_.Reset();
694  calld_.reset();
695  // Start retry timer.
696  StartRetryTimerLocked();
697 }
698 
699 template <typename T>
701  if (shutting_down_) return;
702  GPR_ASSERT(chand_->channel_ != nullptr);
703  GPR_ASSERT(calld_ == nullptr);
705  gpr_log(
706  GPR_INFO,
707  "[xds_client %p] xds server %s: start new call from retryable call %p",
708  chand()->xds_client(), chand()->server_.server_uri.c_str(), this);
709  }
710  calld_ = MakeOrphanable<T>(
711  this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
712 }
713 
714 template <typename T>
716  if (shutting_down_) return;
717  const Timestamp next_attempt_time = backoff_.NextAttemptTime();
719  Duration timeout =
720  std::max(next_attempt_time - ExecCtx::Get()->Now(), Duration::Zero());
722  "[xds_client %p] xds server %s: call attempt failed; "
723  "retry timer will fire in %" PRId64 "ms.",
724  chand()->xds_client(), chand()->server_.server_uri.c_str(),
725  timeout.millis());
726  }
727  this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
728  grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
730 }
731 
732 template <typename T>
734  void* arg, grpc_error_handle error) {
735  RetryableCall* calld = static_cast<RetryableCall*>(arg);
736  {
737  MutexLock lock(&calld->chand_->xds_client()->mu_);
739  }
740  calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
741 }
742 
743 template <typename T>
750  "[xds_client %p] xds server %s: retry timer fired (retryable "
751  "call: %p)",
752  chand()->xds_client(), chand()->server_.server_uri.c_str(), this);
753  }
754  StartNewCallLocked();
755  }
757 }
758 
759 //
760 // XdsClient::ChannelState::AdsCallState::AdsResponseParser
761 //
762 
763 absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser::
764  ProcessAdsResponseFields(AdsResponseFields fields) {
766  gpr_log(
767  GPR_INFO,
768  "[xds_client %p] xds server %s: received ADS response: type_url=%s, "
769  "version=%s, nonce=%s, num_resources=%" PRIuPTR,
770  ads_call_state_->xds_client(),
771  ads_call_state_->chand()->server_.server_uri.c_str(),
772  fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(),
773  fields.num_resources);
774  }
775  result_.type =
776  ads_call_state_->xds_client()->GetResourceTypeLocked(fields.type_url);
777  if (result_.type == nullptr) {
779  absl::StrCat("unknown resource type ", fields.type_url));
780  }
781  result_.type_url = std::move(fields.type_url);
782  result_.version = std::move(fields.version);
783  result_.nonce = std::move(fields.nonce);
784  return absl::OkStatus();
785 }
786 
787 namespace {
788 
789 // Build a resource metadata struct for ADS result accepting methods and CSDS.
790 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
791  std::string serialized_proto, std::string version, Timestamp update_time) {
792  XdsApi::ResourceMetadata resource_metadata;
793  resource_metadata.serialized_proto = std::move(serialized_proto);
794  resource_metadata.update_time = update_time;
795  resource_metadata.version = std::move(version);
796  resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
797  return resource_metadata;
798 }
799 
800 // Update resource_metadata for NACK.
801 void UpdateResourceMetadataNacked(const std::string& version,
802  const std::string& details,
803  Timestamp update_time,
804  XdsApi::ResourceMetadata* resource_metadata) {
805  resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
806  resource_metadata->failed_version = version;
807  resource_metadata->failed_details = details;
808  resource_metadata->failed_update_time = update_time;
809 }
810 
811 } // namespace
812 
813 void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
814  const XdsEncodingContext& context, size_t idx, absl::string_view type_url,
815  absl::string_view serialized_resource) {
816  // Check the type_url of the resource.
817  bool is_v2 = false;
818  if (!result_.type->IsType(type_url, &is_v2)) {
819  result_.errors.emplace_back(
820  absl::StrCat("resource index ", idx, ": incorrect resource type ",
821  type_url, " (should be ", result_.type_url, ")"));
822  return;
823  }
824  // Parse the resource.
826  result_.type->Decode(context, serialized_resource, is_v2);
827  if (!result.ok()) {
828  result_.errors.emplace_back(
829  absl::StrCat("resource index ", idx, ": ", result.status().ToString()));
830  return;
831  }
832  // Check the resource name.
833  auto resource_name =
834  xds_client()->ParseXdsResourceName(result->name, result_.type);
835  if (!resource_name.ok()) {
836  result_.errors.emplace_back(absl::StrCat(
837  "resource index ", idx, ": Cannot parse xDS resource name \"",
838  result->name, "\""));
839  return;
840  }
841  // Cancel resource-does-not-exist timer, if needed.
842  auto timer_it = ads_call_state_->state_map_.find(result_.type);
843  if (timer_it != ads_call_state_->state_map_.end()) {
844  auto it =
845  timer_it->second.subscribed_resources.find(resource_name->authority);
846  if (it != timer_it->second.subscribed_resources.end()) {
847  auto res_it = it->second.find(resource_name->key);
848  if (res_it != it->second.end()) {
849  res_it->second->MaybeCancelTimer();
850  }
851  }
852  }
853  // Lookup the authority in the cache.
854  auto authority_it =
855  xds_client()->authority_state_map_.find(resource_name->authority);
856  if (authority_it == xds_client()->authority_state_map_.end()) {
857  return; // Skip resource -- we don't have a subscription for it.
858  }
859  // Found authority, so look up type.
860  AuthorityState& authority_state = authority_it->second;
861  auto type_it = authority_state.resource_map.find(result_.type);
862  if (type_it == authority_state.resource_map.end()) {
863  return; // Skip resource -- we don't have a subscription for it.
864  }
865  auto& type_map = type_it->second;
866  // Found type, so look up resource key.
867  auto it = type_map.find(resource_name->key);
868  if (it == type_map.end()) {
869  return; // Skip resource -- we don't have a subscription for it.
870  }
871  ResourceState& resource_state = it->second;
872  // If needed, record that we've seen this resource.
873  if (result_.type->AllResourcesRequiredInSotW()) {
874  result_.resources_seen[resource_name->authority].insert(resource_name->key);
875  }
876  // If we previously ignored the resource's deletion, log that we're
877  // now re-adding it.
878  if (resource_state.ignored_deletion) {
880  "[xds_client %p] xds server %s: server returned new version of "
881  "resource for which we previously ignored a deletion: type %s "
882  "name %s",
883  xds_client(), ads_call_state_->chand()->server_.server_uri.c_str(),
884  std::string(type_url).c_str(), result->name.c_str());
885  resource_state.ignored_deletion = false;
886  }
887  // Update resource state based on whether the resource is valid.
888  if (!result->resource.ok()) {
889  result_.errors.emplace_back(absl::StrCat(
890  "resource index ", idx, ": ", result->name,
891  ": validation error: ", result->resource.status().ToString()));
892  xds_client()->NotifyWatchersOnErrorLocked(
893  resource_state.watchers,
895  "invalid resource: ", result->resource.status().ToString())));
896  UpdateResourceMetadataNacked(result_.version,
897  result->resource.status().ToString(),
898  update_time_, &resource_state.meta);
899  return;
900  }
901  // Resource is valid.
902  result_.have_valid_resources = true;
903  // If it didn't change, ignore it.
904  if (resource_state.resource != nullptr &&
905  result_.type->ResourcesEqual(resource_state.resource.get(),
906  result->resource->get())) {
909  "[xds_client %p] %s resource %s identical to current, ignoring.",
910  xds_client(), result_.type_url.c_str(), result->name.c_str());
911  }
912  return;
913  }
914  // Update the resource state.
915  resource_state.resource = std::move(*result->resource);
916  resource_state.meta = CreateResourceMetadataAcked(
917  std::string(serialized_resource), result_.version, update_time_);
918  // Notify watchers.
919  auto& watchers_list = resource_state.watchers;
920  auto* value =
921  result_.type->CopyResource(resource_state.resource.get()).release();
922  xds_client()->work_serializer_.Schedule(
923  [watchers_list, value]()
925  for (const auto& p : watchers_list) {
926  p.first->OnGenericResourceChanged(value);
927  }
928  delete value;
929  },
931 }
932 
933 //
934 // XdsClient::ChannelState::AdsCallState
935 //
936 
937 XdsClient::ChannelState::AdsCallState::AdsCallState(
941  ? "AdsCallState"
942  : nullptr),
943  parent_(std::move(parent)) {
944  // Init the ADS call. Note that the call will progress every time there's
945  // activity in xds_client()->interested_parties_, which is comprised of
946  // the polling entities from client_channel.
947  GPR_ASSERT(xds_client() != nullptr);
948  // Create a call with the specified method name.
949  const char* method =
951  ? "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
952  "StreamAggregatedResources"
953  : "/envoy.service.discovery.v2.AggregatedDiscoveryService/"
954  "StreamAggregatedResources";
958  StaticSlice::FromStaticString(method).c_slice(), nullptr,
959  Timestamp::InfFuture(), nullptr);
960  GPR_ASSERT(call_ != nullptr);
961  // Init data associated with the call.
964  // Start the call.
967  "[xds_client %p] xds server %s: starting ADS call "
968  "(calld: %p, call: %p)",
969  xds_client(), chand()->server_.server_uri.c_str(), this, call_);
970  }
971  // Create the ops.
972  grpc_call_error call_error;
973  grpc_op ops[3];
974  memset(ops, 0, sizeof(ops));
975  // Op: send initial metadata.
976  grpc_op* op = ops;
981  op->reserved = nullptr;
982  op++;
984  call_, ops, static_cast<size_t>(op - ops), nullptr);
985  GPR_ASSERT(GRPC_CALL_OK == call_error);
986  // Op: send request message.
988  grpc_schedule_on_exec_ctx);
989  for (const auto& a : xds_client()->authority_state_map_) {
990  const std::string& authority = a.first;
991  // Skip authorities that are not using this xDS channel.
992  if (a.second.channel_state != chand()) continue;
993  for (const auto& t : a.second.resource_map) {
994  const XdsResourceType* type = t.first;
995  for (const auto& r : t.second) {
996  const XdsResourceKey& resource_key = r.first;
997  SubscribeLocked(type, {authority, resource_key}, /*delay_send=*/true);
998  }
999  }
1000  }
1001  for (const auto& p : state_map_) {
1002  SendMessageLocked(p.first);
1003  }
1004  // Op: recv initial metadata.
1005  op = ops;
1009  op->flags = 0;
1010  op->reserved = nullptr;
1011  op++;
1012  // Op: recv response.
1015  op->flags = 0;
1016  op->reserved = nullptr;
1017  op++;
1018  Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
1019  GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1020  grpc_schedule_on_exec_ctx);
1021  call_error = grpc_call_start_batch_and_execute(
1022  call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1023  GPR_ASSERT(GRPC_CALL_OK == call_error);
1024  // Op: recv server status.
1025  op = ops;
1030  op->flags = 0;
1031  op->reserved = nullptr;
1032  op++;
1033  // This callback signals the end of the call, so it relies on the initial
1034  // ref instead of a new ref. When it's invoked, it's the initial ref that is
1035  // unreffed.
1036  GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1037  grpc_schedule_on_exec_ctx);
1038  call_error = grpc_call_start_batch_and_execute(
1039  call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1040  GPR_ASSERT(GRPC_CALL_OK == call_error);
1041 }
1042 
1044  grpc_metadata_array_destroy(&initial_metadata_recv_);
1045  grpc_metadata_array_destroy(&trailing_metadata_recv_);
1048  grpc_slice_unref_internal(status_details_);
1049  GPR_ASSERT(call_ != nullptr);
1051 }
1052 
1054  GPR_ASSERT(call_ != nullptr);
1055  // If we are here because xds_client wants to cancel the call,
1056  // on_status_received_ will complete the cancellation and clean up. Otherwise,
1057  // we are here because xds_client has to orphan a failed call, then the
1058  // following cancellation will be a no-op.
1060  state_map_.clear();
1061  // Note that the initial ref is hold by on_status_received_. So the
1062  // corresponding unref happens in on_status_received_ instead of here.
1063 }
1064 
1065 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
1066  const XdsResourceType* type)
1068  // Buffer message sending if an existing message is in flight.
1069  if (send_message_payload_ != nullptr) {
1070  buffered_requests_.insert(type);
1071  return;
1072  }
1073  auto& state = state_map_[type];
1074  grpc_slice request_payload_slice;
1075  request_payload_slice = xds_client()->api_.CreateAdsRequest(
1076  chand()->server_,
1077  chand()->server_.ShouldUseV3() ? type->type_url() : type->v2_type_url(),
1078  chand()->resource_type_version_map_[type], state.nonce,
1079  ResourceNamesForRequest(type), GRPC_ERROR_REF(state.error),
1080  !sent_initial_message_);
1081  sent_initial_message_ = true;
1083  gpr_log(GPR_INFO,
1084  "[xds_client %p] xds server %s: sending ADS request: type=%s "
1085  "version=%s nonce=%s error=%s",
1086  xds_client(), chand()->server_.server_uri.c_str(),
1087  std::string(type->type_url()).c_str(),
1089  state.nonce.c_str(), grpc_error_std_string(state.error).c_str());
1090  }
1091  GRPC_ERROR_UNREF(state.error);
1092  state.error = GRPC_ERROR_NONE;
1093  // Create message payload.
1095  grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1096  grpc_slice_unref_internal(request_payload_slice);
1097  // Send the message.
1098  grpc_op op;
1099  memset(&op, 0, sizeof(op));
1102  Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
1103  GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
1104  grpc_schedule_on_exec_ctx);
1105  grpc_call_error call_error =
1106  grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
1107  if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1109  "[xds_client %p] xds server %s: error starting ADS send_message "
1110  "batch on calld=%p: call_error=%d",
1111  xds_client(), chand()->server_.server_uri.c_str(), this,
1112  call_error);
1113  GPR_ASSERT(GRPC_CALL_OK == call_error);
1114  }
1115 }
1116 
1117 void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
1118  const XdsResourceType* type, const XdsResourceName& name, bool delay_send) {
1119  auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
1120  if (state == nullptr) {
1121  state = MakeOrphanable<ResourceTimer>(type, name);
1122  if (!delay_send) SendMessageLocked(type);
1123  }
1124 }
1125 
1126 void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
1127  const XdsResourceType* type, const XdsResourceName& name,
1128  bool delay_unsubscription) {
1129  auto& type_state_map = state_map_[type];
1130  auto& authority_map = type_state_map.subscribed_resources[name.authority];
1131  authority_map.erase(name.key);
1132  if (authority_map.empty()) {
1133  type_state_map.subscribed_resources.erase(name.authority);
1134  }
1135  if (!delay_unsubscription) SendMessageLocked(type);
1136 }
1137 
1139  for (const auto& p : state_map_) {
1140  if (!p.second.subscribed_resources.empty()) return true;
1141  }
1142  return false;
1143 }
1144 
1146  void* arg, grpc_error_handle error) {
1147  AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1148  {
1149  MutexLock lock(&ads_calld->xds_client()->mu_);
1150  ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1151  }
1152  ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1153 }
1154 
1155 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1157  if (IsCurrentCallOnChannel() && GRPC_ERROR_IS_NONE(error)) {
1158  // Clean up the sent message.
1160  send_message_payload_ = nullptr;
1161  // Continue to send another pending message if any.
1162  // TODO(roth): The current code to handle buffered messages has the
1163  // advantage of sending only the most recent list of resource names for
1164  // each resource type (no matter how many times that resource type has
1165  // been requested to send while the current message sending is still
1166  // pending). But its disadvantage is that we send the requests in fixed
1167  // order of resource types. We need to fix this if we are seeing some
1168  // resource type(s) starved due to frequent requests of other resource
1169  // type(s).
1170  auto it = buffered_requests_.begin();
1171  if (it != buffered_requests_.end()) {
1172  SendMessageLocked(*it);
1173  buffered_requests_.erase(it);
1174  }
1175  }
1177 }
1178 
1179 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1180  void* arg, grpc_error_handle /* error */) {
1181  AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1182  bool done;
1183  {
1184  MutexLock lock(&ads_calld->xds_client()->mu_);
1185  done = ads_calld->OnResponseReceivedLocked();
1186  }
1188  if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1189 }
1190 
1191 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1192  // Empty payload means the call was cancelled.
1193  if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1194  return true;
1195  }
1196  // Read the response.
1199  grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1202  recv_message_payload_ = nullptr;
1203  // Parse and validate the response.
1204  AdsResponseParser parser(this);
1206  chand()->server_, response_slice, &parser);
1207  grpc_slice_unref_internal(response_slice);
1208  if (!status.ok()) {
1209  // Ignore unparsable response.
1211  "[xds_client %p] xds server %s: error parsing ADS response (%s) "
1212  "-- ignoring",
1213  xds_client(), chand()->server_.server_uri.c_str(),
1214  status.ToString().c_str());
1215  } else {
1216  seen_response_ = true;
1217  AdsResponseParser::Result result = parser.TakeResult();
1218  // Update nonce.
1219  auto& state = state_map_[result.type];
1220  state.nonce = result.nonce;
1221  // If we got an error, set state.error so that we'll NACK the update.
1222  if (!result.errors.empty()) {
1223  std::string error = absl::StrJoin(result.errors, "; ");
1224  gpr_log(
1225  GPR_ERROR,
1226  "[xds_client %p] xds server %s: ADS response invalid for resource "
1227  "type %s version %s, will NACK: nonce=%s error=%s",
1228  xds_client(), chand()->server_.server_uri.c_str(),
1229  result.type_url.c_str(), result.version.c_str(), state.nonce.c_str(),
1230  error.c_str());
1231  GRPC_ERROR_UNREF(state.error);
1235  }
1236  // Delete resources not seen in update if needed.
1237  if (result.type->AllResourcesRequiredInSotW()) {
1238  for (auto& a : xds_client()->authority_state_map_) {
1239  const std::string& authority = a.first;
1240  AuthorityState& authority_state = a.second;
1241  // Skip authorities that are not using this xDS channel.
1242  if (authority_state.channel_state != chand()) continue;
1243  auto seen_authority_it = result.resources_seen.find(authority);
1244  // Find this resource type.
1245  auto type_it = authority_state.resource_map.find(result.type);
1246  if (type_it == authority_state.resource_map.end()) continue;
1247  // Iterate over resource ids.
1248  for (auto& r : type_it->second) {
1249  const XdsResourceKey& resource_key = r.first;
1250  ResourceState& resource_state = r.second;
1251  if (seen_authority_it == result.resources_seen.end() ||
1252  seen_authority_it->second.find(resource_key) ==
1253  seen_authority_it->second.end()) {
1254  // If the resource was newly requested but has not yet been
1255  // received, we don't want to generate an error for the watchers,
1256  // because this ADS response may be in reaction to an earlier
1257  // request that did not yet request the new resource, so its absence
1258  // from the response does not necessarily indicate that the resource
1259  // does not exist. For that case, we rely on the request timeout
1260  // instead.
1261  if (resource_state.resource == nullptr) continue;
1262  if (chand()->server_.IgnoreResourceDeletion()) {
1263  if (!resource_state.ignored_deletion) {
1265  "[xds_client %p] xds server %s: ignoring deletion "
1266  "for resource type %s name %s",
1267  xds_client(), chand()->server_.server_uri.c_str(),
1268  result.type_url.c_str(),
1270  authority, result.type_url.c_str(), resource_key)
1271  .c_str());
1272  resource_state.ignored_deletion = true;
1273  }
1274  } else {
1275  resource_state.resource.reset();
1277  resource_state.watchers);
1278  }
1279  }
1280  }
1281  }
1282  }
1283  // If we had valid resources, update the version.
1284  if (result.have_valid_resources) {
1285  chand()->resource_type_version_map_[result.type] =
1286  std::move(result.version);
1287  // Start load reporting if needed.
1288  auto& lrs_call = chand()->lrs_calld_;
1289  if (lrs_call != nullptr) {
1290  LrsCallState* lrs_calld = lrs_call->calld();
1291  if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1292  }
1293  }
1294  // Send ACK or NACK.
1295  SendMessageLocked(result.type);
1296  }
1297  if (xds_client()->shutting_down_) return true;
1298  // Keep listening for updates.
1299  grpc_op op;
1300  memset(&op, 0, sizeof(op));
1303  op.flags = 0;
1304  op.reserved = nullptr;
1305  GPR_ASSERT(call_ != nullptr);
1306  // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1307  const grpc_call_error call_error =
1308  grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1309  GPR_ASSERT(GRPC_CALL_OK == call_error);
1310  return false;
1311 }
1312 
1313 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1314  void* arg, grpc_error_handle error) {
1315  AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1316  {
1317  MutexLock lock(&ads_calld->xds_client()->mu_);
1318  ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1319  }
1321  ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1322 }
1323 
1324 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1327  char* status_details = grpc_slice_to_c_string(status_details_);
1328  gpr_log(GPR_INFO,
1329  "[xds_client %p] xds server %s: ADS call status received "
1330  "(chand=%p, ads_calld=%p, call=%p): "
1331  "status=%d, details='%s', error='%s'",
1332  xds_client(), chand()->server_.server_uri.c_str(), chand(), this,
1333  call_, status_code_, status_details,
1335  gpr_free(status_details);
1336  }
1337  // Ignore status from a stale call.
1338  if (IsCurrentCallOnChannel()) {
1339  // Try to restart the call.
1340  parent_->OnCallFinishedLocked();
1341  // Send error to all watchers.
1343  "xDS call failed: xDS server: %s, ADS call status code=%d, "
1344  "details='%s', error='%s'",
1345  chand()->server_.server_uri, status_code_,
1346  StringViewFromSlice(status_details_), grpc_error_std_string(error))));
1347  }
1349 }
1350 
1351 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1352  // If the retryable ADS call is null (which only happens when the xds channel
1353  // is shutting down), all the ADS calls are stale.
1354  if (chand()->ads_calld_ == nullptr) return false;
1355  return this == chand()->ads_calld_->calld();
1356 }
1357 
1358 std::vector<std::string>
1359 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1360  const XdsResourceType* type) {
1361  std::vector<std::string> resource_names;
1362  auto it = state_map_.find(type);
1363  if (it != state_map_.end()) {
1364  for (auto& a : it->second.subscribed_resources) {
1365  const std::string& authority = a.first;
1366  for (auto& p : a.second) {
1367  const XdsResourceKey& resource_key = p.first;
1368  resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
1369  authority, type->type_url(), resource_key));
1370  OrphanablePtr<ResourceTimer>& resource_timer = p.second;
1371  resource_timer->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer"));
1372  }
1373  }
1374  }
1375  return resource_names;
1376 }
1377 
1378 //
1379 // XdsClient::ChannelState::LrsCallState::Reporter
1380 //
1381 
1383  if (next_report_timer_callback_pending_) {
1384  grpc_timer_cancel(&next_report_timer_);
1385  }
1386 }
1387 
1388 void XdsClient::ChannelState::LrsCallState::Reporter::
1389  ScheduleNextReportLocked() {
1390  const Timestamp next_report_time = ExecCtx::Get()->Now() + report_interval_;
1391  grpc_timer_init(&next_report_timer_, next_report_time,
1392  &on_next_report_timer_);
1393  next_report_timer_callback_pending_ = true;
1394 }
1395 
1396 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1397  void* arg, grpc_error_handle error) {
1398  Reporter* self = static_cast<Reporter*>(arg);
1399  bool done;
1400  {
1401  MutexLock lock(&self->xds_client()->mu_);
1402  done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1403  }
1404  if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1405 }
1406 
1407 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1409  next_report_timer_callback_pending_ = false;
1410  if (!GRPC_ERROR_IS_NONE(error) || !IsCurrentReporterOnCall()) {
1412  return true;
1413  }
1414  return SendReportLocked();
1415 }
1416 
1417 namespace {
1418 
1419 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1420  for (const auto& p : snapshot) {
1421  const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1422  if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1423  for (const auto& q : cluster_snapshot.locality_stats) {
1424  const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1425  if (!locality_snapshot.IsZero()) return false;
1426  }
1427  }
1428  return true;
1429 }
1430 
1431 } // namespace
1432 
1433 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1434  // Construct snapshot from all reported stats.
1435  XdsApi::ClusterLoadReportMap snapshot =
1436  xds_client()->BuildLoadReportSnapshotLocked(parent_->chand()->server_,
1437  parent_->send_all_clusters_,
1438  parent_->cluster_names_);
1439  // Skip client load report if the counters were all zero in the last
1440  // report and they are still zero in this one.
1441  const bool old_val = last_report_counters_were_zero_;
1442  last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1443  if (old_val && last_report_counters_were_zero_) {
1444  auto it = xds_client()->xds_load_report_server_map_.find(
1445  parent_->chand()->server_);
1446  if (it == xds_client()->xds_load_report_server_map_.end() ||
1447  it->second.load_report_map.empty()) {
1448  it->second.channel_state->StopLrsCallLocked();
1449  return true;
1450  }
1451  ScheduleNextReportLocked();
1452  return false;
1453  }
1454  // Create a request that contains the snapshot.
1455  grpc_slice request_payload_slice =
1456  xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1457  parent_->send_message_payload_ =
1458  grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1459  grpc_slice_unref_internal(request_payload_slice);
1460  // Send the report.
1461  grpc_op op;
1462  memset(&op, 0, sizeof(op));
1464  op.data.send_message.send_message = parent_->send_message_payload_;
1466  parent_->call_, &op, 1, &on_report_done_);
1467  if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1469  "[xds_client %p] xds server %s: error starting LRS send_message "
1470  "batch on calld=%p: call_error=%d",
1471  xds_client(), parent_->chand()->server_.server_uri.c_str(), this,
1472  call_error);
1473  GPR_ASSERT(GRPC_CALL_OK == call_error);
1474  }
1475  return false;
1476 }
1477 
1479  void* arg, grpc_error_handle error) {
1480  Reporter* self = static_cast<Reporter*>(arg);
1481  bool done;
1482  {
1483  MutexLock lock(&self->xds_client()->mu_);
1484  done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1485  }
1486  if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1487 }
1488 
1489 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1491  grpc_byte_buffer_destroy(parent_->send_message_payload_);
1492  parent_->send_message_payload_ = nullptr;
1493  // If there are no more registered stats to report, cancel the call.
1494  auto it =
1495  xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_);
1496  if (it == xds_client()->xds_load_report_server_map_.end() ||
1497  it->second.load_report_map.empty()) {
1498  it->second.channel_state->StopLrsCallLocked();
1500  return true;
1501  }
1502  if (!GRPC_ERROR_IS_NONE(error) || !IsCurrentReporterOnCall()) {
1504  // If this reporter is no longer the current one on the call, the reason
1505  // might be that it was orphaned for a new one due to config update.
1506  if (!IsCurrentReporterOnCall()) {
1507  parent_->MaybeStartReportingLocked();
1508  }
1509  return true;
1510  }
1511  ScheduleNextReportLocked();
1512  return false;
1513 }
1514 
1515 //
1516 // XdsClient::ChannelState::LrsCallState
1517 //
1518 
1523  ? "LrsCallState"
1524  : nullptr),
1525  parent_(std::move(parent)) {
1526  // Init the LRS call. Note that the call will progress every time there's
1527  // activity in xds_client()->interested_parties_, which is comprised of
1528  // the polling entities from client_channel.
1529  GPR_ASSERT(xds_client() != nullptr);
1530  const char* method =
1532  ? "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats"
1533  : "/envoy.service.load_stats.v2.LoadReportingService/StreamLoadStats";
1535  chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1537  Slice::FromStaticString(method).c_slice(), nullptr,
1538  Timestamp::InfFuture(), nullptr);
1539  GPR_ASSERT(call_ != nullptr);
1540  // Init the request payload.
1541  grpc_slice request_payload_slice =
1544  grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1545  grpc_slice_unref_internal(request_payload_slice);
1546  // Init other data associated with the LRS call.
1549  // Start the call.
1551  gpr_log(
1552  GPR_INFO,
1553  "[xds_client %p] xds server %s: starting LRS call (calld=%p, call=%p)",
1554  xds_client(), chand()->server_.server_uri.c_str(), this, call_);
1555  }
1556  // Create the ops.
1557  grpc_call_error call_error;
1558  grpc_op ops[3];
1559  memset(ops, 0, sizeof(ops));
1560  // Op: send initial metadata.
1561  grpc_op* op = ops;
1566  op->reserved = nullptr;
1567  op++;
1568  // Op: send request message.
1569  GPR_ASSERT(send_message_payload_ != nullptr);
1572  op->flags = 0;
1573  op->reserved = nullptr;
1574  op++;
1575  Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1577  grpc_schedule_on_exec_ctx);
1578  call_error = grpc_call_start_batch_and_execute(
1579  call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1580  GPR_ASSERT(GRPC_CALL_OK == call_error);
1581  // Op: recv initial metadata.
1582  op = ops;
1586  op->flags = 0;
1587  op->reserved = nullptr;
1588  op++;
1589  // Op: recv response.
1592  op->flags = 0;
1593  op->reserved = nullptr;
1594  op++;
1595  Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1596  GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1597  grpc_schedule_on_exec_ctx);
1598  call_error = grpc_call_start_batch_and_execute(
1599  call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1600  GPR_ASSERT(GRPC_CALL_OK == call_error);
1601  // Op: recv server status.
1602  op = ops;
1607  op->flags = 0;
1608  op->reserved = nullptr;
1609  op++;
1610  // This callback signals the end of the call, so it relies on the initial
1611  // ref instead of a new ref. When it's invoked, it's the initial ref that is
1612  // unreffed.
1613  GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1614  grpc_schedule_on_exec_ctx);
1615  call_error = grpc_call_start_batch_and_execute(
1616  call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1617  GPR_ASSERT(GRPC_CALL_OK == call_error);
1618 }
1619 
1621  grpc_metadata_array_destroy(&initial_metadata_recv_);
1622  grpc_metadata_array_destroy(&trailing_metadata_recv_);
1625  grpc_slice_unref_internal(status_details_);
1626  GPR_ASSERT(call_ != nullptr);
1628 }
1629 
1631  reporter_.reset();
1632  GPR_ASSERT(call_ != nullptr);
1633  // If we are here because xds_client wants to cancel the call,
1634  // on_status_received_ will complete the cancellation and clean up. Otherwise,
1635  // we are here because xds_client has to orphan a failed call, then the
1636  // following cancellation will be a no-op.
1638  // Note that the initial ref is hold by on_status_received_. So the
1639  // corresponding unref happens in on_status_received_ instead of here.
1640 }
1641 
1643  // Don't start again if already started.
1644  if (reporter_ != nullptr) return;
1645  // Don't start if the previous send_message op (of the initial request or the
1646  // last report of the previous reporter) hasn't completed.
1647  if (send_message_payload_ != nullptr) return;
1648  // Don't start if no LRS response has arrived.
1649  if (!seen_response()) return;
1650  // Don't start if the ADS call hasn't received any valid response. Note that
1651  // this must be the first channel because it is the current channel but its
1652  // ADS call hasn't seen any response.
1653  if (chand()->ads_calld_ == nullptr ||
1654  chand()->ads_calld_->calld() == nullptr ||
1655  !chand()->ads_calld_->calld()->seen_response()) {
1656  return;
1657  }
1658  // Start reporting.
1659  reporter_ = MakeOrphanable<Reporter>(
1660  Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1661 }
1662 
1664  void* arg, grpc_error_handle /*error*/) {
1665  LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1666  {
1667  MutexLock lock(&lrs_calld->xds_client()->mu_);
1668  lrs_calld->OnInitialRequestSentLocked();
1669  }
1670  lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1671 }
1672 
1673 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1674  // Clear the send_message_payload_.
1676  send_message_payload_ = nullptr;
1677  MaybeStartReportingLocked();
1678 }
1679 
1680 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1681  void* arg, grpc_error_handle /*error*/) {
1682  LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1683  bool done;
1684  {
1685  MutexLock lock(&lrs_calld->xds_client()->mu_);
1686  done = lrs_calld->OnResponseReceivedLocked();
1687  }
1688  if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1689 }
1690 
1691 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1692  // Empty payload means the call was cancelled.
1693  if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1694  return true;
1695  }
1696  // Read the response.
1699  grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1702  recv_message_payload_ = nullptr;
1703  // This anonymous lambda is a hack to avoid the usage of goto.
1704  [&]() {
1705  // Parse the response.
1706  bool send_all_clusters = false;
1707  std::set<std::string> new_cluster_names;
1708  Duration new_load_reporting_interval;
1710  response_slice, &send_all_clusters, &new_cluster_names,
1711  &new_load_reporting_interval);
1714  "[xds_client %p] xds server %s: LRS response parsing failed: %s",
1715  xds_client(), chand()->server_.server_uri.c_str(),
1718  return;
1719  }
1720  seen_response_ = true;
1722  gpr_log(
1723  GPR_INFO,
1724  "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
1725  " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1726  "ms",
1727  xds_client(), chand()->server_.server_uri.c_str(),
1728  new_cluster_names.size(), send_all_clusters,
1729  new_load_reporting_interval.millis());
1730  size_t i = 0;
1731  for (const auto& name : new_cluster_names) {
1732  gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1733  xds_client(), i++, name.c_str());
1734  }
1735  }
1736  if (new_load_reporting_interval <
1739  new_load_reporting_interval = Duration::Milliseconds(
1742  gpr_log(GPR_INFO,
1743  "[xds_client %p] xds server %s: increased load_report_interval "
1744  "to minimum value %dms",
1745  xds_client(), chand()->server_.server_uri.c_str(),
1747  }
1748  }
1749  // Ignore identical update.
1750  if (send_all_clusters == send_all_clusters_ &&
1751  cluster_names_ == new_cluster_names &&
1752  load_reporting_interval_ == new_load_reporting_interval) {
1754  gpr_log(
1755  GPR_INFO,
1756  "[xds_client %p] xds server %s: incoming LRS response identical "
1757  "to current, ignoring.",
1758  xds_client(), chand()->server_.server_uri.c_str());
1759  }
1760  return;
1761  }
1762  // Stop current load reporting (if any) to adopt the new config.
1763  reporter_.reset();
1764  // Record the new config.
1765  send_all_clusters_ = send_all_clusters;
1766  cluster_names_ = std::move(new_cluster_names);
1767  load_reporting_interval_ = new_load_reporting_interval;
1768  // Try starting sending load report.
1769  MaybeStartReportingLocked();
1770  }();
1771  grpc_slice_unref_internal(response_slice);
1772  if (xds_client()->shutting_down_) return true;
1773  // Keep listening for LRS config updates.
1774  grpc_op op;
1775  memset(&op, 0, sizeof(op));
1778  op.flags = 0;
1779  op.reserved = nullptr;
1780  GPR_ASSERT(call_ != nullptr);
1781  // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1782  const grpc_call_error call_error =
1783  grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1784  GPR_ASSERT(GRPC_CALL_OK == call_error);
1785  return false;
1786 }
1787 
1788 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1789  void* arg, grpc_error_handle error) {
1790  LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1791  {
1792  MutexLock lock(&lrs_calld->xds_client()->mu_);
1793  lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1794  }
1795  lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1796 }
1797 
1798 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1800  GPR_ASSERT(call_ != nullptr);
1802  char* status_details = grpc_slice_to_c_string(status_details_);
1803  gpr_log(GPR_INFO,
1804  "[xds_client %p] xds server %s: LRS call status received "
1805  "(chand=%p, calld=%p, call=%p): "
1806  "status=%d, details='%s', error='%s'",
1807  xds_client(), chand()->server_.server_uri.c_str(), chand(), this,
1808  call_, status_code_, status_details,
1810  gpr_free(status_details);
1811  }
1812  // Ignore status from a stale call.
1813  if (IsCurrentCallOnChannel()) {
1814  // Try to restart the call.
1815  parent_->OnCallFinishedLocked();
1816  }
1818 }
1819 
1820 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1821  // If the retryable LRS call is null (which only happens when the xds channel
1822  // is shutting down), all the LRS calls are stale.
1823  if (chand()->lrs_calld_ == nullptr) return false;
1824  return this == chand()->lrs_calld_->calld();
1825 }
1826 
1827 //
1828 // XdsClient
1829 //
1830 
1831 namespace {
1832 
1833 Duration GetRequestTimeout(const grpc_channel_args* args) {
1836  {15000, 0, INT_MAX}));
1837 }
1838 
1839 grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) {
1840  absl::InlinedVector<grpc_arg, 1> args_to_add = {
1842  const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
1843  5 * 60 * GPR_MS_PER_SEC),
1844  };
1845  return grpc_channel_args_copy_and_add(args, args_to_add.data(),
1846  args_to_add.size());
1847 }
1848 
1849 } // namespace
1850 
1851 XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
1852  const grpc_channel_args* args)
1855  : nullptr),
1857  args_(ModifyChannelArgs(args)),
1858  request_timeout_(GetRequestTimeout(args)),
1862  bootstrap_->certificate_providers())),
1863  api_(this, &grpc_xds_client_trace, bootstrap_->node(),
1864  &bootstrap_->certificate_providers(), &symtab_) {
1866  gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1867  }
1868  // Calling grpc_init to ensure gRPC does not shut down until the XdsClient is
1869  // destroyed.
1870  grpc_init();
1871 }
1872 
1875  gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1876  }
1879  // Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient
1880  // is destroyed.
1881  grpc_shutdown();
1882 }
1883 
1886  gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1887  }
1888  {
1889  MutexLock lock(g_mu);
1890  if (g_xds_client == this) g_xds_client = nullptr;
1891  }
1892  {
1893  MutexLock lock(&mu_);
1894  shutting_down_ = true;
1895  // Clear cache and any remaining watchers that may not have been cancelled.
1896  authority_state_map_.clear();
1897  invalid_watchers_.clear();
1898  }
1899 }
1900 
1903  auto it = xds_server_channel_map_.find(server);
1904  if (it != xds_server_channel_map_.end()) {
1905  return it->second->Ref(DEBUG_LOCATION, "Authority");
1906  }
1907  // Channel not found, so create a new one.
1908  auto channel_state = MakeRefCounted<ChannelState>(
1909  WeakRef(DEBUG_LOCATION, "ChannelState"), server);
1910  xds_server_channel_map_[server] = channel_state.get();
1911  return channel_state;
1912 }
1913 
1917  ResourceWatcherInterface* w = watcher.get();
1918  // Lambda for handling failure cases.
1919  auto fail = [&](absl::Status status) mutable {
1920  {
1921  MutexLock lock(&mu_);
1923  invalid_watchers_[w] = watcher;
1924  }
1926  // TODO(yashykt): When we move to C++14, capture watcher using
1927  // std::move()
1929  watcher->OnError(status);
1930  },
1931  DEBUG_LOCATION);
1932  };
1933  auto resource_name = ParseXdsResourceName(name, type);
1934  if (!resource_name.ok()) {
1936  "Unable to parse resource name for listener %s", name)));
1937  return;
1938  }
1939  // Find server to use.
1940  const XdsBootstrap::XdsServer* xds_server = nullptr;
1941  absl::string_view authority_name = resource_name->authority;
1942  if (absl::ConsumePrefix(&authority_name, "xdstp:")) {
1943  auto* authority = bootstrap_->LookupAuthority(std::string(authority_name));
1944  if (authority == nullptr) {
1946  absl::StrCat("authority \"", authority_name,
1947  "\" not present in bootstrap config")));
1948  return;
1949  }
1950  if (!authority->xds_servers.empty()) {
1951  xds_server = &authority->xds_servers[0];
1952  }
1953  }
1954  if (xds_server == nullptr) xds_server = &bootstrap_->server();
1955  {
1956  MutexLock lock(&mu_);
1958  AuthorityState& authority_state =
1959  authority_state_map_[resource_name->authority];
1960  ResourceState& resource_state =
1961  authority_state.resource_map[type][resource_name->key];
1962  resource_state.watchers[w] = watcher;
1963  // If we already have a cached value for the resource, notify the new
1964  // watcher immediately.
1965  if (resource_state.resource != nullptr) {
1967  gpr_log(GPR_INFO,
1968  "[xds_client %p] returning cached listener data for %s", this,
1969  std::string(name).c_str());
1970  }
1971  auto* value = type->CopyResource(resource_state.resource.get()).release();
1974  watcher->OnGenericResourceChanged(value);
1975  delete value;
1976  },
1977  DEBUG_LOCATION);
1978  }
1979  // If the authority doesn't yet have a channel, set it, creating it if
1980  // needed.
1981  if (authority_state.channel_state == nullptr) {
1982  authority_state.channel_state =
1983  GetOrCreateChannelStateLocked(*xds_server);
1984  }
1985  authority_state.channel_state->SubscribeLocked(type, *resource_name);
1986  }
1988 }
1989 
1993  bool delay_unsubscription) {
1994  auto resource_name = ParseXdsResourceName(name, type);
1995  MutexLock lock(&mu_);
1996  // We cannot be sure whether the watcher is in invalid_watchers_ or in
1997  // authority_state_map_, so we check both, just to be safe.
1998  invalid_watchers_.erase(watcher);
1999  // Find authority.
2000  if (!resource_name.ok()) return;
2001  auto authority_it = authority_state_map_.find(resource_name->authority);
2002  if (authority_it == authority_state_map_.end()) return;
2003  AuthorityState& authority_state = authority_it->second;
2004  // Find type map.
2005  auto type_it = authority_state.resource_map.find(type);
2006  if (type_it == authority_state.resource_map.end()) return;
2007  auto& type_map = type_it->second;
2008  // Find resource key.
2009  auto resource_it = type_map.find(resource_name->key);
2010  if (resource_it == type_map.end()) return;
2011  ResourceState& resource_state = resource_it->second;
2012  // Remove watcher.
2013  resource_state.watchers.erase(watcher);
2014  // Clean up empty map entries, if any.
2015  if (resource_state.watchers.empty()) {
2016  if (resource_state.ignored_deletion) {
2017  gpr_log(GPR_INFO,
2018  "[xds_client %p] unsubscribing from a resource for which we "
2019  "previously ignored a deletion: type %s name %s",
2020  this, std::string(type->type_url()).c_str(),
2021  std::string(name).c_str());
2022  }
2023  authority_state.channel_state->UnsubscribeLocked(type, *resource_name,
2024  delay_unsubscription);
2025  type_map.erase(resource_it);
2026  if (type_map.empty()) {
2027  authority_state.resource_map.erase(type_it);
2028  if (authority_state.resource_map.empty()) {
2029  authority_state.channel_state.reset();
2030  }
2031  }
2032  }
2033 }
2034 
2036  const XdsResourceType* resource_type) {
2037  auto it = resource_types_.find(resource_type->type_url());
2038  if (it != resource_types_.end()) {
2039  GPR_ASSERT(it->second == resource_type);
2040  return;
2041  }
2042  resource_types_.emplace(resource_type->type_url(), resource_type);
2043  v2_resource_types_.emplace(resource_type->v2_type_url(), resource_type);
2044  resource_type->InitUpbSymtab(symtab_.ptr());
2045 }
2046 
2048  absl::string_view resource_type) {
2049  auto it = resource_types_.find(resource_type);
2050  if (it != resource_types_.end()) return it->second;
2051  auto it2 = v2_resource_types_.find(resource_type);
2052  if (it2 != v2_resource_types_.end()) return it2->second;
2053  return nullptr;
2054 }
2055 
2058  // Old-style names use the empty string for authority.
2059  // authority is prefixed with "old:" to indicate that it's an old-style name.
2060  if (!xds_federation_enabled_ || !absl::StartsWith(name, "xdstp:")) {
2061  return XdsResourceName{"old:", {std::string(name), {}}};
2062  }
2063  // New style name. Parse URI.
2064  auto uri = URI::Parse(name);
2065  if (!uri.ok()) return uri.status();
2066  // Split the resource type off of the path to get the id.
2067  std::pair<absl::string_view, absl::string_view> path_parts = absl::StrSplit(
2068  absl::StripPrefix(uri->path(), "/"), absl::MaxSplits('/', 1));
2069  if (!type->IsType(path_parts.first, nullptr)) {
2071  "xdstp URI path must indicate valid xDS resource type");
2072  }
2073  // Canonicalize order of query params.
2074  std::vector<URI::QueryParam> query_params;
2075  for (const auto& p : uri->query_parameter_map()) {
2076  query_params.emplace_back(
2077  URI::QueryParam{std::string(p.first), std::string(p.second)});
2078  }
2079  return XdsResourceName{
2080  absl::StrCat("xdstp:", uri->authority()),
2081  {std::string(path_parts.second), std::move(query_params)}};
2082 }
2083 
2085  absl::string_view authority, absl::string_view resource_type,
2086  const XdsResourceKey& key) {
2087  if (absl::ConsumePrefix(&authority, "xdstp:")) {
2088  auto uri = URI::Create("xdstp", std::string(authority),
2089  absl::StrCat("/", resource_type, "/", key.id),
2090  key.query_params, /*fragment=*/"");
2091  GPR_ASSERT(uri.ok());
2092  return uri->ToString();
2093  }
2094  // Old-style name.
2095  return key.id;
2096 }
2097 
2101  if (!bootstrap_->XdsServerExists(xds_server)) return nullptr;
2102  auto key =
2104  MutexLock lock(&mu_);
2105  // We jump through some hoops here to make sure that the const
2106  // XdsBootstrap::XdsServer& and absl::string_views
2107  // stored in the XdsClusterDropStats object point to the
2108  // XdsBootstrap::XdsServer and strings
2109  // in the load_report_map_ key, so that they have the same lifetime.
2110  auto server_it =
2111  xds_load_report_server_map_.emplace(xds_server, LoadReportServer()).first;
2112  if (server_it->second.channel_state == nullptr) {
2113  server_it->second.channel_state = GetOrCreateChannelStateLocked(xds_server);
2114  }
2115  auto load_report_it = server_it->second.load_report_map
2116  .emplace(std::move(key), LoadReportState())
2117  .first;
2118  LoadReportState& load_report_state = load_report_it->second;
2119  RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
2120  if (load_report_state.drop_stats != nullptr) {
2121  cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
2122  }
2123  if (cluster_drop_stats == nullptr) {
2124  if (load_report_state.drop_stats != nullptr) {
2125  load_report_state.deleted_drop_stats +=
2126  load_report_state.drop_stats->GetSnapshotAndReset();
2127  }
2128  cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
2129  Ref(DEBUG_LOCATION, "DropStats"), server_it->first,
2130  load_report_it->first.first /*cluster_name*/,
2131  load_report_it->first.second /*eds_service_name*/);
2132  load_report_state.drop_stats = cluster_drop_stats.get();
2133  }
2134  server_it->second.channel_state->MaybeStartLrsCall();
2135  return cluster_drop_stats;
2136 }
2137 
2141  XdsClusterDropStats* cluster_drop_stats) {
2142  MutexLock lock(&mu_);
2143  auto server_it = xds_load_report_server_map_.find(xds_server);
2144  if (server_it == xds_load_report_server_map_.end()) return;
2145  auto load_report_it = server_it->second.load_report_map.find(
2147  if (load_report_it == server_it->second.load_report_map.end()) return;
2148  LoadReportState& load_report_state = load_report_it->second;
2149  if (load_report_state.drop_stats == cluster_drop_stats) {
2150  // Record final snapshot in deleted_drop_stats, which will be
2151  // added to the next load report.
2152  load_report_state.deleted_drop_stats +=
2153  load_report_state.drop_stats->GetSnapshotAndReset();
2154  load_report_state.drop_stats = nullptr;
2155  }
2156 }
2157 
2161  RefCountedPtr<XdsLocalityName> locality) {
2162  if (!bootstrap_->XdsServerExists(xds_server)) return nullptr;
2163  auto key =
2165  MutexLock lock(&mu_);
2166  // We jump through some hoops here to make sure that the const
2167  // XdsBootstrap::XdsServer& and absl::string_views
2168  // stored in the XdsClusterDropStats object point to the
2169  // XdsBootstrap::XdsServer and strings
2170  // in the load_report_map_ key, so that they have the same lifetime.
2171  auto server_it =
2172  xds_load_report_server_map_.emplace(xds_server, LoadReportServer()).first;
2173  if (server_it->second.channel_state == nullptr) {
2174  server_it->second.channel_state = GetOrCreateChannelStateLocked(xds_server);
2175  }
2176  auto load_report_it = server_it->second.load_report_map
2177  .emplace(std::move(key), LoadReportState())
2178  .first;
2179  LoadReportState& load_report_state = load_report_it->second;
2180  LoadReportState::LocalityState& locality_state =
2181  load_report_state.locality_stats[locality];
2182  RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2183  if (locality_state.locality_stats != nullptr) {
2184  cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2185  }
2186  if (cluster_locality_stats == nullptr) {
2187  if (locality_state.locality_stats != nullptr) {
2188  locality_state.deleted_locality_stats +=
2189  locality_state.locality_stats->GetSnapshotAndReset();
2190  }
2191  cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2192  Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first,
2193  load_report_it->first.first /*cluster_name*/,
2194  load_report_it->first.second /*eds_service_name*/, std::move(locality));
2195  locality_state.locality_stats = cluster_locality_stats.get();
2196  }
2197  server_it->second.channel_state->MaybeStartLrsCall();
2198  return cluster_locality_stats;
2199 }
2200 
2204  const RefCountedPtr<XdsLocalityName>& locality,
2205  XdsClusterLocalityStats* cluster_locality_stats) {
2206  MutexLock lock(&mu_);
2207  auto server_it = xds_load_report_server_map_.find(xds_server);
2208  if (server_it == xds_load_report_server_map_.end()) return;
2209  auto load_report_it = server_it->second.load_report_map.find(
2211  if (load_report_it == server_it->second.load_report_map.end()) return;
2212  LoadReportState& load_report_state = load_report_it->second;
2213  auto locality_it = load_report_state.locality_stats.find(locality);
2214  if (locality_it == load_report_state.locality_stats.end()) return;
2215  LoadReportState::LocalityState& locality_state = locality_it->second;
2216  if (locality_state.locality_stats == cluster_locality_stats) {
2217  // Record final snapshot in deleted_locality_stats, which will be
2218  // added to the next load report.
2219  locality_state.deleted_locality_stats +=
2220  locality_state.locality_stats->GetSnapshotAndReset();
2221  locality_state.locality_stats = nullptr;
2222  }
2223 }
2224 
2226  MutexLock lock(&mu_);
2227  for (auto& p : xds_server_channel_map_) {
2228  grpc_channel_reset_connect_backoff(p.second->channel());
2229  }
2230 }
2231 
2233  const auto* node = bootstrap_->node();
2234  if (node != nullptr) {
2235  status = absl::Status(
2237  " (node ID:", bootstrap_->node()->id, ")"));
2238  }
2239  std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
2240  for (const auto& a : authority_state_map_) { // authority
2241  for (const auto& t : a.second.resource_map) { // type
2242  for (const auto& r : t.second) { // resource id
2243  for (const auto& w : r.second.watchers) { // watchers
2244  watchers.insert(w.second);
2245  }
2246  }
2247  }
2248  }
2250  // TODO(yashykt): When we move to C++14, capture watchers using
2251  // std::move()
2253  for (const auto& watcher : watchers) {
2254  watcher->OnError(status);
2255  }
2256  },
2257  DEBUG_LOCATION);
2258 }
2259 
2263  absl::Status status) {
2264  const auto* node = bootstrap_->node();
2265  if (node != nullptr) {
2266  status = absl::Status(
2268  " (node ID:", bootstrap_->node()->id, ")"));
2269  }
2272  for (const auto& p : watchers) {
2273  p.first->OnError(status);
2274  }
2275  },
2276  DEBUG_LOCATION);
2277 }
2278 
2284  for (const auto& p : watchers) {
2285  p.first->OnResourceDoesNotExist();
2286  }
2287  },
2288  DEBUG_LOCATION);
2289 }
2290 
2292  const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
2293  const std::set<std::string>& clusters) {
2295  gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2296  }
2297  XdsApi::ClusterLoadReportMap snapshot_map;
2298  auto server_it = xds_load_report_server_map_.find(xds_server);
2299  if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
2300  auto& load_report_map = server_it->second.load_report_map;
2301  for (auto load_report_it = load_report_map.begin();
2302  load_report_it != load_report_map.end();) {
2303  // Cluster key is cluster and EDS service name.
2304  const auto& cluster_key = load_report_it->first;
2305  LoadReportState& load_report = load_report_it->second;
2306  // If the CDS response for a cluster indicates to use LRS but the
2307  // LRS server does not say that it wants reports for this cluster,
2308  // then we'll have stats objects here whose data we're not going to
2309  // include in the load report. However, we still need to clear out
2310  // the data from the stats objects, so that if the LRS server starts
2311  // asking for the data in the future, we don't incorrectly include
2312  // data from previous reporting intervals in that future report.
2313  const bool record_stats =
2314  send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2315  XdsApi::ClusterLoadReport snapshot;
2316  // Aggregate drop stats.
2317  snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2318  if (load_report.drop_stats != nullptr) {
2319  snapshot.dropped_requests +=
2320  load_report.drop_stats->GetSnapshotAndReset();
2322  gpr_log(GPR_INFO,
2323  "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2324  this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2325  load_report.drop_stats);
2326  }
2327  }
2328  // Aggregate locality stats.
2329  for (auto it = load_report.locality_stats.begin();
2330  it != load_report.locality_stats.end();) {
2331  const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2332  auto& locality_state = it->second;
2333  XdsClusterLocalityStats::Snapshot& locality_snapshot =
2334  snapshot.locality_stats[locality_name];
2335  locality_snapshot = std::move(locality_state.deleted_locality_stats);
2336  if (locality_state.locality_stats != nullptr) {
2337  locality_snapshot +=
2338  locality_state.locality_stats->GetSnapshotAndReset();
2340  gpr_log(GPR_INFO,
2341  "[xds_client %p] cluster=%s eds_service_name=%s "
2342  "locality=%s locality_stats=%p",
2343  this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2344  locality_name->AsHumanReadableString().c_str(),
2345  locality_state.locality_stats);
2346  }
2347  }
2348  // If the only thing left in this entry was final snapshots from
2349  // deleted locality stats objects, remove the entry.
2350  if (locality_state.locality_stats == nullptr) {
2351  it = load_report.locality_stats.erase(it);
2352  } else {
2353  ++it;
2354  }
2355  }
2356  // Compute load report interval.
2357  const Timestamp now = ExecCtx::Get()->Now();
2358  snapshot.load_report_interval = now - load_report.last_report_time;
2359  load_report.last_report_time = now;
2360  // Record snapshot.
2361  if (record_stats) {
2362  snapshot_map[cluster_key] = std::move(snapshot);
2363  }
2364  // If the only thing left in this entry was final snapshots from
2365  // deleted stats objects, remove the entry.
2366  if (load_report.locality_stats.empty() &&
2367  load_report.drop_stats == nullptr) {
2368  load_report_it = load_report_map.erase(load_report_it);
2369  } else {
2370  ++load_report_it;
2371  }
2372  }
2373  return snapshot_map;
2374 }
2375 
2377  MutexLock lock(&mu_);
2378  XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;
2379  for (const auto& a : authority_state_map_) { // authority
2380  const std::string& authority = a.first;
2381  for (const auto& t : a.second.resource_map) { // type
2382  const XdsResourceType* type = t.first;
2383  auto& resource_metadata_map =
2384  resource_type_metadata_map[type->type_url()];
2385  for (const auto& r : t.second) { // resource id
2386  const XdsResourceKey& resource_key = r.first;
2387  const ResourceState& resource_state = r.second;
2388  resource_metadata_map[ConstructFullXdsResourceName(
2389  authority, type->type_url(), resource_key)] = &resource_state.meta;
2390  }
2391  }
2392  }
2393  // Assemble config dump messages
2394  return api_.AssembleClientConfig(resource_type_metadata_map);
2395 }
2396 
2397 //
2398 // accessors for global state
2399 //
2400 
2402  g_mu = new Mutex;
2405 }
2406 
2407 // TODO(roth): Find a better way to clear the fallback config that does
2408 // not require using ABSL_NO_THREAD_SAFETY_ANALYSIS.
2410  gpr_free(g_fallback_bootstrap_config);
2411  g_fallback_bootstrap_config = nullptr;
2412  delete g_mu;
2413  g_mu = nullptr;
2416 }
2417 
2418 namespace {
2419 
2420 std::string GetBootstrapContents(const char* fallback_config,
2422  // First, try GRPC_XDS_BOOTSTRAP env var.
2423  UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
2424  if (path != nullptr) {
2426  gpr_log(GPR_INFO,
2427  "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
2428  "environment variable: %s",
2429  path.get());
2430  }
2432  *error =
2433  grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
2434  if (!GRPC_ERROR_IS_NONE(*error)) return "";
2435  std::string contents_str(StringViewFromSlice(contents));
2437  return contents_str;
2438  }
2439  // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
2440  UniquePtr<char> env_config(gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG"));
2441  if (env_config != nullptr) {
2443  gpr_log(GPR_INFO,
2444  "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
2445  "environment variable");
2446  }
2447  return env_config.get();
2448  }
2449  // Finally, try fallback config.
2450  if (fallback_config != nullptr) {
2452  gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
2453  }
2454  return fallback_config;
2455  }
2456  // No bootstrap config found.
2458  "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
2459  "not defined");
2460  return "";
2461 }
2462 
2463 } // namespace
2464 
2467  RefCountedPtr<XdsClient> xds_client;
2468  // If getting bootstrap from channel args, create a local XdsClient
2469  // instance for the channel or server instead of using the global instance.
2470  const char* bootstrap_config = grpc_channel_args_find_string(
2472  if (bootstrap_config != nullptr) {
2473  std::unique_ptr<XdsBootstrap> bootstrap =
2474  XdsBootstrap::Create(bootstrap_config, error);
2475  if (GRPC_ERROR_IS_NONE(*error)) {
2476  grpc_channel_args* xds_channel_args =
2477  grpc_channel_args_find_pointer<grpc_channel_args>(
2478  args,
2480  return MakeRefCounted<XdsClient>(std::move(bootstrap), xds_channel_args);
2481  }
2482  return nullptr;
2483  }
2484  // Otherwise, use the global instance.
2485  {
2486  MutexLock lock(g_mu);
2487  if (g_xds_client != nullptr) {
2488  auto xds_client = g_xds_client->RefIfNonZero();
2489  if (xds_client != nullptr) return xds_client;
2490  }
2491  // Find bootstrap contents.
2492  std::string bootstrap_contents =
2493  GetBootstrapContents(g_fallback_bootstrap_config, error);
2494  if (!GRPC_ERROR_IS_NONE(*error)) return nullptr;
2496  gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
2497  bootstrap_contents.c_str());
2498  }
2499  // Parse bootstrap.
2500  std::unique_ptr<XdsBootstrap> bootstrap =
2501  XdsBootstrap::Create(bootstrap_contents, error);
2502  if (!GRPC_ERROR_IS_NONE(*error)) return nullptr;
2503  // Instantiate XdsClient.
2504  xds_client =
2505  MakeRefCounted<XdsClient>(std::move(bootstrap), g_channel_args);
2506  g_xds_client = xds_client.get();
2507  }
2508  return xds_client;
2509 }
2510 
2511 namespace internal {
2512 
2514  MutexLock lock(g_mu);
2515  g_channel_args = args;
2516 }
2517 
2519  MutexLock lock(g_mu);
2520  g_xds_client = nullptr;
2521 }
2522 
2524  MutexLock lock(g_mu);
2525  gpr_free(g_fallback_bootstrap_config);
2526  g_fallback_bootstrap_config = gpr_strdup(config);
2527 }
2528 
2529 } // namespace internal
2530 
2531 //
2532 // embedding XdsClient in channel args
2533 //
2534 
2535 #define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client"
2536 
2537 namespace {
2538 
2539 void* XdsClientArgCopy(void* p) {
2540  XdsClient* xds_client = static_cast<XdsClient*>(p);
2541  xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
2542  return p;
2543 }
2544 
2545 void XdsClientArgDestroy(void* p) {
2546  XdsClient* xds_client = static_cast<XdsClient*>(p);
2547  xds_client->Unref(DEBUG_LOCATION, "channel arg");
2548 }
2549 
2550 int XdsClientArgCmp(void* p, void* q) { return QsortCompare(p, q); }
2551 
2552 const grpc_arg_pointer_vtable kXdsClientArgVtable = {
2553  XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp};
2554 
2555 } // namespace
2556 
2558  return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
2559  const_cast<XdsClient*>(this),
2560  &kXdsClientArgVtable);
2561 }
2562 
2564  const grpc_channel_args& args) {
2565  XdsClient* xds_client =
2566  grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
2567  if (xds_client == nullptr) return nullptr;
2568  return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
2569 }
2570 
2571 } // namespace grpc_core
2572 
2573 // The returned bytes may contain NULL(0), so we can't use c-string.
2575  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2578  auto xds_client = grpc_core::XdsClient::GetOrCreate(nullptr, &error);
2579  if (!GRPC_ERROR_IS_NONE(error)) {
2580  // If we isn't using xDS, just return an empty string.
2582  return grpc_empty_slice();
2583  }
2584  return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary());
2585 }
absl::StrSplit
strings_internal::Splitter< typename strings_internal::SelectDelimiter< Delimiter >::type, AllowEmpty, absl::string_view > StrSplit(strings_internal::ConvertibleToStringView text, Delimiter d)
Definition: abseil-cpp/absl/strings/str_split.h:499
grpc_core::ClientChannel::GetFromChannel
static ClientChannel * GetFromChannel(Channel *channel)
Definition: client_channel.cc:1009
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_arg
Definition: grpc_types.h:103
grpc_core::CoreConfiguration::channel_creds_registry
const ChannelCredsRegistry & channel_creds_registry() const
Definition: core_configuration.h:149
absl::InvalidArgumentError
Status InvalidArgumentError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:351
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG
#define GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG
Definition: grpc_types.h:365
grpc_core::XdsClient::args_
grpc_channel_args * args_
Definition: xds_client.h:317
grpc_core::XdsClient::ChannelState::LrsCallState::on_response_received_
grpc_closure on_response_received_
Definition: xds_client.cc:461
grpc_channel_args_find_string
char * grpc_channel_args_find_string(const grpc_channel_args *args, const char *name)
Definition: channel_args.cc:441
slice.h
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
absl::MaxSplits
strings_internal::MaxSplitsImpl< typename strings_internal::SelectDelimiter< Delimiter >::type > MaxSplits(Delimiter delimiter, int limit)
Definition: abseil-cpp/absl/strings/str_split.h:294
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_op::grpc_op_data::grpc_op_send_message::send_message
struct grpc_byte_buffer * send_message
Definition: grpc_types.h:668
grpc_core::ChannelCredsRegistry::CreateChannelCreds
RefCountedPtr< T > CreateChannelCreds(const std::string &creds_type, const Json &config) const
Definition: channel_creds_registry.h:88
grpc_op::flags
uint32_t flags
Definition: grpc_types.h:644
grpc_call_error
grpc_call_error
Definition: grpc_types.h:464
grpc_core::XdsClient::ChannelState::Orphan
void Orphan() override
Definition: xds_client.cc:558
grpc_core::XdsClient::XdsResourceName::key
XdsResourceKey key
Definition: xds_client.h:180
grpc_core::XdsClient::ChannelState::AdsCallState::Orphan
void Orphan() override
Definition: xds_client.cc:1053
grpc_core::XdsApi::ClusterLoadReport::dropped_requests
XdsClusterDropStats::Snapshot dropped_requests
Definition: xds_api.h:81
now
static double now(void)
Definition: test/core/fling/client.cc:130
grpc_core::XdsClusterSpecifierPluginRegistry::Shutdown
static void Shutdown()
Definition: xds_cluster_specifier_plugin.cc:149
grpc_core::XdsBootstrap::Create
static std::unique_ptr< XdsBootstrap > Create(absl::string_view json_string, grpc_error_handle *error)
Definition: xds_bootstrap.cc:180
regen-readme.it
it
Definition: regen-readme.py:15
grpc_core::XdsClient::LoadReportState::LocalityState::deleted_locality_stats
XdsClusterLocalityStats::Snapshot deleted_locality_stats
Definition: xds_client.h:262
orphanable.h
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::trailing_metadata
grpc_metadata_array * trailing_metadata
Definition: grpc_types.h:701
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc_core::XdsClient::ChannelState::AdsCallState::OnRequestSent
void static SendMessageLocked(const XdsResourceType *type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient voi OnRequestSent)(void *arg, grpc_error_handle error)
Definition: xds_client.cc:323
log.h
core_configuration.h
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::status
grpc_status_code * status
Definition: grpc_types.h:702
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState::~ResourceTypeState
~ResourceTypeState()
Definition: xds_client.cc:308
grpc_core::XdsResourceType::type_url
virtual absl::string_view type_url() const =0
backoff.h
GRPC_STATUS_UNAVAILABLE
@ GRPC_STATUS_UNAVAILABLE
Definition: include/grpc/impl/codegen/status.h:143
grpc_raw_byte_buffer_create
GRPCAPI grpc_byte_buffer * grpc_raw_byte_buffer_create(grpc_slice *slices, size_t nslices)
Definition: byte_buffer.cc:34
MutexLock
#define MutexLock(x)
Definition: bloaty/third_party/re2/util/mutex.h:125
grpc_core::XdsClient::ChannelState::StateWatcher::OnConnectivityStateChange
void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status &status) override
Definition: xds_client.cc:487
lame_client.h
const
#define const
Definition: bloaty/third_party/zlib/zconf.h:230
grpc_load_file
grpc_error_handle grpc_load_file(const char *filename, int add_null_terminator, grpc_slice *output)
Definition: load_file.cc:33
grpc_core::WorkSerializer::DrainQueue
void DrainQueue()
Definition: work_serializer.cc:229
grpc_core::XdsClient::ChannelState::AdsCallState::parent_
std::vector< std::string > ResourceNamesForRequest(const XdsResourceType *type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient RefCountedPtr< RetryableCall< AdsCallState > > parent_
Definition: xds_client.cc:337
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc_core::XdsClient::ChannelState::AdsCallState::buffered_requests_
std::set< const XdsResourceType * > buffered_requests_
Definition: xds_client.cc:367
chand_
ClientChannel * chand_
Definition: client_channel.cc:321
call_
grpc_call * call_
Definition: rls.cc:669
channel_fwd.h
memset
return memset(p, 0, total)
connectivity_state.h
load_file.h
xds_client_stats.h
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
grpc_core::LameClientFilter::kFilter
static const grpc_channel_filter kFilter
Definition: lame_client.h:49
grpc_core::XdsClient::LoadReportState::last_report_time
Timestamp last_report_time
Definition: xds_client.h:270
grpc_core::XdsClientGlobalShutdown
void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS
Definition: xds_client.cc:2409
GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS
#define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS
Definition: xds_client.cc:85
grpc_core::XdsClient::AddClusterDropStats
RefCountedPtr< XdsClusterDropStats > AddClusterDropStats(const XdsBootstrap::XdsServer &xds_server, absl::string_view cluster_name, absl::string_view eds_service_name)
Definition: xds_client.cc:2098
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::xds_client
XdsClient * xds_client() const
Definition: xds_client.cc:419
Timestamp
Definition: bloaty/third_party/protobuf/src/google/protobuf/timestamp.pb.h:69
grpc_channel_reset_connect_backoff
GRPCAPI void grpc_channel_reset_connect_backoff(grpc_channel *channel)
Definition: channel.cc:273
grpc_core::XdsClient::ChannelState::LrsCallState::call_
grpc_call * call_
Definition: xds_client.cc:450
grpc_core::RefCountedPtr::get
T * get() const
Definition: ref_counted_ptr.h:146
grpc_op::grpc_op_data::send_initial_metadata
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
slice.h
false
#define false
Definition: setup_once.h:323
grpc_core::XdsApi::CreateLrsRequest
grpc_slice CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map)
Definition: xds_api.cc:529
grpc_core::RefCounted::RefIfNonZero
RefCountedPtr< Child > RefIfNonZero() GRPC_MUST_USE_RESULT
Definition: ref_counted.h:313
grpc_core::XdsClient::ChannelState::AdsCallState::seen_response_
bool seen_response_
Definition: xds_client.cc:344
grpc_core::XdsEncodingContext
Definition: upb_utils.h:39
report_interval_
const Duration report_interval_
Definition: oob_backend_metric.cc:138
grpc_core::XdsClient::ChannelState::LrsCallState
Definition: xds_client.cc:374
grpc_core::InternallyRefCounted< ResourceTimer >::Unref
void Unref()
Definition: orphanable.h:100
grpc_core
Definition: call_metric_recorder.h:31
cluster_name
std::string cluster_name
Definition: xds_cluster_resolver.cc:91
grpc_metadata_array
Definition: grpc_types.h:579
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
calld_
CallData * calld_
Definition: retry_filter.cc:447
grpc_op::reserved
void * reserved
Definition: grpc_types.h:646
grpc_core::XdsClient::ResourceWatcherInterface
Definition: xds_client.h:66
grpc_core::XdsApi::AdsResponseParserInterface::ParseResource
virtual void ParseResource(const XdsEncodingContext &context, size_t idx, absl::string_view type_url, absl::string_view serialized_resource)=0
grpc_byte_buffer_reader_readall
GRPCAPI grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader)
Definition: byte_buffer_reader.cc:84
string.h
grpc_core::XdsClient
Definition: xds_client.h:60
grpc_core::slice_detail::StaticConstructors< StaticSlice >::FromStaticString
static StaticSlice FromStaticString(const char *s)
Definition: src/core/lib/slice/slice.h:201
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::result_
Result result_
Definition: xds_client.cc:202
grpc_core::XdsClient::ResetBackoff
void ResetBackoff()
Definition: xds_client.cc:2225
absl::StartsWith
bool StartsWith(absl::string_view text, absl::string_view prefix) noexcept
Definition: third_party/abseil-cpp/absl/strings/match.h:58
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_core::DualRefCounted
Definition: dual_ref_counted.h:48
grpc_pollset_set_create
grpc_pollset_set * grpc_pollset_set_create()
Definition: pollset_set.cc:29
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc_core::StringViewFromSlice
absl::string_view StringViewFromSlice(const grpc_slice &slice)
Definition: slice_internal.h:93
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
grpc_core::XdsClient::ChannelState::xds_client
XdsClient * xds_client() const
Definition: xds_client.h:200
grpc_core::XdsClusterSpecifierPluginRegistry::Init
static void Init()
Definition: xds_cluster_specifier_plugin.cc:143
grpc_channel_element
Definition: channel_stack.h:186
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
propagation_bits.h
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
client_channel.h
grpc_core::XdsBootstrap::XdsServer::IgnoreResourceDeletion
bool IgnoreResourceDeletion() const
Definition: xds_bootstrap.cc:171
grpc_core::XdsResourceType::InitUpbSymtab
virtual void InitUpbSymtab(upb_DefPool *symtab) const =0
grpc_core::XdsClient::mu_
Mutex mu_
Definition: xds_client.h:325
xds_http_filters.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc_core::XdsClient::ChannelState::RetryableCall::backoff_
BackOff backoff_
Definition: xds_client.cc:138
grpc_channel_get_channel_stack
grpc_channel_stack * grpc_channel_get_channel_stack(grpc_channel *channel)
Definition: src/core/lib/surface/channel.h:178
grpc_core::XdsApi::ResourceMetadata::DOES_NOT_EXIST
@ DOES_NOT_EXIST
Definition: xds_api.h:104
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
grpc_core::XdsClient::ChannelState::LrsCallState::~LrsCallState
~LrsCallState() override
Definition: xds_client.cc:1620
closure.h
grpc_core::XdsClient::GetResourceTypeLocked
const XdsResourceType * GetResourceTypeLocked(absl::string_view resource_type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: xds_client.cc:2047
GRPC_CALL_OK
@ GRPC_CALL_OK
Definition: grpc_types.h:466
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
status
absl::Status status
Definition: rls.cc:251
grpc_core::XdsClient::ChannelState::RetryableCall::StartNewCallLocked
void StartNewCallLocked()
Definition: xds_client.cc:700
grpc_core::WorkSerializer::Run
void Run(std::function< void()> callback, const DebugLocation &location)
Definition: work_serializer.cc:219
grpc_core::ApplicationCallbackExecCtx
Definition: exec_ctx.h:283
grpc_core::DualRefCounted< XdsClient >::WeakRef
WeakRefCountedPtr< XdsClient > WeakRef() GRPC_MUST_USE_RESULT
Definition: dual_ref_counted.h:149
setup.name
name
Definition: setup.py:542
grpc_core::XdsClient::ChannelState::AdsCallState::HasSubscribedResources
void SubscribeLocked(const XdsResourceType *type, const XdsResourceName &name, bool delay_send) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient void UnsubscribeLocked(const XdsResourceType *type, const XdsResourceName &name, bool delay_unsubscription) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient boo HasSubscribedResources)() const
Definition: xds_client.cc:168
env.h
grpc_core::XdsClient::BuildLoadReportSnapshotLocked
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(const XdsBootstrap::XdsServer &xds_server, bool send_all_clusters, const std::set< std::string > &clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: xds_client.cc:2291
version
Definition: version.py:1
grpc_core::XdsClient::MakeChannelArg
grpc_arg MakeChannelArg() const
Definition: xds_client.cc:2557
time.h
check_documentation.path
path
Definition: check_documentation.py:57
grpc_core::XdsClient::ChannelState::LrsCallState::on_initial_request_sent_
grpc_closure on_initial_request_sent_
Definition: xds_client.cc:457
a
int a
Definition: abseil-cpp/absl/container/internal/hash_policy_traits_test.cc:88
absl::StripPrefix
ABSL_MUST_USE_RESULT absl::string_view StripPrefix(absl::string_view str, absl::string_view prefix)
Definition: abseil-cpp/absl/strings/strip.h:73
grpc_core::XdsClient::AuthorityState::resource_map
std::map< const XdsResourceType *, std::map< XdsResourceKey, ResourceState > > resource_map
Definition: xds_client.h:256
grpc_core::XdsClient::Orphan
void Orphan() override
Definition: xds_client.cc:1884
grpc_core::XdsClient::ChannelState::LrsCallState::LrsCallState
LrsCallState(RefCountedPtr< RetryableCall< LrsCallState >> parent)
Definition: xds_client.cc:1519
xds_manager.p
p
Definition: xds_manager.py:60
grpc_core::XdsClient::LoadReportState::LocalityState
Definition: xds_client.h:260
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::name_
const XdsResourceName name_
Definition: xds_client.cc:298
channel_creds_registry.h
grpc_core::XdsClient::ChannelState::AdsCallState::on_request_sent_
grpc_closure on_request_sent_
Definition: xds_client.cc:354
grpc_core::XdsClient::bootstrap_
std::unique_ptr< XdsBootstrap > bootstrap_
Definition: xds_client.h:316
grpc_core::URI::Parse
static absl::StatusOr< URI > Parse(absl::string_view uri_text)
Definition: uri_parser.cc:209
grpc_timer
Definition: iomgr/timer.h:33
grpc_core::XdsClient::ChannelState::RetryableCall::OnRetryTimer
static void OnRetryTimer(void *arg, grpc_error_handle error)
Definition: xds_client.cc:733
grpc_core::XdsClient::XdsResourceKey
Definition: xds_client.h:167
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::Result::errors
std::vector< std::string > errors
Definition: xds_client.cc:178
credentials.h
grpc_core::XdsClient::ConstructFullXdsResourceName
static std::string ConstructFullXdsResourceName(absl::string_view authority, absl::string_view resource_type, const XdsResourceKey &key)
Definition: xds_client.cc:2084
grpc_arg_pointer_vtable
Definition: grpc_types.h:85
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
GRPC_ARG_XDS_CLIENT
#define GRPC_ARG_XDS_CLIENT
Definition: xds_client.cc:2535
grpc_core::grpc_xds_client_trace
TraceFlag grpc_xds_client_trace(false, "xds_client")
Definition: xds_client.h:57
grpc_core::XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked
void StartRetryTimerLocked()
Definition: xds_client.cc:715
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState::subscribed_resources
std::map< std::string, std::map< XdsResourceKey, OrphanablePtr< ResourceTimer > > > subscribed_resources
Definition: xds_client.cc:317
T
#define T(upbtypeconst, upbtype, ctype, default_value)
grpc_core::WorkSerializer::Schedule
void Schedule(std::function< void()> callback, const DebugLocation &location)
Definition: work_serializer.cc:224
grpc_op::grpc_op_data::recv_message
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
server_
Server *const server_
Definition: chttp2_server.cc:260
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
grpc_op::data
union grpc_op::grpc_op_data data
grpc_core::XdsClient::ChannelState::~ChannelState
~ChannelState() override
Definition: xds_client.cc:545
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::timer_
grpc_timer timer_
Definition: xds_client.cc:303
status.h
byte_buffer_reader.h
grpc_core::XdsClient::ChannelState::ads_calld_
OrphanablePtr< RetryableCall< AdsCallState > > ads_calld_
Definition: xds_client.h:236
absl::synchronization_internal::Get
static GraphId Get(const IdMap &id, int num)
Definition: abseil-cpp/absl/synchronization/internal/graphcycles_test.cc:44
grpc_core::XdsClient::RemoveClusterDropStats
void RemoveClusterDropStats(const XdsBootstrap::XdsServer &xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, XdsClusterDropStats *cluster_drop_stats)
Definition: xds_client.cc:2138
grpc_core::XdsClient::ChannelState::AdsCallState::sent_initial_message_
bool sent_initial_message_
Definition: xds_client.cc:343
memory.h
grpc_core::URI::QueryParam
Definition: uri_parser.h:33
grpc_core::XdsClient::LoadReportState::LocalityState::locality_stats
XdsClusterLocalityStats * locality_stats
Definition: xds_client.h:261
grpc_core::XdsApi::ResourceTypeMetadataMap
std::map< absl::string_view, ResourceMetadataMap > ResourceTypeMetadataMap
Definition: xds_api.h:129
grpc_core::XdsClient::ChannelState::AdsCallState::initial_metadata_recv_
grpc_metadata_array initial_metadata_recv_
Definition: xds_client.cc:350
grpc_metadata_array_destroy
GRPCAPI void grpc_metadata_array_destroy(grpc_metadata_array *array)
Definition: metadata_array.cc:35
grpc_core::XdsClient::ChannelState::channel_
grpc_channel * channel_
Definition: xds_client.h:231
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::XdsClient::ChannelState::AdsCallState::status_code_
grpc_status_code status_code_
Definition: xds_client.cc:362
grpc_op::grpc_op_data::grpc_op_recv_message::recv_message
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:693
grpc_core::XdsApi::AdsResponseParserInterface::ProcessAdsResponseFields
virtual absl::Status ProcessAdsResponseFields(AdsResponseFields fields)=0
string_util.h
grpc_core::XdsClient::ChannelState::RetryableCall::calld
T * calld() const
Definition: xds_client.cc:120
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::Result::version
std::string version
Definition: xds_client.cc:176
grpc_core::XdsClient::ChannelState::AdsCallState::send_message_payload_
grpc_byte_buffer * send_message_payload_
Definition: xds_client.cc:353
server
std::unique_ptr< Server > server
Definition: channelz_service_test.cc:330
grpc_core::XdsClient::XdsResourceName::authority
std::string authority
Definition: xds_client.h:179
grpc_core::XdsClient::ChannelState::LrsCallState::recv_message_payload_
grpc_byte_buffer * recv_message_payload_
Definition: xds_client.cc:460
grpc_channel_stack_last_element
grpc_channel_element * grpc_channel_stack_last_element(grpc_channel_stack *channel_stack)
Definition: channel_stack.cc:83
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
grpc_core::XdsApi::AssembleClientConfig
std::string AssembleClientConfig(const ResourceTypeMetadataMap &resource_type_metadata_map)
Definition: xds_api.cc:652
asyncio_get_stats.parser
parser
Definition: asyncio_get_stats.py:34
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::Result::type
const XdsResourceType * type
Definition: xds_client.cc:174
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc_core::XdsClient::ChannelState::AdsCallState::seen_response
bool seen_response() const
Definition: xds_client.cc:159
GRPC_INITIAL_METADATA_WAIT_FOR_READY
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY
Definition: grpc_types.h:523
gpr_getenv
char * gpr_getenv(const char *name)
GRPC_OP_RECV_INITIAL_METADATA
@ GRPC_OP_RECV_INITIAL_METADATA
Definition: grpc_types.h:617
grpc_core::XdsClient::NotifyWatchersOnErrorLocked
void NotifyWatchersOnErrorLocked(const std::map< ResourceWatcherInterface *, RefCountedPtr< ResourceWatcherInterface >> &watchers, absl::Status status)
Definition: xds_client.cc:2260
xds_channel_args.h
grpc_core::RefCountedPtr
Definition: ref_counted_ptr.h:35
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::XdsResourceType::v2_type_url
virtual absl::string_view v2_type_url() const =0
absl::StrJoin
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
Definition: abseil-cpp/absl/strings/str_join.h:239
grpc_core::CoreConfiguration::Get
static const CoreConfiguration & Get()
Definition: core_configuration.h:82
re2::Result
TestInstance::Result Result
Definition: bloaty/third_party/re2/re2/testing/tester.cc:96
grpc_core::XdsApi::AdsResponseParserInterface
Definition: xds_api.h:58
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
grpc_core::XdsClient::ChannelState::AdsCallState::~AdsCallState
~AdsCallState() override
Definition: xds_client.cc:1043
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_core::XdsClusterLocalityStats::Snapshot
Definition: xds_client_stats.h:176
grpc_core::XdsApi::CreateAdsRequest
grpc_slice CreateAdsRequest(const XdsBootstrap::XdsServer &server, absl::string_view type_url, absl::string_view version, absl::string_view nonce, const std::vector< std::string > &resource_names, grpc_error_handle error, bool populate_node)
Definition: xds_api.cc:272
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Definition: call.cc:1770
grpc_core::XdsClient::ChannelState::RetryableCall
Definition: xds_client.cc:111
channel_stack.h
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
Definition: grpc_types.h:526
grpc_core::XdsClient::ChannelState::StateWatcher::StateWatcher
StateWatcher(WeakRefCountedPtr< ChannelState > parent)
Definition: xds_client.cc:483
grpc_core::InternallyRefCounted< ResourceTimer >::Ref
RefCountedPtr< ResourceTimer > Ref() GRPC_MUST_USE_RESULT
Definition: orphanable.h:90
xds_client.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::Result::resources_seen
std::map< std::string, std::set< XdsResourceKey > > resources_seen
Definition: xds_client.cc:180
grpc_core::XdsClient::ChannelState::RetryableCall::retry_timer_callback_pending_
bool retry_timer_callback_pending_
Definition: xds_client.cc:141
grpc_core::XdsClient::ChannelState::LrsCallState::parent
RetryableCall< LrsCallState > * parent()
Definition: xds_client.cc:385
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone
bool OnNextReportTimerLocked(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient bool static SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient void OnReportDone(void *arg, grpc_error_handle error)
Definition: xds_client.cc:412
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
grpc_core::grpc_xds_client_refcount_trace
TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount")
Definition: xds_client.h:58
grpc_core::XdsClient::ChannelState::lrs_calld
LrsCallState * lrs_calld() const
Definition: xds_client.cc:574
grpc_byte_buffer_reader_init
GRPCAPI int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer)
Definition: byte_buffer_reader.cc:33
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
grpc_core::XdsApi::CreateLrsInitialRequest
grpc_slice CreateLrsInitialRequest(const XdsBootstrap::XdsServer &server)
Definition: xds_api.cc:452
grpc_core::XdsBootstrap::XdsServer::ShouldUseV3
bool ShouldUseV3() const
Definition: xds_bootstrap.cc:166
conf.version
string version
Definition: doc/python/sphinx/conf.py:36
xds_bootstrap.h
grpc_core::XdsClient::ChannelState::StateWatcher::parent_
WeakRefCountedPtr< ChannelState > parent_
Definition: xds_client.cc:508
grpc.h
grpc_call
struct grpc_call grpc_call
Definition: grpc_types.h:70
grpc_pollset_set_destroy
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set)
Definition: pollset_set.cc:33
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::timer_callback_
grpc_closure timer_callback_
Definition: xds_client.cc:304
grpc_core::fail
Poll< absl::StatusOr< std::tuple< T... > > > fail()
Definition: try_join_test.cc:45
connectivity_state.h
grpc_byte_buffer
Definition: grpc_types.h:43
absl::Status::message
absl::string_view message() const
Definition: third_party/abseil-cpp/absl/status/status.h:806
grpc_core::XdsClient::ChannelState::LrsCallState::xds_client
XdsClient * xds_client() const
Definition: xds_client.cc:387
grpc_core::XdsClient::ChannelState::RetryableCall::IsCurrentCallOnChannel
bool IsCurrentCallOnChannel() const
grpc_core::XdsClient::ChannelState::HasActiveAdsCall
bool HasActiveAdsCall() const
Definition: xds_client.cc:579
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::Result::have_valid_resources
bool have_valid_resources
Definition: xds_client.cc:181
grpc_core::DualRefCounted< ChannelState >::Ref
RefCountedPtr< ChannelState > Ref() GRPC_MUST_USE_RESULT
Definition: dual_ref_counted.h:52
pollset_set.h
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
grpc_core::XdsClient::ChannelState::LrsCallState::trailing_metadata_recv_
grpc_metadata_array trailing_metadata_recv_
Definition: xds_client.cc:464
grpc_op
Definition: grpc_types.h:640
GRPC_OP_SEND_MESSAGE
@ GRPC_OP_SEND_MESSAGE
Definition: grpc_types.h:602
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
grpc_core::XdsClient::ChannelState::RetryableCall::calld_
OrphanablePtr< T > calld_
Definition: xds_client.cc:133
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
grpc_core::XdsClient::ChannelState::MaybeStartLrsCall
void MaybeStartLrsCall()
Definition: xds_client.cc:583
channel_
RefCountedPtr< Channel > channel_
Definition: channel_connectivity.cc:209
arg
Definition: cmdline.cc:40
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState::nonce
std::string nonce
Definition: xds_client.cc:311
grpc_core::XdsClient::ChannelState::RetryableCall::OnRetryTimerLocked
void OnRetryTimerLocked(grpc_error_handle error)
Definition: xds_client.cc:744
grpc_empty_slice
GPRAPI grpc_slice grpc_empty_slice(void)
Definition: slice/slice.cc:42
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
absl::InlinedVector::data
pointer data() noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:302
grpc_core::XdsClient::ChannelState::LrsCallState::send_message_payload_
grpc_byte_buffer * send_message_payload_
Definition: xds_client.cc:456
grpc_core::XdsClient::ChannelState::AdsCallState::AdsCallState
AdsCallState(RefCountedPtr< RetryableCall< AdsCallState >> parent)
Definition: xds_client.cc:937
grpc_core::XdsClusterLocalityStats
Definition: xds_client_stats.h:158
grpc_dump_xds_configs
grpc_slice grpc_dump_xds_configs(void)
Definition: xds_client.cc:2574
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS
#define GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS
Definition: xds/xds_channel_args.h:23
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS
#define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS
Definition: xds_client.cc:87
grpc_core::XdsClient::LoadReportState::drop_stats
XdsClusterDropStats * drop_stats
Definition: xds_client.h:265
grpc_core::XdsClient::ChannelState::LrsCallState::initial_metadata_recv_
grpc_metadata_array initial_metadata_recv_
Definition: xds_client.cc:453
grpc_core::XdsClient::GetOrCreate
static RefCountedPtr< XdsClient > GetOrCreate(const grpc_channel_args *args, grpc_error_handle *error)
Definition: xds_client.cc:2465
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::parent_
RefCountedPtr< LrsCallState > parent_
Definition: xds_client.cc:422
grpc_core::InternallyRefCounted
Definition: orphanable.h:73
absl::InlinedVector::size
size_type size() const noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:270
grpc_core::XdsFederationEnabled
bool XdsFederationEnabled()
Definition: xds_bootstrap.cc:48
grpc_core::XdsClient::ChannelState::AdsCallState::parent
RetryableCall< AdsCallState > * parent() const
Definition: xds_client.cc:156
slice_internal.h
watcher_
RefCountedPtr< ConnectivityStateWatcherInterface > watcher_
Definition: health_check_client.cc:155
grpc_core::XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked
void MaybeStartReportingLocked()
Definition: xds_client.cc:1642
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_core::XdsClient::ResourceState::resource
std::unique_ptr< XdsResourceType::ResourceData > resource
Definition: xds_client.h:248
grpc_core::internal::UnsetGlobalXdsClientForTest
void UnsetGlobalXdsClientForTest()
Definition: xds_client.cc:2518
retry_timer_callback_pending_
bool retry_timer_callback_pending_
Definition: grpclb.cc:536
grpc_core::XdsClient::LoadReportState
Definition: xds_client.h:259
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_core::BackOff
Definition: backoff.h:32
grpc_core::XdsClient::ChannelState::RetryableCall::chand
ChannelState * chand() const
Definition: xds_client.cc:121
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::Reporter
Reporter(RefCountedPtr< LrsCallState > parent, Duration report_interval)
Definition: xds_client.cc:394
xds_cluster_specifier_plugin.h
setup.idx
idx
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:197
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState
Definition: xds_client.cc:307
grpc_op::op
grpc_op_type op
Definition: grpc_types.h:642
grpc_core::XdsHttpFilterRegistry::Init
static void Init()
Definition: xds_http_filters.cc:111
grpc_core::XdsClient::ChannelState::StateWatcher
Definition: xds_client.cc:480
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
grpc_core::XdsClusterDropStats
Definition: xds_client_stats.h:104
grpc_core::XdsClient::ChannelState::AdsCallState::xds_client
XdsClient * xds_client() const
Definition: xds_client.cc:158
grpc_op::grpc_op_data::grpc_op_send_initial_metadata::count
size_t count
Definition: grpc_types.h:653
grpc_core::XdsClusterDropStats::GetSnapshotAndReset
Snapshot GetSnapshotAndReset()
Definition: xds_client_stats.cc:75
grpc_core::XdsClient::GetOrCreateChannelStateLocked
RefCountedPtr< ChannelState > GetOrCreateChannelStateLocked(const XdsBootstrap::XdsServer &server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: xds_client.cc:1901
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::status_details
grpc_slice * status_details
Definition: grpc_types.h:703
grpc_core::internal::SetXdsFallbackBootstrapConfig
void SetXdsFallbackBootstrapConfig(const char *config)
Definition: xds_client.cc:2523
grpc_core::XdsClient::ResourceState::meta
XdsApi::ResourceMetadata meta
Definition: xds_client.h:249
details
static grpc_slice details
Definition: test/core/fling/client.cc:46
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_core::XdsClient::ResourceState::ignored_deletion
bool ignored_deletion
Definition: xds_client.h:250
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::xds_client
XdsClient * xds_client() const
Definition: xds_client.cc:198
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::timer_start_needed_
bool timer_start_needed_
Definition: xds_client.cc:301
grpc_core::XdsApi::ClusterLoadReport
Definition: xds_api.h:80
absl::Status
ABSL_NAMESPACE_BEGIN class ABSL_MUST_USE_RESULT Status
Definition: abseil-cpp/absl/status/internal/status_internal.h:36
grpc_slice_to_c_string
GPRAPI char * grpc_slice_to_c_string(grpc_slice s)
Definition: slice/slice.cc:35
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::ads_calld_
RefCountedPtr< AdsCallState > ads_calld_
Definition: xds_client.cc:300
grpc_core::XdsClient::LoadReportState::deleted_drop_stats
XdsClusterDropStats::Snapshot deleted_drop_stats
Definition: xds_client.h:266
GRPC_OP_RECV_MESSAGE
@ GRPC_OP_RECV_MESSAGE
Definition: grpc_types.h:621
absl::Seconds
constexpr Duration Seconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:419
g_mu
static gpr_mu g_mu
Definition: iomgr.cc:55
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::TakeResult
absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient void ParseResource(const XdsEncodingContext &context, size_t idx, absl::string_view type_url, absl::string_view serialized_resource) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient Result TakeResult()
Definition: xds_client.cc:195
GRPC_XDS_RECONNECT_JITTER
#define GRPC_XDS_RECONNECT_JITTER
Definition: xds_client.cc:86
grpc_timer_cancel
void grpc_timer_cancel(grpc_timer *timer)
Definition: iomgr/timer.cc:36
grpc_core::Duration::Milliseconds
static constexpr Duration Milliseconds(int64_t millis)
Definition: src/core/lib/gprpp/time.h:155
contents
string_view contents
Definition: elf.cc:597
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
shutting_down_
bool shutting_down_
Definition: grpclb.cc:516
grpc_core::XdsHttpFilterRegistry::Shutdown
static void Shutdown()
Definition: xds_http_filters.cc:124
grpc_core::XdsClient::XdsClient
XdsClient(std::unique_ptr< XdsBootstrap > bootstrap, const grpc_channel_args *args)
Definition: xds_client.cc:1851
grpc_core::XdsClient::RemoveClusterLocalityStats
void RemoveClusterLocalityStats(const XdsBootstrap::XdsServer &xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, const RefCountedPtr< XdsLocalityName > &locality, XdsClusterLocalityStats *cluster_locality_stats)
Definition: xds_client.cc:2201
grpc_core::XdsClient::ChannelState::LrsCallState::Orphan
void Orphan() override
Definition: xds_client.cc:1630
grpc_core::XdsClient::xds_federation_enabled_
const bool xds_federation_enabled_
Definition: xds_client.h:319
grpc_channel_create
GRPCAPI grpc_channel * grpc_channel_create(const char *target, grpc_channel_credentials *creds, const grpc_channel_args *args)
Definition: chttp2_connector.cc:366
grpc_core::XdsClient::NotifyOnErrorLocked
void NotifyOnErrorLocked(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: xds_client.cc:2232
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
debug_location.h
grpc_core::XdsBootstrap::XdsServer
Definition: xds_bootstrap.h:52
GRPC_PROPAGATE_DEFAULTS
#define GRPC_PROPAGATE_DEFAULTS
Definition: propagation_bits.h:45
grpc_core::XdsClient::ChannelState::AdsCallState::chand
ChannelState * chand() const
Definition: xds_client.cc:157
grpc_core::XdsClient::ChannelState::xds_client_
WeakRefCountedPtr< XdsClient > xds_client_
Definition: xds_client.h:223
key
const char * key
Definition: hpack_parser_table.cc:164
grpc_core::XdsClient::CancelResourceWatch
void CancelResourceWatch(const XdsResourceType *type, absl::string_view listener_name, ResourceWatcherInterface *watcher, bool delay_unsubscription=false)
Definition: xds_client.cc:1990
grpc_core::XdsClient::certificate_provider_store_
OrphanablePtr< CertificateProviderStore > certificate_provider_store_
Definition: xds_client.h:321
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::Orphan
void Orphan() override
Definition: xds_client.cc:213
grpc_channel_arg_integer_create
grpc_arg grpc_channel_arg_integer_create(char *name, int value)
Definition: channel_args.cc:484
eds_service_name
std::string eds_service_name
Definition: xds_cluster_resolver.cc:99
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
grpc_core::XdsClient::ChannelState::LrsCallState::status_details_
grpc_slice status_details_
Definition: xds_client.cc:466
grpc_core::XdsClient::LoadReportServer
Definition: xds_client.h:278
grpc_core::QsortCompare
int QsortCompare(const T &a, const T &b)
Definition: useful.h:95
server
Definition: examples/python/async_streaming/server.py:1
grpc_core::XdsClient::ChannelState::AdsCallState::status_details_
grpc_slice status_details_
Definition: xds_client.cc:363
grpc_core::XdsClient::ChannelState::AdsCallState::call_
grpc_call * call_
Definition: xds_client.cc:347
grpc_core::XdsClient::ChannelState::LrsCallState::seen_response
bool seen_response() const
Definition: xds_client.cc:388
GRPC_OP_SEND_INITIAL_METADATA
@ GRPC_OP_SEND_INITIAL_METADATA
Definition: grpc_types.h:598
grpc_core::XdsClient::ChannelState::LrsCallState::status_code_
grpc_status_code status_code_
Definition: xds_client.cc:465
grpc_core::XdsClient::LoadReportState::locality_stats
std::map< RefCountedPtr< XdsLocalityName >, LocalityState, XdsLocalityName::Less > locality_stats
Definition: xds_client.h:269
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState::error
grpc_error_handle error
Definition: xds_client.cc:312
grpc_op::grpc_op_data::send_message
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
grpc_core::XdsApi::ParseLrsResponse
grpc_error_handle ParseLrsResponse(const grpc_slice &encoded_response, bool *send_all_clusters, std::set< std::string > *cluster_names, Duration *load_reporting_interval)
Definition: xds_api.cc:600
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser
Definition: xds_client.cc:171
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::Result::nonce
std::string nonce
Definition: xds_client.cc:177
grpc_core::XdsClient::ChannelState::ChannelState
ChannelState(WeakRefCountedPtr< XdsClient > xds_client, const XdsBootstrap::XdsServer &server)
Definition: xds_client.cc:528
grpc_core::XdsBootstrap::XdsServer::server_uri
std::string server_uri
Definition: xds_bootstrap.h:53
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::AdsResponseParser
AdsResponseParser(AdsCallState *ads_call_state)
Definition: xds_client.cc:184
grpc_core::URI::Create
static absl::StatusOr< URI > Create(std::string scheme, std::string authority, std::string path, std::vector< QueryParam > query_parameter_pairs, std::string fragment)
Definition: uri_parser.cc:289
GRPC_ERROR_CREATE_FROM_CPP_STRING
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
Definition: error.h:297
grpc_channel_create_pollset_set_call
grpc_call * grpc_channel_create_pollset_set_call(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, const grpc_slice &method, const grpc_slice *host, grpc_core::Timestamp deadline, void *reserved)
Definition: channel.cc:331
alloc.h
GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS
#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS
Definition: xds_client.cc:83
xds_client_
RefCountedPtr< XdsClient > xds_client_
Definition: cds.cc:197
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::MaybeStartTimer
void MaybeStartTimer(RefCountedPtr< AdsCallState > ads_calld) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient
Definition: xds_client.cc:218
grpc_core::XdsClient::ChannelState::AdsCallState::state_map_
std::map< const XdsResourceType *, ResourceTypeState > state_map_
Definition: xds_client.cc:370
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
grpc_core::OrphanablePtr
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:64
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::type_
const XdsResourceType * type_
Definition: xds_client.cc:297
profile_analyzer.fields
list fields
Definition: profile_analyzer.py:266
send_message_payload_
grpc_byte_buffer * send_message_payload_
Definition: grpclb.cc:245
grpc_op::grpc_op_data::recv_status_on_client
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
fix_build_deps.r
r
Definition: fix_build_deps.py:491
xds_api.h
grpc_core::XdsClient::ParseXdsResourceName
absl::StatusOr< XdsResourceName > ParseXdsResourceName(absl::string_view name, const XdsResourceType *type)
Definition: xds_client.cc:2056
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_core::WeakRefCountedPtr
Definition: ref_counted_ptr.h:185
grpc_core::XdsResourceType
Definition: xds_resource_type.h:34
grpc_core::XdsClient::~XdsClient
~XdsClient() override
Definition: xds_client.cc:1873
grpc_core::XdsClient::ChannelState::RetryableCall::Orphan
void Orphan() override
Definition: xds_client.cc:683
grpc_core::XdsClient::WatchResource
void WatchResource(const XdsResourceType *type, absl::string_view name, RefCountedPtr< ResourceWatcherInterface > watcher)
Definition: xds_client.cc:1914
grpc_core::XdsClient::ChannelState::AdsCallState::trailing_metadata_recv_
grpc_metadata_array trailing_metadata_recv_
Definition: xds_client.cc:361
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::report_interval_
const Duration report_interval_
Definition: xds_client.cc:425
grpc_byte_buffer_destroy
GRPCAPI void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)
Definition: byte_buffer.cc:81
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
grpc_core::XdsApi::ClusterLoadReport::load_report_interval
Duration load_report_interval
Definition: xds_api.h:85
grpc_core::XdsClient::ChannelState::RetryableCall::on_retry_timer_
grpc_closure on_retry_timer_
Definition: xds_client.cc:140
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::on_next_report_timer_
grpc_closure on_next_report_timer_
Definition: xds_client.cc:429
type_url
string * type_url
Definition: bloaty/third_party/protobuf/conformance/conformance_cpp.cc:72
GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS
#define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS
Definition: xds/xds_channel_args.h:29
grpc_core::XdsClient::GetFromChannelArgs
static RefCountedPtr< XdsClient > GetFromChannelArgs(const grpc_channel_args &args)
Definition: xds_client.cc:2563
arg
struct arg arg
grpc_core::MakeOrphanable
OrphanablePtr< T > MakeOrphanable(Args &&... args)
Definition: orphanable.h:67
grpc_channel_destroy
GRPCAPI void grpc_channel_destroy(grpc_channel *channel)
Definition: channel.cc:437
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc_core::XdsClient::ChannelState::AdsCallState::on_response_received_
grpc_closure on_response_received_
Definition: xds_client.cc:358
grpc_slice_from_cpp_string
grpc_slice grpc_slice_from_cpp_string(std::string str)
Definition: slice/slice.cc:202
grpc_core::XdsClient::interested_parties_
grpc_pollset_set * interested_parties_
Definition: xds_client.h:320
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::timer_pending_
bool timer_pending_
Definition: xds_client.cc:302
grpc_timer_init
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
Definition: iomgr/timer.cc:31
slice_refcount.h
absl::Status::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: third_party/abseil-cpp/absl/status/status.h:802
absl::UnavailableError
Status UnavailableError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:375
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer
Definition: xds_client.cc:205
grpc_core::XdsClient::work_serializer_
WorkSerializer work_serializer_
Definition: xds_client.h:323
grpc_core::XdsClient::AddClusterLocalityStats
RefCountedPtr< XdsClusterLocalityStats > AddClusterLocalityStats(const XdsBootstrap::XdsServer &xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, RefCountedPtr< XdsLocalityName > locality)
Definition: xds_client.cc:2158
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::XdsClient::api_
XdsApi api_
Definition: xds_client.h:322
ref_counted_ptr.h
grpc_core::XdsClientGlobalInit
void XdsClientGlobalInit()
Definition: xds_client.cc:2401
grpc_channel
struct grpc_channel grpc_channel
Definition: grpc_types.h:62
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
watcher
ClusterWatcher * watcher
Definition: cds.cc:148
GPR_MS_PER_SEC
#define GPR_MS_PER_SEC
Definition: include/grpc/support/time.h:39
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::next_report_timer_
grpc_timer next_report_timer_
Definition: xds_client.cc:428
grpc_core::XdsClient::ChannelState::resource_type_version_map_
std::map< const XdsResourceType *, std::string > resource_type_version_map_
Definition: xds_client.h:241
channel_args.h
timer.h
GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER
#define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER
Definition: xds_client.cc:84
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::on_report_done_
grpc_closure on_report_done_
Definition: xds_client.cc:430
grpc_core::XdsClient::ChannelState::AdsCallState
Definition: xds_client.cc:147
absl::str_format_internal::LengthMod::q
@ q
grpc_core::XdsClient::ChannelState::LrsCallState::chand
ChannelState * chand() const
Definition: xds_client.cc:386
grpc_core::XdsClient::ChannelState::server_
const XdsBootstrap::XdsServer & server_
Definition: xds_client.h:228
gpr_strdup
GPRAPI char * gpr_strdup(const char *src)
Definition: string.cc:39
grpc_core::XdsApi::ParseAdsResponse
absl::Status ParseAdsResponse(const XdsBootstrap::XdsServer &server, const grpc_slice &encoded_response, AdsResponseParserInterface *parser)
Definition: xds_api.cc:358
internal
Definition: benchmark/test/output_test_helper.cc:20
grpc_core::XdsClient::ResourceState::watchers
std::map< ResourceWatcherInterface *, RefCountedPtr< ResourceWatcherInterface > > watchers
Definition: xds_client.h:246
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
grpc_call_start_batch_and_execute
grpc_call_error grpc_call_start_batch_and_execute(grpc_call *call, const grpc_op *ops, size_t nops, grpc_closure *closure)
Definition: call.cc:1847
recv_message_payload_
grpc_byte_buffer * recv_message_payload_
Definition: grpclb.cc:249
Duration
Definition: bloaty/third_party/protobuf/src/google/protobuf/duration.pb.h:69
work_serializer_
std::shared_ptr< WorkSerializer > work_serializer_
Definition: google_c2p_resolver.cc:134
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_op::grpc_op_data::recv_initial_metadata
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
grpc_core::XdsClient::AuthorityState
Definition: xds_client.h:253
grpc_core::CertificateProviderStore
Definition: certificate_provider_store.h:46
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
uri_parser.h
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::Result
Definition: xds_client.cc:173
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
absl::InlinedVector
Definition: abseil-cpp/absl/container/inlined_vector.h:69
GRPC_ARG_KEEPALIVE_TIME_MS
#define GRPC_ARG_KEEPALIVE_TIME_MS
Definition: grpc_types.h:240
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter::Orphan
void Orphan() override
Definition: xds_client.cc:1382
grpc_core::XdsClient::ChannelState::LrsCallState::Reporter
Definition: xds_client.cc:392
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
grpc_error
Definition: error_internal.h:42
parent_
RefCountedPtr< GrpcLb > parent_
Definition: grpclb.cc:438
upb_utils.h
GRPC_OP_RECV_STATUS_ON_CLIENT
@ GRPC_OP_RECV_STATUS_ON_CLIENT
Definition: grpc_types.h:627
grpc_op::grpc_op_data::grpc_op_recv_initial_metadata::recv_initial_metadata
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:685
grpc_core::XdsClient::ChannelState::RetryableCall::shutting_down_
bool shutting_down_
Definition: xds_client.cc:143
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
grpc_core::XdsClient::ChannelState::LrsCallState::OnInitialRequestSent
static void OnInitialRequestSent(void *arg, grpc_error_handle error)
Definition: xds_client.cc:1663
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
grpc_channel_args_find_integer
int grpc_channel_args_find_integer(const grpc_channel_args *args, const char *name, const grpc_integer_options options)
Definition: channel_args.cc:425
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
grpc_core::XdsClient::ChannelState::AdsCallState::on_status_received_
grpc_closure on_status_received_
Definition: xds_client.cc:364
method
NSString * method
Definition: ProtoMethod.h:28
grpc_core::CppImplOf< Channel, grpc_channel >::FromC
static Channel * FromC(grpc_channel *c_type)
Definition: cpp_impl_of.h:30
absl::Status::code
absl::StatusCode code() const
Definition: third_party/abseil-cpp/absl/status/status.cc:233
grpc_core::XdsClient::XdsResourceName
Definition: xds_client.h:178
grpc_core::XdsClient::ChannelState::RetryableCall::RetryableCall
RetryableCall(WeakRefCountedPtr< ChannelState > chand)
Definition: xds_client.cc:666
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::Result::type_url
std::string type_url
Definition: xds_client.cc:175
grpc_core::XdsClient::ChannelState::ads_calld
AdsCallState * ads_calld() const
Definition: xds_client.cc:569
grpc_core::XdsClient::MaybeRegisterResourceTypeLocked
void MaybeRegisterResourceTypeLocked(const XdsResourceType *resource_type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: xds_client.cc:2035
grpc_core::AsyncConnectivityStateWatcherInterface
Definition: src/core/lib/transport/connectivity_state.h:64
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::OnTimerLocked
void OnTimerLocked(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient
Definition: xds_client.cc:272
grpc_core::XdsClient::DumpClientConfigBinary
std::string DumpClientConfigBinary()
Definition: xds_client.cc:2376
grpc_closure
Definition: closure.h:56
grpc_channel_arg_pointer_create
grpc_arg grpc_channel_arg_pointer_create(char *name, void *value, const grpc_arg_pointer_vtable *vtable)
Definition: channel_args.cc:492
grpc_core::XdsClient::ChannelState::lrs_calld_
OrphanablePtr< RetryableCall< LrsCallState > > lrs_calld_
Definition: xds_client.h:237
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::update_time_
const Timestamp update_time_
Definition: xds_client.cc:201
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
grpc_core::XdsClient::ChannelState::shutting_down_
bool shutting_down_
Definition: xds_client.h:232
grpc_core::XdsClient::NotifyWatchersOnResourceDoesNotExist
void NotifyWatchersOnResourceDoesNotExist(const std::map< ResourceWatcherInterface *, RefCountedPtr< ResourceWatcherInterface >> &watchers)
Definition: xds_client.cc:2279
grpc_byte_buffer_reader
Definition: impl/codegen/byte_buffer_reader.h:30
ops
static grpc_op ops[6]
Definition: test/core/fling/client.cc:39
grpc_core::XdsClient::request_timeout_
const Duration request_timeout_
Definition: xds_client.h:318
grpc_core::XdsClient::ChannelState::AdsCallState::AdsResponseParser::ads_call_state_
AdsCallState * ads_call_state_
Definition: xds_client.cc:200
grpc_core::XdsApi::ClusterLoadReport::locality_stats
std::map< RefCountedPtr< XdsLocalityName >, XdsClusterLocalityStats::Snapshot, XdsLocalityName::Less > locality_stats
Definition: xds_api.h:84
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::ResourceTimer
ResourceTimer(const XdsResourceType *type, const XdsResourceName &name)
Definition: xds_client.cc:207
grpc_core::internal::SetXdsChannelArgsForTest
void SetXdsChannelArgsForTest(grpc_channel_args *args)
Definition: xds_client.cc:2513
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
grpc_core::XdsClient::ChannelState::RetryableCall::chand_
WeakRefCountedPtr< ChannelState > chand_
Definition: xds_client.cc:135
grpc_core::XdsClient::ResourceState
Definition: xds_client.h:244
grpc_core::XdsClient::AuthorityState::channel_state
RefCountedPtr< ChannelState > channel_state
Definition: xds_client.h:254
grpc_core::XdsClusterLocalityStats::GetSnapshotAndReset
Snapshot GetSnapshotAndReset()
Definition: xds_client_stats.cc:133
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
sync.h
grpc_core::XdsClient::ChannelState::LrsCallState::on_status_received_
grpc_closure on_status_received_
Definition: xds_client.cc:467
grpc_core::XdsClient::ChannelState::AdsCallState::recv_message_payload_
grpc_byte_buffer * recv_message_payload_
Definition: xds_client.cc:357
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::MaybeCancelTimer
void MaybeCancelTimer()
Definition: xds_client.cc:241
grpc_core::XdsClient::bootstrap
const XdsBootstrap & bootstrap() const
Definition: xds_client.h:88
grpc_channel_args_copy_and_add
grpc_channel_args * grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:224
grpc_core::XdsApi::ClusterLoadReportMap
std::map< std::pair< std::string, std::string >, ClusterLoadReport > ClusterLoadReportMap
Definition: xds_api.h:89
call.h
grpc_byte_buffer_reader_destroy
GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
Definition: byte_buffer_reader.cc:45
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
GRPC_ERROR_INT_GRPC_STATUS
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
Definition: error.h:66
grpc_slice_unref_internal
void grpc_slice_unref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:39
grpc_metadata_array_init
GRPCAPI void grpc_metadata_array_init(grpc_metadata_array *array)
Definition: metadata_array.cc:30
grpc_core::XdsClient::ChannelState::RetryableCall::retry_timer_
grpc_timer retry_timer_
Definition: xds_client.cc:139
retry_timer_
grpc_timer retry_timer_
Definition: retry_filter.cc:606
grpc_core::XdsClient::ChannelState
Definition: xds_client.h:185
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
parse_error
@ parse_error
Definition: pem_info.c:88
grpc_core::XdsClient::ChannelState::AdsCallState::ResourceTimer::OnTimer
static void OnTimer(void *arg, grpc_error_handle error)
Definition: xds_client.cc:261
channel.h
ABSL_NO_THREAD_SAFETY_ANALYSIS
#define ABSL_NO_THREAD_SAFETY_ANALYSIS
Definition: abseil-cpp/absl/base/thread_annotations.h:280
port_platform.h
grpc_call_cancel_internal
void grpc_call_cancel_internal(grpc_call *call)
Definition: call.cc:1806
grpc_core::XdsClient::ChannelState::RetryableCall::OnCallFinishedLocked
void OnCallFinishedLocked()
Definition: xds_client.cc:691
absl::ConsumePrefix
ABSL_NAMESPACE_BEGIN bool ConsumePrefix(absl::string_view *str, absl::string_view expected)
Definition: abseil-cpp/absl/strings/strip.h:46


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:57