grpclb.cc
Go to the documentation of this file.
1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
51 
53 
55 
57 
58 // IWYU pragma: no_include <sys/socket.h>
59 
60 #include <inttypes.h>
61 #include <limits.h>
62 #include <stdlib.h>
63 #include <string.h>
64 
65 #include <algorithm>
66 #include <map>
67 #include <memory>
68 #include <string>
69 #include <utility>
70 #include <vector>
71 
72 #include "absl/container/inlined_vector.h"
73 #include "absl/memory/memory.h"
74 #include "absl/status/status.h"
75 #include "absl/status/statusor.h"
76 #include "absl/strings/str_cat.h"
77 #include "absl/strings/str_format.h"
78 #include "absl/strings/str_join.h"
79 #include "absl/strings/string_view.h"
80 #include "absl/strings/strip.h"
81 #include "absl/types/optional.h"
82 #include "absl/types/variant.h"
83 #include "upb/upb.hpp"
84 
85 #include <grpc/byte_buffer.h>
87 #include <grpc/grpc.h>
91 #include <grpc/slice.h>
92 #include <grpc/status.h>
93 #include <grpc/support/alloc.h>
94 #include <grpc/support/log.h>
95 
115 #include "src/core/lib/gpr/string.h"
116 #include "src/core/lib/gpr/useful.h"
121 #include "src/core/lib/gprpp/time.h"
131 #include "src/core/lib/json/json.h"
145 
146 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
147 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
148 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
149 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
150 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
151 #define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000
152 
153 namespace grpc_core {
154 
155 TraceFlag grpc_lb_glb_trace(false, "glb");
156 
157 const char kGrpcLbAddressAttributeKey[] = "grpclb";
158 
159 namespace {
160 
161 using ::grpc_event_engine::experimental::EventEngine;
163 
164 constexpr char kGrpclb[] = "grpclb";
165 
166 class GrpcLbConfig : public LoadBalancingPolicy::Config {
167  public:
168  GrpcLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
169  std::string service_name)
170  : child_policy_(std::move(child_policy)),
171  service_name_(std::move(service_name)) {}
172  const char* name() const override { return kGrpclb; }
173 
174  RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
175  return child_policy_;
176  }
177 
178  const std::string& service_name() const { return service_name_; }
179 
180  private:
181  RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
183 };
184 
185 class GrpcLb : public LoadBalancingPolicy {
186  public:
187  explicit GrpcLb(Args args);
188 
189  const char* name() const override { return kGrpclb; }
190 
191  void UpdateLocked(UpdateArgs args) override;
192  void ResetBackoffLocked() override;
193 
194  private:
196  class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
197  public:
198  explicit BalancerCallState(
199  RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
200  ~BalancerCallState() override;
201 
202  // It's the caller's responsibility to ensure that Orphan() is called from
203  // inside the combiner.
204  void Orphan() override;
205 
206  void StartQuery();
207 
208  GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
209 
210  bool seen_initial_response() const { return seen_initial_response_; }
211  bool seen_serverlist() const { return seen_serverlist_; }
212 
213  private:
214  GrpcLb* grpclb_policy() const {
215  return static_cast<GrpcLb*>(grpclb_policy_.get());
216  }
217 
218  void ScheduleNextClientLoadReportLocked();
219  void SendClientLoadReportLocked();
220 
221  // EventEngine callbacks
222  void MaybeSendClientLoadReport();
223  void MaybeSendClientLoadReportLocked();
224 
225  static void ClientLoadReportDone(void* arg, grpc_error_handle error);
226  static void OnInitialRequestSent(void* arg, grpc_error_handle error);
227  static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
228  static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
229 
230  void ClientLoadReportDoneLocked(grpc_error_handle error);
231  void OnInitialRequestSentLocked();
232  void OnBalancerMessageReceivedLocked();
233  void OnBalancerStatusReceivedLocked(grpc_error_handle error);
234 
235  // The owning LB policy.
236  RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
237 
238  // The streaming call to the LB server. Always non-NULL.
239  grpc_call* lb_call_ = nullptr;
240 
241  // recv_initial_metadata
243 
244  // send_message
247 
248  // recv_message
252  bool seen_serverlist_ = false;
253 
254  // recv_trailing_metadata
259 
260  // The stats for client-side load reporting associated with this LB call.
261  // Created after the first serverlist is received.
262  RefCountedPtr<GrpcLbClientStats> client_stats_;
267  // The closure used for the completion of sending the load report.
269  };
270 
271  class SubchannelWrapper : public DelegatingSubchannel {
272  public:
273  SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
274  RefCountedPtr<GrpcLb> lb_policy, std::string lb_token,
275  RefCountedPtr<GrpcLbClientStats> client_stats)
276  : DelegatingSubchannel(std::move(subchannel)),
277  lb_policy_(std::move(lb_policy)),
278  lb_token_(std::move(lb_token)),
279  client_stats_(std::move(client_stats)) {}
280 
281  ~SubchannelWrapper() override {
282  if (!lb_policy_->shutting_down_) {
283  lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
284  }
285  }
286 
287  const std::string& lb_token() const { return lb_token_; }
288  GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
289 
290  private:
291  RefCountedPtr<GrpcLb> lb_policy_;
293  RefCountedPtr<GrpcLbClientStats> client_stats_;
294  };
295 
296  class TokenAndClientStatsAttribute
297  : public ServerAddress::AttributeInterface {
298  public:
299  TokenAndClientStatsAttribute(std::string lb_token,
300  RefCountedPtr<GrpcLbClientStats> client_stats)
301  : lb_token_(std::move(lb_token)),
302  client_stats_(std::move(client_stats)) {}
303 
304  std::unique_ptr<AttributeInterface> Copy() const override {
305  return absl::make_unique<TokenAndClientStatsAttribute>(lb_token_,
306  client_stats_);
307  }
308 
309  int Cmp(const AttributeInterface* other_base) const override {
310  const TokenAndClientStatsAttribute* other =
311  static_cast<const TokenAndClientStatsAttribute*>(other_base);
312  int r = lb_token_.compare(other->lb_token_);
313  if (r != 0) return r;
314  return QsortCompare(client_stats_.get(), other->client_stats_.get());
315  }
316 
317  std::string ToString() const override {
318  return absl::StrFormat("lb_token=\"%s\" client_stats=%p", lb_token_,
319  client_stats_.get());
320  }
321 
322  const std::string& lb_token() const { return lb_token_; }
323  RefCountedPtr<GrpcLbClientStats> client_stats() const {
324  return client_stats_;
325  }
326 
327  private:
329  RefCountedPtr<GrpcLbClientStats> client_stats_;
330  };
331 
332  class Serverlist : public RefCounted<Serverlist> {
333  public:
334  // Takes ownership of serverlist.
335  explicit Serverlist(std::vector<GrpcLbServer> serverlist)
336  : serverlist_(std::move(serverlist)) {}
337 
338  bool operator==(const Serverlist& other) const;
339 
340  const std::vector<GrpcLbServer>& serverlist() const { return serverlist_; }
341 
342  // Returns a text representation suitable for logging.
343  std::string AsText() const;
344 
345  // Extracts all non-drop entries into a ServerAddressList.
346  ServerAddressList GetServerAddressList(
347  GrpcLbClientStats* client_stats) const;
348 
349  // Returns true if the serverlist contains at least one drop entry and
350  // no backend address entries.
351  bool ContainsAllDropEntries() const;
352 
353  // Returns the LB token to use for a drop, or null if the call
354  // should not be dropped.
355  //
356  // Note: This is called from the picker, so it will be invoked in
357  // the channel's data plane mutex, NOT the control plane
358  // work_serializer. It should not be accessed by any other part of the LB
359  // policy.
360  const char* ShouldDrop();
361 
362  private:
363  std::vector<GrpcLbServer> serverlist_;
364 
365  // Guarded by the channel's data plane mutex, NOT the control
366  // plane work_serializer. It should not be accessed by anything but the
367  // picker via the ShouldDrop() method.
368  size_t drop_index_ = 0;
369  };
370 
371  class Picker : public SubchannelPicker {
372  public:
373  Picker(RefCountedPtr<Serverlist> serverlist,
374  std::unique_ptr<SubchannelPicker> child_picker,
375  RefCountedPtr<GrpcLbClientStats> client_stats)
376  : serverlist_(std::move(serverlist)),
377  child_picker_(std::move(child_picker)),
378  client_stats_(std::move(client_stats)) {}
379 
380  PickResult Pick(PickArgs args) override;
381 
382  private:
383  // A subchannel call tracker that unrefs the GrpcLbClientStats object
384  // in the case where the subchannel call is never actually started,
385  // since the client load reporting filter will not be able to do it
386  // in that case.
387  class SubchannelCallTracker : public SubchannelCallTrackerInterface {
388  public:
389  SubchannelCallTracker(
390  RefCountedPtr<GrpcLbClientStats> client_stats,
391  std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)
392  : client_stats_(std::move(client_stats)),
393  original_call_tracker_(std::move(original_call_tracker)) {}
394 
395  void Start() override {
396  if (original_call_tracker_ != nullptr) {
397  original_call_tracker_->Start();
398  }
399  // If we're actually starting the subchannel call, then the
400  // client load reporting filter will take ownership of the ref
401  // passed down to it via metadata.
402  client_stats_.release();
403  }
404 
405  void Finish(FinishArgs args) override {
406  if (original_call_tracker_ != nullptr) {
407  original_call_tracker_->Finish(args);
408  }
409  }
410 
411  private:
412  RefCountedPtr<GrpcLbClientStats> client_stats_;
413  std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker_;
414  };
415 
416  // Serverlist to be used for determining drops.
417  RefCountedPtr<Serverlist> serverlist_;
418 
419  std::unique_ptr<SubchannelPicker> child_picker_;
420  RefCountedPtr<GrpcLbClientStats> client_stats_;
421  };
422 
423  class Helper : public ChannelControlHelper {
424  public:
425  explicit Helper(RefCountedPtr<GrpcLb> parent)
426  : parent_(std::move(parent)) {}
427 
428  RefCountedPtr<SubchannelInterface> CreateSubchannel(
429  ServerAddress address, const grpc_channel_args& args) override;
430  void UpdateState(grpc_connectivity_state state, const absl::Status& status,
431  std::unique_ptr<SubchannelPicker> picker) override;
432  void RequestReresolution() override;
433  absl::string_view GetAuthority() override;
434  void AddTraceEvent(TraceSeverity severity,
435  absl::string_view message) override;
436 
437  private:
438  RefCountedPtr<GrpcLb> parent_;
439  };
440 
441  class StateWatcher : public AsyncConnectivityStateWatcherInterface {
442  public:
443  explicit StateWatcher(RefCountedPtr<GrpcLb> parent)
444  : AsyncConnectivityStateWatcherInterface(parent->work_serializer()),
445  parent_(std::move(parent)) {}
446 
447  ~StateWatcher() override { parent_.reset(DEBUG_LOCATION, "StateWatcher"); }
448 
449  private:
450  void OnConnectivityStateChange(grpc_connectivity_state new_state,
451  const absl::Status& status) override {
452  if (parent_->fallback_at_startup_checks_pending_ &&
453  new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
454  // In TRANSIENT_FAILURE. Cancel the fallback timer and go into
455  // fallback mode immediately.
457  "[grpclb %p] balancer channel in state:TRANSIENT_FAILURE (%s); "
458  "entering fallback mode",
459  parent_.get(), status.ToString().c_str());
460  parent_->fallback_at_startup_checks_pending_ = false;
461  grpc_timer_cancel(&parent_->lb_fallback_timer_);
462  parent_->fallback_mode_ = true;
463  parent_->CreateOrUpdateChildPolicyLocked();
464  // Cancel the watch, since we don't care about the channel state once we
465  // go into fallback mode.
466  parent_->CancelBalancerChannelConnectivityWatchLocked();
467  }
468  }
469 
470  RefCountedPtr<GrpcLb> parent_;
471  };
472 
473  ~GrpcLb() override;
474 
475  void ShutdownLocked() override;
476 
477  // Helper functions used in UpdateLocked().
478  void UpdateBalancerChannelLocked(const grpc_channel_args& args);
479 
480  void CancelBalancerChannelConnectivityWatchLocked();
481 
482  // Methods for dealing with fallback state.
483  void MaybeEnterFallbackModeAfterStartup();
484  static void OnFallbackTimer(void* arg, grpc_error_handle error);
485  void OnFallbackTimerLocked(grpc_error_handle error);
486 
487  // Methods for dealing with the balancer call.
488  void StartBalancerCallLocked();
489  void StartBalancerCallRetryTimerLocked();
490  static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error);
491  void OnBalancerCallRetryTimerLocked(grpc_error_handle error);
492 
493  // Methods for dealing with the child policy.
494  grpc_channel_args* CreateChildPolicyArgsLocked(
495  bool is_backend_from_grpclb_load_balancer);
496  OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
497  const grpc_channel_args* args);
498  void CreateOrUpdateChildPolicyLocked();
499 
500  // Subchannel caching.
501  void CacheDeletedSubchannelLocked(
502  RefCountedPtr<SubchannelInterface> subchannel);
503  void StartSubchannelCacheTimerLocked();
504  static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error);
505  void OnSubchannelCacheTimerLocked(grpc_error_handle error);
506 
507  // Who the client is trying to communicate with.
509  // Configurations for the policy.
510  RefCountedPtr<GrpcLbConfig> config_;
511 
512  // Current channel args from the resolver.
514 
515  // Internal state.
516  bool shutting_down_ = false;
517 
518  // The channel for communicating with the LB server.
520  StateWatcher* watcher_ = nullptr;
521  // Response generator to inject address updates into lb_channel_.
522  RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
523  // Parent channelz node.
524  RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
525 
526  // The data associated with the current LB call. It holds a ref to this LB
527  // policy. It's initialized every time we query for backends. It's reset to
528  // NULL whenever the current LB call is no longer needed (e.g., the LB policy
529  // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
530  // contains a non-NULL lb_call_.
531  OrphanablePtr<BalancerCallState> lb_calld_;
532  // Timeout for the LB call. 0 means no deadline.
534  // Balancer call retry state.
539 
540  // The deserialized response from the balancer. May be nullptr until one
541  // such response has arrived.
542  RefCountedPtr<Serverlist> serverlist_;
543 
544  // Whether we're in fallback mode.
545  bool fallback_mode_ = false;
546  // The backend addresses from the resolver.
548  // The last resolution note from our parent.
549  // To be passed to child policy when fallback_backend_addresses_ is empty.
551  // State for fallback-at-startup checks.
552  // Timeout after startup after which we will go into fallback mode if
553  // we have not received a serverlist from the balancer.
558 
559  // The child policy to use for the backends.
560  OrphanablePtr<LoadBalancingPolicy> child_policy_;
561  // Child policy in state READY.
562  bool child_policy_ready_ = false;
563 
564  // Deleted subchannel caching.
566  std::map<Timestamp /*deletion time*/,
567  std::vector<RefCountedPtr<SubchannelInterface>>>
572 };
573 
574 //
575 // GrpcLb::Serverlist
576 //
577 
578 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
579  return serverlist_ == other.serverlist_;
580 }
581 
582 void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
583  memset(addr, 0, sizeof(*addr));
584  if (server.drop) return;
585  const uint16_t netorder_port = grpc_htons(static_cast<uint16_t>(server.port));
586  /* the addresses are given in binary format (a in(6)_addr struct) in
587  * server->ip_address.bytes. */
588  if (server.ip_size == 4) {
589  addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
590  grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
591  addr4->sin_family = GRPC_AF_INET;
592  memcpy(&addr4->sin_addr, server.ip_addr, server.ip_size);
593  addr4->sin_port = netorder_port;
594  } else if (server.ip_size == 16) {
595  addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
596  grpc_sockaddr_in6* addr6 =
597  reinterpret_cast<grpc_sockaddr_in6*>(&addr->addr);
598  addr6->sin6_family = GRPC_AF_INET6;
599  memcpy(&addr6->sin6_addr, server.ip_addr, server.ip_size);
600  addr6->sin6_port = netorder_port;
601  }
602 }
603 
604 std::string GrpcLb::Serverlist::AsText() const {
605  std::vector<std::string> entries;
606  for (size_t i = 0; i < serverlist_.size(); ++i) {
607  const GrpcLbServer& server = serverlist_[i];
608  std::string ipport;
609  if (server.drop) {
610  ipport = "(drop)";
611  } else {
613  ParseServer(server, &addr);
614  auto addr_str = grpc_sockaddr_to_string(&addr, false);
615  ipport = addr_str.ok() ? *addr_str : addr_str.status().ToString();
616  }
617  entries.push_back(absl::StrFormat(" %" PRIuPTR ": %s token=%s\n", i,
618  ipport, server.load_balance_token));
619  }
620  return absl::StrJoin(entries, "");
621 }
622 
623 bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
624  if (server.drop) return false;
625  if (GPR_UNLIKELY(server.port >> 16 != 0)) {
626  if (log) {
628  "Invalid port '%d' at index %" PRIuPTR
629  " of serverlist. Ignoring.",
630  server.port, idx);
631  }
632  return false;
633  }
634  if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
635  if (log) {
637  "Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
638  " of serverlist. Ignoring",
639  server.ip_size, idx);
640  }
641  return false;
642  }
643  return true;
644 }
645 
646 // Returns addresses extracted from the serverlist.
647 ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
648  GrpcLbClientStats* client_stats) const {
649  RefCountedPtr<GrpcLbClientStats> stats;
650  if (client_stats != nullptr) stats = client_stats->Ref();
651  ServerAddressList addresses;
652  for (size_t i = 0; i < serverlist_.size(); ++i) {
653  const GrpcLbServer& server = serverlist_[i];
654  if (!IsServerValid(server, i, false)) continue;
655  // Address processing.
657  ParseServer(server, &addr);
658  // LB token processing.
659  const size_t lb_token_length = strnlen(
660  server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
661  std::string lb_token(server.load_balance_token, lb_token_length);
662  if (lb_token.empty()) {
663  auto addr_uri = grpc_sockaddr_to_uri(&addr);
665  "Missing LB token for backend address '%s'. The empty token will "
666  "be used instead",
667  addr_uri.ok() ? addr_uri->c_str()
668  : addr_uri.status().ToString().c_str());
669  }
670  // Attach attribute to address containing LB token and stats object.
671  std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
672  attributes;
673  attributes[kGrpcLbAddressAttributeKey] =
674  absl::make_unique<TokenAndClientStatsAttribute>(std::move(lb_token),
675  stats);
676  // Add address.
677  addresses.emplace_back(addr, /*args=*/nullptr, std::move(attributes));
678  }
679  return addresses;
680 }
681 
682 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
683  if (serverlist_.empty()) return false;
684  for (const GrpcLbServer& server : serverlist_) {
685  if (!server.drop) return false;
686  }
687  return true;
688 }
689 
690 const char* GrpcLb::Serverlist::ShouldDrop() {
691  if (serverlist_.empty()) return nullptr;
692  GrpcLbServer& server = serverlist_[drop_index_];
693  drop_index_ = (drop_index_ + 1) % serverlist_.size();
694  return server.drop ? server.load_balance_token : nullptr;
695 }
696 
697 //
698 // GrpcLb::Picker
699 //
700 
701 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
702  // Check if we should drop the call.
703  const char* drop_token =
704  serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop();
705  if (drop_token != nullptr) {
706  // Update client load reporting stats to indicate the number of
707  // dropped calls. Note that we have to do this here instead of in
708  // the client_load_reporting filter, because we do not create a
709  // subchannel call (and therefore no client_load_reporting filter)
710  // for dropped calls.
711  if (client_stats_ != nullptr) {
712  client_stats_->AddCallDropped(drop_token);
713  }
714  return PickResult::Drop(
715  absl::UnavailableError("drop directed by grpclb balancer"));
716  }
717  // Forward pick to child policy.
718  PickResult result = child_picker_->Pick(args);
719  // If pick succeeded, add LB token to initial metadata.
720  auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
721  if (complete_pick != nullptr) {
722  const SubchannelWrapper* subchannel_wrapper =
723  static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
724  // Encode client stats object into metadata for use by
725  // client_load_reporting filter.
726  GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
727  if (client_stats != nullptr) {
728  complete_pick->subchannel_call_tracker =
729  absl::make_unique<SubchannelCallTracker>(
730  client_stats->Ref(),
731  std::move(complete_pick->subchannel_call_tracker));
732  // The metadata value is a hack: we pretend the pointer points to
733  // a string and rely on the client_load_reporting filter to know
734  // how to interpret it.
735  args.initial_metadata->Add(
737  absl::string_view(reinterpret_cast<const char*>(client_stats), 0));
738  // Update calls-started.
739  client_stats->AddCallStarted();
740  }
741  // Encode the LB token in metadata.
742  // Create a new copy on the call arena, since the subchannel list
743  // may get refreshed between when we return this pick and when the
744  // initial metadata goes out on the wire.
745  if (!subchannel_wrapper->lb_token().empty()) {
746  char* lb_token = static_cast<char*>(
747  args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1));
748  strcpy(lb_token, subchannel_wrapper->lb_token().c_str());
749  args.initial_metadata->Add(LbTokenMetadata::key(), lb_token);
750  }
751  // Unwrap subchannel to pass up to the channel.
752  complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
753  }
754  return result;
755 }
756 
757 //
758 // GrpcLb::Helper
759 //
760 
761 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
762  ServerAddress address, const grpc_channel_args& args) {
763  if (parent_->shutting_down_) return nullptr;
764  const TokenAndClientStatsAttribute* attribute =
765  static_cast<const TokenAndClientStatsAttribute*>(
766  address.GetAttribute(kGrpcLbAddressAttributeKey));
767  if (attribute == nullptr) {
769  "[grpclb %p] no TokenAndClientStatsAttribute for address %p",
770  parent_.get(), address.ToString().c_str());
771  abort();
772  }
773  std::string lb_token = attribute->lb_token();
774  RefCountedPtr<GrpcLbClientStats> client_stats = attribute->client_stats();
775  return MakeRefCounted<SubchannelWrapper>(
776  parent_->channel_control_helper()->CreateSubchannel(std::move(address),
777  args),
778  parent_->Ref(DEBUG_LOCATION, "SubchannelWrapper"), std::move(lb_token),
779  std::move(client_stats));
780 }
781 
782 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
783  const absl::Status& status,
784  std::unique_ptr<SubchannelPicker> picker) {
785  if (parent_->shutting_down_) return;
786  // Record whether child policy reports READY.
787  parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
788  // Enter fallback mode if needed.
789  parent_->MaybeEnterFallbackModeAfterStartup();
790  // We pass the serverlist to the picker so that it can handle drops.
791  // However, we don't want to handle drops in the case where the child
792  // policy is reporting a state other than READY (unless we are
793  // dropping *all* calls), because we don't want to process drops for picks
794  // that yield a QUEUE result; this would result in dropping too many calls,
795  // since we will see the queued picks multiple times, and we'd consider each
796  // one a separate call for the drop calculation. So in this case, we pass
797  // a null serverlist to the picker, which tells it not to do drops.
798  RefCountedPtr<Serverlist> serverlist;
799  if (state == GRPC_CHANNEL_READY ||
800  (parent_->serverlist_ != nullptr &&
801  parent_->serverlist_->ContainsAllDropEntries())) {
802  serverlist = parent_->serverlist_;
803  }
804  RefCountedPtr<GrpcLbClientStats> client_stats;
805  if (parent_->lb_calld_ != nullptr &&
806  parent_->lb_calld_->client_stats() != nullptr) {
807  client_stats = parent_->lb_calld_->client_stats()->Ref();
808  }
811  "[grpclb %p helper %p] state=%s (%s) wrapping child "
812  "picker %p (serverlist=%p, client_stats=%p)",
813  parent_.get(), this, ConnectivityStateName(state),
814  status.ToString().c_str(), picker.get(), serverlist.get(),
815  client_stats.get());
816  }
817  parent_->channel_control_helper()->UpdateState(
818  state, status,
819  absl::make_unique<Picker>(std::move(serverlist), std::move(picker),
820  std::move(client_stats)));
821 }
822 
823 void GrpcLb::Helper::RequestReresolution() {
824  if (parent_->shutting_down_) return;
825  // If we are talking to a balancer, we expect to get updated addresses
826  // from the balancer, so we can ignore the re-resolution request from
827  // the child policy. Otherwise, pass the re-resolution request up to the
828  // channel.
829  if (parent_->lb_calld_ == nullptr ||
830  !parent_->lb_calld_->seen_initial_response()) {
831  parent_->channel_control_helper()->RequestReresolution();
832  }
833 }
834 
835 absl::string_view GrpcLb::Helper::GetAuthority() {
836  return parent_->channel_control_helper()->GetAuthority();
837 }
838 
839 void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity,
841  if (parent_->shutting_down_) return;
842  parent_->channel_control_helper()->AddTraceEvent(severity, message);
843 }
844 
845 //
846 // GrpcLb::BalancerCallState
847 //
848 
849 GrpcLb::BalancerCallState::BalancerCallState(
850  RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
851  : InternallyRefCounted<BalancerCallState>(
852  GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState"
853  : nullptr),
854  grpclb_policy_(std::move(parent_grpclb_policy)) {
855  GPR_ASSERT(grpclb_policy_ != nullptr);
856  GPR_ASSERT(!grpclb_policy()->shutting_down_);
857  // Init the LB call. Note that the LB call will progress every time there's
858  // activity in grpclb_policy_->interested_parties(), which is comprised of
859  // the polling entities from client_channel.
860  GPR_ASSERT(!grpclb_policy()->server_name_.empty());
861  // Closure Initialization
862  GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
863  grpc_schedule_on_exec_ctx);
865  OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
866  GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
867  this, grpc_schedule_on_exec_ctx);
869  this, grpc_schedule_on_exec_ctx);
870  const Timestamp deadline =
871  grpclb_policy()->lb_call_timeout_ == Duration::Zero()
872  ? Timestamp::InfFuture()
873  : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_;
875  grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
876  grpclb_policy_->interested_parties(),
877  Slice::FromStaticString("/grpc.lb.v1.LoadBalancer/BalanceLoad").c_slice(),
878  nullptr, deadline, nullptr);
879  // Init the LB call request payload.
881  grpc_slice request_payload_slice = GrpcLbRequestCreate(
882  grpclb_policy()->config_->service_name().empty()
883  ? grpclb_policy()->server_name_.c_str()
884  : grpclb_policy()->config_->service_name().c_str(),
885  arena.ptr());
887  grpc_raw_byte_buffer_create(&request_payload_slice, 1);
888  grpc_slice_unref_internal(request_payload_slice);
889  // Init other data associated with the LB call.
892 }
893 
894 GrpcLb::BalancerCallState::~BalancerCallState() {
895  GPR_ASSERT(lb_call_ != nullptr);
902 }
903 
904 void GrpcLb::BalancerCallState::Orphan() {
905  GPR_ASSERT(lb_call_ != nullptr);
906  // If we are here because grpclb_policy wants to cancel the call,
907  // lb_on_balancer_status_received_ will complete the cancellation and clean
908  // up. Otherwise, we are here because grpclb_policy has to orphan a failed
909  // call, then the following cancellation will be a no-op.
913  Unref(DEBUG_LOCATION, "client_load_report cancelled");
914  }
915  // Note that the initial ref is hold by lb_on_balancer_status_received_
916  // instead of the caller of this function. So the corresponding unref happens
917  // in lb_on_balancer_status_received_ instead of here.
918 }
919 
920 void GrpcLb::BalancerCallState::StartQuery() {
921  GPR_ASSERT(lb_call_ != nullptr);
923  gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
924  grpclb_policy_.get(), this, lb_call_);
925  }
926  // Create the ops.
927  grpc_call_error call_error;
928  grpc_op ops[3];
929  memset(ops, 0, sizeof(ops));
930  // Op: send initial metadata.
931  grpc_op* op = ops;
936  op->reserved = nullptr;
937  op++;
938  // Op: send request message.
939  GPR_ASSERT(send_message_payload_ != nullptr);
942  op->flags = 0;
943  op->reserved = nullptr;
944  op++;
945  // TODO(roth): We currently track this ref manually. Once the
946  // ClosureRef API is ready, we should pass the RefCountedPtr<> along
947  // with the callback.
948  auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
949  self.release();
951  static_cast<size_t>(op - ops),
953  GPR_ASSERT(GRPC_CALL_OK == call_error);
954  // Op: recv initial metadata.
955  op = ops;
959  op->flags = 0;
960  op->reserved = nullptr;
961  op++;
962  // Op: recv response.
965  op->flags = 0;
966  op->reserved = nullptr;
967  op++;
968  // TODO(roth): We currently track this ref manually. Once the
969  // ClosureRef API is ready, we should pass the RefCountedPtr<> along
970  // with the callback.
971  self = Ref(DEBUG_LOCATION, "on_message_received");
972  self.release();
974  lb_call_, ops, static_cast<size_t>(op - ops),
976  GPR_ASSERT(GRPC_CALL_OK == call_error);
977  // Op: recv server status.
978  op = ops;
984  op->flags = 0;
985  op->reserved = nullptr;
986  op++;
987  // This callback signals the end of the LB call, so it relies on the initial
988  // ref instead of a new ref. When it's invoked, it's the initial ref that is
989  // unreffed.
991  lb_call_, ops, static_cast<size_t>(op - ops),
993  GPR_ASSERT(GRPC_CALL_OK == call_error);
994 }
995 
996 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
999  ApplicationCallbackExecCtx callback_exec_ctx;
1000  ExecCtx exec_ctx;
1001  MaybeSendClientLoadReport();
1002  });
1003 }
1004 
1005 void GrpcLb::BalancerCallState::MaybeSendClientLoadReport() {
1006  grpclb_policy()->work_serializer()->Run(
1007  [this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION);
1008 }
1009 
1010 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() {
1012  if (this != grpclb_policy()->lb_calld_.get()) {
1013  Unref(DEBUG_LOCATION, "client_load_report");
1014  return;
1015  }
1016  // If we've already sent the initial request, then we can go ahead and send
1017  // the load report. Otherwise, we need to wait until the initial request has
1018  // been sent to send this (see OnInitialRequestSentLocked()).
1019  if (send_message_payload_ == nullptr) {
1020  SendClientLoadReportLocked();
1021  } else {
1023  }
1024 }
1025 
1026 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
1027  // Construct message payload.
1028  GPR_ASSERT(send_message_payload_ == nullptr);
1029  // Get snapshot of stats.
1034  std::unique_ptr<GrpcLbClientStats::DroppedCallCounts> drop_token_counts;
1038  // Skip client load report if the counters were all zero in the last
1039  // report and they are still zero in this one.
1040  if (num_calls_started == 0 && num_calls_finished == 0 &&
1043  (drop_token_counts == nullptr || drop_token_counts->empty())) {
1045  ScheduleNextClientLoadReportLocked();
1046  return;
1047  }
1049  } else {
1051  }
1052  // Populate load report.
1053  upb::Arena arena;
1054  grpc_slice request_payload_slice = GrpcLbLoadReportRequestCreate(
1059  grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1060  grpc_slice_unref_internal(request_payload_slice);
1061  // Send the report.
1062  grpc_op op;
1063  memset(&op, 0, sizeof(op));
1068  if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1070  "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
1071  grpclb_policy_.get(), this, call_error);
1072  GPR_ASSERT(GRPC_CALL_OK == call_error);
1073  }
1074 }
1075 
1076 void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
1078  BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1079  (void)GRPC_ERROR_REF(error); // ref owned by lambda
1080  lb_calld->grpclb_policy()->work_serializer()->Run(
1081  [lb_calld, error]() { lb_calld->ClientLoadReportDoneLocked(error); },
1082  DEBUG_LOCATION);
1083 }
1084 
1085 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(
1088  send_message_payload_ = nullptr;
1089  if (!GRPC_ERROR_IS_NONE(error) || this != grpclb_policy()->lb_calld_.get()) {
1090  Unref(DEBUG_LOCATION, "client_load_report");
1092  return;
1093  }
1094  ScheduleNextClientLoadReportLocked();
1095 }
1096 
1097 void GrpcLb::BalancerCallState::OnInitialRequestSent(
1098  void* arg, grpc_error_handle /*error*/) {
1099  BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1100  lb_calld->grpclb_policy()->work_serializer()->Run(
1101  [lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION);
1102 }
1103 
1104 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() {
1106  send_message_payload_ = nullptr;
1107  // If we attempted to send a client load report before the initial request was
1108  // sent (and this lb_calld is still in use), send the load report now.
1109  if (client_load_report_is_due_ && this == grpclb_policy()->lb_calld_.get()) {
1110  SendClientLoadReportLocked();
1112  }
1113  Unref(DEBUG_LOCATION, "on_initial_request_sent");
1114 }
1115 
1116 void GrpcLb::BalancerCallState::OnBalancerMessageReceived(
1117  void* arg, grpc_error_handle /*error*/) {
1118  BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1119  lb_calld->grpclb_policy()->work_serializer()->Run(
1120  [lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); },
1121  DEBUG_LOCATION);
1122 }
1123 
1124 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
1125  // Null payload means the LB call was cancelled.
1126  if (this != grpclb_policy()->lb_calld_.get() ||
1127  recv_message_payload_ == nullptr) {
1128  Unref(DEBUG_LOCATION, "on_message_received");
1129  return;
1130  }
1133  grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1136  recv_message_payload_ = nullptr;
1137  GrpcLbResponse response;
1138  upb::Arena arena;
1139  if (!GrpcLbResponseParse(response_slice, arena.ptr(), &response) ||
1140  (response.type == response.INITIAL && seen_initial_response_)) {
1141  char* response_slice_str =
1142  grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1144  "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1145  "Ignoring.",
1146  grpclb_policy(), this, response_slice_str);
1147  gpr_free(response_slice_str);
1148  } else {
1149  switch (response.type) {
1150  case response.INITIAL: {
1151  if (response.client_stats_report_interval != Duration::Zero()) {
1153  Duration::Seconds(1), response.client_stats_report_interval);
1155  gpr_log(GPR_INFO,
1156  "[grpclb %p] lb_calld=%p: Received initial LB response "
1157  "message; client load reporting interval = %" PRId64
1158  " milliseconds",
1159  grpclb_policy(), this,
1161  }
1163  gpr_log(GPR_INFO,
1164  "[grpclb %p] lb_calld=%p: Received initial LB response "
1165  "message; client load reporting NOT enabled",
1166  grpclb_policy(), this);
1167  }
1168  seen_initial_response_ = true;
1169  break;
1170  }
1171  case response.SERVERLIST: {
1172  GPR_ASSERT(lb_call_ != nullptr);
1173  auto serverlist_wrapper =
1174  MakeRefCounted<Serverlist>(std::move(response.serverlist));
1176  gpr_log(GPR_INFO,
1177  "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1178  " servers received:\n%s",
1179  grpclb_policy(), this,
1180  serverlist_wrapper->serverlist().size(),
1181  serverlist_wrapper->AsText().c_str());
1182  }
1183  seen_serverlist_ = true;
1184  // Start sending client load report only after we start using the
1185  // serverlist returned from the current LB call.
1186  if (client_stats_report_interval_ > Duration::Zero() &&
1187  client_stats_ == nullptr) {
1188  client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1189  // Ref held by callback.
1190  Ref(DEBUG_LOCATION, "client_load_report").release();
1191  ScheduleNextClientLoadReportLocked();
1192  }
1193  // Check if the serverlist differs from the previous one.
1194  if (grpclb_policy()->serverlist_ != nullptr &&
1195  *grpclb_policy()->serverlist_ == *serverlist_wrapper) {
1197  gpr_log(GPR_INFO,
1198  "[grpclb %p] lb_calld=%p: Incoming server list identical "
1199  "to current, ignoring.",
1200  grpclb_policy(), this);
1201  }
1202  } else { // New serverlist.
1203  // Dispose of the fallback.
1204  // TODO(roth): Ideally, we should stay in fallback mode until we
1205  // know that we can reach at least one of the backends in the new
1206  // serverlist. Unfortunately, we can't do that, since we need to
1207  // send the new addresses to the child policy in order to determine
1208  // if they are reachable, and if we don't exit fallback mode now,
1209  // CreateOrUpdateChildPolicyLocked() will use the fallback
1210  // addresses instead of the addresses from the new serverlist.
1211  // However, if we can't reach any of the servers in the new
1212  // serverlist, then the child policy will never switch away from
1213  // the fallback addresses, but the grpclb policy will still think
1214  // that we're not in fallback mode, which means that we won't send
1215  // updates to the child policy when the fallback addresses are
1216  // updated by the resolver. This is sub-optimal, but the only way
1217  // to fix it is to maintain a completely separate child policy for
1218  // fallback mode, and that's more work than we want to put into
1219  // the grpclb implementation at this point, since we're deprecating
1220  // it in favor of the xds policy. We will implement this the
1221  // right way in the xds policy instead.
1222  if (grpclb_policy()->fallback_mode_) {
1223  gpr_log(GPR_INFO,
1224  "[grpclb %p] Received response from balancer; exiting "
1225  "fallback mode",
1226  grpclb_policy());
1227  grpclb_policy()->fallback_mode_ = false;
1228  }
1229  if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1230  grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1231  grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
1232  grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1233  }
1234  // Update the serverlist in the GrpcLb instance. This serverlist
1235  // instance will be destroyed either upon the next update or when the
1236  // GrpcLb instance is destroyed.
1237  grpclb_policy()->serverlist_ = std::move(serverlist_wrapper);
1238  grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1239  }
1240  break;
1241  }
1242  case response.FALLBACK: {
1243  if (!grpclb_policy()->fallback_mode_) {
1244  gpr_log(GPR_INFO,
1245  "[grpclb %p] Entering fallback mode as requested by balancer",
1246  grpclb_policy());
1247  if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1248  grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1249  grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
1250  grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1251  }
1252  grpclb_policy()->fallback_mode_ = true;
1253  grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1254  // Reset serverlist, so that if the balancer exits fallback
1255  // mode by sending the same serverlist we were previously
1256  // using, we don't incorrectly ignore it as a duplicate.
1257  grpclb_policy()->serverlist_.reset();
1258  }
1259  break;
1260  }
1261  }
1262  }
1263  grpc_slice_unref_internal(response_slice);
1264  if (!grpclb_policy()->shutting_down_) {
1265  // Keep listening for serverlist updates.
1266  grpc_op op;
1267  memset(&op, 0, sizeof(op));
1270  op.flags = 0;
1271  op.reserved = nullptr;
1272  // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1275  GPR_ASSERT(GRPC_CALL_OK == call_error);
1276  } else {
1277  Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1278  }
1279 }
1280 
1281 void GrpcLb::BalancerCallState::OnBalancerStatusReceived(
1282  void* arg, grpc_error_handle error) {
1283  BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1284  (void)GRPC_ERROR_REF(error); // owned by lambda
1285  lb_calld->grpclb_policy()->work_serializer()->Run(
1286  [lb_calld, error]() { lb_calld->OnBalancerStatusReceivedLocked(error); },
1287  DEBUG_LOCATION);
1288 }
1289 
1290 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1292  GPR_ASSERT(lb_call_ != nullptr);
1294  char* status_details = grpc_slice_to_c_string(lb_call_status_details_);
1295  gpr_log(GPR_INFO,
1296  "[grpclb %p] lb_calld=%p: Status from LB server received. "
1297  "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1298  grpclb_policy(), this, lb_call_status_, status_details, lb_call_,
1300  gpr_free(status_details);
1301  }
1303  // If this lb_calld is still in use, this call ended because of a failure so
1304  // we want to retry connecting. Otherwise, we have deliberately ended this
1305  // call and no further action is required.
1306  if (this == grpclb_policy()->lb_calld_.get()) {
1307  // If the fallback-at-startup checks are pending, go into fallback mode
1308  // immediately. This short-circuits the timeout for the fallback-at-startup
1309  // case.
1310  grpclb_policy()->lb_calld_.reset();
1311  if (grpclb_policy()->fallback_at_startup_checks_pending_) {
1313  gpr_log(GPR_INFO,
1314  "[grpclb %p] Balancer call finished without receiving "
1315  "serverlist; entering fallback mode",
1316  grpclb_policy());
1317  grpclb_policy()->fallback_at_startup_checks_pending_ = false;
1318  grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
1319  grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
1320  grpclb_policy()->fallback_mode_ = true;
1321  grpclb_policy()->CreateOrUpdateChildPolicyLocked();
1322  } else {
1323  // This handles the fallback-after-startup case.
1324  grpclb_policy()->MaybeEnterFallbackModeAfterStartup();
1325  }
1326  GPR_ASSERT(!grpclb_policy()->shutting_down_);
1327  grpclb_policy()->channel_control_helper()->RequestReresolution();
1328  if (seen_initial_response_) {
1329  // If we lose connection to the LB server, reset the backoff and restart
1330  // the LB call immediately.
1331  grpclb_policy()->lb_call_backoff_.Reset();
1332  grpclb_policy()->StartBalancerCallLocked();
1333  } else {
1334  // If this LB call fails establishing any connection to the LB server,
1335  // retry later.
1336  grpclb_policy()->StartBalancerCallRetryTimerLocked();
1337  }
1338  }
1339  Unref(DEBUG_LOCATION, "lb_call_ended");
1340 }
1341 
1342 //
1343 // helper code for creating balancer channel
1344 //
1345 
1346 ServerAddressList ExtractBalancerAddresses(const grpc_channel_args& args) {
1347  const ServerAddressList* addresses =
1349  if (addresses != nullptr) return *addresses;
1350  return ServerAddressList();
1351 }
1352 
1353 /* Returns the channel args for the LB channel, used to create a bidirectional
1354  * stream for the reception of load balancing updates.
1355  *
1356  * Inputs:
1357  * - \a response_generator: in order to propagate updates from the resolver
1358  * above the grpclb policy.
1359  * - \a args: other args inherited from the grpclb policy. */
1360 grpc_channel_args* BuildBalancerChannelArgs(
1361  FakeResolverResponseGenerator* response_generator,
1362  const grpc_channel_args* args) {
1363  // Channel args to remove.
1364  static const char* args_to_remove[] = {
1365  // LB policy name, since we want to use the default (pick_first) in
1366  // the LB channel.
1368  // Strip out the service config, since we don't want the LB policy
1369  // config specified for the parent channel to affect the LB channel.
1371  // The channel arg for the server URI, since that will be different for
1372  // the LB channel than for the parent channel. The client channel
1373  // factory will re-add this arg with the right value.
1375  // The fake resolver response generator, because we are replacing it
1376  // with the one from the grpclb policy, used to propagate updates to
1377  // the LB channel.
1379  // The LB channel should use the authority indicated by the target
1380  // authority table (see \a ModifyGrpclbBalancerChannelArgs),
1381  // as opposed to the authority from the parent channel.
1383  // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1384  // treated as a stand-alone channel and not inherit this argument from the
1385  // args of the parent channel.
1387  // Don't want to pass down channelz node from parent; the balancer
1388  // channel will get its own.
1390  // Remove the channel args for channel credentials and replace it
1391  // with a version that does not contain call credentials. The loadbalancer
1392  // is not necessarily trusted to handle bearer token credentials.
1394  };
1395  // Create channel args for channel credentials that does not contain bearer
1396  // token credentials.
1397  grpc_channel_credentials* channel_credentials =
1399  GPR_ASSERT(channel_credentials != nullptr);
1400  RefCountedPtr<grpc_channel_credentials> creds_sans_call_creds =
1401  channel_credentials->duplicate_without_call_credentials();
1402  GPR_ASSERT(creds_sans_call_creds != nullptr);
1403  // Channel args to add.
1404  absl::InlinedVector<grpc_arg, 4> args_to_add = {
1405  // The fake resolver response generator, which we use to inject
1406  // address updates into the LB channel.
1407  FakeResolverResponseGenerator::MakeChannelArg(response_generator),
1408  // A channel arg indicating the target is a grpclb load balancer.
1410  const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
1411  // Tells channelz that this is an internal channel.
1413  const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
1414  // A channel args for new channel credentials that does not contain bearer
1415  // tokens.
1416  grpc_channel_credentials_to_arg(creds_sans_call_creds.get()),
1417  };
1419  args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
1420  args_to_add.size());
1421 }
1422 
1423 //
1424 // ctor and dtor
1425 //
1426 
1427 std::string GetServerNameFromChannelArgs(const grpc_channel_args* args) {
1428  const char* server_uri =
1430  GPR_ASSERT(server_uri != nullptr);
1432  GPR_ASSERT(uri.ok() && !uri->path().empty());
1433  return std::string(absl::StripPrefix(uri->path(), "/"));
1434 }
1435 
1436 GrpcLb::GrpcLb(Args args)
1437  : LoadBalancingPolicy(std::move(args)),
1438  server_name_(GetServerNameFromChannelArgs(args.args)),
1439  response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1441  args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS, {0, 0, INT_MAX}))),
1443  BackOff::Options()
1444  .set_initial_backoff(Duration::Seconds(
1447  .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1448  .set_max_backoff(Duration::Seconds(
1453  {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}))),
1457  {GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS, 0,
1458  INT_MAX}))) {
1460  gpr_log(GPR_INFO,
1461  "[grpclb %p] Will use '%s' as the server name for LB request.",
1462  this, server_name_.c_str());
1463  }
1464  // Closure Initialization
1465  GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
1466  grpc_schedule_on_exec_ctx);
1467  GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this,
1468  grpc_schedule_on_exec_ctx);
1469  GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this,
1470  nullptr);
1471 }
1472 
1473 GrpcLb::~GrpcLb() { grpc_channel_args_destroy(args_); }
1474 
1475 void GrpcLb::ShutdownLocked() {
1476  shutting_down_ = true;
1477  lb_calld_.reset();
1481  }
1482  cached_subchannels_.clear();
1485  }
1489  CancelBalancerChannelConnectivityWatchLocked();
1490  }
1491  if (child_policy_ != nullptr) {
1492  grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1493  interested_parties());
1494  child_policy_.reset();
1495  }
1496  // We destroy the LB channel here instead of in our destructor because
1497  // destroying the channel triggers a last callback to
1498  // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1499  // alive when that callback is invoked.
1500  if (lb_channel_ != nullptr) {
1501  if (parent_channelz_node_ != nullptr) {
1502  channelz::ChannelNode* child_channelz_node =
1504  GPR_ASSERT(child_channelz_node != nullptr);
1505  parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
1506  }
1508  lb_channel_ = nullptr;
1509  }
1510 }
1511 
1512 //
1513 // public methods
1514 //
1515 
1516 void GrpcLb::ResetBackoffLocked() {
1517  if (lb_channel_ != nullptr) {
1519  }
1520  if (child_policy_ != nullptr) {
1521  child_policy_->ResetBackoffLocked();
1522  }
1523 }
1524 
1525 void GrpcLb::UpdateLocked(UpdateArgs args) {
1526  const bool is_initial_update = lb_channel_ == nullptr;
1527  config_ = args.config;
1528  GPR_ASSERT(config_ != nullptr);
1529  // Update fallback address list.
1532  // Add null LB token attributes.
1533  for (ServerAddress& address : *fallback_backend_addresses_) {
1534  address = address.WithAttribute(
1536  absl::make_unique<TokenAndClientStatsAttribute>("", nullptr));
1537  }
1538  }
1539  resolution_note_ = std::move(args.resolution_note);
1540  // Update balancer channel.
1541  UpdateBalancerChannelLocked(*args.args);
1542  // Update the existing child policy, if any.
1543  if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1544  // If this is the initial update, start the fallback-at-startup checks
1545  // and the balancer call.
1546  if (is_initial_update) {
1548  // Start timer.
1549  Timestamp deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
1550  Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
1552  // Start watching the channel's connectivity state. If the channel
1553  // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1554  // fallback mode even if the fallback timeout has not elapsed.
1555  ClientChannel* client_channel =
1556  ClientChannel::GetFromChannel(Channel::FromC(lb_channel_));
1557  GPR_ASSERT(client_channel != nullptr);
1558  // Ref held by callback.
1559  watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
1560  client_channel->AddConnectivityWatcher(
1562  OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
1563  // Start balancer call.
1564  StartBalancerCallLocked();
1565  }
1566 }
1567 
1568 //
1569 // helpers for UpdateLocked()
1570 //
1571 
1572 void GrpcLb::UpdateBalancerChannelLocked(const grpc_channel_args& args) {
1573  // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1574  // since we use this to trigger the client_load_reporting filter.
1575  static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1577  const_cast<char*>(GRPC_ARG_LB_POLICY_NAME), const_cast<char*>("grpclb"));
1580  &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1581  // Construct args for balancer channel.
1582  ServerAddressList balancer_addresses = ExtractBalancerAddresses(args);
1583  grpc_channel_args* lb_channel_args =
1584  BuildBalancerChannelArgs(response_generator_.get(), &args);
1585  // Create balancer channel if needed.
1586  if (lb_channel_ == nullptr) {
1587  std::string uri_str = absl::StrCat("fake:///", server_name_);
1588  grpc_channel_credentials* creds =
1589  grpc_channel_credentials_find_in_args(lb_channel_args);
1590  GPR_ASSERT(creds != nullptr);
1591  const char* arg_to_remove = GRPC_ARG_CHANNEL_CREDENTIALS;
1592  grpc_channel_args* new_args =
1593  grpc_channel_args_copy_and_remove(lb_channel_args, &arg_to_remove, 1);
1594  lb_channel_ = grpc_channel_create(uri_str.c_str(), creds, new_args);
1595  GPR_ASSERT(lb_channel_ != nullptr);
1596  grpc_channel_args_destroy(new_args);
1597  // Set up channelz linkage.
1598  channelz::ChannelNode* child_channelz_node =
1600  channelz::ChannelNode* parent_channelz_node =
1601  grpc_channel_args_find_pointer<channelz::ChannelNode>(
1603  if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
1604  parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
1605  parent_channelz_node_ = parent_channelz_node->Ref();
1606  }
1607  }
1608  // Propagate updates to the LB channel (pick_first) through the fake
1609  // resolver.
1611  result.addresses = std::move(balancer_addresses);
1612  result.args = lb_channel_args;
1613  response_generator_->SetResponse(std::move(result));
1614 }
1615 
1616 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1617  ClientChannel* client_channel =
1618  ClientChannel::GetFromChannel(Channel::FromC(lb_channel_));
1619  GPR_ASSERT(client_channel != nullptr);
1620  client_channel->RemoveConnectivityWatcher(watcher_);
1621 }
1622 
1623 //
1624 // code for balancer channel and call
1625 //
1626 
1627 void GrpcLb::StartBalancerCallLocked() {
1628  GPR_ASSERT(lb_channel_ != nullptr);
1629  if (shutting_down_) return;
1630  // Init the LB call data.
1631  GPR_ASSERT(lb_calld_ == nullptr);
1632  lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1634  gpr_log(GPR_INFO,
1635  "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1636  this, lb_channel_, lb_calld_.get());
1637  }
1638  lb_calld_->StartQuery();
1639 }
1640 
1641 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1642  Timestamp next_try = lb_call_backoff_.NextAttemptTime();
1644  gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1645  Duration timeout = next_try - ExecCtx::Get()->Now();
1646  if (timeout > Duration::Zero()) {
1647  gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1648  this, timeout.millis());
1649  } else {
1650  gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1651  this);
1652  }
1653  }
1654  // TODO(roth): We currently track this ref manually. Once the
1655  // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1656  // with the callback.
1657  auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1658  self.release();
1661 }
1662 
1663 void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) {
1664  GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1665  (void)GRPC_ERROR_REF(error); // ref owned by lambda
1666  grpclb_policy->work_serializer()->Run(
1667  [grpclb_policy, error]() {
1668  grpclb_policy->OnBalancerCallRetryTimerLocked(error);
1669  },
1670  DEBUG_LOCATION);
1671 }
1672 
1673 void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) {
1675  if (!shutting_down_ && GRPC_ERROR_IS_NONE(error) && lb_calld_ == nullptr) {
1677  gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this);
1678  }
1679  StartBalancerCallLocked();
1680  }
1681  Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1683 }
1684 
1685 //
1686 // code for handling fallback mode
1687 //
1688 
1689 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1690  // Enter fallback mode if all of the following are true:
1691  // - We are not currently in fallback mode.
1692  // - We are not currently waiting for the initial fallback timeout.
1693  // - We are not currently in contact with the balancer.
1694  // - The child policy is not in state READY.
1696  (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1698  gpr_log(GPR_INFO,
1699  "[grpclb %p] lost contact with balancer and backends from "
1700  "most recent serverlist; entering fallback mode",
1701  this);
1702  fallback_mode_ = true;
1703  CreateOrUpdateChildPolicyLocked();
1704  }
1705 }
1706 
1707 void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) {
1708  GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1709  (void)GRPC_ERROR_REF(error); // ref owned by lambda
1710  grpclb_policy->work_serializer()->Run(
1711  [grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); },
1712  DEBUG_LOCATION);
1713 }
1714 
1715 void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) {
1716  // If we receive a serverlist after the timer fires but before this callback
1717  // actually runs, don't fall back.
1720  gpr_log(GPR_INFO,
1721  "[grpclb %p] No response from balancer after fallback timeout; "
1722  "entering fallback mode",
1723  this);
1725  CancelBalancerChannelConnectivityWatchLocked();
1726  fallback_mode_ = true;
1727  CreateOrUpdateChildPolicyLocked();
1728  }
1729  Unref(DEBUG_LOCATION, "on_fallback_timer");
1731 }
1732 
1733 //
1734 // code for interacting with the child policy
1735 //
1736 
1737 grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
1738  bool is_backend_from_grpclb_load_balancer) {
1742  is_backend_from_grpclb_load_balancer));
1743  if (is_backend_from_grpclb_load_balancer) {
1745  const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
1746  }
1747  return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
1748  args_to_add.size());
1749 }
1750 
1751 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1752  const grpc_channel_args* args) {
1753  LoadBalancingPolicy::Args lb_policy_args;
1754  lb_policy_args.work_serializer = work_serializer();
1755  lb_policy_args.args = args;
1756  lb_policy_args.channel_control_helper = absl::make_unique<Helper>(Ref());
1757  OrphanablePtr<LoadBalancingPolicy> lb_policy =
1758  MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1761  gpr_log(GPR_INFO, "[grpclb %p] Created new child policy handler (%p)", this,
1762  lb_policy.get());
1763  }
1764  // Add the gRPC LB's interested_parties pollset_set to that of the newly
1765  // created child policy. This will make the child policy progress upon
1766  // activity on gRPC LB, which in turn is tied to the application's call.
1767  grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1768  interested_parties());
1769  return lb_policy;
1770 }
1771 
1772 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1773  if (shutting_down_) return;
1774  // Construct update args.
1775  UpdateArgs update_args;
1776  bool is_backend_from_grpclb_load_balancer = false;
1777  if (fallback_mode_) {
1778  // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1779  // received any serverlist from the balancer, we use the fallback backends
1780  // returned by the resolver. Note that the fallback backend list may be
1781  // empty, in which case the new child policy will fail the picks.
1782  update_args.addresses = fallback_backend_addresses_;
1784  fallback_backend_addresses_->empty()) {
1785  update_args.resolution_note = absl::StrCat(
1786  "grpclb in fallback mode without any balancer addresses: ",
1788  }
1789  } else {
1790  update_args.addresses = serverlist_->GetServerAddressList(
1791  lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1792  is_backend_from_grpclb_load_balancer = true;
1793  }
1794  update_args.args =
1795  CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1796  GPR_ASSERT(update_args.args != nullptr);
1797  update_args.config = config_->child_policy();
1798  // Create child policy if needed.
1799  if (child_policy_ == nullptr) {
1800  child_policy_ = CreateChildPolicyLocked(update_args.args);
1801  }
1802  // Update the policy.
1804  gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this,
1805  child_policy_.get());
1806  }
1807  child_policy_->UpdateLocked(std::move(update_args));
1808 }
1809 
1810 //
1811 // subchannel caching
1812 //
1813 
1814 void GrpcLb::CacheDeletedSubchannelLocked(
1815  RefCountedPtr<SubchannelInterface> subchannel) {
1816  Timestamp deletion_time = ExecCtx::Get()->Now() + subchannel_cache_interval_;
1817  cached_subchannels_[deletion_time].push_back(std::move(subchannel));
1819  Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release();
1821  StartSubchannelCacheTimerLocked();
1822  }
1823 }
1824 
1825 void GrpcLb::StartSubchannelCacheTimerLocked() {
1826  GPR_ASSERT(!cached_subchannels_.empty());
1829 }
1830 
1831 void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) {
1832  auto* self = static_cast<GrpcLb*>(arg);
1833  (void)GRPC_ERROR_REF(error);
1834  self->work_serializer()->Run(
1835  [self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); },
1836  DEBUG_LOCATION);
1837 }
1838 
1839 void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) {
1841  auto it = cached_subchannels_.begin();
1842  if (it != cached_subchannels_.end()) {
1844  gpr_log(GPR_INFO,
1845  "[grpclb %p] removing %" PRIuPTR " subchannels from cache",
1846  this, it->second.size());
1847  }
1848  cached_subchannels_.erase(it);
1849  }
1850  if (!cached_subchannels_.empty()) {
1851  StartSubchannelCacheTimerLocked();
1852  return;
1853  }
1855  }
1856  Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer");
1858 }
1859 
1860 //
1861 // factory
1862 //
1863 
1864 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1865  public:
1866  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1867  LoadBalancingPolicy::Args args) const override {
1868  return MakeOrphanable<GrpcLb>(std::move(args));
1869  }
1870 
1871  const char* name() const override { return kGrpclb; }
1872 
1873  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1874  const Json& json, grpc_error_handle* error) const override {
1876  if (json.type() == Json::Type::JSON_NULL) {
1877  return MakeRefCounted<GrpcLbConfig>(nullptr, "");
1878  }
1879  std::vector<grpc_error_handle> error_list;
1880  Json child_policy_config_json_tmp;
1881  const Json* child_policy_config_json;
1882  std::string service_name;
1883  auto it = json.object_value().find("serviceName");
1884  if (it != json.object_value().end()) {
1885  const Json& service_name_json = it->second;
1886  if (service_name_json.type() != Json::Type::STRING) {
1887  error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1888  "field:serviceName error:type should be string"));
1889  } else {
1890  service_name = service_name_json.string_value();
1891  }
1892  }
1893  it = json.object_value().find("childPolicy");
1894  if (it == json.object_value().end()) {
1895  child_policy_config_json_tmp = Json::Array{Json::Object{
1896  {"round_robin", Json::Object()},
1897  }};
1898  child_policy_config_json = &child_policy_config_json_tmp;
1899  } else {
1900  child_policy_config_json = &it->second;
1901  }
1903  RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config =
1904  LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1905  *child_policy_config_json, &parse_error);
1907  std::vector<grpc_error_handle> child_errors;
1908  child_errors.push_back(parse_error);
1909  error_list.push_back(
1910  GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
1911  }
1912  if (error_list.empty()) {
1913  return MakeRefCounted<GrpcLbConfig>(std::move(child_policy_config),
1914  std::move(service_name));
1915  } else {
1916  *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
1917  return nullptr;
1918  }
1919  }
1920 };
1921 
1922 } // namespace
1923 
1924 } // namespace grpc_core
1925 
1926 //
1927 // Plugin registration
1928 //
1929 
1933  absl::make_unique<grpc_core::GrpcLbFactory>());
1934 }
1935 
1937 
1938 namespace grpc_core {
1940  builder->channel_init()->RegisterStage(
1943  if (builder->channel_args().GetString(GRPC_ARG_LB_POLICY_NAME) ==
1944  "grpclb") {
1945  // TODO(roth): When we get around to re-attempting
1946  // https://github.com/grpc/grpc/pull/16214, we should try to keep
1947  // this filter at the very top of the subchannel stack, since that
1948  // will minimize the number of metadata elements that the filter
1949  // needs to iterate through to find the ClientStats object.
1950  builder->PrependFilter(&grpc_client_load_reporting_filter);
1951  }
1952  return true;
1953  });
1954 }
1955 } // namespace grpc_core
seen_initial_response_
bool seen_initial_response_
Definition: grpclb.cc:251
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_arg
Definition: grpc_types.h:103
trace.h
grpc_core::grpc_lb_glb_trace
TraceFlag grpc_lb_glb_trace(false, "glb")
grpc_channel_args_find_string
char * grpc_channel_args_find_string(const grpc_channel_args *args, const char *name)
Definition: channel_args.cc:441
slice.h
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_op::grpc_op_data::grpc_op_send_message::send_message
struct grpc_byte_buffer * send_message
Definition: grpc_types.h:668
grpc_op::flags
uint32_t flags
Definition: grpc_types.h:644
grpc_call_error
grpc_call_error
Definition: grpc_types.h:464
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
grpc_core::MakeRefCounted
RefCountedPtr< T > MakeRefCounted(Args &&... args)
Definition: ref_counted_ptr.h:335
lb_call_
grpc_call * lb_call_
Definition: grpclb.cc:239
regen-readme.it
it
Definition: regen-readme.py:15
grpc_core::LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory
static void RegisterLoadBalancingPolicyFactory(std::unique_ptr< LoadBalancingPolicyFactory > factory)
Definition: lb_policy_registry.cc:87
orphanable.h
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::trailing_metadata
grpc_metadata_array * trailing_metadata
Definition: grpc_types.h:701
log.h
core_configuration.h
bloat_diff.severity
def severity
Definition: bloat_diff.py:143
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::status
grpc_status_code * status
Definition: grpc_types.h:702
metadata_batch.h
grpc_event_engine::experimental::slice_detail::operator==
bool operator==(const BaseSlice &a, const BaseSlice &b)
Definition: include/grpc/event_engine/slice.h:117
backoff.h
GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER
Definition: grpclb.cc:147
sockaddr_utils.h
grpc_raw_byte_buffer_create
GRPCAPI grpc_byte_buffer * grpc_raw_byte_buffer_create(grpc_slice *slices, size_t nslices)
Definition: byte_buffer.cc:34
subchannel_cache_timer_pending_
bool subchannel_cache_timer_pending_
Definition: grpclb.cc:571
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
absl::InlinedVector::emplace_back
reference emplace_back(Args &&... args)
Definition: abseil-cpp/absl/container/inlined_vector.h:675
memset
return memset(p, 0, total)
connectivity_state.h
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
fallback_mode_
bool fallback_mode_
Definition: grpclb.cc:545
grpc_dump_slice
char * grpc_dump_slice(const grpc_slice &s, uint32_t flags)
Definition: slice_string_helpers.cc:25
Timestamp
Definition: bloaty/third_party/protobuf/src/google/protobuf/timestamp.pb.h:69
grpc_channel_reset_connect_backoff
GRPCAPI void grpc_channel_reset_connect_backoff(grpc_channel *channel)
Definition: channel.cc:273
client_load_report_is_due_
bool client_load_report_is_due_
Definition: grpclb.cc:266
grpc_channel_args_copy_and_remove
grpc_channel_args * grpc_channel_args_copy_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove)
Definition: channel_args.cc:231
grpc_op::grpc_op_data::send_initial_metadata
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
slice.h
uint16_t
unsigned short uint16_t
Definition: stdint-msvc2008.h:79
fallback_at_startup_timeout_
const Duration fallback_at_startup_timeout_
Definition: grpclb.cc:554
grpc_core::kGrpcLbAddressAttributeKey
const char kGrpcLbAddressAttributeKey[]
Definition: grpclb.cc:157
GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS
Definition: grpclb.cc:148
grpc_core
Definition: call_metric_recorder.h:31
grpc_lb_policy_grpclb_shutdown
void grpc_lb_policy_grpclb_shutdown()
Definition: grpclb.cc:1936
grpc_metadata_array
Definition: grpc_types.h:579
event_engine.h
grpc_core::CoreConfiguration::Builder
Definition: core_configuration.h:41
grpc_op::reserved
void * reserved
Definition: grpc_types.h:646
grpc_byte_buffer_reader_readall
GRPCAPI grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader)
Definition: byte_buffer_reader.cc:84
string.h
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
num_calls_started
size_t num_calls_started
Definition: grpclb_end2end_test.cc:157
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
grpc_channel_credentials::duplicate_without_call_credentials
virtual grpc_core::RefCountedPtr< grpc_channel_credentials > duplicate_without_call_credentials()
Definition: src/core/lib/security/credentials/credentials.h:122
subchannel
RingHashSubchannelData * subchannel
Definition: ring_hash.cc:285
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
propagation_bits.h
error
grpc_error_handle error
Definition: retry_filter.cc:499
lb_policy.h
fake_resolver.h
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
client_channel.h
grpc_core::GrpcLbLoadReportRequestCreate
grpc_slice GrpcLbLoadReportRequestCreate(int64_t num_calls_started, int64_t num_calls_finished, int64_t num_calls_finished_with_client_failed_to_send, int64_t num_calls_finished_known_received, const GrpcLbClientStats::DroppedCallCounts *drop_token_counts, upb_Arena *arena)
Definition: load_balancer_api.cc:84
resolution_note_
std::string resolution_note_
Definition: grpclb.cc:550
lb_policy_factory.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc_core::LoadBalancingPolicy::Config
Definition: lb_policy.h:305
grpc_resolved_address
Definition: resolved_address.h:34
grpc_channel_args_copy_and_add_and_remove
grpc_channel_args * grpc_channel_args_copy_and_add_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:246
closure.h
client_load_report_handle_
absl::optional< EventEngine::TaskHandle > client_load_report_handle_
Definition: grpclb.cc:264
GRPC_CALL_OK
@ GRPC_CALL_OK
Definition: grpc_types.h:466
status
absl::Status status
Definition: rls.cc:251
server_name_
std::string server_name_
Definition: grpclb.cc:508
setup.name
name
Definition: setup.py:542
lb_initial_metadata_recv_
grpc_metadata_array lb_initial_metadata_recv_
Definition: grpclb.cc:242
grpc_channel_arg_string_create
grpc_arg grpc_channel_arg_string_create(char *name, char *value)
Definition: channel_args.cc:476
absl::StripPrefix
ABSL_MUST_USE_RESULT absl::string_view StripPrefix(absl::string_view str, absl::string_view prefix)
Definition: abseil-cpp/absl/strings/strip.h:73
args_
grpc_channel_args * args_
Definition: grpclb.cc:513
run_xds_tests.server_uri
string server_uri
Definition: run_xds_tests.py:3320
GRPC_CLIENT_SUBCHANNEL
@ GRPC_CLIENT_SUBCHANNEL
Definition: channel_stack_type.h:29
GPR_DUMP_HEX
#define GPR_DUMP_HEX
Definition: string.h:34
resolved_address.h
num_calls_finished
size_t num_calls_finished
Definition: grpclb_end2end_test.cc:158
grpc_timer
Definition: iomgr/timer.h:33
channelz.h
grpc_sockaddr_to_string
absl::StatusOr< std::string > grpc_sockaddr_to_string(const grpc_resolved_address *resolved_addr, bool normalize)
Definition: sockaddr_utils.cc:194
sockaddr.h
grpc_event_engine::experimental::EventEngine::RunAfter
virtual TaskHandle RunAfter(Duration when, Closure *closure)=0
grpc_pollset_set_del_pollset_set
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:52
lb_on_fallback_
grpc_closure lb_on_fallback_
Definition: grpclb.cc:557
map
zval * map
Definition: php/ext/google/protobuf/encode_decode.c:480
drop_index_
size_t drop_index_
Definition: grpclb.cc:368
credentials.h
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
subchannel_interface.h
fallback_at_startup_checks_pending_
bool fallback_at_startup_checks_pending_
Definition: grpclb.cc:555
on_subchannel_cache_timer_
grpc_closure on_subchannel_cache_timer_
Definition: grpclb.cc:570
GPR_DUMP_ASCII
#define GPR_DUMP_ASCII
Definition: string.h:35
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc_op::grpc_op_data::recv_message
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
GRPC_ERROR_CREATE_FROM_VECTOR
#define GRPC_ERROR_CREATE_FROM_VECTOR(desc, error_list)
Definition: error.h:314
grpc_core::ChannelStackBuilder
Definition: channel_stack_builder.h:41
grpc_op::data
union grpc_op::grpc_op_data data
grpclb.h
status.h
GRPC_ARG_CHANNELZ_CHANNEL_NODE
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE
Definition: channelz.h:49
byte_buffer_reader.h
grpc_types.h
absl::synchronization_internal::Get
static GraphId Get(const IdMap &id, int num)
Definition: abseil-cpp/absl/synchronization/internal/graphcycles_test.cc:44
client_load_report_done_closure_
grpc_closure client_load_report_done_closure_
Definition: grpclb.cc:268
socket_utils.h
grpc_metadata_array_destroy
GRPCAPI void grpc_metadata_array_destroy(grpc_metadata_array *array)
Definition: metadata_array.cc:35
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER
#define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER
Definition: grpclb.h:32
grpc_op::grpc_op_data::grpc_op_recv_message::recv_message
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:693
absl::Milliseconds
constexpr Duration Milliseconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:415
GRPC_ARG_DEFAULT_AUTHORITY
#define GRPC_ARG_DEFAULT_AUTHORITY
Definition: grpc_types.h:251
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
subchannel_cache_interval_
const Duration subchannel_cache_interval_
Definition: grpclb.cc:565
child_picker_
std::unique_ptr< SubchannelPicker > child_picker_
Definition: grpclb.cc:419
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
channel_init.h
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
GRPC_INITIAL_METADATA_WAIT_FOR_READY
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY
Definition: grpc_types.h:523
lb_call_retry_timer_
grpc_timer lb_call_retry_timer_
Definition: grpclb.cc:537
GRPC_OP_RECV_INITIAL_METADATA
@ GRPC_OP_RECV_INITIAL_METADATA
Definition: grpc_types.h:617
grpc_core::RefCountedPtr
Definition: ref_counted_ptr.h:35
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
#define GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
Definition: grpc_types.h:278
absl::StrJoin
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
Definition: abseil-cpp/absl/strings/str_join.h:239
response_generator_
RefCountedPtr< FakeResolverResponseGenerator > response_generator_
Definition: grpclb.cc:522
lb_call_timeout_
const Duration lb_call_timeout_
Definition: grpclb.cc:533
hpack_encoder_fixtures::Args
Args({0, 16384})
GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER
#define GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER
Definition: grpclb.h:27
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
grpc_core::FindGrpclbBalancerAddressesInChannelArgs
const ServerAddressList * FindGrpclbBalancerAddressesInChannelArgs(const grpc_channel_args &args)
Definition: grpclb_balancer_addresses.cc:74
ToString
std::string ToString(const grpc::string_ref &r)
Definition: string_ref_helper.cc:24
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
re2::Result
TestInstance::Result Result
Definition: bloaty/third_party/re2/re2/testing/tester.cc:96
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
Json
JSON (JavaScript Object Notation).
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:227
sockaddr_in6::sin6_port
unsigned short sin6_port
Definition: ares_ipv6.h:28
lb_call_backoff_
BackOff lb_call_backoff_
Definition: grpclb.cc:535
GRPC_ARG_SERVER_URI
#define GRPC_ARG_SERVER_URI
Definition: client_channel.h:89
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Definition: call.cc:1770
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
Definition: grpc_types.h:526
GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS
#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS
Definition: grpc_types.h:370
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::GrpcLbResponseParse
bool GrpcLbResponseParse(const grpc_slice &serialized_response, upb_Arena *arena, GrpcLbResponse *result)
Definition: load_balancer_api.cc:166
lb_on_call_retry_
grpc_closure lb_on_call_retry_
Definition: grpclb.cc:538
work_serializer.h
grpc_byte_buffer_reader_init
GRPCAPI int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer)
Definition: byte_buffer_reader.cc:33
grpc_channel_get_channelz_node
grpc_core::channelz::ChannelNode * grpc_channel_get_channelz_node(grpc_channel *channel)
Definition: src/core/lib/surface/channel.h:183
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
lb_on_balancer_message_received_
grpc_closure lb_on_balancer_message_received_
Definition: grpclb.cc:250
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR
#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR
Definition: fake_resolver.h:31
grpc.h
grpc_call
struct grpc_call grpc_call
Definition: grpc_types.h:70
grpc_channel_credentials_find_in_args
grpc_channel_credentials * grpc_channel_credentials_find_in_args(const grpc_channel_args *args)
Definition: credentials.cc:83
GRPC_ARG_LB_POLICY_NAME
#define GRPC_ARG_LB_POLICY_NAME
Definition: grpc_types.h:309
connectivity_state.h
grpc_byte_buffer
Definition: grpc_types.h:43
gen_stats_data.stats
list stats
Definition: gen_stats_data.py:58
lb_on_initial_request_sent_
grpc_closure lb_on_initial_request_sent_
Definition: grpclb.cc:246
absl::optional< EventEngine::TaskHandle >
pollset_set.h
grpc_op
Definition: grpc_types.h:640
GRPC_OP_SEND_MESSAGE
@ GRPC_OP_SEND_MESSAGE
Definition: grpc_types.h:602
addr6
static struct sockaddr_in6 addr6
Definition: test-getnameinfo.c:34
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
arg
Definition: cmdline.cc:40
server_address.h
time.h
fallback_backend_addresses_
absl::StatusOr< ServerAddressList > fallback_backend_addresses_
Definition: grpclb.cc:547
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
absl::InlinedVector::data
pointer data() noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:302
config_
RefCountedPtr< GrpcLbConfig > config_
Definition: grpclb.cc:510
lb_channel_
grpc_channel * lb_channel_
Definition: grpclb.cc:519
sockaddr_in6::sin6_family
unsigned short sin6_family
Definition: ares_ipv6.h:27
grpc_core::LbTokenMetadata::key
static absl::string_view key()
Definition: metadata_batch.h:345
lb_call_status_details_
grpc_slice lb_call_status_details_
Definition: grpclb.cc:258
absl::flags_internal::Parse
bool Parse(FlagOpFn op, absl::string_view text, void *dst, std::string *error)
Definition: abseil-cpp/absl/flags/internal/flag.h:125
error.h
child_policy_
RefCountedPtr< LoadBalancingPolicy::Config > child_policy_
Definition: grpclb.cc:181
absl::InlinedVector::size
size_type size() const noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:270
json.h
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
lb_calld_
OrphanablePtr< BalancerCallState > lb_calld_
Definition: grpclb.cc:531
channel_stack_type.h
log
Definition: bloaty/third_party/zlib/examples/gzlog.c:289
grpc_core::ServerAddressList
std::vector< ServerAddress > ServerAddressList
Definition: server_address.h:120
lb_trailing_metadata_recv_
grpc_metadata_array lb_trailing_metadata_recv_
Definition: grpclb.cc:256
retry_timer_callback_pending_
bool retry_timer_callback_pending_
Definition: grpclb.cc:536
lb_on_balancer_status_received_
grpc_closure lb_on_balancer_status_received_
Definition: grpclb.cc:255
service_name_
std::string service_name_
Definition: grpclb.cc:182
cached_subchannels_
std::map< Timestamp, std::vector< RefCountedPtr< SubchannelInterface > > > cached_subchannels_
Definition: grpclb.cc:568
setup.idx
idx
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:197
grpc_op::op
grpc_op_type op
Definition: grpc_types.h:642
grpclb_balancer_addresses.h
grpc_htons
uint16_t grpc_htons(uint16_t hostshort)
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
resolver.h
grpc_op::grpc_op_data::grpc_op_send_initial_metadata::count
size_t count
Definition: grpc_types.h:653
grpc_op::grpc_op_data::grpc_op_recv_status_on_client::status_details
grpc_slice * status_details
Definition: grpc_types.h:703
watcher_
StateWatcher * watcher_
Definition: grpclb.cc:520
child_policy_handler.h
grpclb_client_stats.h
addr4
static struct sockaddr_in addr4
Definition: test-getnameinfo.c:33
grpc_core::GrpcLbRequestCreate
grpc_slice GrpcLbRequestCreate(const char *lb_service_name, upb_Arena *arena)
Definition: load_balancer_api.cc:62
grpc_slice_to_c_string
GPRAPI char * grpc_slice_to_c_string(grpc_slice s)
Definition: slice/slice.cc:35
GRPC_OP_RECV_MESSAGE
@ GRPC_OP_RECV_MESSAGE
Definition: grpc_types.h:621
absl::Seconds
constexpr Duration Seconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:419
child_policy_ready_
bool child_policy_ready_
Definition: grpclb.cc:562
absl::optional::value
constexpr const T & value() const &
Definition: abseil-cpp/absl/types/optional.h:475
grpc_timer_cancel
void grpc_timer_cancel(grpc_timer *timer)
Definition: iomgr/timer.cc:36
GRPC_ARG_INHIBIT_HEALTH_CHECKING
#define GRPC_ARG_INHIBIT_HEALTH_CHECKING
Definition: grpc_types.h:424
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
shutting_down_
bool shutting_down_
Definition: grpclb.cc:516
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
upb::Arena
Definition: upb.hpp:68
subchannel_cache_timer_
grpc_timer subchannel_cache_timer_
Definition: grpclb.cc:569
grpc_channel_create
GRPCAPI grpc_channel * grpc_channel_create(const char *target, grpc_channel_credentials *creds, const grpc_channel_args *args)
Definition: chttp2_connector.cc:366
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
debug_location.h
GRPC_PROPAGATE_DEFAULTS
#define GRPC_PROPAGATE_DEFAULTS
Definition: propagation_bits.h:45
grpc_channel_arg_integer_create
grpc_arg grpc_channel_arg_integer_create(char *name, int value)
Definition: channel_args.cc:484
lb_policy_registry.h
original_call_tracker_
std::unique_ptr< SubchannelCallTrackerInterface > original_call_tracker_
Definition: grpclb.cc:413
grpc_lb_policy_grpclb_init
void grpc_lb_policy_grpclb_init()
Definition: grpclb.cc:1930
lb_call_status_
grpc_status_code lb_call_status_
Definition: grpclb.cc:257
grpc_core::QsortCompare
int QsortCompare(const T &a, const T &b)
Definition: useful.h:95
server
Definition: examples/python/async_streaming/server.py:1
ref_counted.h
GRPC_ARG_SERVICE_CONFIG
#define GRPC_ARG_SERVICE_CONFIG
Definition: grpc_types.h:304
upb.hpp
GRPC_OP_SEND_INITIAL_METADATA
@ GRPC_OP_SEND_INITIAL_METADATA
Definition: grpc_types.h:598
benchmark::internal::Finish
double Finish(Counter const &c, IterationCount iterations, double cpu_time, double num_threads)
Definition: benchmark/src/counter.cc:20
grpc_core::RegisterGrpcLbLoadReportingFilter
void RegisterGrpcLbLoadReportingFilter(CoreConfiguration::Builder *builder)
Definition: grpclb.cc:1939
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
absl::optional::reset
ABSL_ATTRIBUTE_REINITIALIZES void reset() noexcept
Definition: abseil-cpp/absl/types/optional.h:342
grpc_op::grpc_op_data::send_message
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_channel_create_pollset_set_call
grpc_call * grpc_channel_create_pollset_set_call(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_pollset_set *pollset_set, const grpc_slice &method, const grpc_slice *host, grpc_core::Timestamp deadline, void *reserved)
Definition: channel.cc:331
alloc.h
lb_fallback_timer_
grpc_timer lb_fallback_timer_
Definition: grpclb.cc:556
num_calls_finished_known_received
size_t num_calls_finished_known_received
Definition: grpclb_end2end_test.cc:160
send_message_payload_
grpc_byte_buffer * send_message_payload_
Definition: grpclb.cc:245
Copy
@ Copy
Definition: upb/benchmarks/benchmark.cc:200
grpc_op::grpc_op_data::recv_status_on_client
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
fix_build_deps.r
r
Definition: fix_build_deps.py:491
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_pollset_set_add_pollset_set
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:47
grpc_byte_buffer_destroy
GRPCAPI void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)
Definition: byte_buffer.cc:81
arg
struct arg arg
grpc_event_engine::experimental::GetDefaultEventEngine
EventEngine * GetDefaultEventEngine()
Definition: event_engine.cc:47
grpc_channel_destroy
GRPCAPI void grpc_channel_destroy(grpc_channel *channel)
Definition: channel.cc:437
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
exec_ctx.h
grpc_timer_init
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
Definition: iomgr/timer.cc:31
slice_refcount.h
absl::UnavailableError
Status UnavailableError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:375
client_stats_report_interval_
Duration client_stats_report_interval_
Definition: grpclb.cc:263
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
last_client_load_report_counters_were_zero_
bool last_client_load_report_counters_were_zero_
Definition: grpclb.cc:265
ref_counted_ptr.h
grpc_channel
struct grpc_channel grpc_channel
Definition: grpc_types.h:62
drop_token_counts
std::map< std::string, size_t > drop_token_counts
Definition: grpclb_end2end_test.cc:161
GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL
#define GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL
Definition: channelz.h:52
channel_args.h
timer.h
sockaddr_in6::sin6_addr
struct ares_in6_addr sin6_addr
Definition: ares_ipv6.h:30
seen_serverlist_
bool seen_serverlist_
Definition: grpclb.cc:252
grpc_channel_credentials_to_arg
grpc_arg grpc_channel_credentials_to_arg(grpc_channel_credentials *credentials)
Definition: credentials.cc:65
grpc_call_start_batch_and_execute
grpc_call_error grpc_call_start_batch_and_execute(grpc_call *call, const grpc_op *ops, size_t nops, grpc_closure *closure)
Definition: call.cc:1847
recv_message_payload_
grpc_byte_buffer * recv_message_payload_
Definition: grpclb.cc:249
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS
#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS
Definition: grpclb.cc:146
Duration
Definition: bloaty/third_party/protobuf/src/google/protobuf/duration.pb.h:69
GRPC_ARG_CHANNEL_CREDENTIALS
#define GRPC_ARG_CHANNEL_CREDENTIALS
Definition: src/core/lib/security/credentials/credentials.h:91
grpc_op::grpc_op_data::recv_initial_metadata
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
client_stats_
RefCountedPtr< GrpcLbClientStats > client_stats_
Definition: grpclb.cc:262
GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS
Definition: grpc_types.h:355
absl::StatusOr< ServerAddressList >
uri_parser.h
absl::InlinedVector
Definition: abseil-cpp/absl/container/inlined_vector.h:69
strnlen
size_t strnlen(const char *str, size_t maxlen)
Definition: os390-syscalls.c:554
parent_channelz_node_
RefCountedPtr< channelz::ChannelNode > parent_channelz_node_
Definition: grpclb.cc:524
grpc_error
Definition: error_internal.h:42
parent_
RefCountedPtr< GrpcLb > parent_
Definition: grpclb.cc:438
GRPC_OP_RECV_STATUS_ON_CLIENT
@ GRPC_OP_RECV_STATUS_ON_CLIENT
Definition: grpc_types.h:627
event_engine_factory.h
num_calls_finished_with_client_failed_to_send
size_t num_calls_finished_with_client_failed_to_send
Definition: grpclb_end2end_test.cc:159
GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS
#define GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS
Definition: grpclb.h:36
grpc_op::grpc_op_data::grpc_op_recv_initial_metadata::recv_initial_metadata
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:685
grpc_channel_args_find_integer
int grpc_channel_args_find_integer(const grpc_channel_args *args, const char *name, const grpc_integer_options options)
Definition: channel_args.cc:425
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
grpclb_policy_
RefCountedPtr< LoadBalancingPolicy > grpclb_policy_
Definition: grpclb.cc:236
load_balancer_api.h
channel_stack_builder.h
lb_policy_
RefCountedPtr< GrpcLb > lb_policy_
Definition: grpclb.cc:291
grpc_core::GrpcLbClientStatsMetadata::key
static absl::string_view key()
Definition: metadata_batch.h:331
grpc_closure
Definition: closure.h:56
lb_token_
std::string lb_token_
Definition: grpclb.cc:292
grpc_sockaddr_to_uri
absl::StatusOr< std::string > grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr)
Definition: sockaddr_utils.cc:260
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
GRPC_GRPCLB_RECONNECT_JITTER
#define GRPC_GRPCLB_RECONNECT_JITTER
Definition: grpclb.cc:149
grpc_byte_buffer_reader
Definition: impl/codegen/byte_buffer_reader.h:30
ops
static grpc_op ops[6]
Definition: test/core/fling/client.cc:39
grpc_channel_credentials
Definition: src/core/lib/security/credentials/credentials.h:96
serverlist_
std::vector< GrpcLbServer > serverlist_
Definition: grpclb.cc:363
client_load_reporting_filter.h
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
grpc_channel_args_copy_and_add
grpc_channel_args * grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:224
call.h
slice_string_helpers.h
grpc_byte_buffer_reader_destroy
GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
Definition: byte_buffer_reader.cc:45
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
grpc_slice_unref_internal
void grpc_slice_unref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:39
grpc_metadata_array_init
GRPCAPI void grpc_metadata_array_init(grpc_metadata_array *array)
Definition: metadata_array.cc:30
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
parse_error
@ parse_error
Definition: pem_info.c:88
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY
#define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY
Definition: channel_init.h:31
channel.h
port_platform.h
grpc_call_cancel_internal
void grpc_call_cancel_internal(grpc_call *call)
Definition: call.cc:1806


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:48