27 #include <type_traits>
31 #include "absl/memory/memory.h"
32 #include "absl/meta/type_traits.h"
33 #include "absl/random/random.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/match.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_format.h"
39 #include "absl/strings/str_join.h"
40 #include "absl/strings/str_replace.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/strings/strip.h"
43 #include "absl/types/optional.h"
44 #include "absl/types/variant.h"
49 #define XXH_INLINE_ALL
105 std::string GetDefaultAuthorityInternal(
const URI& uri) {
110 size_t pos = uri.path().find_last_of(
'/');
111 if (
pos == uri.path().npos)
return uri.path();
112 return uri.path().substr(
pos + 1);
117 const char* authority =
119 if (authority !=
nullptr)
return authority;
120 return GetDefaultAuthorityInternal(uri);
127 class XdsResolver :
public Resolver {
129 explicit XdsResolver(ResolverArgs
args)
140 "[xds_resolver %p] created for URI %s; data plane authority is %s",
145 ~XdsResolver()
override {
152 void StartLocked()
override;
154 void ShutdownLocked()
override;
156 void ResetBackoffLocked()
override {
161 class ListenerWatcher :
public XdsListenerResourceType::WatcherInterface {
163 explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
165 void OnResourceChanged(XdsListenerResource listener)
override {
170 [
this, listener]()
mutable {
185 void OnResourceDoesNotExist()
override {
191 ": xDS listener resource does not exist"));
201 class RouteConfigWatcher
202 :
public XdsRouteConfigResourceType::WatcherInterface {
204 explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
206 void OnResourceChanged(XdsRouteConfigResource route_config)
override {
211 [
this, route_config]()
mutable {
226 void OnResourceDoesNotExist()
override {
232 ": xDS route configuration resource does not exist"));
248 class ClusterState :
public DualRefCounted<ClusterState> {
250 using ClusterStateMap =
251 std::map<std::string, WeakRefCountedPtr<ClusterState>>;
253 ClusterState(RefCountedPtr<XdsResolver> resolver,
259 void Orphan()
override {
261 resolver->work_serializer_->Run(
263 resolver->MaybeRemoveUnusedClusters();
279 class XdsCallDispatchController
280 :
public ConfigSelector::CallDispatchController {
282 explicit XdsCallDispatchController(
283 RefCountedPtr<ClusterState> cluster_state)
286 bool ShouldRetry()
override {
291 void Commit()
override {
304 class XdsConfigSelector :
public ConfigSelector {
306 XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
308 ~XdsConfigSelector()
override;
310 const char*
name()
const override {
return "XdsConfigSelector"; }
312 bool Equals(
const ConfigSelector* other)
const override {
313 const auto* other_xds =
static_cast<const XdsConfigSelector*
>(other);
319 CallConfig GetCallConfig(GetCallConfigArgs
args)
override;
321 std::vector<const grpc_channel_filter*> GetFilters()
override {
329 struct ClusterWeightState {
334 bool operator==(
const ClusterWeightState& other)
const;
337 XdsRouteConfigResource::Route
route;
343 using RouteTable = std::vector<Route>;
345 class RouteListIterator;
349 const XdsRouteConfigResource::Route&
route,
350 const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
356 std::map<absl::string_view, RefCountedPtr<ClusterState>>
clusters_;
360 void OnListenerUpdate(XdsListenerResource listener);
361 void OnRouteConfigUpdate(XdsRouteConfigResource
rds_update);
366 void GenerateResult();
367 void MaybeRemoveUnusedClusters();
400 bool MethodConfigsEqual(
const ServiceConfig* sc1,
const ServiceConfig* sc2) {
401 if (sc1 ==
nullptr)
return sc2 ==
nullptr;
402 if (sc2 ==
nullptr)
return false;
403 return sc1->json_string() == sc2->json_string();
407 const ClusterWeightState& other)
const {
409 MethodConfigsEqual(
method_config.get(), other.method_config.get());
413 const Route& other)
const {
414 return route == other.route &&
416 MethodConfigsEqual(
method_config.get(), other.method_config.get());
421 class XdsResolver::XdsConfigSelector::RouteListIterator
422 :
public XdsRouting::RouteListIterator {
424 explicit RouteListIterator(
const RouteTable* route_table)
427 size_t Size()
const override {
return route_table_->size(); }
429 const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute(
430 size_t index)
const override {
442 XdsResolver::XdsConfigSelector::XdsConfigSelector(
466 route_entry.route =
route;
468 absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
469 &route_entry.route.action);
470 if (route_action !=
nullptr) {
473 if (!route_action->max_stream_duration.has_value()) {
474 route_action->max_stream_duration =
475 resolver_->current_listener_.http_connection_manager
476 .http_max_stream_duration;
478 if (route_action->action.index() ==
479 XdsRouteConfigResource::Route::RouteAction::kClusterIndex) {
480 *
error = CreateMethodConfig(route_entry.route,
nullptr,
481 &route_entry.method_config);
485 XdsRouteConfigResource::Route::RouteAction::kClusterIndex>(
486 route_action->action)));
487 }
else if (route_action->action.index() ==
488 XdsRouteConfigResource::Route::RouteAction::
489 kWeightedClustersIndex) {
490 auto& action_weighted_clusters =
absl::get<
491 XdsRouteConfigResource::Route::RouteAction::kWeightedClustersIndex>(
492 route_action->action);
494 for (
const auto& weighted_cluster : action_weighted_clusters) {
495 Route::ClusterWeightState cluster_weight_state;
496 *
error = CreateMethodConfig(route_entry.route, &weighted_cluster,
497 &cluster_weight_state.method_config);
499 end += weighted_cluster.weight;
500 cluster_weight_state.range_end =
end;
501 cluster_weight_state.cluster = weighted_cluster.name;
502 route_entry.weighted_cluster_state.push_back(
504 MaybeAddCluster(
absl::StrCat(
"cluster:", weighted_cluster.name));
506 }
else if (route_action->action.index() ==
507 XdsRouteConfigResource::Route::RouteAction::
508 kClusterSpecifierPluginIndex) {
510 *
error = CreateMethodConfig(route_entry.route,
nullptr,
511 &route_entry.method_config);
513 "cluster_specifier_plugin:",
514 absl::get<XdsRouteConfigResource::Route::RouteAction::
515 kClusterSpecifierPluginIndex>(route_action->action)));
520 for (
const auto& http_filter :
521 resolver_->current_listener_.http_connection_manager.http_filters) {
524 const XdsHttpFilterImpl* filter_impl =
525 XdsHttpFilterRegistry::GetFilterForType(
526 http_filter.config.config_proto_type_name);
529 if (filter_impl->channel_filter() !=
nullptr) {
530 filters_.push_back(filter_impl->channel_filter());
535 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
545 const XdsRouteConfigResource::Route&
route,
546 const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
549 std::vector<std::string>
fields;
550 const auto& route_action =
551 absl::get<XdsRouteConfigResource::Route::RouteAction>(
route.action);
553 if (route_action.retry_policy.has_value() &&
554 !route_action.retry_policy->retry_on.Empty()) {
555 std::vector<std::string> retry_parts;
557 "\"retryPolicy\": {\n"
558 " \"maxAttempts\": %d,\n"
559 " \"initialBackoff\": \"%s\",\n"
560 " \"maxBackoff\": \"%s\",\n"
561 " \"backoffMultiplier\": 2,\n",
562 route_action.retry_policy->num_retries + 1,
563 route_action.retry_policy->retry_back_off.base_interval.ToJsonString(),
564 route_action.retry_policy->retry_back_off.max_interval.ToJsonString()));
565 std::vector<std::string> code_parts;
567 code_parts.push_back(
" \"CANCELLED\"");
569 if (route_action.retry_policy->retry_on.Contains(
571 code_parts.push_back(
" \"DEADLINE_EXCEEDED\"");
574 code_parts.push_back(
" \"INTERNAL\"");
576 if (route_action.retry_policy->retry_on.Contains(
578 code_parts.push_back(
" \"RESOURCE_EXHAUSTED\"");
581 code_parts.push_back(
" \"UNAVAILABLE\"");
583 retry_parts.push_back(
590 if (route_action.max_stream_duration.has_value() &&
591 (route_action.max_stream_duration != Duration::Zero())) {
594 route_action.max_stream_duration->ToJsonString()));
597 XdsRouting::GeneratePerHttpFilterConfigsResult
result =
598 XdsRouting::GeneratePerHTTPFilterConfigs(
599 resolver_->current_listener_.http_connection_manager.http_filters,
605 for (
const auto&
p :
result.per_filter_configs) {
615 " \"methodConfig\": [ {\n"
624 ServiceConfigImpl::Create(
result.args, json.c_str(), &
error);
635 void XdsResolver::XdsConfigSelector::MaybeAddCluster(
const std::string&
name) {
639 auto new_cluster_state = MakeRefCounted<ClusterState>(
resolver_,
name);
648 const XdsRouteConfigResource::Route::RouteAction::HashPolicy& policy,
654 initial_metadata, policy.header_name, &value_buffer);
656 return absl::nullopt;
658 if (policy.regex !=
nullptr) {
661 if (header_value->
data() != value_buffer.data()) {
664 RE2::GlobalReplace(&value_buffer, *policy.regex, policy.regex_substitution);
665 header_value = value_buffer;
670 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
671 GetCallConfigArgs
args) {
672 auto route_index = XdsRouting::GetRouteForRequest(
674 args.initial_metadata);
675 if (!route_index.has_value()) {
680 const auto* route_action =
681 absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
682 &entry.route.action);
683 if (route_action ==
nullptr) {
684 CallConfig call_config;
687 "Matching route has inappropriate action"),
693 if (route_action->action.index() ==
694 XdsRouteConfigResource::Route::RouteAction::kClusterIndex) {
697 absl::get<XdsRouteConfigResource::Route::RouteAction::kClusterIndex>(
698 route_action->action));
700 }
else if (route_action->action.index() ==
701 XdsRouteConfigResource::Route::RouteAction::
702 kWeightedClustersIndex) {
705 entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
709 size_t start_index = 0;
710 size_t end_index = entry.weighted_cluster_state.size() - 1;
712 while (end_index > start_index) {
713 mid = (start_index + end_index) / 2;
714 if (entry.weighted_cluster_state[mid].range_end >
key) {
716 }
else if (entry.weighted_cluster_state[mid].range_end <
key) {
717 start_index = mid + 1;
728 }
else if (route_action->action.index() ==
729 XdsRouteConfigResource::Route::RouteAction::
730 kClusterSpecifierPluginIndex) {
732 "cluster_specifier_plugin:",
733 absl::get<XdsRouteConfigResource::Route::RouteAction::
734 kClusterSpecifierPluginIndex>(route_action->action));
741 for (
const auto& hash_policy : route_action->hash_policies) {
743 switch (hash_policy.type) {
745 new_hash = HeaderHashHelper(hash_policy,
args.initial_metadata);
747 case XdsRouteConfigResource::Route::RouteAction::HashPolicy::CHANNEL_ID:
757 hash.has_value() ? ((
hash.value() << 1) | (
hash.value() >> 63)) : 0;
762 if (hash_policy.terminal &&
hash.has_value()) {
766 if (!
hash.has_value()) {
769 CallConfig call_config;
771 call_config.method_configs =
778 static_cast<char*
>(
args.arena->Alloc(hash_string.size() + 1));
779 memcpy(hash_value, hash_string.c_str(), hash_string.size());
780 hash_value[hash_string.size()] =
'\0';
782 call_config.call_dispatch_controller =
783 args.arena->New<XdsCallDispatchController>(
it->second->Ref());
791 void XdsResolver::StartLocked() {
796 "Failed to create xds client -- channel will remain in "
797 "TRANSIENT_FAILURE: %s",
802 absl::StrCat(
"Failed to create XdsClient: ", error_message));
812 if (!
uri_.authority().empty()) {
814 const auto* authority_config =
816 if (authority_config ==
nullptr) {
818 absl::StrCat(
"Invalid target URI -- authority not found for ",
819 uri_.authority().c_str()));
828 authority_config->client_listener_resource_name_template;
829 if (name_template.empty()) {
831 "xdstp://", URI::PercentEncodeAuthority(
uri_.authority()),
832 "/envoy.config.listener.v3.Listener/%s");
836 {{
"%s", URI::PercentEncodePath(resource_name_fragment)}});
841 .client_default_listener_resource_name_template();
842 if (name_template.
empty()) {
843 name_template =
"%s";
846 resource_name_fragment = URI::PercentEncodePath(resource_name_fragment);
852 gpr_log(
GPR_INFO,
"[xds_resolver %p] Started with lds_resource_name %s.",
857 auto watcher = MakeRefCounted<ListenerWatcher>(
Ref());
863 void XdsResolver::ShutdownLocked() {
869 XdsListenerResourceType::CancelWatch(
874 XdsRouteConfigResourceType::CancelWatch(
884 void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
886 gpr_log(
GPR_INFO,
"[xds_resolver %p] received updated listener data",
this);
891 if (listener.http_connection_manager.route_config_name !=
894 XdsRouteConfigResourceType::CancelWatch(
897 !listener.http_connection_manager.route_config_name.empty());
901 std::move(listener.http_connection_manager.route_config_name);
904 auto watcher = MakeRefCounted<RouteConfigWatcher>(
Ref());
906 XdsRouteConfigResourceType::StartWatch(
924 class VirtualHostListIterator :
public XdsRouting::VirtualHostListIterator {
926 explicit VirtualHostListIterator(
927 const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts)
932 const std::vector<std::string>& GetDomainsForVirtualHost(
933 size_t index)
const override {
942 void XdsResolver::OnRouteConfigUpdate(XdsRouteConfigResource
rds_update) {
944 gpr_log(
GPR_INFO,
"[xds_resolver %p] received updated route config",
this);
950 auto vhost_index = XdsRouting::FindVirtualHostForDomain(
951 VirtualHostListIterator(&
rds_update.virtual_hosts),
953 if (!vhost_index.has_value()) {
958 " in RouteConfiguration")));
970 gpr_log(
GPR_ERROR,
"[xds_resolver %p] received error from XdsClient: %s: %s",
985 "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
986 "update and returning empty service config",
993 result.addresses.emplace();
1003 XdsResolver::CreateServiceConfig() {
1004 std::vector<std::string> clusters;
1010 " \"childPolicy\": %s\n"
1018 " \"childPolicy\":[ {\n"
1019 " \"cds_experimental\":{\n"
1020 " \"cluster\": \"%s\"\n"
1027 std::vector<std::string> config_parts;
1028 config_parts.push_back(
1030 " \"loadBalancingConfig\":[\n"
1031 " { \"xds_cluster_manager_experimental\":{\n"
1032 " \"children\":{\n");
1034 config_parts.push_back(
1042 ServiceConfigImpl::Create(
args_, json.c_str(), &
error);
1050 void XdsResolver::GenerateResult() {
1055 auto config_selector = MakeRefCounted<XdsConfigSelector>(
Ref(), &
error);
1057 OnError(
"could not create ConfigSelector",
1063 result.addresses.emplace();
1064 result.service_config = CreateServiceConfig();
1066 gpr_log(
GPR_INFO,
"[xds_resolver %p] generated service config: %s",
this,
1067 result.service_config.ok()
1069 :
result.service_config.status().ToString().c_str());
1073 config_selector->MakeChannelArg(),
1080 void XdsResolver::MaybeRemoveUnusedClusters() {
1081 bool update_needed =
false;
1083 RefCountedPtr<ClusterState> cluster_state =
it->second->RefIfNonZero();
1084 if (cluster_state !=
nullptr) {
1087 update_needed =
true;
1101 class XdsResolverFactory :
public ResolverFactory {
1105 bool IsValidUri(
const URI& uri)
const override {
1106 if (uri.path().empty() || uri.path().back() ==
'/') {
1108 "URI path does not contain valid data plane authority");
1114 std::string GetDefaultAuthority(
const URI& uri)
const override {
1115 return GetDefaultAuthorityInternal(uri);
1118 OrphanablePtr<Resolver> CreateResolver(ResolverArgs
args)
const override {
1119 if (!IsValidUri(
args.uri))
return nullptr;
1127 builder->resolver_registry()->RegisterResolverFactory(
1128 absl::make_unique<XdsResolverFactory>());