37 #include <unordered_map>
41 #include "absl/base/thread_annotations.h"
42 #include "absl/container/inlined_vector.h"
43 #include "absl/hash/hash.h"
44 #include "absl/memory/memory.h"
45 #include "absl/status/status.h"
46 #include "absl/status/statusor.h"
47 #include "absl/strings/str_cat.h"
48 #include "absl/strings/str_format.h"
49 #include "absl/strings/str_join.h"
50 #include "absl/strings/string_view.h"
51 #include "absl/strings/strip.h"
52 #include "absl/types/optional.h"
56 #include <grpc/byte_buffer.h>
111 const char* kRls =
"rls_experimental";
112 const char kGrpc[] =
"grpc";
113 const char* kRlsRequestPath =
"/grpc.lookup.v1.RouteLookupService/RouteLookup";
114 const char* kFakeTargetFieldValue =
"fake_target_field_value";
115 const char* kRlsHeaderKey =
"X-Google-RLS-Data";
121 const double kCacheBackoffMultiplier = 1.6;
122 const double kCacheBackoffJitter = 0.2;
125 const double kDefaultThrottleRatioForSuccesses = 2.0;
126 const int kDefaultThrottlePadding = 8;
128 const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
141 using KeyBuilderMap = std::unordered_map<
std::string , KeyBuilder>;
143 struct RouteLookupConfig {
153 RlsLbConfig(RouteLookupConfig route_lookup_config,
156 RefCountedPtr<LoadBalancingPolicy::Config>
157 default_child_policy_parsed_config)
162 std::
move(child_policy_config_target_field_name)),
164 std::
move(default_child_policy_parsed_config)) {}
166 const char*
name()
const override {
return kRls; }
185 const std::string& rls_channel_service_config()
const {
189 const std::string& child_policy_config_target_field_name()
const {
192 RefCountedPtr<LoadBalancingPolicy::Config>
193 default_child_policy_parsed_config()
const {
202 RefCountedPtr<LoadBalancingPolicy::Config>
207 class RlsLb :
public LoadBalancingPolicy {
211 const char*
name()
const override {
return kRls; }
212 void UpdateLocked(UpdateArgs
args)
override;
214 void ResetBackoffLocked()
override;
221 bool operator==(
const RequestKey& rhs)
const {
225 template <
typename H>
227 std::hash<std::string> string_hasher;
228 for (
auto& kv :
key.key_map) {
229 h = H::combine(
std::move(h), string_hasher(kv.first),
230 string_hasher(kv.second));
235 size_t Size()
const {
236 size_t size =
sizeof(RequestKey);
238 size += kv.first.length() + kv.second.length();
250 struct ResponseInfo {
256 return absl::StrFormat(
"{status=%s, targets=[%s], header_data=\"%s\"}",
263 class ChildPolicyWrapper :
public DualRefCounted<ChildPolicyWrapper> {
303 void ResetBackoffLocked() {
319 class ChildPolicyHelper :
public LoadBalancingPolicy::ChannelControlHelper {
321 explicit ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper>
wrapper)
323 ~ChildPolicyHelper()
override {
327 RefCountedPtr<SubchannelInterface> CreateSubchannel(
331 std::unique_ptr<SubchannelPicker> picker)
override;
332 void RequestReresolution()
override;
334 void AddTraceEvent(TraceSeverity
severity,
351 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker>
picker_
357 class Picker :
public LoadBalancingPolicy::SubchannelPicker {
359 explicit Picker(RefCountedPtr<RlsLb> lb_policy);
362 PickResult Pick(PickArgs
args)
override;
375 class Entry :
public InternallyRefCounted<Entry> {
377 Entry(RefCountedPtr<RlsLb> lb_policy,
const RequestKey&
key);
391 return backoff_time_;
393 Timestamp backoff_expiration_time()
const
395 return backoff_expiration_time_;
399 return data_expiration_time_;
410 return min_expiration_time_;
413 std::unique_ptr<BackOff> TakeBackoffState()
439 std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
440 ResponseInfo
response, std::unique_ptr<BackOff> backoff_state)
447 class BackoffTimer :
public InternallyRefCounted<BackoffTimer> {
449 BackoffTimer(RefCountedPtr<Entry> entry,
Timestamp backoff_time);
479 std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
490 explicit Cache(RlsLb* lb_policy);
495 Entry*
Find(
const RequestKey&
key)
503 Entry* FindOrInsert(
const RequestKey&
key)
521 static
size_t EntrySizeForKey(
const RequestKey&
key);
525 void MaybeShrinkSize(
size_t bytes)
542 class RlsChannel : public InternallyRefCounted<RlsChannel> {
544 explicit RlsChannel(RefCountedPtr<RlsLb> lb_policy);
547 void Orphan()
override;
552 void StartRlsCall(
const RequestKey&
key, Cache::Entry* stale_entry)
556 void ReportResponseLocked(
bool response_succeeded)
561 return throttle_.ShouldThrottle();
572 class StateWatcher :
public AsyncConnectivityStateWatcherInterface {
574 explicit StateWatcher(RefCountedPtr<RlsChannel> rls_channel)
575 : AsyncConnectivityStateWatcherInterface(
591 Duration window_size = kDefaultThrottleWindowSize,
592 float ratio_for_successes = kDefaultThrottleRatioForSuccesses,
593 int padding = kDefaultThrottlePadding)
600 void RegisterResponse(
bool success)
626 class RlsRequest :
public InternallyRefCounted<RlsRequest> {
631 RlsRequest(RefCountedPtr<RlsLb> lb_policy, RlsLb::RequestKey
key,
632 RefCountedPtr<RlsChannel> rls_channel,
633 std::unique_ptr<BackOff> backoff_state,
636 ~RlsRequest()
override;
640 void Orphan()
override;
647 void StartCallLocked();
656 ResponseInfo ParseResponseProto();
678 void ShutdownLocked()
override;
683 void UpdatePickerAsync();
711 RefCountedPtr<RlsLbConfig>
config_;
720 RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
722 : DualRefCounted<ChildPolicyWrapper>(
731 void RlsLb::ChildPolicyWrapper::Orphan() {
751 "child policy configuration is not an array");
753 std::vector<grpc_error_handle> error_list;
754 for (
Json& child_json : *
config->mutable_array()) {
757 "child policy item is not an object"));
760 if (
child.size() != 1) {
762 "child policy item contains more than one field"));
764 Json& child_config_json =
child.begin()->second;
767 "child policy item config is not an object"));
769 Json::Object& child_config = *child_config_json.mutable_object();
777 "\" for child policy"),
781 void RlsLb::ChildPolicyWrapper::StartUpdate() {
782 Json child_policy_config =
lb_policy_->config_->child_policy_config();
785 &child_policy_config);
790 "[rlslb %p] ChildPolicyWrapper=%p [%s]: validating update, config: %s",
792 child_policy_config.Dump().c_str());
795 child_policy_config, &
error);
800 "[rlslb %p] ChildPolicyWrapper=%p [%s]: config failed to parse: "
804 child_policy_config.Dump().c_str());
807 picker_ = absl::make_unique<TransientFailurePicker>(
814 void RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
821 create_args.work_serializer =
lb_policy_->work_serializer();
822 create_args.channel_control_helper = absl::make_unique<ChildPolicyHelper>(
829 "[rlslb %p] ChildPolicyWrapper=%p [%s], created new child policy "
839 "[rlslb %p] ChildPolicyWrapper=%p [%s], updating child policy "
843 UpdateArgs update_args;
845 update_args.addresses =
lb_policy_->addresses_;
854 RefCountedPtr<SubchannelInterface>
855 RlsLb::ChildPolicyWrapper::ChildPolicyHelper::CreateSubchannel(
859 "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
860 "CreateSubchannel() for %s",
862 wrapper_->target_.c_str(),
this, address.ToString().c_str());
864 if (
wrapper_->is_shutdown_)
return nullptr;
865 return wrapper_->lb_policy_->channel_control_helper()->CreateSubchannel(
869 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
871 std::unique_ptr<SubchannelPicker> picker) {
874 "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
875 "UpdateState(state=%s, status=%s, picker=%p)",
889 if (picker !=
nullptr) {
893 wrapper_->lb_policy_->UpdatePickerLocked();
896 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::RequestReresolution() {
899 "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
900 "RequestReresolution",
905 wrapper_->lb_policy_->channel_control_helper()->RequestReresolution();
909 return wrapper_->lb_policy_->channel_control_helper()->GetAuthority();
912 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::AddTraceEvent(
924 std::map<std::string, std::string> BuildKeyMap(
927 const LoadBalancingPolicy::MetadataInterface* initial_metadata) {
928 size_t last_slash_pos =
path.npos;
933 last_slash_pos =
path.rfind(
"/");
940 const RlsLbConfig::KeyBuilder* key_builder = &
it->second;
942 std::map<std::string, std::string>
key_map;
944 for (
const auto& p : key_builder->header_keys) {
946 const std::vector<std::string>& header_names =
p.second;
947 for (
const std::string& header_name : header_names) {
950 initial_metadata->Lookup(header_name, &
buffer);
951 if (
value.has_value()) {
958 key_map.insert(key_builder->constant_keys.begin(),
959 key_builder->constant_keys.end());
961 if (!key_builder->host_key.empty()) {
962 key_map[key_builder->host_key] = host;
965 if (!key_builder->service_key.empty()) {
966 if (last_slash_pos ==
path.npos) {
967 last_slash_pos =
path.rfind(
"/");
971 key_map[key_builder->service_key] =
975 if (!key_builder->method_key.empty()) {
976 if (last_slash_pos ==
path.npos) {
977 last_slash_pos =
path.rfind(
"/");
981 key_map[key_builder->method_key] =
987 RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
989 if (
lb_policy_->default_child_policy_ !=
nullptr) {
995 RlsLb::Picker::~Picker() {
1001 [default_child_policy]() {
1008 LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs
args) {
1010 RequestKey
key = {BuildKeyMap(
config_->key_builder_map(),
args.path,
1012 args.initial_metadata)};
1028 if ((entry ==
nullptr ||
1029 (entry->stale_time() <
now && entry->backoff_time() <
now)) &&
1032 if (
lb_policy_->rls_channel_->ShouldThrottle()) {
1036 if (entry ==
nullptr || entry->data_expiration_time() <
now) {
1040 "[rlslb %p] picker=%p: RLS call throttled; "
1041 "using default target",
1048 "[rlslb %p] picker=%p: RLS call throttled; failing pick",
1057 key, (entry ==
nullptr || entry->data_expiration_time() <
now) ?
nullptr
1061 if (entry !=
nullptr) {
1063 if (entry->data_expiration_time() >=
now) {
1068 return entry->Pick(
args);
1072 if (entry->backoff_time() >=
now) {
1077 "[rlslb %p] picker=%p: RLS call in backoff; using default target",
1084 "[rlslb %p] picker=%p: RLS call in backoff; failing pick",
1088 absl::StrCat(
"RLS request failed: ", entry->status().ToString())));
1093 gpr_log(
GPR_INFO,
"[rlslb %p] picker=%p: RLS request pending; queuing pick",
1096 return PickResult::Queue();
1103 RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
1111 void RlsLb::Cache::Entry::BackoffTimer::Orphan() {
1119 void RlsLb::Cache::Entry::BackoffTimer::OnBackoffTimer(
1121 auto*
self =
static_cast<BackoffTimer*
>(
arg);
1122 self->entry_->lb_policy_->work_serializer()->Run(
1124 RefCountedPtr<BackoffTimer> backoff_timer(
self);
1129 "[rlslb %p] cache entry=%p %s, armed_=%d: "
1130 "backoff timer fired",
1131 self->entry_->lb_policy_.get(),
self->entry_.get(),
1132 self->entry_->is_shutdown_
1134 :
self->entry_->lru_iterator_->ToString().c_str(),
1137 bool cancelled = !
self->armed_;
1138 self->armed_ =
false;
1139 if (cancelled)
return;
1143 self->entry_->lb_policy_->UpdatePickerLocked();
1152 std::unique_ptr<BackOff> MakeCacheEntryBackoff() {
1153 return absl::make_unique<BackOff>(
1155 .set_initial_backoff(kCacheBackoffInitial)
1156 .set_multiplier(kCacheBackoffMultiplier)
1157 .set_jitter(kCacheBackoffJitter)
1158 .set_max_backoff(kCacheBackoffMax));
1161 RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
1162 const RequestKey&
key)
1163 : InternallyRefCounted<Entry>(
1167 min_expiration_time_(ExecCtx::
Get()->
Now() + kMinExpirationTime),
1171 void RlsLb::Cache::Entry::Orphan() {
1173 gpr_log(
GPR_INFO,
"[rlslb %p] cache entry=%p %s: cache entry evicted",
1174 lb_policy_.get(),
this, lru_iterator_->ToString().c_str());
1177 lb_policy_->cache_.lru_list_.erase(lru_iterator_);
1178 lru_iterator_ =
lb_policy_->cache_.lru_list_.end();
1184 child_policy_wrappers_.clear();
1188 size_t RlsLb::Cache::Entry::Size()
const {
1191 return lb_policy_->cache_.EntrySizeForKey(*lru_iterator_);
1194 LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs
args) {
1196 ChildPolicyWrapper* child_policy_wrapper =
nullptr;
1198 for (;
i < child_policy_wrappers_.size(); ++
i) {
1199 child_policy_wrapper = child_policy_wrappers_[
i].get();
1200 if (child_policy_wrapper->connectivity_state() ==
1202 i < child_policy_wrappers_.size() - 1) {
1205 "[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR
1206 " of %" PRIuPTR
") in state TRANSIENT_FAILURE; skipping",
1207 lb_policy_.get(),
this, lru_iterator_->ToString().c_str(),
1208 child_policy_wrapper->target().c_str(),
i,
1209 child_policy_wrappers_.size());
1219 "[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR
" of %" PRIuPTR
1220 ") in state %s; delegating",
1221 lb_policy_.get(),
this, lru_iterator_->ToString().c_str(),
1222 child_policy_wrapper->target().c_str(),
i,
1223 child_policy_wrappers_.size(),
1230 if (!header_data_.empty()) {
1231 char* copied_header_data =
1232 static_cast<char*
>(
args.call_state->Alloc(header_data_.length() + 1));
1233 strcpy(copied_header_data, header_data_.c_str());
1234 args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
1236 return child_policy_wrapper->Pick(
args);
1239 void RlsLb::Cache::Entry::ResetBackoff() {
1240 backoff_time_ = Timestamp::InfPast();
1244 bool RlsLb::Cache::Entry::ShouldRemove()
const {
1246 return data_expiration_time_ <
now && backoff_expiration_time_ <
now;
1249 bool RlsLb::Cache::Entry::CanEvict()
const {
1251 return min_expiration_time_ <
now;
1254 void RlsLb::Cache::Entry::MarkUsed() {
1255 auto& lru_list =
lb_policy_->cache_.lru_list_;
1256 auto new_it = lru_list.insert(lru_list.end(), *lru_iterator_);
1257 lru_list.erase(lru_iterator_);
1258 lru_iterator_ = new_it;
1261 std::vector<RlsLb::ChildPolicyWrapper*>
1262 RlsLb::Cache::Entry::OnRlsResponseLocked(
1263 ResponseInfo
response, std::unique_ptr<BackOff> backoff_state) {
1270 if (backoff_state !=
nullptr) {
1277 backoff_expiration_time_ =
now + (backoff_time_ -
now) * 2;
1286 data_expiration_time_ =
now +
lb_policy_->config_->max_age();
1290 backoff_time_ = Timestamp::InfPast();
1291 backoff_expiration_time_ = Timestamp::InfPast();
1294 if (child_policy_wrappers_.size() !=
response.targets.size())
return true;
1295 for (
size_t i = 0;
i <
response.targets.size(); ++
i) {
1302 if (!targets_changed) {
1310 std::set<absl::string_view> old_targets;
1311 for (RefCountedPtr<ChildPolicyWrapper>& child_policy_wrapper :
1312 child_policy_wrappers_) {
1313 old_targets.emplace(child_policy_wrapper->target());
1315 bool update_picker =
false;
1316 std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1317 std::vector<RefCountedPtr<ChildPolicyWrapper>> new_child_policy_wrappers;
1318 new_child_policy_wrappers.reserve(
response.targets.size());
1322 auto new_child = MakeRefCounted<ChildPolicyWrapper>(
1324 new_child->StartUpdate();
1325 child_policies_to_finish_update.push_back(new_child.get());
1326 new_child_policy_wrappers.emplace_back(
std::move(new_child));
1328 new_child_policy_wrappers.emplace_back(
1334 if (old_targets.find(
target) == old_targets.end()) {
1335 update_picker =
true;
1339 child_policy_wrappers_ =
std::move(new_child_policy_wrappers);
1340 if (update_picker) {
1343 return child_policies_to_finish_update;
1350 RlsLb::Cache::Cache(RlsLb* lb_policy) :
lb_policy_(lb_policy) {
1359 auto it = map_.find(
key);
1360 if (
it == map_.end())
return nullptr;
1361 it->second->MarkUsed();
1362 return it->second.get();
1365 RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(
const RequestKey&
key) {
1366 auto it = map_.find(
key);
1368 if (
it == map_.end()) {
1369 size_t entry_size = EntrySizeForKey(
key);
1370 MaybeShrinkSize(size_limit_ -
std::min(size_limit_, entry_size));
1373 map_.emplace(
key, OrphanablePtr<Entry>(entry));
1374 size_ += entry_size;
1384 key.ToString().c_str(),
it->second.get());
1386 it->second->MarkUsed();
1387 return it->second.get();
1390 void RlsLb::Cache::Resize(
size_t bytes) {
1395 size_limit_ =
bytes;
1396 MaybeShrinkSize(size_limit_);
1399 void RlsLb::Cache::ResetAllBackoff() {
1400 for (
auto&
p : map_) {
1401 p.second->ResetBackoff();
1413 Cache* cache =
static_cast<Cache*
>(
arg);
1415 cache->lb_policy_->work_serializer()->Run(
1417 RefCountedPtr<RlsLb> lb_policy(cache->lb_policy_);
1424 if (lb_policy->is_shutdown_)
return;
1425 for (
auto it = cache->map_.begin();
it != cache->map_.end();) {
1427 it->second->CanEvict())) {
1428 cache->size_ -=
it->second->Size();
1429 it = cache->map_.erase(
it);
1435 lb_policy.release();
1437 now + kCacheCleanupTimerInterval,
1438 &cache->timer_callback_);
1443 size_t RlsLb::Cache::EntrySizeForKey(
const RequestKey&
key) {
1445 return (
key.Size() * 2) +
sizeof(Entry);
1448 void RlsLb::Cache::MaybeShrinkSize(
size_t bytes) {
1450 auto lru_it = lru_list_.begin();
1452 auto map_it = map_.find(*lru_it);
1454 if (!map_it->second->CanEvict())
break;
1457 lb_policy_, map_it->second.get(), lru_it->ToString().c_str());
1459 size_ -= map_it->second->Size();
1464 "[rlslb %p] LRU pass complete: desired size=%" PRIuPTR
1474 void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
1479 "[rlslb %p] RlsChannel=%p StateWatcher=%p: "
1480 "state changed to %s (%s)",
1492 lb_policy->cache_.ResetAllBackoff();
1504 while (!requests_.empty() &&
now - requests_.front() >
window_size_) {
1505 requests_.pop_front();
1507 while (!failures_.empty() &&
now - failures_.front() >
window_size_) {
1508 failures_.pop_front();
1511 float num_requests = requests_.size();
1512 float num_successes = num_requests - failures_.size();
1515 float throttle_probability =
1519 std::uniform_real_distribution<float> dist(0, 1.0);
1521 bool throttle = dist(
rng_) < throttle_probability;
1524 requests_.push_back(
now);
1525 failures_.push_back(
now);
1530 void RlsLb::RlsChannel::Throttle::RegisterResponse(
bool success) {
1532 requests_.push_back(
now);
1533 if (!success) failures_.push_back(
now);
1540 RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
1541 : InternallyRefCounted<RlsChannel>(
1554 const_cast<char*
>(authority.c_str())),
1564 if (fake_security_expected_targets !=
nullptr) {
1567 const_cast<char*
>(fake_security_expected_targets)));
1571 lb_policy_->config_->rls_channel_service_config();
1572 if (!service_config.empty()) {
1575 const_cast<char*
>(service_config.c_str())));
1581 creds, &rls_channel_args);
1583 gpr_log(
GPR_INFO,
"[rlslb %p] RlsChannel=%p: created channel %p for %s",
1585 lb_policy_->config_->lookup_service().c_str());
1589 channelz::ChannelNode* child_channelz_node =
1591 channelz::ChannelNode* parent_channelz_node =
1592 grpc_channel_args_find_pointer<channelz::ChannelNode>(
1594 if (child_channelz_node !=
nullptr && parent_channelz_node !=
nullptr) {
1595 parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1599 ClientChannel* client_channel =
1600 ClientChannel::GetFromChannel(Channel::FromC(
channel_));
1603 client_channel->AddConnectivityWatcher(
1605 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(
watcher_));
1609 void RlsLb::RlsChannel::Orphan() {
1618 channelz::ChannelNode* child_channelz_node =
1625 ClientChannel* client_channel =
1626 ClientChannel::GetFromChannel(Channel::FromC(
channel_));
1628 client_channel->RemoveConnectivityWatcher(
watcher_);
1636 void RlsLb::RlsChannel::StartRlsCall(
const RequestKey&
key,
1637 Cache::Entry* stale_entry) {
1638 std::unique_ptr<BackOff> backoff_state;
1642 if (stale_entry !=
nullptr) {
1643 backoff_state = stale_entry->TakeBackoffState();
1645 stale_header_data = stale_entry->header_data();
1648 key, MakeOrphanable<RlsRequest>(
1654 void RlsLb::RlsChannel::ReportResponseLocked(
bool response_succeeded) {
1655 throttle_.RegisterResponse(response_succeeded);
1658 void RlsLb::RlsChannel::ResetBackoff() {
1667 RlsLb::RlsRequest::RlsRequest(RefCountedPtr<RlsLb> lb_policy, RequestKey
key,
1668 RefCountedPtr<RlsChannel> rls_channel,
1669 std::unique_ptr<BackOff> backoff_state,
1672 : InternallyRefCounted<RlsRequest>(
1682 "[rlslb %p] rls_request=%p: RLS request created for key %s",
1695 void RlsLb::RlsRequest::Orphan() {
1696 if (
call_ !=
nullptr) {
1698 gpr_log(
GPR_INFO,
"[rlslb %p] rls_request=%p %s: cancelling RLS call",
1707 auto*
request =
static_cast<RlsRequest*
>(
arg);
1708 request->lb_policy_->work_serializer()->Run(
1716 void RlsLb::RlsRequest::StartCallLocked() {
1760 auto*
request =
static_cast<RlsRequest*
>(
arg);
1762 request->lb_policy_->work_serializer()->Run(
1774 "[rlslb %p] rls_request=%p %s, error=%s, status={%d, %s} RLS call "
1775 "response received",
1778 status_message.c_str());
1809 std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1814 Cache::Entry* cache_entry =
lb_policy_->cache_.FindOrInsert(
key_);
1815 child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
1821 for (ChildPolicyWrapper*
child : child_policies_to_finish_update) {
1822 child->MaybeFinishUpdate();
1832 for (
const auto& kv :
key_.key_map) {
1853 RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
1854 ResponseInfo response_info;
1867 return response_info;
1872 if (num_targets == 0) {
1873 response_info.status =
1875 return response_info;
1877 response_info.targets.reserve(num_targets);
1878 for (
size_t i = 0;
i < num_targets; ++
i) {
1879 response_info.targets.emplace_back(targets_strview[
i].
data,
1880 targets_strview[
i].
size);
1884 response_info.header_data =
1886 return response_info;
1894 const char* server_uri_str =
1911 void RlsLb::UpdateLocked(UpdateArgs
args) {
1920 (old_config ==
nullptr ||
1921 old_config->child_policy_config() !=
config_->child_policy_config())) {
1923 config_->child_policy_config().Dump().c_str());
1929 if (
args.addresses.ok()) {
1939 bool update_child_policies =
1940 old_config ==
nullptr ||
1941 old_config->child_policy_config() !=
config_->child_policy_config() ||
1945 bool created_default_child =
false;
1946 if (old_config ==
nullptr ||
1947 config_->default_target() != old_config->default_target()) {
1948 if (
config_->default_target().empty()) {
1962 created_default_child =
true;
1966 "[rlslb %p] using existing child for default target",
this);
1977 if (old_config ==
nullptr ||
1978 config_->lookup_service() != old_config->lookup_service()) {
1983 if (old_config ==
nullptr ||
1984 config_->cache_size_bytes() != old_config->cache_size_bytes()) {
1985 cache_.Resize(
static_cast<size_t>(
config_->cache_size_bytes()));
1988 if (update_child_policies) {
1993 p.second->StartUpdate();
1995 }
else if (created_default_child) {
2004 if (update_child_policies) {
2009 p.second->MaybeFinishUpdate();
2011 }
else if (created_default_child) {
2025 UpdatePickerLocked();
2031 child_entry.second->ExitIdleLocked();
2035 void RlsLb::ResetBackoffLocked() {
2039 cache_.ResetAllBackoff();
2042 child.second->ResetBackoffLocked();
2046 void RlsLb::ShutdownLocked() {
2057 request_map_.clear();
2062 void RlsLb::UpdatePickerAsync() {
2070 grpc_schedule_on_exec_ctx),
2075 auto* rls_lb =
static_cast<RlsLb*
>(
arg);
2076 rls_lb->work_serializer()->Run(
2078 RefCountedPtr<RlsLb> lb_policy(rls_lb);
2079 lb_policy->UpdatePickerLocked();
2085 void RlsLb::UpdatePickerLocked() {
2099 int num_connecting = 0;
2107 p.second->target().c_str(),
2120 if (num_connecting > 0) {
2122 }
else if (num_idle > 0) {
2136 channel_control_helper()->UpdateState(
2146 std::vector<std::string>* headers) {
2147 if (json.type() != Json::Type::OBJECT) {
2149 "field:headers index:",
idx,
" error:type should be OBJECT"));
2151 std::vector<grpc_error_handle> error_list;
2153 if (json.object_value().find(
"requiredMatch") != json.object_value().end()) {
2155 "field:requiredMatch error:must not be present"));
2161 "field:key error:must be non-empty"));
2164 const Json::Array* headers_json =
nullptr;
2167 if (headers_json !=
nullptr) {
2168 if (headers_json->empty()) {
2170 "field:names error:list is empty"));
2172 size_t name_idx = 0;
2173 for (
const Json& name_json : *headers_json) {
2174 if (name_json.type() != Json::Type::STRING) {
2176 "field:names index:", name_idx,
" error:type should be STRING")));
2177 }
else if (name_json.string_value().empty()) {
2180 " error:header name must be non-empty")));
2182 headers->push_back(name_json.string_value());
2194 if (json.type() != Json::Type::OBJECT) {
2196 "field:names index:",
idx,
" error:type should be OBJECT"));
2199 std::vector<grpc_error_handle> error_list;
2217 if (json.type() != Json::Type::OBJECT) {
2219 "field:grpc_keybuilders index:",
idx,
" error:type should be OBJECT"));
2221 std::vector<grpc_error_handle> error_list;
2223 std::set<std::string>
names;
2224 const Json::Array* names_array =
nullptr;
2227 if (names_array->empty()) {
2229 "field:names error:list is empty"));
2231 size_t name_idx = 0;
2232 for (
const Json& name_json : *names_array) {
2235 ParseJsonMethodName(name_idx++, name_json, &child_error);
2237 error_list.push_back(child_error);
2249 std::set<std::string> all_keys;
2250 auto duplicate_key_check_func = [&all_keys,
2252 auto it = all_keys.find(
key);
2253 if (
it != all_keys.end()) {
2257 all_keys.insert(
key);
2261 RlsLbConfig::KeyBuilder key_builder;
2262 const Json::Array* headers_array =
nullptr;
2264 &error_list,
false);
2265 if (headers_array !=
nullptr) {
2266 size_t header_idx = 0;
2267 for (
const Json& header_json : *headers_array) {
2269 std::vector<std::string> headers;
2271 ParseJsonHeaders(header_idx++, header_json, &
key, &headers);
2273 error_list.push_back(child_error);
2275 duplicate_key_check_func(
key);
2276 key_builder.header_keys.emplace(
key,
std::move(headers));
2281 const Json::Object* extra_keys =
nullptr;
2283 &error_list,
false);
2284 if (extra_keys !=
nullptr) {
2285 std::vector<grpc_error_handle> extra_keys_errors;
2287 &extra_keys_errors,
false) &&
2288 key_builder.host_key.empty()) {
2290 "field:host error:must be non-empty"));
2292 if (!key_builder.host_key.empty()) {
2293 duplicate_key_check_func(key_builder.host_key);
2296 &extra_keys_errors,
false) &&
2297 key_builder.service_key.empty()) {
2299 "field:service error:must be non-empty"));
2301 if (!key_builder.service_key.empty()) {
2302 duplicate_key_check_func(key_builder.service_key);
2305 &extra_keys_errors,
false) &&
2306 key_builder.method_key.empty()) {
2308 "field:method error:must be non-empty"));
2310 if (!key_builder.method_key.empty()) {
2311 duplicate_key_check_func(key_builder.method_key);
2313 if (!extra_keys_errors.empty()) {
2314 error_list.push_back(
2321 &error_list,
false);
2323 std::vector<grpc_error_handle> constant_keys_errors;
2329 "error:keys must be non-empty"));
2331 duplicate_key_check_func(
key);
2333 &constant_keys_errors);
2335 if (!constant_keys_errors.empty()) {
2337 "field:constantKeys", &constant_keys_errors));
2352 RlsLbConfig::KeyBuilderMap ParseGrpcKeybuilders(
2355 if (key_builder_list.empty()) {
2357 "field:grpcKeybuilders error:list is empty");
2360 std::vector<grpc_error_handle> error_list;
2362 for (
const Json& key_builder : key_builder_list) {
2371 RlsLbConfig::RouteLookupConfig ParseRouteLookupConfig(
2373 std::vector<grpc_error_handle> error_list;
2374 RlsLbConfig::RouteLookupConfig route_lookup_config;
2376 const Json::Array* keybuilder_list =
nullptr;
2378 if (keybuilder_list !=
nullptr) {
2380 route_lookup_config.key_builder_map =
2381 ParseGrpcKeybuilders(*keybuilder_list, &child_error);
2386 &route_lookup_config.lookup_service, &error_list)) {
2388 route_lookup_config.lookup_service)) {
2390 "field:lookupService error:must be valid gRPC target URI"));
2394 route_lookup_config.lookup_service_timeout = kDefaultLookupServiceTimeout;
2396 &route_lookup_config.lookup_service_timeout,
2397 &error_list,
false);
2399 route_lookup_config.max_age = kMaxMaxAge;
2401 json,
"maxAge", &route_lookup_config.max_age, &error_list,
2404 if (route_lookup_config.max_age > kMaxMaxAge) {
2405 route_lookup_config.max_age = kMaxMaxAge;
2408 route_lookup_config.stale_age = kMaxMaxAge;
2410 json,
"staleAge", &route_lookup_config.stale_age, &error_list,
2413 if (stale_age_set && !max_age_set) {
2415 "field:maxAge error:must be set if staleAge is set"));
2418 if (route_lookup_config.stale_age >= route_lookup_config.max_age) {
2419 route_lookup_config.stale_age = route_lookup_config.max_age;
2423 &route_lookup_config.cache_size_bytes, &error_list);
2424 if (route_lookup_config.cache_size_bytes <= 0) {
2426 "field:cacheSizeBytes error:must be greater than 0"));
2429 if (route_lookup_config.cache_size_bytes > kMaxCacheSizeBytes) {
2430 route_lookup_config.cache_size_bytes = kMaxCacheSizeBytes;
2434 &route_lookup_config.default_target, &error_list,
2436 if (route_lookup_config.default_target.empty()) {
2438 "field:defaultTarget error:must be non-empty if set"));
2443 return route_lookup_config;
2447 const Json& child_policy_list,
2448 const std::string& child_policy_config_target_field_name,
2450 RefCountedPtr<LoadBalancingPolicy::Config>*
2451 default_child_policy_parsed_config) {
2453 *child_policy_config = child_policy_list;
2457 child_policy_config_target_field_name,
target, child_policy_config);
2460 RefCountedPtr<LoadBalancingPolicy::Config> parsed_config =
2461 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
2462 *child_policy_config, &
error);
2469 if (parsed_config !=
nullptr) {
2470 for (
Json&
config : *(child_policy_config->mutable_array())) {
2471 if (
config.object_value().begin()->first == parsed_config->name()) {
2473 child_policy_config->mutable_array()->clear();
2474 child_policy_config->mutable_array()->push_back(
std::move(save_config));
2481 *default_child_policy_parsed_config =
std::move(parsed_config);
2486 class RlsLbFactory :
public LoadBalancingPolicyFactory {
2488 const char*
name()
const override {
return kRls; }
2490 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2495 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
2497 std::vector<grpc_error_handle> error_list;
2499 RlsLbConfig::RouteLookupConfig route_lookup_config;
2500 const Json::Object* route_lookup_config_json =
nullptr;
2502 &route_lookup_config_json, &error_list)) {
2504 route_lookup_config =
2505 ParseRouteLookupConfig(*route_lookup_config_json, &child_error);
2510 const Json::Object* rls_channel_service_config_json_obj =
nullptr;
2512 "routeLookupChannelServiceConfig",
2513 &rls_channel_service_config_json_obj, &error_list,
2516 Json rls_channel_service_config_json(
2517 *rls_channel_service_config_json_obj);
2518 rls_channel_service_config = rls_channel_service_config_json.Dump();
2519 auto service_config = MakeRefCounted<ServiceConfigImpl>(
2520 nullptr, rls_channel_service_config,
2521 std::move(rls_channel_service_config_json), &child_error);
2524 "field:routeLookupChannelServiceConfig", &child_error, 1));
2529 std::string child_policy_config_target_field_name;
2531 config.object_value(),
"childPolicyConfigTargetFieldName",
2532 &child_policy_config_target_field_name, &error_list)) {
2533 if (child_policy_config_target_field_name.empty()) {
2535 "field:childPolicyConfigTargetFieldName error:must be non-empty"));
2539 Json child_policy_config;
2540 RefCountedPtr<LoadBalancingPolicy::Config>
2541 default_child_policy_parsed_config;
2542 auto it =
config.object_value().find(
"childPolicy");
2543 if (
it ==
config.object_value().end()) {
2545 "field:childPolicy error:does not exist."));
2546 }
else if (
it->second.type() != Json::Type::ARRAY) {
2548 "field:childPolicy error:type should be ARRAY"));
2551 it->second, child_policy_config_target_field_name,
2552 route_lookup_config.default_target, &child_policy_config,
2553 &default_child_policy_parsed_config);
2556 "field:childPolicy", &child_error, 1));
2562 "errors parsing RLS LB policy config", &error_list);
2563 return MakeRefCounted<RlsLbConfig>(
2566 std::move(child_policy_config_target_field_name),
2567 std::move(default_child_policy_parsed_config));
2574 LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
2575 absl::make_unique<RlsLbFactory>());