27 #include "absl/container/inlined_vector.h"
28 #include "absl/strings/match.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/str_split.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/strings/strip.h"
36 #include <grpc/byte_buffer.h>
83 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
84 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
85 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
86 #define GRPC_XDS_RECONNECT_JITTER 0.2
87 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
110 template <
typename T>
210 grpc_schedule_on_exec_ctx);
227 auto& authority_state =
230 if (
state.resource !=
nullptr)
return;
267 self->ads_calld_->xds_client()->work_serializer_.DrainQueue();
268 self->ads_calld_.reset();
278 "[xds_client %p] xds server %s: timeout obtaining resource "
279 "{type=%s name=%s} from xds server",
281 ads_calld_->chand()->server_.server_uri.c_str(),
287 auto& authority_state =
291 ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
316 std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
327 bool OnResponseReceivedLocked()
333 bool IsCurrentCallOnChannel()
const;
383 void MaybeStartReportingLocked();
397 grpc_schedule_on_exec_ctx);
399 grpc_schedule_on_exec_ctx);
400 ScheduleNextReportLocked();
406 void ScheduleNextReportLocked()
416 bool IsCurrentReporterOnCall()
const {
417 return this ==
parent_->reporter_.get();
426 bool last_report_counters_were_zero_ =
false;
427 bool next_report_timer_callback_pending_ =
false;
434 void OnInitialRequestSentLocked()
437 bool OnResponseReceivedLocked()
443 bool IsCurrentCallOnChannel()
const;
491 if (!
parent_->shutting_down_ &&
495 "[xds_client %p] xds channel for server %s in "
496 "state TRANSIENT_FAILURE: %s",
499 parent_->xds_client_->NotifyOnErrorLocked(
501 "xds channel in TRANSIENT_FAILURE, connectivity error: ",
505 parent_->xds_client()->work_serializer_.DrainQueue();
521 server.channel_creds_type,
server.channel_creds_config);
542 StartConnectivityWatchLocked();
547 gpr_log(
GPR_INFO,
"[xds_client %p] destroying xds channel %p for server %s",
548 xds_client(),
this,
server_.server_uri.c_str());
560 CancelConnectivityWatchLocked();
571 return ads_calld_->calld();
576 return lrs_calld_->calld();
580 return ads_calld_ !=
nullptr && ads_calld_->calld() !=
nullptr;
584 if (lrs_calld_ !=
nullptr)
return;
589 void XdsClient::ChannelState::StopLrsCallLocked() {
604 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
606 xds_client()->NotifyOnErrorLocked(
610 ClientChannel* client_channel =
614 client_channel->AddConnectivityWatcher(
616 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(
watcher_));
619 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
623 ClientChannel* client_channel =
626 client_channel->RemoveConnectivityWatcher(
watcher_);
629 void XdsClient::ChannelState::SubscribeLocked(
const XdsResourceType*
type,
630 const XdsResourceName&
name) {
631 if (ads_calld_ ==
nullptr) {
633 ads_calld_.reset(
new RetryableCall<AdsCallState>(
642 if (ads_calld() ==
nullptr)
return;
644 ads_calld()->SubscribeLocked(
type,
name,
false);
647 void XdsClient::ChannelState::UnsubscribeLocked(
const XdsResourceType*
type,
648 const XdsResourceName&
name,
649 bool delay_unsubscription) {
650 if (ads_calld_ !=
nullptr) {
651 auto* calld = ads_calld_->calld();
652 if (calld !=
nullptr) {
653 calld->UnsubscribeLocked(
type,
name, delay_unsubscription);
654 if (!calld->HasSubscribedResources()) {
665 template <
typename T>
678 grpc_schedule_on_exec_ctx);
679 StartNewCallLocked();
682 template <
typename T>
690 template <
typename T>
693 if (
calld_->seen_response()) backoff_.Reset();
696 StartRetryTimerLocked();
699 template <
typename T>
707 "[xds_client %p] xds server %s: start new call from retryable call %p",
708 chand()->xds_client(), chand()->
server_.server_uri.c_str(),
this);
710 calld_ = MakeOrphanable<T>(
714 template <
typename T>
717 const Timestamp next_attempt_time = backoff_.NextAttemptTime();
722 "[xds_client %p] xds server %s: call attempt failed; "
723 "retry timer will fire in %" PRId64
"ms.",
724 chand()->xds_client(), chand()->
server_.server_uri.c_str(),
732 template <
typename T>
743 template <
typename T>
750 "[xds_client %p] xds server %s: retry timer fired (retryable "
752 chand()->xds_client(), chand()->
server_.server_uri.c_str(),
this);
754 StartNewCallLocked();
763 absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser::
764 ProcessAdsResponseFields(AdsResponseFields
fields) {
768 "[xds_client %p] xds server %s: received ADS response: type_url=%s, "
769 "version=%s, nonce=%s, num_resources=%" PRIuPTR,
777 if (result_.type ==
nullptr) {
790 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
792 XdsApi::ResourceMetadata resource_metadata;
793 resource_metadata.serialized_proto =
std::move(serialized_proto);
794 resource_metadata.update_time = update_time;
796 resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
797 return resource_metadata;
804 XdsApi::ResourceMetadata* resource_metadata) {
805 resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
806 resource_metadata->failed_version =
version;
807 resource_metadata->failed_details =
details;
808 resource_metadata->failed_update_time = update_time;
813 void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
818 if (!result_.type->IsType(
type_url, &is_v2)) {
819 result_.errors.emplace_back(
821 type_url,
" (should be ", result_.type_url,
")"));
826 result_.type->Decode(
context, serialized_resource, is_v2);
828 result_.errors.emplace_back(
834 xds_client()->ParseXdsResourceName(
result->name, result_.type);
835 if (!resource_name.ok()) {
837 "resource index ",
idx,
": Cannot parse xDS resource name \"",
842 auto timer_it = ads_call_state_->
state_map_.find(result_.type);
843 if (timer_it != ads_call_state_->
state_map_.end()) {
845 timer_it->second.subscribed_resources.find(resource_name->authority);
846 if (
it != timer_it->second.subscribed_resources.end()) {
847 auto res_it =
it->second.find(resource_name->key);
848 if (res_it !=
it->second.end()) {
849 res_it->second->MaybeCancelTimer();
855 xds_client()->authority_state_map_.find(resource_name->authority);
856 if (authority_it == xds_client()->authority_state_map_.end()) {
860 AuthorityState& authority_state = authority_it->second;
861 auto type_it = authority_state.resource_map.find(result_.type);
862 if (type_it == authority_state.resource_map.end()) {
865 auto& type_map = type_it->second;
867 auto it = type_map.find(resource_name->key);
868 if (
it == type_map.end()) {
871 ResourceState& resource_state =
it->second;
873 if (result_.type->AllResourcesRequiredInSotW()) {
874 result_.resources_seen[resource_name->authority].insert(resource_name->key);
878 if (resource_state.ignored_deletion) {
880 "[xds_client %p] xds server %s: server returned new version of "
881 "resource for which we previously ignored a deletion: type %s "
885 resource_state.ignored_deletion =
false;
888 if (!
result->resource.ok()) {
890 "resource index ",
idx,
": ",
result->name,
891 ": validation error: ",
result->resource.status().ToString()));
892 xds_client()->NotifyWatchersOnErrorLocked(
893 resource_state.watchers,
895 "invalid resource: ",
result->resource.status().ToString())));
896 UpdateResourceMetadataNacked(result_.version,
897 result->resource.status().ToString(),
898 update_time_, &resource_state.meta);
902 result_.have_valid_resources =
true;
904 if (resource_state.resource !=
nullptr &&
905 result_.type->ResourcesEqual(resource_state.resource.get(),
906 result->resource->get())) {
909 "[xds_client %p] %s resource %s identical to current, ignoring.",
910 xds_client(), result_.type_url.c_str(),
result->name.c_str());
916 resource_state.meta = CreateResourceMetadataAcked(
917 std::string(serialized_resource), result_.version, update_time_);
919 auto& watchers_list = resource_state.watchers;
921 result_.type->CopyResource(resource_state.resource.get()).release();
922 xds_client()->work_serializer_.Schedule(
923 [watchers_list,
value]()
925 for (
const auto&
p : watchers_list) {
926 p.first->OnGenericResourceChanged(
value);
937 XdsClient::ChannelState::AdsCallState::AdsCallState(
951 ?
"/envoy.service.discovery.v3.AggregatedDiscoveryService/"
952 "StreamAggregatedResources"
953 :
"/envoy.service.discovery.v2.AggregatedDiscoveryService/"
954 "StreamAggregatedResources";
967 "[xds_client %p] xds server %s: starting ADS call "
968 "(calld: %p, call: %p)",
988 grpc_schedule_on_exec_ctx);
989 for (
const auto&
a :
xds_client()->authority_state_map_) {
992 if (
a.second.channel_state !=
chand())
continue;
993 for (
const auto& t :
a.second.resource_map) {
995 for (
const auto&
r : t.second) {
997 SubscribeLocked(
type, {authority, resource_key},
true);
1002 SendMessageLocked(p.first);
1020 grpc_schedule_on_exec_ctx);
1037 grpc_schedule_on_exec_ctx);
1065 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
1070 buffered_requests_.insert(
type);
1080 !sent_initial_message_);
1081 sent_initial_message_ =
true;
1084 "[xds_client %p] xds server %s: sending ADS request: type=%s "
1085 "version=%s nonce=%s error=%s",
1104 grpc_schedule_on_exec_ctx);
1109 "[xds_client %p] xds server %s: error starting ADS send_message "
1110 "batch on calld=%p: call_error=%d",
1117 void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
1118 const XdsResourceType*
type,
const XdsResourceName&
name,
bool delay_send) {
1120 if (
state ==
nullptr) {
1122 if (!delay_send) SendMessageLocked(
type);
1126 void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
1127 const XdsResourceType*
type,
const XdsResourceName&
name,
1128 bool delay_unsubscription) {
1129 auto& type_state_map = state_map_[
type];
1130 auto& authority_map = type_state_map.subscribed_resources[
name.authority];
1131 authority_map.erase(
name.key);
1132 if (authority_map.empty()) {
1133 type_state_map.subscribed_resources.erase(
name.authority);
1135 if (!delay_unsubscription) SendMessageLocked(
type);
1139 for (
const auto& p : state_map_) {
1140 if (!
p.second.subscribed_resources.empty())
return true;
1147 AdsCallState*
ads_calld =
static_cast<AdsCallState*
>(
arg);
1155 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1170 auto it = buffered_requests_.begin();
1171 if (
it != buffered_requests_.end()) {
1172 SendMessageLocked(*
it);
1173 buffered_requests_.erase(
it);
1179 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1181 AdsCallState*
ads_calld =
static_cast<AdsCallState*
>(
arg);
1191 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1204 AdsResponseParser
parser(
this);
1211 "[xds_client %p] xds server %s: error parsing ADS response (%s) "
1216 seen_response_ =
true;
1222 if (!
result.errors.empty()) {
1226 "[xds_client %p] xds server %s: ADS response invalid for resource "
1227 "type %s version %s, will NACK: nonce=%s error=%s",
1237 if (
result.type->AllResourcesRequiredInSotW()) {
1238 for (
auto& a :
xds_client()->authority_state_map_) {
1240 AuthorityState& authority_state =
a.second;
1242 if (authority_state.channel_state != chand())
continue;
1243 auto seen_authority_it =
result.resources_seen.find(authority);
1245 auto type_it = authority_state.resource_map.find(
result.type);
1246 if (type_it == authority_state.resource_map.end())
continue;
1248 for (
auto&
r : type_it->second) {
1249 const XdsResourceKey& resource_key =
r.first;
1250 ResourceState& resource_state =
r.second;
1251 if (seen_authority_it ==
result.resources_seen.end() ||
1252 seen_authority_it->second.find(resource_key) ==
1253 seen_authority_it->second.end()) {
1261 if (resource_state.resource ==
nullptr)
continue;
1263 if (!resource_state.ignored_deletion) {
1265 "[xds_client %p] xds server %s: ignoring deletion "
1266 "for resource type %s name %s",
1270 authority,
result.type_url.c_str(), resource_key)
1272 resource_state.ignored_deletion =
true;
1275 resource_state.resource.reset();
1277 resource_state.watchers);
1284 if (
result.have_valid_resources) {
1285 chand()->resource_type_version_map_[
result.type] =
1288 auto& lrs_call = chand()->lrs_calld_;
1289 if (lrs_call !=
nullptr) {
1290 LrsCallState*
lrs_calld = lrs_call->calld();
1295 SendMessageLocked(
result.type);
1313 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1315 AdsCallState*
ads_calld =
static_cast<AdsCallState*
>(
arg);
1324 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1329 "[xds_client %p] xds server %s: ADS call status received "
1330 "(chand=%p, ads_calld=%p, call=%p): "
1331 "status=%d, details='%s', error='%s'",
1333 call_, status_code_, status_details,
1338 if (IsCurrentCallOnChannel()) {
1340 parent_->OnCallFinishedLocked();
1343 "xDS call failed: xDS server: %s, ADS call status code=%d, "
1344 "details='%s', error='%s'",
1351 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel()
const {
1354 if (chand()->
ads_calld_ ==
nullptr)
return false;
1355 return this == chand()->ads_calld_->calld();
1358 std::vector<std::string>
1359 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1360 const XdsResourceType*
type) {
1361 std::vector<std::string> resource_names;
1362 auto it = state_map_.find(
type);
1363 if (
it != state_map_.end()) {
1364 for (
auto& a :
it->second.subscribed_resources) {
1366 for (
auto& p :
a.second) {
1367 const XdsResourceKey& resource_key =
p.first;
1369 authority,
type->type_url(), resource_key));
1370 OrphanablePtr<ResourceTimer>& resource_timer =
p.second;
1375 return resource_names;
1383 if (next_report_timer_callback_pending_) {
1388 void XdsClient::ChannelState::LrsCallState::Reporter::
1389 ScheduleNextReportLocked() {
1392 &on_next_report_timer_);
1393 next_report_timer_callback_pending_ =
true;
1396 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1398 Reporter*
self =
static_cast<Reporter*
>(
arg);
1407 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1409 next_report_timer_callback_pending_ =
false;
1414 return SendReportLocked();
1420 for (
const auto& p : snapshot) {
1421 const XdsApi::ClusterLoadReport& cluster_snapshot =
p.second;
1422 if (!cluster_snapshot.dropped_requests.IsZero())
return false;
1423 for (
const auto& q : cluster_snapshot.locality_stats) {
1424 const XdsClusterLocalityStats::Snapshot& locality_snapshot =
q.second;
1425 if (!locality_snapshot.IsZero())
return false;
1433 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1441 const bool old_val = last_report_counters_were_zero_;
1442 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1443 if (old_val && last_report_counters_were_zero_) {
1444 auto it =
xds_client()->xds_load_report_server_map_.find(
1446 if (
it ==
xds_client()->xds_load_report_server_map_.end() ||
1447 it->second.load_report_map.empty()) {
1448 it->second.channel_state->StopLrsCallLocked();
1451 ScheduleNextReportLocked();
1457 parent_->send_message_payload_ =
1466 parent_->call_, &
op, 1, &on_report_done_);
1469 "[xds_client %p] xds server %s: error starting LRS send_message "
1470 "batch on calld=%p: call_error=%d",
1480 Reporter*
self =
static_cast<Reporter*
>(
arg);
1489 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1492 parent_->send_message_payload_ =
nullptr;
1496 if (
it ==
xds_client()->xds_load_report_server_map_.end() ||
1497 it->second.load_report_map.empty()) {
1498 it->second.channel_state->StopLrsCallLocked();
1506 if (!IsCurrentReporterOnCall()) {
1507 parent_->MaybeStartReportingLocked();
1511 ScheduleNextReportLocked();
1532 ?
"/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats"
1533 :
"/envoy.service.load_stats.v2.LoadReportingService/StreamLoadStats";
1553 "[xds_client %p] xds server %s: starting LRS call (calld=%p, call=%p)",
1577 grpc_schedule_on_exec_ctx);
1597 grpc_schedule_on_exec_ctx);
1614 grpc_schedule_on_exec_ctx);
1644 if (reporter_ !=
nullptr)
return;
1649 if (!seen_response())
return;
1655 !chand()->
ads_calld_->calld()->seen_response()) {
1659 reporter_ = MakeOrphanable<Reporter>(
1668 lrs_calld->OnInitialRequestSentLocked();
1673 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1677 MaybeStartReportingLocked();
1680 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1682 LrsCallState*
lrs_calld =
static_cast<LrsCallState*
>(
arg);
1691 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1706 bool send_all_clusters =
false;
1707 std::set<std::string> new_cluster_names;
1708 Duration new_load_reporting_interval;
1710 response_slice, &send_all_clusters, &new_cluster_names,
1711 &new_load_reporting_interval);
1714 "[xds_client %p] xds server %s: LRS response parsing failed: %s",
1720 seen_response_ =
true;
1724 "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
1725 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1728 new_cluster_names.size(), send_all_clusters,
1729 new_load_reporting_interval.millis());
1731 for (
const auto&
name : new_cluster_names) {
1736 if (new_load_reporting_interval <
1743 "[xds_client %p] xds server %s: increased load_report_interval "
1744 "to minimum value %dms",
1750 if (send_all_clusters == send_all_clusters_ &&
1751 cluster_names_ == new_cluster_names &&
1752 load_reporting_interval_ == new_load_reporting_interval) {
1756 "[xds_client %p] xds server %s: incoming LRS response identical "
1757 "to current, ignoring.",
1765 send_all_clusters_ = send_all_clusters;
1766 cluster_names_ =
std::move(new_cluster_names);
1767 load_reporting_interval_ = new_load_reporting_interval;
1769 MaybeStartReportingLocked();
1788 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1790 LrsCallState*
lrs_calld =
static_cast<LrsCallState*
>(
arg);
1798 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1804 "[xds_client %p] xds server %s: LRS call status received "
1805 "(chand=%p, calld=%p, call=%p): "
1806 "status=%d, details='%s', error='%s'",
1808 call_, status_code_, status_details,
1813 if (IsCurrentCallOnChannel()) {
1815 parent_->OnCallFinishedLocked();
1820 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel()
const {
1823 if (chand()->
lrs_calld_ ==
nullptr)
return false;
1824 return this == chand()->lrs_calld_->calld();
1836 {15000, 0, INT_MAX}));
1846 args_to_add.
size());
1864 &
bootstrap_->certificate_providers(), &symtab_) {
1890 if (g_xds_client ==
this) g_xds_client =
nullptr;
1896 authority_state_map_.clear();
1897 invalid_watchers_.clear();
1903 auto it = xds_server_channel_map_.find(
server);
1904 if (
it != xds_server_channel_map_.end()) {
1908 auto channel_state = MakeRefCounted<ChannelState>(
1910 xds_server_channel_map_[
server] = channel_state.get();
1911 return channel_state;
1923 invalid_watchers_[w] =
watcher;
1934 if (!resource_name.ok()) {
1936 "Unable to parse resource name for listener %s",
name)));
1944 if (authority ==
nullptr) {
1947 "\" not present in bootstrap config")));
1950 if (!authority->xds_servers.empty()) {
1951 xds_server = &authority->xds_servers[0];
1954 if (xds_server ==
nullptr) xds_server = &
bootstrap_->server();
1959 authority_state_map_[resource_name->authority];
1965 if (resource_state.
resource !=
nullptr) {
1968 "[xds_client %p] returning cached listener data for %s",
this,
1993 bool delay_unsubscription) {
1998 invalid_watchers_.erase(
watcher);
2000 if (!resource_name.ok())
return;
2001 auto authority_it = authority_state_map_.find(resource_name->authority);
2002 if (authority_it == authority_state_map_.end())
return;
2006 if (type_it == authority_state.
resource_map.end())
return;
2007 auto& type_map = type_it->second;
2009 auto resource_it = type_map.find(resource_name->key);
2010 if (resource_it == type_map.end())
return;
2015 if (resource_state.
watchers.empty()) {
2018 "[xds_client %p] unsubscribing from a resource for which we "
2019 "previously ignored a deletion: type %s name %s",
2024 delay_unsubscription);
2025 type_map.erase(resource_it);
2026 if (type_map.empty()) {
2037 auto it = resource_types_.find(resource_type->
type_url());
2038 if (
it != resource_types_.end()) {
2042 resource_types_.emplace(resource_type->
type_url(), resource_type);
2043 v2_resource_types_.emplace(resource_type->
v2_type_url(), resource_type);
2049 auto it = resource_types_.find(resource_type);
2050 if (
it != resource_types_.end())
return it->second;
2051 auto it2 = v2_resource_types_.find(resource_type);
2052 if (it2 != v2_resource_types_.end())
return it2->second;
2065 if (!uri.ok())
return uri.status();
2067 std::pair<absl::string_view, absl::string_view> path_parts =
absl::StrSplit(
2069 if (!
type->IsType(path_parts.first,
nullptr)) {
2071 "xdstp URI path must indicate valid xDS resource type");
2074 std::vector<URI::QueryParam> query_params;
2075 for (
const auto& p : uri->query_parameter_map()) {
2076 query_params.emplace_back(
2090 key.query_params,
"");
2092 return uri->ToString();
2101 if (!
bootstrap_->XdsServerExists(xds_server))
return nullptr;
2112 if (server_it->second.channel_state ==
nullptr) {
2115 auto load_report_it = server_it->second.load_report_map
2120 if (load_report_state.
drop_stats !=
nullptr) {
2123 if (cluster_drop_stats ==
nullptr) {
2124 if (load_report_state.
drop_stats !=
nullptr) {
2128 cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
2130 load_report_it->first.first ,
2131 load_report_it->first.second );
2134 server_it->second.channel_state->MaybeStartLrsCall();
2135 return cluster_drop_stats;
2143 auto server_it = xds_load_report_server_map_.find(xds_server);
2144 if (server_it == xds_load_report_server_map_.end())
return;
2145 auto load_report_it = server_it->second.load_report_map.find(
2147 if (load_report_it == server_it->second.load_report_map.end())
return;
2149 if (load_report_state.
drop_stats == cluster_drop_stats) {
2162 if (!
bootstrap_->XdsServerExists(xds_server))
return nullptr;
2173 if (server_it->second.channel_state ==
nullptr) {
2176 auto load_report_it = server_it->second.load_report_map
2186 if (cluster_locality_stats ==
nullptr) {
2191 cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2193 load_report_it->first.first ,
2194 load_report_it->first.second ,
std::move(locality));
2197 server_it->second.channel_state->MaybeStartLrsCall();
2198 return cluster_locality_stats;
2207 auto server_it = xds_load_report_server_map_.find(xds_server);
2208 if (server_it == xds_load_report_server_map_.end())
return;
2209 auto load_report_it = server_it->second.load_report_map.find(
2211 if (load_report_it == server_it->second.load_report_map.end())
return;
2213 auto locality_it = load_report_state.
locality_stats.find(locality);
2214 if (locality_it == load_report_state.
locality_stats.end())
return;
2227 for (
auto& p : xds_server_channel_map_) {
2234 if (node !=
nullptr) {
2239 std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
2240 for (
const auto&
a : authority_state_map_) {
2241 for (
const auto& t :
a.second.resource_map) {
2242 for (
const auto&
r : t.second) {
2243 for (
const auto& w :
r.second.watchers) {
2244 watchers.insert(w.second);
2253 for (
const auto&
watcher : watchers) {
2265 if (node !=
nullptr) {
2272 for (
const auto& p : watchers) {
2273 p.first->OnError(
status);
2284 for (
const auto& p : watchers) {
2285 p.first->OnResourceDoesNotExist();
2293 const std::set<std::string>& clusters) {
2295 gpr_log(
GPR_INFO,
"[xds_client %p] start building load report",
this);
2298 auto server_it = xds_load_report_server_map_.find(xds_server);
2299 if (server_it == xds_load_report_server_map_.end())
return snapshot_map;
2300 auto& load_report_map = server_it->second.load_report_map;
2301 for (
auto load_report_it = load_report_map.begin();
2302 load_report_it != load_report_map.end();) {
2304 const auto& cluster_key = load_report_it->first;
2313 const bool record_stats =
2314 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2323 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2324 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2332 auto& locality_state =
it->second;
2335 locality_snapshot =
std::move(locality_state.deleted_locality_stats);
2336 if (locality_state.locality_stats !=
nullptr) {
2337 locality_snapshot +=
2338 locality_state.locality_stats->GetSnapshotAndReset();
2341 "[xds_client %p] cluster=%s eds_service_name=%s "
2342 "locality=%s locality_stats=%p",
2343 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2344 locality_name->AsHumanReadableString().c_str(),
2345 locality_state.locality_stats);
2350 if (locality_state.locality_stats ==
nullptr) {
2362 snapshot_map[cluster_key] =
std::move(snapshot);
2368 load_report_it = load_report_map.erase(load_report_it);
2373 return snapshot_map;
2379 for (
const auto&
a : authority_state_map_) {
2381 for (
const auto& t :
a.second.resource_map) {
2383 auto& resource_metadata_map =
2384 resource_type_metadata_map[
type->type_url()];
2385 for (
const auto&
r : t.second) {
2389 authority,
type->type_url(), resource_key)] = &resource_state.
meta;
2410 gpr_free(g_fallback_bootstrap_config);
2411 g_fallback_bootstrap_config =
nullptr;
2420 std::string GetBootstrapContents(
const char* fallback_config,
2424 if (
path !=
nullptr) {
2427 "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
2428 "environment variable: %s",
2437 return contents_str;
2440 UniquePtr<char> env_config(
gpr_getenv(
"GRPC_XDS_BOOTSTRAP_CONFIG"));
2441 if (env_config !=
nullptr) {
2444 "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
2445 "environment variable");
2447 return env_config.get();
2450 if (fallback_config !=
nullptr) {
2454 return fallback_config;
2458 "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
2472 if (bootstrap_config !=
nullptr) {
2473 std::unique_ptr<XdsBootstrap>
bootstrap =
2477 grpc_channel_args_find_pointer<grpc_channel_args>(
2487 if (g_xds_client !=
nullptr) {
2488 auto xds_client = g_xds_client->RefIfNonZero();
2489 if (xds_client !=
nullptr)
return xds_client;
2493 GetBootstrapContents(g_fallback_bootstrap_config,
error);
2497 bootstrap_contents.c_str());
2500 std::unique_ptr<XdsBootstrap>
bootstrap =
2506 g_xds_client = xds_client.
get();
2515 g_channel_args =
args;
2520 g_xds_client =
nullptr;
2525 gpr_free(g_fallback_bootstrap_config);
2535 #define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client"
2539 void* XdsClientArgCopy(
void* p) {
2540 XdsClient* xds_client =
static_cast<XdsClient*
>(p);
2545 void XdsClientArgDestroy(
void* p) {
2546 XdsClient* xds_client =
static_cast<XdsClient*
>(p);
2550 int XdsClientArgCmp(
void* p,
void* q) {
return QsortCompare(p, q); }
2553 XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp};
2560 &kXdsClientArgVtable);
2567 if (xds_client ==
nullptr)
return nullptr;