rls.cc
Go to the documentation of this file.
1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 // Implementation of the Route Lookup Service (RLS) LB policy
18 //
19 // The policy queries a route lookup service for the name of the actual service
20 // to use. A child policy that recognizes the name as a field of its
21 // configuration will take further load balancing action on the request.
22 
24 
25 #include <inttypes.h>
26 #include <stdlib.h>
27 #include <string.h>
28 
29 #include <algorithm>
30 #include <deque>
31 #include <list>
32 #include <map>
33 #include <memory>
34 #include <random>
35 #include <set>
36 #include <string>
37 #include <unordered_map>
38 #include <utility>
39 #include <vector>
40 
41 #include "absl/base/thread_annotations.h"
42 #include "absl/container/inlined_vector.h"
43 #include "absl/hash/hash.h"
44 #include "absl/memory/memory.h"
45 #include "absl/status/status.h"
46 #include "absl/status/statusor.h"
47 #include "absl/strings/str_cat.h"
48 #include "absl/strings/str_format.h"
49 #include "absl/strings/str_join.h"
50 #include "absl/strings/string_view.h"
51 #include "absl/strings/strip.h"
52 #include "absl/types/optional.h"
53 #include "upb/upb.h"
54 #include "upb/upb.hpp"
55 
56 #include <grpc/byte_buffer.h>
58 #include <grpc/grpc.h>
62 #include <grpc/slice.h>
63 #include <grpc/status.h>
64 #include <grpc/support/log.h>
65 
89 #include "src/core/lib/json/json.h"
104 
105 namespace grpc_core {
106 
107 TraceFlag grpc_lb_rls_trace(false, "rls_lb");
108 
109 namespace {
110 
111 const char* kRls = "rls_experimental";
112 const char kGrpc[] = "grpc";
113 const char* kRlsRequestPath = "/grpc.lookup.v1.RouteLookupService/RouteLookup";
114 const char* kFakeTargetFieldValue = "fake_target_field_value";
115 const char* kRlsHeaderKey = "X-Google-RLS-Data";
116 
117 const Duration kDefaultLookupServiceTimeout = Duration::Seconds(10);
118 const Duration kMaxMaxAge = Duration::Minutes(5);
119 const Duration kMinExpirationTime = Duration::Seconds(5);
120 const Duration kCacheBackoffInitial = Duration::Seconds(1);
121 const double kCacheBackoffMultiplier = 1.6;
122 const double kCacheBackoffJitter = 0.2;
123 const Duration kCacheBackoffMax = Duration::Minutes(2);
124 const Duration kDefaultThrottleWindowSize = Duration::Seconds(30);
125 const double kDefaultThrottleRatioForSuccesses = 2.0;
126 const int kDefaultThrottlePadding = 8;
127 const Duration kCacheCleanupTimerInterval = Duration::Minutes(1);
128 const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
129 
130 // Parsed RLS LB policy configuration.
131 class RlsLbConfig : public LoadBalancingPolicy::Config {
132  public:
133  struct KeyBuilder {
134  std::map<std::string /*key*/, std::vector<std::string /*header*/>>
140  };
141  using KeyBuilderMap = std::unordered_map<std::string /*path*/, KeyBuilder>;
142 
143  struct RouteLookupConfig {
144  KeyBuilderMap key_builder_map;
151  };
152 
153  RlsLbConfig(RouteLookupConfig route_lookup_config,
154  std::string rls_channel_service_config, Json child_policy_config,
155  std::string child_policy_config_target_field_name,
156  RefCountedPtr<LoadBalancingPolicy::Config>
157  default_child_policy_parsed_config)
158  : route_lookup_config_(std::move(route_lookup_config)),
159  rls_channel_service_config_(std::move(rls_channel_service_config)),
160  child_policy_config_(std::move(child_policy_config)),
162  std::move(child_policy_config_target_field_name)),
164  std::move(default_child_policy_parsed_config)) {}
165 
166  const char* name() const override { return kRls; }
167 
168  const KeyBuilderMap& key_builder_map() const {
169  return route_lookup_config_.key_builder_map;
170  }
171  const std::string& lookup_service() const {
172  return route_lookup_config_.lookup_service;
173  }
175  return route_lookup_config_.lookup_service_timeout;
176  }
177  Duration max_age() const { return route_lookup_config_.max_age; }
178  Duration stale_age() const { return route_lookup_config_.stale_age; }
179  int64_t cache_size_bytes() const {
180  return route_lookup_config_.cache_size_bytes;
181  }
182  const std::string& default_target() const {
183  return route_lookup_config_.default_target;
184  }
185  const std::string& rls_channel_service_config() const {
187  }
188  const Json& child_policy_config() const { return child_policy_config_; }
189  const std::string& child_policy_config_target_field_name() const {
191  }
192  RefCountedPtr<LoadBalancingPolicy::Config>
193  default_child_policy_parsed_config() const {
195  }
196 
197  private:
198  RouteLookupConfig route_lookup_config_;
202  RefCountedPtr<LoadBalancingPolicy::Config>
204 };
205 
206 // RLS LB policy.
207 class RlsLb : public LoadBalancingPolicy {
208  public:
209  explicit RlsLb(Args args);
210 
211  const char* name() const override { return kRls; }
212  void UpdateLocked(UpdateArgs args) override;
213  void ExitIdleLocked() override;
214  void ResetBackoffLocked() override;
215 
216  private:
217  // Key to access entries in the cache and the request map.
218  struct RequestKey {
219  std::map<std::string, std::string> key_map;
220 
221  bool operator==(const RequestKey& rhs) const {
222  return key_map == rhs.key_map;
223  }
224 
225  template <typename H>
226  friend H AbslHashValue(H h, const RequestKey& key) {
227  std::hash<std::string> string_hasher;
228  for (auto& kv : key.key_map) {
229  h = H::combine(std::move(h), string_hasher(kv.first),
230  string_hasher(kv.second));
231  }
232  return h;
233  }
234 
235  size_t Size() const {
236  size_t size = sizeof(RequestKey);
237  for (auto& kv : key_map) {
238  size += kv.first.length() + kv.second.length();
239  }
240  return size;
241  }
242 
243  std::string ToString() const {
244  return absl::StrCat(
245  "{", absl::StrJoin(key_map, ",", absl::PairFormatter("=")), "}");
246  }
247  };
248 
249  // Data from an RLS response.
250  struct ResponseInfo {
252  std::vector<std::string> targets;
254 
255  std::string ToString() const {
256  return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}",
258  header_data);
259  }
260  };
261 
262  // Wraps a child policy for a given RLS target.
263  class ChildPolicyWrapper : public DualRefCounted<ChildPolicyWrapper> {
264  public:
265  ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, std::string target);
266 
267  // Note: We are forced to disable lock analysis here because
268  // Orphan() is called by OrphanablePtr<>, which cannot have lock
269  // annotations for this particular caller.
270  void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
271 
272  const std::string& target() const { return target_; }
273 
274  PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
275  return picker_->Pick(args);
276  }
277 
278  // Updates for the child policy are handled in two phases:
279  // 1. In StartUpdate(), we parse and validate the new child policy
280  // config and store the parsed config.
281  // 2. In MaybeFinishUpdate(), we actually pass the parsed config to the
282  // child policy's UpdateLocked() method.
283  //
284  // The reason we do this is to avoid deadlocks. In StartUpdate(),
285  // if the new config fails to validate, then we need to set
286  // picker_ to an instance that will fail all requests, which
287  // requires holding the lock. However, we cannot call the child
288  // policy's UpdateLocked() method from MaybeFinishUpdate() while
289  // holding the lock, since that would cause a deadlock: the child's
290  // UpdateLocked() will call the helper's UpdateState() method, which
291  // will try to acquire the lock to set picker_. So StartUpdate() is
292  // called while we are still holding the lock, but MaybeFinishUpdate()
293  // is called after releasing it.
294  //
295  // Both methods grab the data they need from the parent object.
296  void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
297  void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
298 
299  void ExitIdleLocked() {
300  if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
301  }
302 
303  void ResetBackoffLocked() {
304  if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
305  }
306 
307  // Gets the connectivity state of the child policy. Once the child policy
308  // reports TRANSIENT_FAILURE, the function will always return
309  // TRANSIENT_FAILURE state instead of the actual state of the child policy
310  // until the child policy reports another READY state.
311  grpc_connectivity_state connectivity_state() const
313  return connectivity_state_;
314  }
315 
316  private:
317  // ChannelControlHelper object that allows the child policy to update state
318  // with the wrapper.
319  class ChildPolicyHelper : public LoadBalancingPolicy::ChannelControlHelper {
320  public:
321  explicit ChildPolicyHelper(WeakRefCountedPtr<ChildPolicyWrapper> wrapper)
322  : wrapper_(std::move(wrapper)) {}
323  ~ChildPolicyHelper() override {
324  wrapper_.reset(DEBUG_LOCATION, "ChildPolicyHelper");
325  }
326 
327  RefCountedPtr<SubchannelInterface> CreateSubchannel(
328  ServerAddress address, const grpc_channel_args& args) override;
329  void UpdateState(grpc_connectivity_state state,
330  const absl::Status& status,
331  std::unique_ptr<SubchannelPicker> picker) override;
332  void RequestReresolution() override;
333  absl::string_view GetAuthority() override;
334  void AddTraceEvent(TraceSeverity severity,
335  absl::string_view message) override;
336 
337  private:
338  WeakRefCountedPtr<ChildPolicyWrapper> wrapper_;
339  };
340 
341  RefCountedPtr<RlsLb> lb_policy_;
343 
344  bool is_shutdown_ = false;
345 
346  OrphanablePtr<ChildPolicyHandler> child_policy_;
347  RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
348 
351  std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_
353  };
354 
355  // A picker that uses the cache and the request map in the LB policy
356  // (synchronized via a mutex) to determine how to route requests.
357  class Picker : public LoadBalancingPolicy::SubchannelPicker {
358  public:
359  explicit Picker(RefCountedPtr<RlsLb> lb_policy);
360  ~Picker() override;
361 
362  PickResult Pick(PickArgs args) override;
363 
364  private:
365  RefCountedPtr<RlsLb> lb_policy_;
366  RefCountedPtr<RlsLbConfig> config_;
367  RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
368  };
369 
370  // An LRU cache with adjustable size.
371  class Cache {
372  public:
374 
375  class Entry : public InternallyRefCounted<Entry> {
376  public:
377  Entry(RefCountedPtr<RlsLb> lb_policy, const RequestKey& key);
378 
379  // Notify the entry when it's evicted from the cache. Performs shut down.
380  // Note: We are forced to disable lock analysis here because
381  // Orphan() is called by OrphanablePtr<>, which cannot have lock
382  // annotations for this particular caller.
383  void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
384 
385  const absl::Status& status() const
387  return status_;
388  }
389  Timestamp backoff_time() const
391  return backoff_time_;
392  }
393  Timestamp backoff_expiration_time() const
395  return backoff_expiration_time_;
396  }
397  Timestamp data_expiration_time() const
399  return data_expiration_time_;
400  }
401  const std::string& header_data() const
403  return header_data_;
404  }
405  Timestamp stale_time() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
406  return stale_time_;
407  }
408  Timestamp min_expiration_time() const
410  return min_expiration_time_;
411  }
412 
413  std::unique_ptr<BackOff> TakeBackoffState()
415  return std::move(backoff_state_);
416  }
417 
418  // Cache size of entry.
419  size_t Size() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
420 
421  // Pick subchannel for request based on the entry's state.
422  PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
423 
424  // If the cache entry is in backoff state, resets the backoff and, if
425  // applicable, its backoff timer. The method does not update the LB
426  // policy's picker; the caller is responsible for that if necessary.
427  void ResetBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
428 
429  // Check if the entry should be removed by the clean-up timer.
430  bool ShouldRemove() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
431 
432  // Check if the entry can be evicted from the cache, i.e. the
433  // min_expiration_time_ has passed.
434  bool CanEvict() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
435 
436  // Updates the entry upon reception of a new RLS response.
437  // Returns a list of child policy wrappers on which FinishUpdate()
438  // needs to be called after releasing the lock.
439  std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
440  ResponseInfo response, std::unique_ptr<BackOff> backoff_state)
442 
443  // Moves entry to the end of the LRU list.
444  void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
445 
446  private:
447  class BackoffTimer : public InternallyRefCounted<BackoffTimer> {
448  public:
449  BackoffTimer(RefCountedPtr<Entry> entry, Timestamp backoff_time);
450 
451  // Note: We are forced to disable lock analysis here because
452  // Orphan() is called by OrphanablePtr<>, which cannot have lock
453  // annotations for this particular caller.
454  void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
455 
456  private:
457  static void OnBackoffTimer(void* args, grpc_error_handle error);
458 
459  RefCountedPtr<Entry> entry_;
460  bool armed_ ABSL_GUARDED_BY(&RlsLb::mu_) = true;
463  };
464 
465  RefCountedPtr<RlsLb> lb_policy_;
466 
467  bool is_shutdown_ ABSL_GUARDED_BY(&RlsLb::mu_) = false;
468 
469  // Backoff states
471  std::unique_ptr<BackOff> backoff_state_ ABSL_GUARDED_BY(&RlsLb::mu_);
472  Timestamp backoff_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
474  Timestamp backoff_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
476  OrphanablePtr<BackoffTimer> backoff_timer_;
477 
478  // RLS response states
479  std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
481  std::string header_data_ ABSL_GUARDED_BY(&RlsLb::mu_);
482  Timestamp data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
485 
486  Timestamp min_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_);
488  };
489 
490  explicit Cache(RlsLb* lb_policy);
491 
492  // Finds an entry from the cache that corresponds to a key. If an entry is
493  // not found, nullptr is returned. Otherwise, the entry is considered
494  // recently used and its order in the LRU list of the cache is updated.
495  Entry* Find(const RequestKey& key)
497 
498  // Finds an entry from the cache that corresponds to a key. If an entry is
499  // not found, an entry is created, inserted in the cache, and returned to
500  // the caller. Otherwise, the entry found is returned to the caller. The
501  // entry returned to the user is considered recently used and its order in
502  // the LRU list of the cache is updated.
503  Entry* FindOrInsert(const RequestKey& key)
505 
506  // Resizes the cache. If the new cache size is greater than the current size
507  // of the cache, do nothing. Otherwise, evict the oldest entries that
508  // exceed the new size limit of the cache.
509  void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
510 
511  // Resets backoff of all the cache entries.
512  void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
513 
514  // Shutdown the cache; clean-up and orphan all the stored cache entries.
516 
517  private:
518  static void OnCleanupTimer(void* arg, grpc_error_handle error);
519 
520  // Returns the entry size for a given key.
521  static size_t EntrySizeForKey(const RequestKey& key);
522 
523  // Evicts oversized cache elements when the current size is greater than
524  // the specified limit.
525  void MaybeShrinkSize(size_t bytes)
527 
528  RlsLb* lb_policy_;
529 
530  size_t size_limit_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
531  size_t size_ ABSL_GUARDED_BY(&RlsLb::mu_) = 0;
532 
533  std::list<RequestKey> lru_list_ ABSL_GUARDED_BY(&RlsLb::mu_);
534  std::unordered_map<RequestKey, OrphanablePtr<Entry>, absl::Hash<RequestKey>>
535  map_ ABSL_GUARDED_BY(&RlsLb::mu_);
538  };
539 
540  // Channel for communicating with the RLS server.
541  // Contains throttling logic for RLS requests.
542  class RlsChannel : public InternallyRefCounted<RlsChannel> {
543  public:
544  explicit RlsChannel(RefCountedPtr<RlsLb> lb_policy);
545 
546  // Shuts down the channel.
547  void Orphan() override;
548 
549  // Starts an RLS call.
550  // If stale_entry is non-null, it points to the entry containing
551  // stale data for the key.
552  void StartRlsCall(const RequestKey& key, Cache::Entry* stale_entry)
554 
555  // Reports the result of an RLS call to the throttle.
556  void ReportResponseLocked(bool response_succeeded)
558 
559  // Checks if a proposed RLS call should be throttled.
561  return throttle_.ShouldThrottle();
562  }
563 
564  // Resets the channel's backoff.
565  void ResetBackoff();
566 
567  grpc_channel* channel() const { return channel_; }
568 
569  private:
570  // Watches the state of the RLS channel. Notifies the LB policy when
571  // the channel was previously in TRANSIENT_FAILURE and then becomes READY.
572  class StateWatcher : public AsyncConnectivityStateWatcherInterface {
573  public:
574  explicit StateWatcher(RefCountedPtr<RlsChannel> rls_channel)
575  : AsyncConnectivityStateWatcherInterface(
576  rls_channel->lb_policy_->work_serializer()),
577  rls_channel_(std::move(rls_channel)) {}
578 
579  private:
580  void OnConnectivityStateChange(grpc_connectivity_state new_state,
581  const absl::Status& status) override;
582 
583  RefCountedPtr<RlsChannel> rls_channel_;
585  };
586 
587  // Throttle state for RLS requests.
588  class Throttle {
589  public:
590  explicit Throttle(
591  Duration window_size = kDefaultThrottleWindowSize,
592  float ratio_for_successes = kDefaultThrottleRatioForSuccesses,
593  int padding = kDefaultThrottlePadding)
594  : window_size_(window_size),
595  ratio_for_successes_(ratio_for_successes),
596  padding_(padding) {}
597 
599 
600  void RegisterResponse(bool success)
602 
603  private:
606  int padding_;
607  std::mt19937 rng_{std::random_device()()};
608 
609  // Logged timestamp of requests.
610  std::deque<Timestamp> requests_ ABSL_GUARDED_BY(&RlsLb::mu_);
611 
612  // Logged timestamps of failures.
613  std::deque<Timestamp> failures_ ABSL_GUARDED_BY(&RlsLb::mu_);
614  };
615 
616  RefCountedPtr<RlsLb> lb_policy_;
617  bool is_shutdown_ = false;
618 
619  grpc_channel* channel_ = nullptr;
620  RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
621  StateWatcher* watcher_ = nullptr;
622  Throttle throttle_ ABSL_GUARDED_BY(&RlsLb::mu_);
623  };
624 
625  // A pending RLS request. Instances will be tracked in request_map_.
626  class RlsRequest : public InternallyRefCounted<RlsRequest> {
627  public:
628  // Asynchronously starts a call on rls_channel for key.
629  // Stores backoff_state, which will be transferred to the data cache
630  // if the RLS request fails.
631  RlsRequest(RefCountedPtr<RlsLb> lb_policy, RlsLb::RequestKey key,
632  RefCountedPtr<RlsChannel> rls_channel,
633  std::unique_ptr<BackOff> backoff_state,
635  std::string stale_header_data);
636  ~RlsRequest() override;
637 
638  // Shuts down the request. If the request is still in flight, it is
639  // cancelled, in which case no response will be added to the cache.
640  void Orphan() override;
641 
642  private:
643  // Callback to be invoked to start the call.
644  static void StartCall(void* arg, grpc_error_handle error);
645 
646  // Helper for StartCall() that runs within the WorkSerializer.
647  void StartCallLocked();
648 
649  // Callback to be invoked when the call is completed.
650  static void OnRlsCallComplete(void* arg, grpc_error_handle error);
651 
652  // Call completion callback running on LB policy WorkSerializer.
653  void OnRlsCallCompleteLocked(grpc_error_handle error);
654 
655  grpc_byte_buffer* MakeRequestProto();
656  ResponseInfo ParseResponseProto();
657 
658  RefCountedPtr<RlsLb> lb_policy_;
659  RlsLb::RequestKey key_;
660  RefCountedPtr<RlsChannel> rls_channel_;
661  std::unique_ptr<BackOff> backoff_state_;
664 
665  // RLS call state.
669  grpc_call* call_ = nullptr;
676  };
677 
678  void ShutdownLocked() override;
679 
680  // Returns a new picker to the channel to trigger reprocessing of
681  // pending picks. Schedules the actual picker update on the ExecCtx
682  // to be run later, so it's safe to invoke this while holding the lock.
683  void UpdatePickerAsync();
684  // Hops into work serializer and calls UpdatePickerLocked().
685  static void UpdatePickerCallback(void* arg, grpc_error_handle error);
686  // Updates the picker in the work serializer.
687  void UpdatePickerLocked() ABSL_LOCKS_EXCLUDED(&mu_);
688 
689  // The name of the server for the channel.
690  std::string server_name_;
691 
692  // Mutex to guard LB policy state that is accessed by the picker.
696  Cache cache_ ABSL_GUARDED_BY(mu_);
697  // Maps an RLS request key to an RlsRequest object that represents a pending
698  // RLS request.
699  std::unordered_map<RequestKey, OrphanablePtr<RlsRequest>,
700  absl::Hash<RequestKey>>
701  request_map_ ABSL_GUARDED_BY(mu_);
702  // The channel on which RLS requests are sent.
703  // Note that this channel may be swapped out when the RLS policy gets
704  // an update. However, when that happens, any existing entries in
705  // request_map_ will continue to use the previous channel.
707 
708  // Accessed only from within WorkSerializer.
711  RefCountedPtr<RlsLbConfig> config_;
712  RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
713  std::map<std::string /*target*/, ChildPolicyWrapper*> child_policy_map_;
714 };
715 
716 //
717 // RlsLb::ChildPolicyWrapper
718 //
719 
720 RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
721  std::string target)
722  : DualRefCounted<ChildPolicyWrapper>(
723  GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "ChildPolicyWrapper"
724  : nullptr),
725  lb_policy_(lb_policy),
726  target_(std::move(target)),
727  picker_(absl::make_unique<QueuePicker>(std::move(lb_policy))) {
728  lb_policy_->child_policy_map_.emplace(target_, this);
729 }
730 
731 void RlsLb::ChildPolicyWrapper::Orphan() {
733  gpr_log(GPR_INFO, "[rlslb %p] ChildPolicyWrapper=%p [%s]: shutdown",
734  lb_policy_.get(), this, target_.c_str());
735  }
736  is_shutdown_ = true;
737  lb_policy_->child_policy_map_.erase(target_);
738  if (child_policy_ != nullptr) {
739  grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
740  lb_policy_->interested_parties());
741  child_policy_.reset();
742  }
743  picker_.reset();
744 }
745 
746 grpc_error_handle InsertOrUpdateChildPolicyField(const std::string& field,
747  const std::string& value,
748  Json* config) {
749  if (config->type() != Json::Type::ARRAY) {
751  "child policy configuration is not an array");
752  }
753  std::vector<grpc_error_handle> error_list;
754  for (Json& child_json : *config->mutable_array()) {
755  if (child_json.type() != Json::Type::OBJECT) {
756  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
757  "child policy item is not an object"));
758  } else {
759  Json::Object& child = *child_json.mutable_object();
760  if (child.size() != 1) {
761  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
762  "child policy item contains more than one field"));
763  } else {
764  Json& child_config_json = child.begin()->second;
765  if (child_config_json.type() != Json::Type::OBJECT) {
766  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
767  "child policy item config is not an object"));
768  } else {
769  Json::Object& child_config = *child_config_json.mutable_object();
770  child_config[field] = Json(value);
771  }
772  }
773  }
774  }
776  absl::StrCat("errors when inserting field \"", field,
777  "\" for child policy"),
778  &error_list);
779 }
780 
781 void RlsLb::ChildPolicyWrapper::StartUpdate() {
782  Json child_policy_config = lb_policy_->config_->child_policy_config();
783  grpc_error_handle error = InsertOrUpdateChildPolicyField(
784  lb_policy_->config_->child_policy_config_target_field_name(), target_,
785  &child_policy_config);
788  gpr_log(
789  GPR_INFO,
790  "[rlslb %p] ChildPolicyWrapper=%p [%s]: validating update, config: %s",
791  lb_policy_.get(), this, target_.c_str(),
792  child_policy_config.Dump().c_str());
793  }
795  child_policy_config, &error);
796  // Returned RLS target fails the validation.
797  if (!GRPC_ERROR_IS_NONE(error)) {
800  "[rlslb %p] ChildPolicyWrapper=%p [%s]: config failed to parse: "
801  "%s; config: %s",
802  lb_policy_.get(), this, target_.c_str(),
804  child_policy_config.Dump().c_str());
805  }
806  pending_config_.reset();
807  picker_ = absl::make_unique<TransientFailurePicker>(
810  child_policy_.reset();
811  }
812 }
813 
814 void RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
815  // If pending_config_ is not set, that means StartUpdate() failed, so
816  // there's nothing to do here.
817  if (pending_config_ == nullptr) return;
818  // If child policy doesn't yet exist, create it.
819  if (child_policy_ == nullptr) {
820  Args create_args;
821  create_args.work_serializer = lb_policy_->work_serializer();
822  create_args.channel_control_helper = absl::make_unique<ChildPolicyHelper>(
823  WeakRef(DEBUG_LOCATION, "ChildPolicyHelper"));
824  create_args.args = lb_policy_->channel_args_;
825  child_policy_ = MakeOrphanable<ChildPolicyHandler>(std::move(create_args),
829  "[rlslb %p] ChildPolicyWrapper=%p [%s], created new child policy "
830  "handler %p",
831  lb_policy_.get(), this, target_.c_str(), child_policy_.get());
832  }
833  grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
834  lb_policy_->interested_parties());
835  }
836  // Send the child the updated config.
839  "[rlslb %p] ChildPolicyWrapper=%p [%s], updating child policy "
840  "handler %p",
841  lb_policy_.get(), this, target_.c_str(), child_policy_.get());
842  }
843  UpdateArgs update_args;
844  update_args.config = std::move(pending_config_);
845  update_args.addresses = lb_policy_->addresses_;
846  update_args.args = grpc_channel_args_copy(lb_policy_->channel_args_);
847  child_policy_->UpdateLocked(std::move(update_args));
848 }
849 
850 //
851 // RlsLb::ChildPolicyWrapper::ChildPolicyHelper
852 //
853 
854 RefCountedPtr<SubchannelInterface>
855 RlsLb::ChildPolicyWrapper::ChildPolicyHelper::CreateSubchannel(
856  ServerAddress address, const grpc_channel_args& args) {
859  "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
860  "CreateSubchannel() for %s",
861  wrapper_->lb_policy_.get(), wrapper_.get(),
862  wrapper_->target_.c_str(), this, address.ToString().c_str());
863  }
864  if (wrapper_->is_shutdown_) return nullptr;
865  return wrapper_->lb_policy_->channel_control_helper()->CreateSubchannel(
866  std::move(address), args);
867 }
868 
869 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
871  std::unique_ptr<SubchannelPicker> picker) {
874  "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
875  "UpdateState(state=%s, status=%s, picker=%p)",
876  wrapper_->lb_policy_.get(), wrapper_.get(),
877  wrapper_->target_.c_str(), this, ConnectivityStateName(state),
878  status.ToString().c_str(), picker.get());
879  }
880  {
881  MutexLock lock(&wrapper_->lb_policy_->mu_);
882  if (wrapper_->is_shutdown_) return;
883  if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
885  return;
886  }
887  wrapper_->connectivity_state_ = state;
888  GPR_DEBUG_ASSERT(picker != nullptr);
889  if (picker != nullptr) {
890  wrapper_->picker_ = std::move(picker);
891  }
892  }
893  wrapper_->lb_policy_->UpdatePickerLocked();
894 }
895 
896 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::RequestReresolution() {
899  "[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
900  "RequestReresolution",
901  wrapper_->lb_policy_.get(), wrapper_.get(),
902  wrapper_->target_.c_str(), this);
903  }
904  if (wrapper_->is_shutdown_) return;
905  wrapper_->lb_policy_->channel_control_helper()->RequestReresolution();
906 }
907 
908 absl::string_view RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetAuthority() {
909  return wrapper_->lb_policy_->channel_control_helper()->GetAuthority();
910 }
911 
912 void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::AddTraceEvent(
913  TraceSeverity severity, absl::string_view message) {
914  if (wrapper_->is_shutdown_) return;
915  wrapper_->lb_policy_->channel_control_helper()->AddTraceEvent(severity,
916  message);
917 }
918 
919 //
920 // RlsLb::Picker
921 //
922 
923 // Builds the key to be used for a request based on path and initial_metadata.
924 std::map<std::string, std::string> BuildKeyMap(
925  const RlsLbConfig::KeyBuilderMap& key_builder_map, absl::string_view path,
926  const std::string& host,
927  const LoadBalancingPolicy::MetadataInterface* initial_metadata) {
928  size_t last_slash_pos = path.npos; // May need this a few times, so cache it.
929  // Find key builder for this path.
930  auto it = key_builder_map.find(std::string(path));
931  if (it == key_builder_map.end()) {
932  // Didn't find exact match, try method wildcard.
933  last_slash_pos = path.rfind("/");
934  GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
935  if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
936  std::string service(path.substr(0, last_slash_pos + 1));
937  it = key_builder_map.find(service);
938  if (it == key_builder_map.end()) return {};
939  }
940  const RlsLbConfig::KeyBuilder* key_builder = &it->second;
941  // Construct key map using key builder.
942  std::map<std::string, std::string> key_map;
943  // Add header keys.
944  for (const auto& p : key_builder->header_keys) {
945  const std::string& key = p.first;
946  const std::vector<std::string>& header_names = p.second;
947  for (const std::string& header_name : header_names) {
950  initial_metadata->Lookup(header_name, &buffer);
951  if (value.has_value()) {
953  break;
954  }
955  }
956  }
957  // Add constant keys.
958  key_map.insert(key_builder->constant_keys.begin(),
959  key_builder->constant_keys.end());
960  // Add host key.
961  if (!key_builder->host_key.empty()) {
962  key_map[key_builder->host_key] = host;
963  }
964  // Add service key.
965  if (!key_builder->service_key.empty()) {
966  if (last_slash_pos == path.npos) {
967  last_slash_pos = path.rfind("/");
968  GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
969  if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
970  }
971  key_map[key_builder->service_key] =
972  std::string(path.substr(1, last_slash_pos - 1));
973  }
974  // Add method key.
975  if (!key_builder->method_key.empty()) {
976  if (last_slash_pos == path.npos) {
977  last_slash_pos = path.rfind("/");
978  GPR_DEBUG_ASSERT(last_slash_pos != path.npos);
979  if (GPR_UNLIKELY(last_slash_pos == path.npos)) return {};
980  }
981  key_map[key_builder->method_key] =
982  std::string(path.substr(last_slash_pos + 1));
983  }
984  return key_map;
985 }
986 
987 RlsLb::Picker::Picker(RefCountedPtr<RlsLb> lb_policy)
988  : lb_policy_(std::move(lb_policy)), config_(lb_policy_->config_) {
989  if (lb_policy_->default_child_policy_ != nullptr) {
991  lb_policy_->default_child_policy_->Ref(DEBUG_LOCATION, "Picker");
992  }
993 }
994 
995 RlsLb::Picker::~Picker() {
996  // It's not safe to unref the default child policy in the picker,
997  // since that needs to be done in the WorkSerializer.
998  if (default_child_policy_ != nullptr) {
999  auto* default_child_policy = default_child_policy_.release();
1000  lb_policy_->work_serializer()->Run(
1001  [default_child_policy]() {
1002  default_child_policy->Unref(DEBUG_LOCATION, "Picker");
1003  },
1004  DEBUG_LOCATION);
1005  }
1006 }
1007 
1008 LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
1009  // Construct key for request.
1010  RequestKey key = {BuildKeyMap(config_->key_builder_map(), args.path,
1011  lb_policy_->server_name_,
1012  args.initial_metadata)};
1014  gpr_log(GPR_INFO, "[rlslb %p] picker=%p: request keys: %s",
1015  lb_policy_.get(), this, key.ToString().c_str());
1016  }
1017  Timestamp now = ExecCtx::Get()->Now();
1018  MutexLock lock(&lb_policy_->mu_);
1019  if (lb_policy_->is_shutdown_) {
1020  return PickResult::Fail(
1021  absl::UnavailableError("LB policy already shut down"));
1022  }
1023  // Check if there's a cache entry.
1024  Cache::Entry* entry = lb_policy_->cache_.Find(key);
1025  // If there is no cache entry, or if the cache entry is not in backoff
1026  // and has a stale time in the past, and there is not already a
1027  // pending RLS request for this key, then try to start a new RLS request.
1028  if ((entry == nullptr ||
1029  (entry->stale_time() < now && entry->backoff_time() < now)) &&
1030  lb_policy_->request_map_.find(key) == lb_policy_->request_map_.end()) {
1031  // Check if requests are being throttled.
1032  if (lb_policy_->rls_channel_->ShouldThrottle()) {
1033  // Request is throttled.
1034  // If there is no non-expired data in the cache, then we use the
1035  // default target if set, or else we fail the pick.
1036  if (entry == nullptr || entry->data_expiration_time() < now) {
1037  if (default_child_policy_ != nullptr) {
1039  gpr_log(GPR_INFO,
1040  "[rlslb %p] picker=%p: RLS call throttled; "
1041  "using default target",
1042  lb_policy_.get(), this);
1043  }
1044  return default_child_policy_->Pick(args);
1045  }
1047  gpr_log(GPR_INFO,
1048  "[rlslb %p] picker=%p: RLS call throttled; failing pick",
1049  lb_policy_.get(), this);
1050  }
1051  return PickResult::Fail(
1052  absl::UnavailableError("RLS request throttled"));
1053  }
1054  }
1055  // Start the RLS call.
1056  lb_policy_->rls_channel_->StartRlsCall(
1057  key, (entry == nullptr || entry->data_expiration_time() < now) ? nullptr
1058  : entry);
1059  }
1060  // If the cache entry exists, see if it has usable data.
1061  if (entry != nullptr) {
1062  // If the entry has non-expired data, use it.
1063  if (entry->data_expiration_time() >= now) {
1065  gpr_log(GPR_INFO, "[rlslb %p] picker=%p: using cache entry %p",
1066  lb_policy_.get(), this, entry);
1067  }
1068  return entry->Pick(args);
1069  }
1070  // If the entry is in backoff, then use the default target if set,
1071  // or else fail the pick.
1072  if (entry->backoff_time() >= now) {
1073  if (default_child_policy_ != nullptr) {
1075  gpr_log(
1076  GPR_INFO,
1077  "[rlslb %p] picker=%p: RLS call in backoff; using default target",
1078  lb_policy_.get(), this);
1079  }
1080  return default_child_policy_->Pick(args);
1081  }
1083  gpr_log(GPR_INFO,
1084  "[rlslb %p] picker=%p: RLS call in backoff; failing pick",
1085  lb_policy_.get(), this);
1086  }
1088  absl::StrCat("RLS request failed: ", entry->status().ToString())));
1089  }
1090  }
1091  // RLS call pending. Queue the pick.
1093  gpr_log(GPR_INFO, "[rlslb %p] picker=%p: RLS request pending; queuing pick",
1094  lb_policy_.get(), this);
1095  }
1096  return PickResult::Queue();
1097 }
1098 
1099 //
1100 // RlsLb::Cache::Entry::BackoffTimer
1101 //
1102 
1103 RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
1104  Timestamp backoff_time)
1105  : entry_(std::move(entry)) {
1106  GRPC_CLOSURE_INIT(&backoff_timer_callback_, OnBackoffTimer, this, nullptr);
1107  Ref(DEBUG_LOCATION, "BackoffTimer").release();
1109 }
1110 
1111 void RlsLb::Cache::Entry::BackoffTimer::Orphan() {
1112  if (armed_) {
1113  armed_ = false;
1115  }
1116  Unref(DEBUG_LOCATION, "Orphan");
1117 }
1118 
1119 void RlsLb::Cache::Entry::BackoffTimer::OnBackoffTimer(
1120  void* arg, grpc_error_handle /*error*/) {
1121  auto* self = static_cast<BackoffTimer*>(arg);
1122  self->entry_->lb_policy_->work_serializer()->Run(
1123  [self]() {
1124  RefCountedPtr<BackoffTimer> backoff_timer(self);
1125  {
1126  MutexLock lock(&self->entry_->lb_policy_->mu_);
1128  gpr_log(GPR_INFO,
1129  "[rlslb %p] cache entry=%p %s, armed_=%d: "
1130  "backoff timer fired",
1131  self->entry_->lb_policy_.get(), self->entry_.get(),
1132  self->entry_->is_shutdown_
1133  ? "(shut down)"
1134  : self->entry_->lru_iterator_->ToString().c_str(),
1135  self->armed_);
1136  }
1137  bool cancelled = !self->armed_;
1138  self->armed_ = false;
1139  if (cancelled) return;
1140  }
1141  // The pick was in backoff state and there could be a pick queued if
1142  // wait_for_ready is true. We'll update the picker for that case.
1143  self->entry_->lb_policy_->UpdatePickerLocked();
1144  },
1145  DEBUG_LOCATION);
1146 }
1147 
1148 //
1149 // RlsLb::Cache::Entry
1150 //
1151 
1152 std::unique_ptr<BackOff> MakeCacheEntryBackoff() {
1153  return absl::make_unique<BackOff>(
1154  BackOff::Options()
1155  .set_initial_backoff(kCacheBackoffInitial)
1156  .set_multiplier(kCacheBackoffMultiplier)
1157  .set_jitter(kCacheBackoffJitter)
1158  .set_max_backoff(kCacheBackoffMax));
1159 }
1160 
1161 RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
1162  const RequestKey& key)
1163  : InternallyRefCounted<Entry>(
1164  GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "CacheEntry" : nullptr),
1165  lb_policy_(std::move(lb_policy)),
1166  backoff_state_(MakeCacheEntryBackoff()),
1167  min_expiration_time_(ExecCtx::Get()->Now() + kMinExpirationTime),
1168  lru_iterator_(lb_policy_->cache_.lru_list_.insert(
1169  lb_policy_->cache_.lru_list_.end(), key)) {}
1170 
1171 void RlsLb::Cache::Entry::Orphan() {
1173  gpr_log(GPR_INFO, "[rlslb %p] cache entry=%p %s: cache entry evicted",
1174  lb_policy_.get(), this, lru_iterator_->ToString().c_str());
1175  }
1176  is_shutdown_ = true;
1177  lb_policy_->cache_.lru_list_.erase(lru_iterator_);
1178  lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
1179  backoff_state_.reset();
1180  if (backoff_timer_ != nullptr) {
1181  backoff_timer_.reset();
1182  lb_policy_->UpdatePickerAsync();
1183  }
1184  child_policy_wrappers_.clear();
1185  Unref(DEBUG_LOCATION, "Orphan");
1186 }
1187 
1188 size_t RlsLb::Cache::Entry::Size() const {
1189  // lru_iterator_ is not valid once we're shut down.
1191  return lb_policy_->cache_.EntrySizeForKey(*lru_iterator_);
1192 }
1193 
1194 LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
1195  size_t i = 0;
1196  ChildPolicyWrapper* child_policy_wrapper = nullptr;
1197  // Skip targets before the last one that are in state TRANSIENT_FAILURE.
1198  for (; i < child_policy_wrappers_.size(); ++i) {
1199  child_policy_wrapper = child_policy_wrappers_[i].get();
1200  if (child_policy_wrapper->connectivity_state() ==
1202  i < child_policy_wrappers_.size() - 1) {
1204  gpr_log(GPR_INFO,
1205  "[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR
1206  " of %" PRIuPTR ") in state TRANSIENT_FAILURE; skipping",
1207  lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
1208  child_policy_wrapper->target().c_str(), i,
1209  child_policy_wrappers_.size());
1210  }
1211  continue;
1212  }
1213  break;
1214  }
1215  // Child policy not in TRANSIENT_FAILURE or is the last target in
1216  // the list, so delegate.
1218  gpr_log(GPR_INFO,
1219  "[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR " of %" PRIuPTR
1220  ") in state %s; delegating",
1221  lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
1222  child_policy_wrapper->target().c_str(), i,
1223  child_policy_wrappers_.size(),
1224  ConnectivityStateName(child_policy_wrapper->connectivity_state()));
1225  }
1226  // Add header data.
1227  // Note that even if the target we're using is in TRANSIENT_FAILURE,
1228  // the pick might still succeed (e.g., if the child is ring_hash), so
1229  // we need to pass the right header info down in all cases.
1230  if (!header_data_.empty()) {
1231  char* copied_header_data =
1232  static_cast<char*>(args.call_state->Alloc(header_data_.length() + 1));
1233  strcpy(copied_header_data, header_data_.c_str());
1234  args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
1235  }
1236  return child_policy_wrapper->Pick(args);
1237 }
1238 
1239 void RlsLb::Cache::Entry::ResetBackoff() {
1240  backoff_time_ = Timestamp::InfPast();
1241  backoff_timer_.reset();
1242 }
1243 
1244 bool RlsLb::Cache::Entry::ShouldRemove() const {
1245  Timestamp now = ExecCtx::Get()->Now();
1246  return data_expiration_time_ < now && backoff_expiration_time_ < now;
1247 }
1248 
1249 bool RlsLb::Cache::Entry::CanEvict() const {
1250  Timestamp now = ExecCtx::Get()->Now();
1251  return min_expiration_time_ < now;
1252 }
1253 
1254 void RlsLb::Cache::Entry::MarkUsed() {
1255  auto& lru_list = lb_policy_->cache_.lru_list_;
1256  auto new_it = lru_list.insert(lru_list.end(), *lru_iterator_);
1257  lru_list.erase(lru_iterator_);
1258  lru_iterator_ = new_it;
1259 }
1260 
1261 std::vector<RlsLb::ChildPolicyWrapper*>
1262 RlsLb::Cache::Entry::OnRlsResponseLocked(
1263  ResponseInfo response, std::unique_ptr<BackOff> backoff_state) {
1264  // Move the entry to the end of the LRU list.
1265  MarkUsed();
1266  // If the request failed, store the failed status and update the
1267  // backoff state.
1268  if (!response.status.ok()) {
1269  status_ = response.status;
1270  if (backoff_state != nullptr) {
1271  backoff_state_ = std::move(backoff_state);
1272  } else {
1273  backoff_state_ = MakeCacheEntryBackoff();
1274  }
1275  backoff_time_ = backoff_state_->NextAttemptTime();
1276  Timestamp now = ExecCtx::Get()->Now();
1277  backoff_expiration_time_ = now + (backoff_time_ - now) * 2;
1278  backoff_timer_ = MakeOrphanable<BackoffTimer>(
1279  Ref(DEBUG_LOCATION, "BackoffTimer"), backoff_time_);
1280  lb_policy_->UpdatePickerAsync();
1281  return {};
1282  }
1283  // Request succeeded, so store the result.
1284  header_data_ = std::move(response.header_data);
1285  Timestamp now = ExecCtx::Get()->Now();
1286  data_expiration_time_ = now + lb_policy_->config_->max_age();
1287  stale_time_ = now + lb_policy_->config_->stale_age();
1288  status_ = absl::OkStatus();
1289  backoff_state_.reset();
1290  backoff_time_ = Timestamp::InfPast();
1291  backoff_expiration_time_ = Timestamp::InfPast();
1292  // Check if we need to update this list of targets.
1293  bool targets_changed = [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
1294  if (child_policy_wrappers_.size() != response.targets.size()) return true;
1295  for (size_t i = 0; i < response.targets.size(); ++i) {
1296  if (child_policy_wrappers_[i]->target() != response.targets[i]) {
1297  return true;
1298  }
1299  }
1300  return false;
1301  }();
1302  if (!targets_changed) {
1303  // Targets didn't change, so we're not updating the list of child
1304  // policies. Return a new picker so that any queued requests can be
1305  // re-processed.
1306  lb_policy_->UpdatePickerAsync();
1307  return {};
1308  }
1309  // Target list changed, so update it.
1310  std::set<absl::string_view> old_targets;
1311  for (RefCountedPtr<ChildPolicyWrapper>& child_policy_wrapper :
1312  child_policy_wrappers_) {
1313  old_targets.emplace(child_policy_wrapper->target());
1314  }
1315  bool update_picker = false;
1316  std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1317  std::vector<RefCountedPtr<ChildPolicyWrapper>> new_child_policy_wrappers;
1318  new_child_policy_wrappers.reserve(response.targets.size());
1319  for (std::string& target : response.targets) {
1320  auto it = lb_policy_->child_policy_map_.find(target);
1321  if (it == lb_policy_->child_policy_map_.end()) {
1322  auto new_child = MakeRefCounted<ChildPolicyWrapper>(
1323  lb_policy_->Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
1324  new_child->StartUpdate();
1325  child_policies_to_finish_update.push_back(new_child.get());
1326  new_child_policy_wrappers.emplace_back(std::move(new_child));
1327  } else {
1328  new_child_policy_wrappers.emplace_back(
1329  it->second->Ref(DEBUG_LOCATION, "CacheEntry"));
1330  // If the target already existed but was not previously used for
1331  // this key, then we'll need to update the picker, since we
1332  // didn't actually create a new child policy, which would have
1333  // triggered an RLS picker update when it returned its first picker.
1334  if (old_targets.find(target) == old_targets.end()) {
1335  update_picker = true;
1336  }
1337  }
1338  }
1339  child_policy_wrappers_ = std::move(new_child_policy_wrappers);
1340  if (update_picker) {
1341  lb_policy_->UpdatePickerAsync();
1342  }
1343  return child_policies_to_finish_update;
1344 }
1345 
1346 //
1347 // RlsLb::Cache
1348 //
1349 
1350 RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) {
1351  Timestamp now = ExecCtx::Get()->Now();
1352  lb_policy_->Ref(DEBUG_LOCATION, "CacheCleanupTimer").release();
1353  GRPC_CLOSURE_INIT(&timer_callback_, OnCleanupTimer, this, nullptr);
1354  grpc_timer_init(&cleanup_timer_, now + kCacheCleanupTimerInterval,
1355  &timer_callback_);
1356 }
1357 
1358 RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
1359  auto it = map_.find(key);
1360  if (it == map_.end()) return nullptr;
1361  it->second->MarkUsed();
1362  return it->second.get();
1363 }
1364 
1365 RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
1366  auto it = map_.find(key);
1367  // If not found, create new entry.
1368  if (it == map_.end()) {
1369  size_t entry_size = EntrySizeForKey(key);
1370  MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size));
1371  Entry* entry =
1372  new Entry(lb_policy_->Ref(DEBUG_LOCATION, "CacheEntry"), key);
1373  map_.emplace(key, OrphanablePtr<Entry>(entry));
1374  size_ += entry_size;
1376  gpr_log(GPR_INFO, "[rlslb %p] key=%s: cache entry added, entry=%p",
1377  lb_policy_, key.ToString().c_str(), entry);
1378  }
1379  return entry;
1380  }
1381  // Entry found, so use it.
1383  gpr_log(GPR_INFO, "[rlslb %p] key=%s: found cache entry %p", lb_policy_,
1384  key.ToString().c_str(), it->second.get());
1385  }
1386  it->second->MarkUsed();
1387  return it->second.get();
1388 }
1389 
1390 void RlsLb::Cache::Resize(size_t bytes) {
1392  gpr_log(GPR_INFO, "[rlslb %p] resizing cache to %" PRIuPTR " bytes",
1393  lb_policy_, bytes);
1394  }
1395  size_limit_ = bytes;
1396  MaybeShrinkSize(size_limit_);
1397 }
1398 
1399 void RlsLb::Cache::ResetAllBackoff() {
1400  for (auto& p : map_) {
1401  p.second->ResetBackoff();
1402  }
1403  lb_policy_->UpdatePickerAsync();
1404 }
1405 
1406 void RlsLb::Cache::Shutdown() {
1407  map_.clear();
1408  lru_list_.clear();
1410 }
1411 
1412 void RlsLb::Cache::OnCleanupTimer(void* arg, grpc_error_handle error) {
1413  Cache* cache = static_cast<Cache*>(arg);
1414  (void)GRPC_ERROR_REF(error);
1415  cache->lb_policy_->work_serializer()->Run(
1416  [cache, error]() {
1417  RefCountedPtr<RlsLb> lb_policy(cache->lb_policy_);
1419  gpr_log(GPR_INFO, "[rlslb %p] cache cleanup timer fired (%s)",
1420  cache->lb_policy_, grpc_error_std_string(error).c_str());
1421  }
1422  if (error == GRPC_ERROR_CANCELLED) return;
1423  MutexLock lock(&lb_policy->mu_);
1424  if (lb_policy->is_shutdown_) return;
1425  for (auto it = cache->map_.begin(); it != cache->map_.end();) {
1426  if (GPR_UNLIKELY(it->second->ShouldRemove() &&
1427  it->second->CanEvict())) {
1428  cache->size_ -= it->second->Size();
1429  it = cache->map_.erase(it);
1430  } else {
1431  ++it;
1432  }
1433  }
1434  Timestamp now = ExecCtx::Get()->Now();
1435  lb_policy.release();
1436  grpc_timer_init(&cache->cleanup_timer_,
1437  now + kCacheCleanupTimerInterval,
1438  &cache->timer_callback_);
1439  },
1440  DEBUG_LOCATION);
1441 }
1442 
1443 size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
1444  // Key is stored twice, once in LRU list and again in the cache map.
1445  return (key.Size() * 2) + sizeof(Entry);
1446 }
1447 
1448 void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
1449  while (size_ > bytes) {
1450  auto lru_it = lru_list_.begin();
1451  if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
1452  auto map_it = map_.find(*lru_it);
1453  GPR_ASSERT(map_it != map_.end());
1454  if (!map_it->second->CanEvict()) break;
1456  gpr_log(GPR_INFO, "[rlslb %p] LRU eviction: removing entry %p %s",
1457  lb_policy_, map_it->second.get(), lru_it->ToString().c_str());
1458  }
1459  size_ -= map_it->second->Size();
1460  map_.erase(map_it);
1461  }
1463  gpr_log(GPR_INFO,
1464  "[rlslb %p] LRU pass complete: desired size=%" PRIuPTR
1465  " size=%" PRIuPTR,
1466  lb_policy_, bytes, size_);
1467  }
1468 }
1469 
1470 //
1471 // RlsLb::RlsChannel::StateWatcher
1472 //
1473 
1474 void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
1475  grpc_connectivity_state new_state, const absl::Status& status) {
1476  auto* lb_policy = rls_channel_->lb_policy_.get();
1478  gpr_log(GPR_INFO,
1479  "[rlslb %p] RlsChannel=%p StateWatcher=%p: "
1480  "state changed to %s (%s)",
1481  lb_policy, rls_channel_.get(), this,
1482  ConnectivityStateName(new_state), status.ToString().c_str());
1483  }
1484  if (rls_channel_->is_shutdown_) return;
1485  MutexLock lock(&lb_policy->mu_);
1486  if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) {
1487  was_transient_failure_ = false;
1488  // Reset the backoff of all cache entries, so that we don't
1489  // double-penalize if an RLS request fails while the channel is
1490  // down, since the throttling for the channel being down is handled
1491  // at the channel level instead of in the individual cache entries.
1492  lb_policy->cache_.ResetAllBackoff();
1493  } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1494  was_transient_failure_ = true;
1495  }
1496 }
1497 
1498 //
1499 // RlsLb::RlsChannel::Throttle
1500 //
1501 
1503  Timestamp now = ExecCtx::Get()->Now();
1504  while (!requests_.empty() && now - requests_.front() > window_size_) {
1505  requests_.pop_front();
1506  }
1507  while (!failures_.empty() && now - failures_.front() > window_size_) {
1508  failures_.pop_front();
1509  }
1510  // Compute probability of throttling.
1511  float num_requests = requests_.size();
1512  float num_successes = num_requests - failures_.size();
1513  // Note: it's possible that this ratio will be negative, in which case
1514  // no throttling will be done.
1515  float throttle_probability =
1516  (num_requests - (num_successes * ratio_for_successes_)) /
1517  (num_requests + padding_);
1518  // Generate a random number for the request.
1519  std::uniform_real_distribution<float> dist(0, 1.0);
1520  // Check if we should throttle the request.
1521  bool throttle = dist(rng_) < throttle_probability;
1522  // If we're throttling, record the request and the failure.
1523  if (throttle) {
1524  requests_.push_back(now);
1525  failures_.push_back(now);
1526  }
1527  return throttle;
1528 }
1529 
1530 void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
1531  Timestamp now = ExecCtx::Get()->Now();
1532  requests_.push_back(now);
1533  if (!success) failures_.push_back(now);
1534 }
1535 
1536 //
1537 // RlsLb::RlsChannel
1538 //
1539 
1540 RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
1541  : InternallyRefCounted<RlsChannel>(
1542  GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsChannel" : nullptr),
1543  lb_policy_(std::move(lb_policy)) {
1544  // Get channel creds from parent channel.
1545  // TODO(roth): Once we eliminate insecure builds, get this via a
1546  // method on the helper instead of digging through channel args.
1547  grpc_channel_credentials* creds =
1549  // Use the parent channel's authority.
1550  std::string authority(lb_policy_->channel_control_helper()->GetAuthority());
1553  const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
1554  const_cast<char*>(authority.c_str())),
1556  const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
1557  };
1558  // Propagate fake security connector expected targets, if any.
1559  // (This is ugly, but it seems better than propagating all channel args
1560  // from the parent channel by default and then having a giant
1561  // exclude list of args to strip out, like we do in grpclb.)
1562  const char* fake_security_expected_targets = grpc_channel_args_find_string(
1564  if (fake_security_expected_targets != nullptr) {
1566  const_cast<char*>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS),
1567  const_cast<char*>(fake_security_expected_targets)));
1568  }
1569  // Add service config args if needed.
1570  const std::string& service_config =
1571  lb_policy_->config_->rls_channel_service_config();
1572  if (!service_config.empty()) {
1574  const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
1575  const_cast<char*>(service_config.c_str())));
1577  const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 1));
1578  }
1579  grpc_channel_args rls_channel_args = {args.size(), args.data()};
1580  channel_ = grpc_channel_create(lb_policy_->config_->lookup_service().c_str(),
1581  creds, &rls_channel_args);
1583  gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p: created channel %p for %s",
1584  lb_policy_.get(), this, channel_,
1585  lb_policy_->config_->lookup_service().c_str());
1586  }
1587  if (channel_ != nullptr) {
1588  // Set up channelz linkage.
1589  channelz::ChannelNode* child_channelz_node =
1591  channelz::ChannelNode* parent_channelz_node =
1592  grpc_channel_args_find_pointer<channelz::ChannelNode>(
1593  lb_policy_->channel_args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1594  if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1595  parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1596  parent_channelz_node_ = parent_channelz_node->Ref();
1597  }
1598  // Start connectivity watch.
1599  ClientChannel* client_channel =
1600  ClientChannel::GetFromChannel(Channel::FromC(channel_));
1601  GPR_ASSERT(client_channel != nullptr);
1602  watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
1603  client_channel->AddConnectivityWatcher(
1605  OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1606  }
1607 }
1608 
1609 void RlsLb::RlsChannel::Orphan() {
1611  gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p, channel=%p: shutdown",
1612  lb_policy_.get(), this, channel_);
1613  }
1614  is_shutdown_ = true;
1615  if (channel_ != nullptr) {
1616  // Remove channelz linkage.
1617  if (parent_channelz_node_ != nullptr) {
1618  channelz::ChannelNode* child_channelz_node =
1620  GPR_ASSERT(child_channelz_node != nullptr);
1621  parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1622  }
1623  // Stop connectivity watch.
1624  if (watcher_ != nullptr) {
1625  ClientChannel* client_channel =
1626  ClientChannel::GetFromChannel(Channel::FromC(channel_));
1627  GPR_ASSERT(client_channel != nullptr);
1628  client_channel->RemoveConnectivityWatcher(watcher_);
1629  watcher_ = nullptr;
1630  }
1632  }
1633  Unref(DEBUG_LOCATION, "Orphan");
1634 }
1635 
1636 void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key,
1637  Cache::Entry* stale_entry) {
1638  std::unique_ptr<BackOff> backoff_state;
1641  std::string stale_header_data;
1642  if (stale_entry != nullptr) {
1643  backoff_state = stale_entry->TakeBackoffState();
1645  stale_header_data = stale_entry->header_data();
1646  }
1647  lb_policy_->request_map_.emplace(
1648  key, MakeOrphanable<RlsRequest>(
1649  lb_policy_->Ref(DEBUG_LOCATION, "RlsRequest"), key,
1650  lb_policy_->rls_channel_->Ref(DEBUG_LOCATION, "RlsRequest"),
1651  std::move(backoff_state), reason, std::move(stale_header_data)));
1652 }
1653 
1654 void RlsLb::RlsChannel::ReportResponseLocked(bool response_succeeded) {
1655  throttle_.RegisterResponse(response_succeeded);
1656 }
1657 
1658 void RlsLb::RlsChannel::ResetBackoff() {
1659  GPR_DEBUG_ASSERT(channel_ != nullptr);
1661 }
1662 
1663 //
1664 // RlsLb::RlsRequest
1665 //
1666 
1667 RlsLb::RlsRequest::RlsRequest(RefCountedPtr<RlsLb> lb_policy, RequestKey key,
1668  RefCountedPtr<RlsChannel> rls_channel,
1669  std::unique_ptr<BackOff> backoff_state,
1671  std::string stale_header_data)
1672  : InternallyRefCounted<RlsRequest>(
1673  GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "RlsRequest" : nullptr),
1674  lb_policy_(std::move(lb_policy)),
1675  key_(std::move(key)),
1676  rls_channel_(std::move(rls_channel)),
1677  backoff_state_(std::move(backoff_state)),
1678  reason_(reason),
1679  stale_header_data_(std::move(stale_header_data)) {
1681  gpr_log(GPR_INFO,
1682  "[rlslb %p] rls_request=%p: RLS request created for key %s",
1683  lb_policy_.get(), this, key_.ToString().c_str());
1684  }
1685  GRPC_CLOSURE_INIT(&call_complete_cb_, OnRlsCallComplete, this, nullptr);
1686  ExecCtx::Run(
1688  GRPC_CLOSURE_INIT(&call_start_cb_, StartCall,
1689  Ref(DEBUG_LOCATION, "StartCall").release(), nullptr),
1690  GRPC_ERROR_NONE);
1691 }
1692 
1693 RlsLb::RlsRequest::~RlsRequest() { GPR_ASSERT(call_ == nullptr); }
1694 
1695 void RlsLb::RlsRequest::Orphan() {
1696  if (call_ != nullptr) {
1698  gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: cancelling RLS call",
1699  lb_policy_.get(), this, key_.ToString().c_str());
1700  }
1702  }
1703  Unref(DEBUG_LOCATION, "Orphan");
1704 }
1705 
1706 void RlsLb::RlsRequest::StartCall(void* arg, grpc_error_handle /*error*/) {
1707  auto* request = static_cast<RlsRequest*>(arg);
1708  request->lb_policy_->work_serializer()->Run(
1709  [request]() {
1710  request->StartCallLocked();
1711  request->Unref(DEBUG_LOCATION, "StartCall");
1712  },
1713  DEBUG_LOCATION);
1714 }
1715 
1716 void RlsLb::RlsRequest::StartCallLocked() {
1717  {
1718  MutexLock lock(&lb_policy_->mu_);
1719  if (lb_policy_->is_shutdown_) return;
1720  }
1721  Timestamp now = ExecCtx::Get()->Now();
1722  deadline_ = now + lb_policy_->config_->lookup_service_timeout();
1726  rls_channel_->channel(), nullptr, GRPC_PROPAGATE_DEFAULTS,
1727  lb_policy_->interested_parties(),
1728  grpc_slice_from_static_string(kRlsRequestPath), nullptr, deadline_,
1729  nullptr);
1730  grpc_op ops[6];
1731  memset(ops, 0, sizeof(ops));
1732  grpc_op* op = ops;
1734  ++op;
1736  send_message_ = MakeRequestProto();
1738  ++op;
1740  ++op;
1744  ++op;
1747  ++op;
1752  ++op;
1753  Ref(DEBUG_LOCATION, "OnRlsCallComplete").release();
1754  auto call_error = grpc_call_start_batch_and_execute(
1755  call_, ops, static_cast<size_t>(op - ops), &call_complete_cb_);
1756  GPR_ASSERT(call_error == GRPC_CALL_OK);
1757 }
1758 
1759 void RlsLb::RlsRequest::OnRlsCallComplete(void* arg, grpc_error_handle error) {
1760  auto* request = static_cast<RlsRequest*>(arg);
1761  (void)GRPC_ERROR_REF(error);
1762  request->lb_policy_->work_serializer()->Run(
1763  [request, error]() {
1764  request->OnRlsCallCompleteLocked(error);
1765  request->Unref(DEBUG_LOCATION, "OnRlsCallComplete");
1766  },
1767  DEBUG_LOCATION);
1768 }
1769 
1770 void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
1773  gpr_log(GPR_INFO,
1774  "[rlslb %p] rls_request=%p %s, error=%s, status={%d, %s} RLS call "
1775  "response received",
1776  lb_policy_.get(), this, key_.ToString().c_str(),
1778  status_message.c_str());
1779  }
1780  // Parse response.
1781  ResponseInfo response;
1782  if (!GRPC_ERROR_IS_NONE(error)) {
1786  /*http_error=*/nullptr, /*error_string=*/nullptr);
1787  response.status =
1788  absl::Status(static_cast<absl::StatusCode>(code), message);
1789  } else if (status_recv_ != GRPC_STATUS_OK) {
1790  response.status = absl::Status(static_cast<absl::StatusCode>(status_recv_),
1792  } else {
1793  response = ParseResponseProto();
1794  }
1795  // Clean up call state.
1802  call_ = nullptr;
1803  // Return result to cache.
1805  gpr_log(GPR_INFO, "[rlslb %p] rls_request=%p %s: response info: %s",
1806  lb_policy_.get(), this, key_.ToString().c_str(),
1807  response.ToString().c_str());
1808  }
1809  std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1810  {
1811  MutexLock lock(&lb_policy_->mu_);
1812  if (lb_policy_->is_shutdown_) return;
1813  rls_channel_->ReportResponseLocked(response.status.ok());
1814  Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
1815  child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
1817  lb_policy_->request_map_.erase(key_);
1818  }
1819  // Now that we've released the lock, finish the update on any newly
1820  // created child policies.
1821  for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
1822  child->MaybeFinishUpdate();
1823  }
1824 }
1825 
1826 grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() {
1827  upb::Arena arena;
1831  req, upb_StringView_FromDataAndSize(kGrpc, sizeof(kGrpc) - 1));
1832  for (const auto& kv : key_.key_map) {
1834  req, upb_StringView_FromDataAndSize(kv.first.data(), kv.first.size()),
1835  upb_StringView_FromDataAndSize(kv.second.data(), kv.second.size()),
1836  arena.ptr());
1837  }
1839  if (!stale_header_data_.empty()) {
1842  stale_header_data_.size()));
1843  }
1844  size_t len;
1845  char* buf =
1848  grpc_byte_buffer* byte_buffer = grpc_raw_byte_buffer_create(&send_slice, 1);
1849  grpc_slice_unref_internal(send_slice);
1850  return byte_buffer;
1851 }
1852 
1853 RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
1854  ResponseInfo response_info;
1855  upb::Arena arena;
1858  grpc_slice recv_slice = grpc_byte_buffer_reader_readall(&bbr);
1862  reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(recv_slice)),
1863  GRPC_SLICE_LENGTH(recv_slice), arena.ptr());
1864  grpc_slice_unref_internal(recv_slice);
1865  if (response == nullptr) {
1866  response_info.status = absl::InternalError("cannot parse RLS response");
1867  return response_info;
1868  }
1869  size_t num_targets;
1870  const upb_StringView* targets_strview =
1872  if (num_targets == 0) {
1873  response_info.status =
1874  absl::InvalidArgumentError("RLS response has no target entry");
1875  return response_info;
1876  }
1877  response_info.targets.reserve(num_targets);
1878  for (size_t i = 0; i < num_targets; ++i) {
1879  response_info.targets.emplace_back(targets_strview[i].data,
1880  targets_strview[i].size);
1881  }
1882  upb_StringView header_data_strview =
1884  response_info.header_data =
1885  std::string(header_data_strview.data, header_data_strview.size);
1886  return response_info;
1887 }
1888 
1889 //
1890 // RlsLb
1891 //
1892 
1893 std::string GetServerUri(const grpc_channel_args* args) {
1894  const char* server_uri_str =
1896  GPR_ASSERT(server_uri_str != nullptr);
1897  absl::StatusOr<URI> uri = URI::Parse(server_uri_str);
1898  GPR_ASSERT(uri.ok());
1899  return std::string(absl::StripPrefix(uri->path(), "/"));
1900 }
1901 
1902 RlsLb::RlsLb(Args args)
1903  : LoadBalancingPolicy(std::move(args)),
1904  server_name_(GetServerUri(args.args)),
1905  cache_(this) {
1907  gpr_log(GPR_INFO, "[rlslb %p] policy created", this);
1908  }
1909 }
1910 
1911 void RlsLb::UpdateLocked(UpdateArgs args) {
1913  gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
1914  }
1915  update_in_progress_ = true;
1916  // Swap out config.
1917  RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
1918  config_ = std::move(args.config);
1920  (old_config == nullptr ||
1921  old_config->child_policy_config() != config_->child_policy_config())) {
1922  gpr_log(GPR_INFO, "[rlslb %p] updated child policy config: %s", this,
1923  config_->child_policy_config().Dump().c_str());
1924  }
1925  // Swap out addresses.
1926  // If the new address list is an error and we have an existing address list,
1927  // stick with the existing addresses.
1928  absl::StatusOr<ServerAddressList> old_addresses;
1929  if (args.addresses.ok()) {
1930  old_addresses = std::move(addresses_);
1931  addresses_ = std::move(args.addresses);
1932  } else {
1933  old_addresses = addresses_;
1934  }
1935  // Swap out channel args.
1938  // Determine whether we need to update all child policies.
1939  bool update_child_policies =
1940  old_config == nullptr ||
1941  old_config->child_policy_config() != config_->child_policy_config() ||
1942  old_addresses != addresses_ ||
1944  // If default target changes, swap out child policy.
1945  bool created_default_child = false;
1946  if (old_config == nullptr ||
1947  config_->default_target() != old_config->default_target()) {
1948  if (config_->default_target().empty()) {
1950  gpr_log(GPR_INFO, "[rlslb %p] unsetting default target", this);
1951  }
1952  default_child_policy_.reset();
1953  } else {
1954  auto it = child_policy_map_.find(config_->default_target());
1955  if (it == child_policy_map_.end()) {
1957  gpr_log(GPR_INFO, "[rlslb %p] creating new default target", this);
1958  }
1959  default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
1960  Ref(DEBUG_LOCATION, "ChildPolicyWrapper"),
1961  config_->default_target());
1962  created_default_child = true;
1963  } else {
1965  gpr_log(GPR_INFO,
1966  "[rlslb %p] using existing child for default target", this);
1967  }
1969  it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
1970  }
1971  }
1972  }
1973  // Now grab the lock to swap out the state it guards.
1974  {
1975  MutexLock lock(&mu_);
1976  // Swap out RLS channel if needed.
1977  if (old_config == nullptr ||
1978  config_->lookup_service() != old_config->lookup_service()) {
1979  rls_channel_ =
1980  MakeOrphanable<RlsChannel>(Ref(DEBUG_LOCATION, "RlsChannel"));
1981  }
1982  // Resize cache if needed.
1983  if (old_config == nullptr ||
1984  config_->cache_size_bytes() != old_config->cache_size_bytes()) {
1985  cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()));
1986  }
1987  // Start update of child policies if needed.
1988  if (update_child_policies) {
1990  gpr_log(GPR_INFO, "[rlslb %p] starting child policy updates", this);
1991  }
1992  for (auto& p : child_policy_map_) {
1993  p.second->StartUpdate();
1994  }
1995  } else if (created_default_child) {
1997  gpr_log(GPR_INFO, "[rlslb %p] starting default child policy update",
1998  this);
1999  }
2000  default_child_policy_->StartUpdate();
2001  }
2002  }
2003  // Now that we've released the lock, finish update of child policies.
2004  if (update_child_policies) {
2006  gpr_log(GPR_INFO, "[rlslb %p] finishing child policy updates", this);
2007  }
2008  for (auto& p : child_policy_map_) {
2009  p.second->MaybeFinishUpdate();
2010  }
2011  } else if (created_default_child) {
2013  gpr_log(GPR_INFO, "[rlslb %p] finishing default child policy update",
2014  this);
2015  }
2016  default_child_policy_->MaybeFinishUpdate();
2017  }
2018  update_in_progress_ = false;
2019  // In principle, we need to update the picker here only if the config
2020  // fields used by the picker have changed. However, it seems fragile
2021  // to check individual fields, since the picker logic could change in
2022  // the future to use additional config fields, and we might not
2023  // remember to update the code here. So for now, we just unconditionally
2024  // update the picker here, even though it's probably redundant.
2025  UpdatePickerLocked();
2026 }
2027 
2028 void RlsLb::ExitIdleLocked() {
2029  MutexLock lock(&mu_);
2030  for (auto& child_entry : child_policy_map_) {
2031  child_entry.second->ExitIdleLocked();
2032  }
2033 }
2034 
2035 void RlsLb::ResetBackoffLocked() {
2036  {
2037  MutexLock lock(&mu_);
2038  rls_channel_->ResetBackoff();
2039  cache_.ResetAllBackoff();
2040  }
2041  for (auto& child : child_policy_map_) {
2042  child.second->ResetBackoffLocked();
2043  }
2044 }
2045 
2046 void RlsLb::ShutdownLocked() {
2048  gpr_log(GPR_INFO, "[rlslb %p] policy shutdown", this);
2049  }
2050  MutexLock lock(&mu_);
2051  is_shutdown_ = true;
2052  config_.reset(DEBUG_LOCATION, "ShutdownLocked");
2053  if (channel_args_ != nullptr) {
2055  }
2056  cache_.Shutdown();
2057  request_map_.clear();
2058  rls_channel_.reset();
2059  default_child_policy_.reset();
2060 }
2061 
2062 void RlsLb::UpdatePickerAsync() {
2063  // Run via the ExecCtx, since the caller may be holding the lock, and
2064  // we don't want to be doing that when we hop into the WorkSerializer,
2065  // in case the WorkSerializer callback happens to run inline.
2066  ExecCtx::Run(
2068  GRPC_CLOSURE_CREATE(UpdatePickerCallback,
2069  Ref(DEBUG_LOCATION, "UpdatePickerCallback").release(),
2070  grpc_schedule_on_exec_ctx),
2071  GRPC_ERROR_NONE);
2072 }
2073 
2074 void RlsLb::UpdatePickerCallback(void* arg, grpc_error_handle /*error*/) {
2075  auto* rls_lb = static_cast<RlsLb*>(arg);
2076  rls_lb->work_serializer()->Run(
2077  [rls_lb]() {
2078  RefCountedPtr<RlsLb> lb_policy(rls_lb);
2079  lb_policy->UpdatePickerLocked();
2080  lb_policy.reset(DEBUG_LOCATION, "UpdatePickerCallback");
2081  },
2082  DEBUG_LOCATION);
2083 }
2084 
2085 void RlsLb::UpdatePickerLocked() {
2086  // If we're in the process of propagating an update from our parent to
2087  // our children, ignore any updates that come from the children. We
2088  // will instead return a new picker once the update has been seen by
2089  // all children. This avoids unnecessary picker churn while an update
2090  // is being propagated to our children.
2091  if (update_in_progress_) return;
2093  gpr_log(GPR_INFO, "[rlslb %p] updating picker", this);
2094  }
2096  if (!child_policy_map_.empty()) {
2098  int num_idle = 0;
2099  int num_connecting = 0;
2100  {
2101  MutexLock lock(&mu_);
2102  if (is_shutdown_) return;
2103  for (auto& p : child_policy_map_) {
2104  grpc_connectivity_state child_state = p.second->connectivity_state();
2106  gpr_log(GPR_INFO, "[rlslb %p] target %s in state %s", this,
2107  p.second->target().c_str(),
2108  ConnectivityStateName(child_state));
2109  }
2110  if (child_state == GRPC_CHANNEL_READY) {
2112  break;
2113  } else if (child_state == GRPC_CHANNEL_CONNECTING) {
2114  ++num_connecting;
2115  } else if (child_state == GRPC_CHANNEL_IDLE) {
2116  ++num_idle;
2117  }
2118  }
2119  if (state != GRPC_CHANNEL_READY) {
2120  if (num_connecting > 0) {
2122  } else if (num_idle > 0) {
2124  }
2125  }
2126  }
2127  }
2129  gpr_log(GPR_INFO, "[rlslb %p] reporting state %s", this,
2131  }
2134  status = absl::UnavailableError("no children available");
2135  }
2136  channel_control_helper()->UpdateState(
2137  state, status, absl::make_unique<Picker>(Ref(DEBUG_LOCATION, "Picker")));
2138 }
2139 
2140 //
2141 // RlsLbFactory
2142 //
2143 
2144 grpc_error_handle ParseJsonHeaders(size_t idx, const Json& json,
2145  std::string* key,
2146  std::vector<std::string>* headers) {
2147  if (json.type() != Json::Type::OBJECT) {
2149  "field:headers index:", idx, " error:type should be OBJECT"));
2150  }
2151  std::vector<grpc_error_handle> error_list;
2152  // requiredMatch must not be present.
2153  if (json.object_value().find("requiredMatch") != json.object_value().end()) {
2154  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2155  "field:requiredMatch error:must not be present"));
2156  }
2157  // Find key.
2158  if (ParseJsonObjectField(json.object_value(), "key", key, &error_list) &&
2159  key->empty()) {
2160  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2161  "field:key error:must be non-empty"));
2162  }
2163  // Find headers.
2164  const Json::Array* headers_json = nullptr;
2165  ParseJsonObjectField(json.object_value(), "names", &headers_json,
2166  &error_list);
2167  if (headers_json != nullptr) {
2168  if (headers_json->empty()) {
2169  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2170  "field:names error:list is empty"));
2171  } else {
2172  size_t name_idx = 0;
2173  for (const Json& name_json : *headers_json) {
2174  if (name_json.type() != Json::Type::STRING) {
2175  error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
2176  "field:names index:", name_idx, " error:type should be STRING")));
2177  } else if (name_json.string_value().empty()) {
2178  error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
2179  absl::StrCat("field:names index:", name_idx,
2180  " error:header name must be non-empty")));
2181  } else {
2182  headers->push_back(name_json.string_value());
2183  }
2184  ++name_idx;
2185  }
2186  }
2187  }
2189  absl::StrCat("field:headers index:", idx), &error_list);
2190 }
2191 
2192 std::string ParseJsonMethodName(size_t idx, const Json& json,
2194  if (json.type() != Json::Type::OBJECT) {
2196  "field:names index:", idx, " error:type should be OBJECT"));
2197  return "";
2198  }
2199  std::vector<grpc_error_handle> error_list;
2200  // Find service name.
2201  absl::string_view service_name;
2202  ParseJsonObjectField(json.object_value(), "service", &service_name,
2203  &error_list);
2204  // Find method name.
2206  ParseJsonObjectField(json.object_value(), "method", &method_name, &error_list,
2207  /*required=*/false);
2208  // Return error, if any.
2210  absl::StrCat("field:names index:", idx), &error_list);
2211  // Construct path.
2212  return absl::StrCat("/", service_name, "/", method_name);
2213 }
2214 
2215 grpc_error_handle ParseGrpcKeybuilder(
2216  size_t idx, const Json& json, RlsLbConfig::KeyBuilderMap* key_builder_map) {
2217  if (json.type() != Json::Type::OBJECT) {
2219  "field:grpc_keybuilders index:", idx, " error:type should be OBJECT"));
2220  }
2221  std::vector<grpc_error_handle> error_list;
2222  // Parse names.
2223  std::set<std::string> names;
2224  const Json::Array* names_array = nullptr;
2225  if (ParseJsonObjectField(json.object_value(), "names", &names_array,
2226  &error_list)) {
2227  if (names_array->empty()) {
2228  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2229  "field:names error:list is empty"));
2230  } else {
2231  size_t name_idx = 0;
2232  for (const Json& name_json : *names_array) {
2233  grpc_error_handle child_error = GRPC_ERROR_NONE;
2234  std::string name =
2235  ParseJsonMethodName(name_idx++, name_json, &child_error);
2236  if (!GRPC_ERROR_IS_NONE(child_error)) {
2237  error_list.push_back(child_error);
2238  } else {
2239  bool inserted = names.insert(name).second;
2240  if (!inserted) {
2241  error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
2242  absl::StrCat("field:names error:duplicate entry for ", name)));
2243  }
2244  }
2245  }
2246  }
2247  }
2248  // Helper function to check for duplicate keys.
2249  std::set<std::string> all_keys;
2250  auto duplicate_key_check_func = [&all_keys,
2251  &error_list](const std::string& key) {
2252  auto it = all_keys.find(key);
2253  if (it != all_keys.end()) {
2254  error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
2255  absl::StrCat("key \"", key, "\" listed multiple times")));
2256  } else {
2257  all_keys.insert(key);
2258  }
2259  };
2260  // Parse headers.
2261  RlsLbConfig::KeyBuilder key_builder;
2262  const Json::Array* headers_array = nullptr;
2263  ParseJsonObjectField(json.object_value(), "headers", &headers_array,
2264  &error_list, /*required=*/false);
2265  if (headers_array != nullptr) {
2266  size_t header_idx = 0;
2267  for (const Json& header_json : *headers_array) {
2268  std::string key;
2269  std::vector<std::string> headers;
2270  grpc_error_handle child_error =
2271  ParseJsonHeaders(header_idx++, header_json, &key, &headers);
2272  if (!GRPC_ERROR_IS_NONE(child_error)) {
2273  error_list.push_back(child_error);
2274  } else {
2275  duplicate_key_check_func(key);
2276  key_builder.header_keys.emplace(key, std::move(headers));
2277  }
2278  }
2279  }
2280  // Parse extraKeys.
2281  const Json::Object* extra_keys = nullptr;
2282  ParseJsonObjectField(json.object_value(), "extraKeys", &extra_keys,
2283  &error_list, /*required=*/false);
2284  if (extra_keys != nullptr) {
2285  std::vector<grpc_error_handle> extra_keys_errors;
2286  if (ParseJsonObjectField(*extra_keys, "host", &key_builder.host_key,
2287  &extra_keys_errors, /*required=*/false) &&
2288  key_builder.host_key.empty()) {
2289  extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2290  "field:host error:must be non-empty"));
2291  }
2292  if (!key_builder.host_key.empty()) {
2293  duplicate_key_check_func(key_builder.host_key);
2294  }
2295  if (ParseJsonObjectField(*extra_keys, "service", &key_builder.service_key,
2296  &extra_keys_errors, /*required=*/false) &&
2297  key_builder.service_key.empty()) {
2298  extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2299  "field:service error:must be non-empty"));
2300  }
2301  if (!key_builder.service_key.empty()) {
2302  duplicate_key_check_func(key_builder.service_key);
2303  }
2304  if (ParseJsonObjectField(*extra_keys, "method", &key_builder.method_key,
2305  &extra_keys_errors, /*required=*/false) &&
2306  key_builder.method_key.empty()) {
2307  extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2308  "field:method error:must be non-empty"));
2309  }
2310  if (!key_builder.method_key.empty()) {
2311  duplicate_key_check_func(key_builder.method_key);
2312  }
2313  if (!extra_keys_errors.empty()) {
2314  error_list.push_back(
2315  GRPC_ERROR_CREATE_FROM_VECTOR("field:extraKeys", &extra_keys_errors));
2316  }
2317  }
2318  // Parse constantKeys.
2319  const Json::Object* constant_keys = nullptr;
2320  ParseJsonObjectField(json.object_value(), "constantKeys", &constant_keys,
2321  &error_list, /*required=*/false);
2322  if (constant_keys != nullptr) {
2323  std::vector<grpc_error_handle> constant_keys_errors;
2324  for (const auto& p : *constant_keys) {
2325  const std::string& key = p.first;
2326  const Json& value = p.second;
2327  if (key.empty()) {
2328  constant_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2329  "error:keys must be non-empty"));
2330  }
2331  duplicate_key_check_func(key);
2332  ExtractJsonString(value, key, &key_builder.constant_keys[key],
2333  &constant_keys_errors);
2334  }
2335  if (!constant_keys_errors.empty()) {
2336  error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
2337  "field:constantKeys", &constant_keys_errors));
2338  }
2339  }
2340  // Insert key_builder into key_builder_map.
2341  for (const std::string& name : names) {
2342  bool inserted = key_builder_map->emplace(name, key_builder).second;
2343  if (!inserted) {
2344  error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
2345  absl::StrCat("field:names error:duplicate entry for ", name)));
2346  }
2347  }
2349  absl::StrCat("index:", idx), &error_list);
2350 }
2351 
2352 RlsLbConfig::KeyBuilderMap ParseGrpcKeybuilders(
2353  const Json::Array& key_builder_list, grpc_error_handle* error) {
2354  RlsLbConfig::KeyBuilderMap key_builder_map;
2355  if (key_builder_list.empty()) {
2357  "field:grpcKeybuilders error:list is empty");
2358  return key_builder_map;
2359  }
2360  std::vector<grpc_error_handle> error_list;
2361  size_t idx = 0;
2362  for (const Json& key_builder : key_builder_list) {
2363  grpc_error_handle child_error =
2364  ParseGrpcKeybuilder(idx++, key_builder, &key_builder_map);
2365  if (!GRPC_ERROR_IS_NONE(child_error)) error_list.push_back(child_error);
2366  }
2367  *error = GRPC_ERROR_CREATE_FROM_VECTOR("field:grpcKeybuilders", &error_list);
2368  return key_builder_map;
2369 }
2370 
2371 RlsLbConfig::RouteLookupConfig ParseRouteLookupConfig(
2372  const Json::Object& json, grpc_error_handle* error) {
2373  std::vector<grpc_error_handle> error_list;
2374  RlsLbConfig::RouteLookupConfig route_lookup_config;
2375  // Parse grpcKeybuilders.
2376  const Json::Array* keybuilder_list = nullptr;
2377  ParseJsonObjectField(json, "grpcKeybuilders", &keybuilder_list, &error_list);
2378  if (keybuilder_list != nullptr) {
2379  grpc_error_handle child_error = GRPC_ERROR_NONE;
2380  route_lookup_config.key_builder_map =
2381  ParseGrpcKeybuilders(*keybuilder_list, &child_error);
2382  if (!GRPC_ERROR_IS_NONE(child_error)) error_list.push_back(child_error);
2383  }
2384  // Parse lookupService.
2385  if (ParseJsonObjectField(json, "lookupService",
2386  &route_lookup_config.lookup_service, &error_list)) {
2387  if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
2388  route_lookup_config.lookup_service)) {
2389  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2390  "field:lookupService error:must be valid gRPC target URI"));
2391  }
2392  }
2393  // Parse lookupServiceTimeout.
2394  route_lookup_config.lookup_service_timeout = kDefaultLookupServiceTimeout;
2395  ParseJsonObjectFieldAsDuration(json, "lookupServiceTimeout",
2396  &route_lookup_config.lookup_service_timeout,
2397  &error_list, /*required=*/false);
2398  // Parse maxAge.
2399  route_lookup_config.max_age = kMaxMaxAge;
2400  bool max_age_set = ParseJsonObjectFieldAsDuration(
2401  json, "maxAge", &route_lookup_config.max_age, &error_list,
2402  /*required=*/false);
2403  // Clamp maxAge to the max allowed value.
2404  if (route_lookup_config.max_age > kMaxMaxAge) {
2405  route_lookup_config.max_age = kMaxMaxAge;
2406  }
2407  // Parse staleAge.
2408  route_lookup_config.stale_age = kMaxMaxAge;
2409  bool stale_age_set = ParseJsonObjectFieldAsDuration(
2410  json, "staleAge", &route_lookup_config.stale_age, &error_list,
2411  /*required=*/false);
2412  // If staleAge is set, then maxAge must also be set.
2413  if (stale_age_set && !max_age_set) {
2414  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2415  "field:maxAge error:must be set if staleAge is set"));
2416  }
2417  // Ignore staleAge if greater than or equal to maxAge.
2418  if (route_lookup_config.stale_age >= route_lookup_config.max_age) {
2419  route_lookup_config.stale_age = route_lookup_config.max_age;
2420  }
2421  // Parse cacheSizeBytes.
2422  ParseJsonObjectField(json, "cacheSizeBytes",
2423  &route_lookup_config.cache_size_bytes, &error_list);
2424  if (route_lookup_config.cache_size_bytes <= 0) {
2425  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2426  "field:cacheSizeBytes error:must be greater than 0"));
2427  }
2428  // Clamp cacheSizeBytes to the max allowed value.
2429  if (route_lookup_config.cache_size_bytes > kMaxCacheSizeBytes) {
2430  route_lookup_config.cache_size_bytes = kMaxCacheSizeBytes;
2431  }
2432  // Parse defaultTarget.
2433  if (ParseJsonObjectField(json, "defaultTarget",
2434  &route_lookup_config.default_target, &error_list,
2435  /*required=*/false)) {
2436  if (route_lookup_config.default_target.empty()) {
2437  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2438  "field:defaultTarget error:must be non-empty if set"));
2439  }
2440  }
2441  *error =
2442  GRPC_ERROR_CREATE_FROM_VECTOR("field:routeLookupConfig", &error_list);
2443  return route_lookup_config;
2444 }
2445 
2446 grpc_error_handle ValidateChildPolicyList(
2447  const Json& child_policy_list,
2448  const std::string& child_policy_config_target_field_name,
2449  const std::string& default_target, Json* child_policy_config,
2450  RefCountedPtr<LoadBalancingPolicy::Config>*
2451  default_child_policy_parsed_config) {
2452  // Add target to each entry in the config proto.
2453  *child_policy_config = child_policy_list;
2455  default_target.empty() ? kFakeTargetFieldValue : default_target;
2456  grpc_error_handle error = InsertOrUpdateChildPolicyField(
2457  child_policy_config_target_field_name, target, child_policy_config);
2458  if (!GRPC_ERROR_IS_NONE(error)) return error;
2459  // Parse the config.
2460  RefCountedPtr<LoadBalancingPolicy::Config> parsed_config =
2461  LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
2462  *child_policy_config, &error);
2463  if (!GRPC_ERROR_IS_NONE(error)) return error;
2464  // Find the chosen config and return it in JSON form.
2465  // We remove all non-selected configs, and in the selected config, we leave
2466  // the target field in place, set to the default value. This slightly
2467  // optimizes what we need to do later when we update a child policy for a
2468  // given target.
2469  if (parsed_config != nullptr) {
2470  for (Json& config : *(child_policy_config->mutable_array())) {
2471  if (config.object_value().begin()->first == parsed_config->name()) {
2472  Json save_config = std::move(config);
2473  child_policy_config->mutable_array()->clear();
2474  child_policy_config->mutable_array()->push_back(std::move(save_config));
2475  break;
2476  }
2477  }
2478  }
2479  // If default target is set, return the parsed config.
2480  if (!default_target.empty()) {
2481  *default_child_policy_parsed_config = std::move(parsed_config);
2482  }
2483  return GRPC_ERROR_NONE;
2484 }
2485 
2486 class RlsLbFactory : public LoadBalancingPolicyFactory {
2487  public:
2488  const char* name() const override { return kRls; }
2489 
2490  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2491  LoadBalancingPolicy::Args args) const override {
2492  return MakeOrphanable<RlsLb>(std::move(args));
2493  }
2494 
2495  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
2496  const Json& config, grpc_error_handle* error) const override {
2497  std::vector<grpc_error_handle> error_list;
2498  // Parse routeLookupConfig.
2499  RlsLbConfig::RouteLookupConfig route_lookup_config;
2500  const Json::Object* route_lookup_config_json = nullptr;
2501  if (ParseJsonObjectField(config.object_value(), "routeLookupConfig",
2502  &route_lookup_config_json, &error_list)) {
2503  grpc_error_handle child_error = GRPC_ERROR_NONE;
2504  route_lookup_config =
2505  ParseRouteLookupConfig(*route_lookup_config_json, &child_error);
2506  if (!GRPC_ERROR_IS_NONE(child_error)) error_list.push_back(child_error);
2507  }
2508  // Parse routeLookupChannelServiceConfig.
2509  std::string rls_channel_service_config;
2510  const Json::Object* rls_channel_service_config_json_obj = nullptr;
2511  if (ParseJsonObjectField(config.object_value(),
2512  "routeLookupChannelServiceConfig",
2513  &rls_channel_service_config_json_obj, &error_list,
2514  /*required=*/false)) {
2515  grpc_error_handle child_error = GRPC_ERROR_NONE;
2516  Json rls_channel_service_config_json(
2517  *rls_channel_service_config_json_obj);
2518  rls_channel_service_config = rls_channel_service_config_json.Dump();
2519  auto service_config = MakeRefCounted<ServiceConfigImpl>(
2520  /*args=*/nullptr, rls_channel_service_config,
2521  std::move(rls_channel_service_config_json), &child_error);
2522  if (!GRPC_ERROR_IS_NONE(child_error)) {
2524  "field:routeLookupChannelServiceConfig", &child_error, 1));
2525  GRPC_ERROR_UNREF(child_error);
2526  }
2527  }
2528  // Parse childPolicyConfigTargetFieldName.
2529  std::string child_policy_config_target_field_name;
2531  config.object_value(), "childPolicyConfigTargetFieldName",
2532  &child_policy_config_target_field_name, &error_list)) {
2533  if (child_policy_config_target_field_name.empty()) {
2534  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2535  "field:childPolicyConfigTargetFieldName error:must be non-empty"));
2536  }
2537  }
2538  // Parse childPolicy.
2539  Json child_policy_config;
2540  RefCountedPtr<LoadBalancingPolicy::Config>
2541  default_child_policy_parsed_config;
2542  auto it = config.object_value().find("childPolicy");
2543  if (it == config.object_value().end()) {
2544  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2545  "field:childPolicy error:does not exist."));
2546  } else if (it->second.type() != Json::Type::ARRAY) {
2547  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2548  "field:childPolicy error:type should be ARRAY"));
2549  } else {
2550  grpc_error_handle child_error = ValidateChildPolicyList(
2551  it->second, child_policy_config_target_field_name,
2552  route_lookup_config.default_target, &child_policy_config,
2553  &default_child_policy_parsed_config);
2554  if (!GRPC_ERROR_IS_NONE(child_error)) {
2556  "field:childPolicy", &child_error, 1));
2557  GRPC_ERROR_UNREF(child_error);
2558  }
2559  }
2560  // Return result.
2562  "errors parsing RLS LB policy config", &error_list);
2563  return MakeRefCounted<RlsLbConfig>(
2564  std::move(route_lookup_config), std::move(rls_channel_service_config),
2565  std::move(child_policy_config),
2566  std::move(child_policy_config_target_field_name),
2567  std::move(default_child_policy_parsed_config));
2568  }
2569 };
2570 
2571 } // namespace
2572 
2574  LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
2575  absl::make_unique<RlsLbFactory>());
2576 }
2577 
2579 
2580 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
trace.h
GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION
#define GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION
Definition: grpc_types.h:306
is_shutdown_
bool is_shutdown_
Definition: rls.cc:344
absl::InvalidArgumentError
Status InvalidArgumentError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:351
grpc_channel_args_find_string
char * grpc_channel_args_find_string(const grpc_channel_args *args, const char *name)
Definition: channel_args.cc:441
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::RlsLbPluginInit
void RlsLbPluginInit()
Definition: rls.cc:2573
key_map
std::map< std::string, std::string > key_map
Definition: rls.cc:219
grpc_op::grpc_op_data::grpc_op_send_message::send_message
struct grpc_byte_buffer * send_message
Definition: grpc_types.h:668
child_policy_config_target_field_name_
std::string child_policy_config_target_field_name_
Definition: rls.cc:201
cleanup.Json
Json
Definition: cleanup.py:49
wrapper_
WeakRefCountedPtr< ChildPolicyWrapper > wrapper_
Definition: rls.cc:338
absl::AbslHashValue
H AbslHashValue(H h, const absl::InlinedVector< T, N, A > &a)
Definition: abseil-cpp/absl/container/inlined_vector.h:858
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
targets
std::vector< std::string > targets
Definition: rls.cc:252
now
static double now(void)
Definition: test/core/fling/client.cc:130
config_
RefCountedPtr< RlsLbConfig > config_
Definition: rls.cc:366
Hash
Hash
Definition: abseil-cpp/absl/container/internal/hash_function_defaults_test.cc:339
regen-readme.it
it
Definition: regen-readme.py:15
fake_credentials.h
orphanable.h
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::trailing_metadata
grpc_metadata_array * trailing_metadata
Definition: grpc_types.h:701
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
child_policy_map_
std::map< std::string, ChildPolicyWrapper * > child_policy_map_
Definition: rls.cc:713
log.h
grpc_core::RlsLbPluginShutdown
void RlsLbPluginShutdown()
Definition: rls.cc:2578
core_configuration.h
bloat_diff.severity
def severity
Definition: bloat_diff.py:143
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::status
grpc_status_code * status
Definition: grpc_types.h:702
grpc_lookup_v1_RouteLookupRequest_REASON_MISS
@ grpc_lookup_v1_RouteLookupRequest_REASON_MISS
Definition: rls.upb.h:35
grpc_event_engine::experimental::slice_detail::operator==
bool operator==(const BaseSlice &a, const BaseSlice &b)
Definition: include/grpc/event_engine/slice.h:117
backoff.h
grpc_raw_byte_buffer_create
GRPCAPI grpc_byte_buffer * grpc_raw_byte_buffer_create(grpc_slice *slices, size_t nslices)
Definition: byte_buffer.cc:34
MutexLock
#define MutexLock(x)
Definition: bloaty/third_party/re2/util/mutex.h:125
const
#define const
Definition: bloaty/third_party/zlib/zconf.h:230
lookup_service_timeout
Duration lookup_service_timeout
Definition: rls.cc:146
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
call_
grpc_call * call_
Definition: rls.cc:669
memset
return memset(p, 0, total)
connectivity_state.h
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
constant_keys
std::map< std::string, std::string > constant_keys
Definition: rls.cc:139
Timestamp
Definition: bloaty/third_party/protobuf/src/google/protobuf/timestamp.pb.h:69
grpc_channel_reset_connect_backoff
GRPCAPI void grpc_channel_reset_connect_backoff(grpc_channel *channel)
Definition: channel.cc:273
grpc_core::Json::Type::OBJECT
@ OBJECT
slice.h
false
#define false
Definition: setup_once.h:323
absl::StatusOr
ABSL_NAMESPACE_BEGIN class ABSL_MUST_USE_RESULT StatusOr
Definition: abseil-cpp/absl/status/internal/statusor_internal.h:29
timer_callback_
grpc_closure timer_callback_
Definition: rls.cc:537
padding_
int padding_
Definition: rls.cc:606
grpc_core
Definition: call_metric_recorder.h:31
upb_StringView::data
const char * data
Definition: upb/upb/upb.h:73
grpc_metadata_array
Definition: grpc_types.h:579
backoff_state_
std::unique_ptr< BackOff > backoff_state_
Definition: rls.cc:661
grpc_byte_buffer_reader_readall
GRPCAPI grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader)
Definition: byte_buffer_reader.cc:84
string.h
benchmark.request
request
Definition: benchmark.py:77
ExitIdleLocked
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb void ExitIdleLocked()
Definition: rls.cc:299
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
names
sub_type names
Definition: cxa_demangle.cpp:4905
grpc_core::StringViewFromSlice
absl::string_view StringViewFromSlice(const grpc_slice &slice)
Definition: slice_internal.h:93
ratio_for_successes_
double ratio_for_successes_
Definition: rls.cc:605
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
propagation_bits.h
error
grpc_error_handle error
Definition: retry_filter.cc:499
lb_policy.h
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
client_channel.h
inserted
bool inserted
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:110
lb_policy_factory.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
closure.h
cleanup_timer_
grpc_timer cleanup_timer_
Definition: rls.cc:536
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count)
Definition: error.h:307
GRPC_CALL_OK
@ GRPC_CALL_OK
Definition: grpc_types.h:466
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
status
absl::Status status
Definition: rls.cc:251
setup.name
name
Definition: setup.py:542
grpc_lookup_v1_RouteLookupRequest_REASON_STALE
@ grpc_lookup_v1_RouteLookupRequest_REASON_STALE
Definition: rls.upb.h:36
grpc_core::Timestamp::InfPast
static constexpr Timestamp InfPast()
Definition: src/core/lib/gprpp/time.h:83
grpc_channel_arg_string_create
grpc_arg grpc_channel_arg_string_create(char *name, char *value)
Definition: channel_args.cc:476
check_documentation.path
path
Definition: check_documentation.py:57
absl::make_unique
memory_internal::MakeUniqueResult< T >::scalar make_unique(Args &&... args)
Definition: third_party/abseil-cpp/absl/memory/memory.h:168
absl::StripPrefix
ABSL_MUST_USE_RESULT absl::string_view StripPrefix(absl::string_view str, absl::string_view prefix)
Definition: abseil-cpp/absl/strings/strip.h:73
absl::InternalError
Status InternalError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:347
grpc_lookup_v1_RouteLookupResponse_header_data
UPB_INLINE upb_StringView grpc_lookup_v1_RouteLookupResponse_header_data(const grpc_lookup_v1_RouteLookupResponse *msg)
Definition: rls.upb.h:177
xds_manager.p
p
Definition: xds_manager.py:60
GRPC_CLOSURE_CREATE
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
Definition: closure.h:160
GRPC_ERROR_CANCELLED
#define GRPC_ERROR_CANCELLED
Definition: error.h:238
grpc_core::grpc_lb_rls_trace
TraceFlag grpc_lb_rls_trace(false, "rls_lb")
grpc_timer
Definition: iomgr/timer.h:33
channelz.h
iterator
const typedef MCPhysReg * iterator
Definition: MCRegisterInfo.h:27
grpc_pollset_set_del_pollset_set
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:52
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
credentials.h
call_start_cb_
grpc_closure call_start_cb_
Definition: rls.cc:667
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
subchannel_interface.h
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc_op::grpc_op_data::recv_message
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
grpc_status._async.code
code
Definition: grpcio_status/grpc_status/_async.py:34
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
GRPC_ERROR_CREATE_FROM_VECTOR
#define GRPC_ERROR_CREATE_FROM_VECTOR(desc, error_list)
Definition: error.h:314
grpc_op::data
union grpc_op::grpc_op_data data
status.h
GRPC_ARG_CHANNELZ_CHANNEL_NODE
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE
Definition: channelz.h:49
byte_buffer_reader.h
grpc_types.h
absl::synchronization_internal::Get
static GraphId Get(const IdMap &id, int num)
Definition: abseil-cpp/absl/synchronization/internal/graphcycles_test.cc:44
grpc_metadata_array_destroy
GRPCAPI void grpc_metadata_array_destroy(grpc_metadata_array *array)
Definition: metadata_array.cc:35
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_op::grpc_op_data::grpc_op_recv_message::recv_message
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:693
GRPC_ARG_DEFAULT_AUTHORITY
#define GRPC_ARG_DEFAULT_AUTHORITY
Definition: grpc_types.h:251
backoff_timer_
grpc_timer backoff_timer_
Definition: rls.cc:461
deadline_
Timestamp deadline_
Definition: rls.cc:666
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
GRPC_STATUS_OK
@ GRPC_STATUS_OK
Definition: include/grpc/impl/codegen/status.h:30
GRPC_OP_RECV_INITIAL_METADATA
@ GRPC_OP_RECV_INITIAL_METADATA
Definition: grpc_types.h:617
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
end
char * end
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1008
absl::StrJoin
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
Definition: abseil-cpp/absl/strings/str_join.h:239
hpack_encoder_fixtures::Args
Args({0, 16384})
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
ToString
std::string ToString(const grpc::string_ref &r)
Definition: string_ref_helper.cc:24
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
header_keys
std::map< std::string, std::vector< std::string > > header_keys
Definition: rls.cc:135
upb_StringView::size
size_t size
Definition: upb/upb/upb.h:74
Json
JSON (JavaScript Object Notation).
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:227
GRPC_ARG_SERVER_URI
#define GRPC_ARG_SERVER_URI
Definition: client_channel.h:89
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Definition: call.cc:1770
child_policy_
OrphanablePtr< ChildPolicyHandler > child_policy_
Definition: rls.cc:346
grpc_channel_args_compare
int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b)
Definition: channel_args.cc:380
req
static uv_connect_t req
Definition: test-connection-fail.c:30
upb.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_lookup_v1_RouteLookupRequest_set_target_type
UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_target_type(grpc_lookup_v1_RouteLookupRequest *msg, upb_StringView value)
Definition: rls.upb.h:106
work_serializer.h
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
lb_policy_
RefCountedPtr< RlsLb > lb_policy_
Definition: rls.cc:341
grpc_byte_buffer_reader_init
GRPCAPI int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer)
Definition: byte_buffer_reader.cc:33
grpc_channel_get_channelz_node
grpc_core::channelz::ChannelNode * grpc_channel_get_channelz_node(grpc_channel *channel)
Definition: src/core/lib/surface/channel.h:183
channel_args_
const grpc_channel_args * channel_args_
Definition: rls.cc:710
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
grpc_lookup_v1_RouteLookupResponse_parse
UPB_INLINE grpc_lookup_v1_RouteLookupResponse * grpc_lookup_v1_RouteLookupResponse_parse(const char *buf, size_t size, upb_Arena *arena)
Definition: rls.upb.h:148
grpc.h
grpc_call
struct grpc_call grpc_call
Definition: grpc_types.h:70
grpc_channel_credentials_find_in_args
grpc_channel_credentials * grpc_channel_credentials_find_in_args(const grpc_channel_args *args)
Definition: credentials.cc:83
connectivity_state.h
grpc_byte_buffer
Definition: grpc_types.h:43
window_size_
Duration window_size_
Definition: rls.cc:600
ShouldThrottle
void StartRlsCall(const RequestKey &key, Cache::Entry *stale_entry) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb void ReportResponseLocked(bool response_succeeded) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb boo ShouldThrottle)() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb
Definition: rls.cc:560
json_util.h
rls.upb.h
recv_message_
grpc_byte_buffer * recv_message_
Definition: rls.cc:672
absl::optional< absl::string_view >
channel_
grpc_channel * channel_
Definition: rls.cc:619
pollset_set.h
grpc_op
Definition: grpc_types.h:640
GRPC_OP_SEND_MESSAGE
@ GRPC_OP_SEND_MESSAGE
Definition: grpc_types.h:602
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: include/grpc/impl/codegen/slice.h:101
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
arg
Definition: cmdline.cc:40
entry_
RefCountedPtr< Entry > entry_
Definition: rls.cc:459
grpc_slice_from_static_string
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
Definition: slice/slice.cc:89
status_details_recv_
grpc_slice status_details_recv_
Definition: rls.cc:675
server_address.h
time.h
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
grpc_lookup_v1_RouteLookupResponse_targets
UPB_INLINE upb_StringView const * grpc_lookup_v1_RouteLookupResponse_targets(const grpc_lookup_v1_RouteLookupResponse *msg, size_t *len)
Definition: rls.upb.h:183
grpc_channel_args_copy
grpc_channel_args * grpc_channel_args_copy(const grpc_channel_args *src)
Definition: channel_args.cc:285
googletest-filter-unittest.child
child
Definition: bloaty/third_party/googletest/googletest/test/googletest-filter-unittest.py:62
backoff_timer_callback_
grpc_closure backoff_timer_callback_
Definition: rls.cc:462
server_name_
std::string server_name_
Definition: rls.cc:690
max_age
Duration max_age
Definition: rls.cc:147
grpc_lookup_v1_RouteLookupRequest_Reason
grpc_lookup_v1_RouteLookupRequest_Reason
Definition: rls.upb.h:33
status_
absl::Status status_
Definition: outlier_detection.cc:404
absl::flags_internal::Parse
bool Parse(FlagOpFn op, absl::string_view text, void *dst, std::string *error)
Definition: abseil-cpp/absl/flags/internal/flag.h:125
error.h
grpc_core::Json::Type::ARRAY
@ ARRAY
key_
RlsLb::RequestKey key_
Definition: rls.cc:659
connectivity_state_
grpc_connectivity_state connectivity_state_
Definition: priority.cc:238
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
grpc_lookup_v1_RouteLookupRequest_key_map_set
UPB_INLINE bool grpc_lookup_v1_RouteLookupRequest_key_map_set(grpc_lookup_v1_RouteLookupRequest *msg, upb_StringView key, upb_StringView val, upb_Arena *a)
Definition: rls.upb.h:110
benchmark::Shutdown
void Shutdown()
Definition: benchmark/src/benchmark.cc:607
wrapper
grpc_channel_wrapper * wrapper
Definition: src/php/ext/grpc/channel.h:48
json.h
buffer
char buffer[1024]
Definition: libuv/docs/code/idle-compute/main.c:8
slice_internal.h
parent_channelz_node_
RefCountedPtr< channelz::ChannelNode > parent_channelz_node_
Definition: rls.cc:620
route_lookup_config_
RouteLookupConfig route_lookup_config_
Definition: rls.cc:198
was_transient_failure_
bool was_transient_failure_
Definition: rls.cc:584
min
#define min(a, b)
Definition: qsort.h:83
stale_age
Duration stale_age
Definition: rls.cc:148
reason_
grpc_lookup_v1_RouteLookupRequest_Reason reason_
Definition: rls.cc:662
grpc_core::ServerAddressList
std::vector< ServerAddress > ServerAddressList
Definition: server_address.h:120
grpc_lookup_v1_RouteLookupRequest_new
UPB_INLINE grpc_lookup_v1_RouteLookupRequest * grpc_lookup_v1_RouteLookupRequest_new(upb_Arena *arena)
Definition: rls.upb.h:43
cache_size_bytes
int64_t cache_size_bytes
Definition: rls.cc:149
H
#define H(b, c, d)
Definition: md4.c:114
setup.idx
idx
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:197
grpc_lookup_v1_RouteLookupRequest_serialize
UPB_INLINE char * grpc_lookup_v1_RouteLookupRequest_serialize(const grpc_lookup_v1_RouteLookupRequest *msg, upb_Arena *arena, size_t *len)
Definition: rls.upb.h:65
host_key
std::string host_key
Definition: rls.cc:136
resolver_registry.h
grpc_op::op
grpc_op_type op
Definition: grpc_types.h:642
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
recv_trailing_metadata_
grpc_metadata_array recv_trailing_metadata_
Definition: rls.cc:673
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: include/grpc/impl/codegen/slice.h:104
GRPC_CHANNEL_CONNECTING
@ GRPC_CHANNEL_CONNECTING
Definition: include/grpc/impl/codegen/connectivity_state.h:34
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::status_details
grpc_slice * status_details
Definition: grpc_types.h:703
grpc_core::LoadBalancingPolicyRegistry::ParseLoadBalancingConfig
static RefCountedPtr< LoadBalancingPolicy::Config > ParseLoadBalancingConfig(const Json &json, grpc_error_handle *error)
Definition: lb_policy_registry.cc:169
child_policy_handler.h
value
const char * value
Definition: hpack_parser_table.cc:165
absl::Status
ABSL_NAMESPACE_BEGIN class ABSL_MUST_USE_RESULT Status
Definition: abseil-cpp/absl/status/internal/status_internal.h:36
rls_channel_
RefCountedPtr< RlsChannel > rls_channel_
Definition: rls.cc:583
GRPC_OP_RECV_MESSAGE
@ GRPC_OP_RECV_MESSAGE
Definition: grpc_types.h:621
default_target
std::string default_target
Definition: rls.cc:150
grpc_core::Json::Object
std::map< std::string, Json > Object
Definition: src/core/lib/json/json.h:54
grpc_timer_cancel
void grpc_timer_cancel(grpc_timer *timer)
Definition: iomgr/timer.cc:36
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
upb::Arena
Definition: upb.hpp:68
picker_
std::unique_ptr< SubchannelPicker > picker_
Definition: outlier_detection.cc:323
grpc_channel_create
GRPCAPI grpc_channel * grpc_channel_create(const char *target, grpc_channel_credentials *creds, const grpc_channel_args *args)
Definition: chttp2_connector.cc:366
grpc_slice_from_copied_buffer
GPRAPI grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len)
Definition: slice/slice.cc:170
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
field
const FieldDescriptor * field
Definition: bloaty/third_party/protobuf/src/google/protobuf/compiler/parser_unittest.cc:2692
debug_location.h
GRPC_PROPAGATE_DEFAULTS
#define GRPC_PROPAGATE_DEFAULTS
Definition: propagation_bits.h:45
rng_
std::mt19937 rng_
Definition: rls.cc:607
key
const char * key
Definition: hpack_parser_table.cc:164
insert
static void insert(upb_table *t, lookupkey_t key, upb_tabkey tabkey, upb_value val, uint32_t hash, hashfunc_t *hashfunc, eqlfunc_t *eql)
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/upb.c:1431
grpc_lookup_v1_RouteLookupRequest
struct grpc_lookup_v1_RouteLookupRequest grpc_lookup_v1_RouteLookupRequest
Definition: rls.upb.h:26
upb_StringView
Definition: upb/upb/upb.h:72
grpc_channel_arg_integer_create
grpc_arg grpc_channel_arg_integer_create(char *name, int value)
Definition: channel_args.cc:484
absl::StatusCode
StatusCode
Definition: third_party/abseil-cpp/absl/status/status.h:92
lb_policy_registry.h
bytes
uint8 bytes[10]
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/coded_stream_unittest.cc:153
size_
size_t size_
Definition: memory_allocator.cc:56
call_complete_cb_
grpc_closure call_complete_cb_
Definition: rls.cc:668
GRPC_ARG_SERVICE_CONFIG
#define GRPC_ARG_SERVICE_CONFIG
Definition: grpc_types.h:304
upb.hpp
GRPC_OP_SEND_INITIAL_METADATA
@ GRPC_OP_SEND_INITIAL_METADATA
Definition: grpc_types.h:598
ABSL_LOCKS_EXCLUDED
#define ABSL_LOCKS_EXCLUDED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:163
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
grpc_op::grpc_op_data::send_message
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
GRPC_ERROR_CREATE_FROM_CPP_STRING
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
Definition: error.h:297
grpc_channel_create_pollset_set_call
grpc_call * grpc_channel_create_pollset_set_call(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, const grpc_slice &method, const grpc_slice *host, grpc_core::Timestamp deadline, void *reserved)
Definition: channel.cc:331
absl::container_internal::internal_layout::adl_barrier::Find
constexpr size_t Find(Needle, Needle, Ts...)
Definition: abseil-cpp/absl/container/internal/layout.h:269
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
grpc_core::OrphanablePtr
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:64
watcher_
StateWatcher * watcher_
Definition: rls.cc:621
method_key
std::string method_key
Definition: rls.cc:138
grpc_op::grpc_op_data::recv_status_on_client
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_pollset_set_add_pollset_set
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:47
upb_StringView_FromDataAndSize
UPB_INLINE upb_StringView upb_StringView_FromDataAndSize(const char *data, size_t size)
Definition: upb/upb/upb.h:77
service_config_impl.h
grpc_core::Duration::Seconds
static constexpr Duration Seconds(int64_t seconds)
Definition: src/core/lib/gprpp/time.h:151
grpc_byte_buffer_destroy
GRPCAPI void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)
Definition: byte_buffer.cc:81
arg
struct arg arg
grpc_channel_destroy
GRPCAPI void grpc_channel_destroy(grpc_channel *channel)
Definition: channel.cc:437
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
exec_ctx.h
GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING
#define GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(desc, error_list)
Definition: error.h:317
grpc_timer_init
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
Definition: iomgr/timer.cc:31
slice_refcount.h
absl::UnavailableError
Status UnavailableError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:375
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
ref_counted_ptr.h
grpc_channel
struct grpc_channel grpc_channel
Definition: grpc_types.h:62
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
absl::inlined_vector_internal::Iterator
Pointer< A > Iterator
Definition: abseil-cpp/absl/container/internal/inlined_vector.h:64
release
return ret release()
Definition: doc/python/sphinx/conf.py:37
dual_ref_counted.h
GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL
#define GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL
Definition: channelz.h:52
update_in_progress_
bool update_in_progress_
Definition: rls.cc:695
channel_args.h
timer.h
lookup_service
std::string lookup_service
Definition: rls.cc:145
key_builder_map
KeyBuilderMap key_builder_map
Definition: rls.cc:144
check_redundant_namespace_qualifiers.Config
Config
Definition: check_redundant_namespace_qualifiers.py:142
GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS
#define GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS
Definition: fake_credentials.h:40
grpc_lookup_v1_RouteLookupRequest_set_reason
UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_reason(grpc_lookup_v1_RouteLookupRequest *msg, int32_t value)
Definition: rls.upb.h:119
Fail
void Fail(const char *msg)
Definition: bloaty/third_party/googletest/googletest/test/gtest_assert_by_exception_test.cc:52
grpc_error_get_status
void grpc_error_get_status(grpc_error_handle error, grpc_core::Timestamp deadline, grpc_status_code *code, std::string *message, grpc_http2_error_code *http_error, const char **error_string)
Definition: error_utils.cc:67
grpc_call_start_batch_and_execute
grpc_call_error grpc_call_start_batch_and_execute(grpc_call *call, const grpc_op *ops, size_t nops, grpc_closure *closure)
Definition: call.cc:1847
absl
Definition: abseil-cpp/absl/algorithm/algorithm.h:31
addresses_
absl::StatusOr< ServerAddressList > addresses_
Definition: rls.cc:709
Duration
Definition: bloaty/third_party/protobuf/src/google/protobuf/duration.pb.h:69
default_child_policy_
RefCountedPtr< ChildPolicyWrapper > default_child_policy_
Definition: rls.cc:367
status_recv_
grpc_status_code status_recv_
Definition: rls.cc:674
grpc_op::grpc_op_data::recv_initial_metadata
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
grpc_core::ParseJsonObjectFieldAsDuration
bool ParseJsonObjectFieldAsDuration(const Json::Object &object, absl::string_view field_name, Duration *output, std::vector< grpc_error_handle > *error_list, bool required)
Definition: src/core/lib/json/json_util.cc:107
uri_parser.h
code
Definition: bloaty/third_party/zlib/contrib/infback9/inftree9.h:24
pending_config_
RefCountedPtr< LoadBalancingPolicy::Config > pending_config_
Definition: rls.cc:347
absl::InlinedVector< grpc_arg, 3 >
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
grpc_core::ExtractJsonString
bool ExtractJsonString(const Json &json, absl::string_view field_name, OutputType *output, std::vector< grpc_error_handle > *error_list)
Definition: src/core/lib/json/json_util.h:75
googletest-break-on-failure-unittest.Run
def Run(command)
Definition: bloaty/third_party/googletest/googletest/test/googletest-break-on-failure-unittest.py:76
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
header_data
std::string header_data
Definition: rls.cc:253
grpc_error
Definition: error_internal.h:42
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
GRPC_OP_RECV_STATUS_ON_CLIENT
@ GRPC_OP_RECV_STATUS_ON_CLIENT
Definition: grpc_types.h:627
default_child_policy_parsed_config_
RefCountedPtr< LoadBalancingPolicy::Config > default_child_policy_parsed_config_
Definition: rls.cc:203
grpc_op::grpc_op_data::grpc_op_recv_initial_metadata::recv_initial_metadata
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:685
grpc_lookup_v1_RouteLookupRequest_set_stale_header_data
UPB_INLINE void grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(grpc_lookup_v1_RouteLookupRequest *msg, upb_StringView value)
Definition: rls.upb.h:122
grpc_error_to_absl_status
absl::Status grpc_error_to_absl_status(grpc_error_handle error)
Definition: error_utils.cc:156
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
grpc_lookup_v1_RouteLookupResponse
struct grpc_lookup_v1_RouteLookupResponse grpc_lookup_v1_RouteLookupResponse
Definition: rls.upb.h:28
mu_
Mutex mu_
Definition: rls.cc:693
send_message_
grpc_byte_buffer * send_message_
Definition: rls.cc:670
absl::str_format_internal::LengthMod::h
@ h
grpc_closure
Definition: closure.h:56
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
method_name
absl::string_view method_name
Definition: call_creds_util.cc:40
grpc_byte_buffer_reader
Definition: impl/codegen/byte_buffer_reader.h:30
ops
static grpc_op ops[6]
Definition: test/core/fling/client.cc:39
stale_header_data_
std::string stale_header_data_
Definition: rls.cc:663
setup.target
target
Definition: third_party/bloaty/third_party/protobuf/python/setup.py:179
grpc_channel_credentials
Definition: src/core/lib/security/credentials/credentials.h:96
recv_initial_metadata_
grpc_metadata_array recv_initial_metadata_
Definition: rls.cc:671
absl::PairFormatter
strings_internal::PairFormatterImpl< FirstFormatter, SecondFormatter > PairFormatter(FirstFormatter f1, absl::string_view sep, SecondFormatter f2)
Definition: abseil-cpp/absl/strings/str_join.h:114
sync.h
target_
std::string target_
Definition: rls.cc:342
grpc_core::ParseJsonObjectField
bool ParseJsonObjectField(const Json::Object &object, absl::string_view field_name, T *output, std::vector< grpc_error_handle > *error_list, bool required=true)
Definition: src/core/lib/json/json_util.h:136
call.h
GRPC_OP_SEND_CLOSE_FROM_CLIENT
@ GRPC_OP_SEND_CLOSE_FROM_CLIENT
Definition: grpc_types.h:607
grpc_byte_buffer_reader_destroy
GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
Definition: byte_buffer_reader.cc:45
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
child_policy_config_
Json child_policy_config_
Definition: rls.cc:200
state
static struct rpc_state state
Definition: bad_server_response_test.cc:87
grpc_slice_unref_internal
void grpc_slice_unref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:39
grpc_metadata_array_init
GRPCAPI void grpc_metadata_array_init(grpc_metadata_array *array)
Definition: metadata_array.cc:30
error_utils.h
service_key
std::string service_key
Definition: rls.cc:137
rls_channel_service_config_
std::string rls_channel_service_config_
Definition: rls.cc:199
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
channel.h
ABSL_NO_THREAD_SAFETY_ANALYSIS
#define ABSL_NO_THREAD_SAFETY_ANALYSIS
Definition: abseil-cpp/absl/base/thread_annotations.h:280
grpc_core::Duration::Minutes
static constexpr Duration Minutes(int64_t minutes)
Definition: src/core/lib/gprpp/time.h:147
port_platform.h
grpc_call_cancel_internal
void grpc_call_cancel_internal(grpc_call *call)
Definition: call.cc:1806


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:05