client_channel.cc
Go to the documentation of this file.
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24 
25 #include <algorithm>
26 #include <functional>
27 #include <new>
28 #include <set>
29 #include <vector>
30 
31 #include "absl/container/inlined_vector.h"
32 #include "absl/memory/memory.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/cord.h"
36 #include "absl/strings/numbers.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_join.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/variant.h"
42 
44 #include <grpc/slice.h>
45 #include <grpc/status.h>
46 #include <grpc/support/alloc.h>
47 #include <grpc/support/log.h>
49 
78 #include "src/core/lib/json/json.h"
90 
91 //
92 // Client channel filter
93 //
94 
95 #define GRPC_ARG_HEALTH_CHECK_SERVICE_NAME \
96  "grpc.internal.health_check_service_name"
97 
98 namespace grpc_core {
99 
100 using internal::ClientChannelMethodParsedConfig;
101 
102 TraceFlag grpc_client_channel_trace(false, "client_channel");
103 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
104 TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call");
105 
106 //
107 // ClientChannel::CallData definition
108 //
109 
111  public:
114  static void Destroy(grpc_call_element* elem,
115  const grpc_call_final_info* final_info,
116  grpc_closure* then_schedule_closure);
117  static void StartTransportStreamOpBatch(
119  static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
120 
121  // Invoked by channel for queued calls when name resolution is completed.
122  static void CheckResolution(void* arg, grpc_error_handle error);
123  // Helper function for applying the service config to a call while
124  // holding ClientChannel::resolution_mu_.
125  // Returns true if the service config has been applied to the call, in which
126  // case the caller must invoke ResolutionDone() or AsyncResolutionDone()
127  // with the returned error.
128  bool CheckResolutionLocked(grpc_call_element* elem, grpc_error_handle* error)
130  // Schedules a callback to continue processing the call once
131  // resolution is complete. The callback will not run until after this
132  // method returns.
134 
135  private:
137 
140  ~CallData();
141 
142  // Returns the index into pending_batches_ to be used for batch.
146  static void FailPendingBatchInCallCombiner(void* arg,
148  // A predicate type and some useful implementations for PendingBatchesFail().
150  const CallCombinerClosureList& closures);
151  static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
152  return true;
153  }
154  static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
155  return false;
156  }
158  const CallCombinerClosureList& closures) {
159  return closures.size() > 0;
160  }
161  // Fails all pending batches.
162  // If yield_call_combiner_predicate returns true, assumes responsibility for
163  // yielding the call combiner.
164  void PendingBatchesFail(
166  YieldCallCombinerPredicate yield_call_combiner_predicate);
167  static void ResumePendingBatchInCallCombiner(void* arg,
168  grpc_error_handle ignored);
169  // Resumes all pending batches on lb_call_.
171 
172  // Applies service config to the call. Must be invoked once we know
173  // that the resolver has returned results to the channel.
174  // If an error is returned, the error indicates the status with which
175  // the call should be failed.
176  grpc_error_handle ApplyServiceConfigToCallLocked(
177  grpc_call_element* elem, grpc_metadata_batch* initial_metadata)
179  // Invoked when the resolver result is applied to the caller, on both
180  // success or failure.
182  // Removes the call (if present) from the channel's list of calls queued
183  // for name resolution.
184  void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem)
186  // Adds the call (if not already present) to the channel's list of
187  // calls queued for name resolution.
188  void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem)
190 
192  void* arg, grpc_error_handle error);
193 
195 
196  // State for handling deadlines.
197  // The code in deadline_filter.c requires this to be the first field.
198  // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
199  // and this struct both independently store pointers to the call stack
200  // and call combiner. If/when we have time, find a way to avoid this
201  // without breaking the grpc_deadline_state abstraction.
203 
204  grpc_slice path_; // Request path.
205  gpr_cycle_counter call_start_time_;
211 
213 
215 
216  // Accessed while holding ClientChannel::resolution_mu_.
217  bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) =
218  false;
219  bool queued_pending_resolver_result_
221  ClientChannel::ResolverQueuedCall resolver_queued_call_
223  ResolverQueuedCallCanceller* resolver_call_canceller_
225 
228 
231 
232  // Batches are added to this list when received from above.
233  // They are removed when we are done handling the batch (i.e., when
234  // either we have invoked all of the batch's callbacks or we have
235  // passed the batch down to the LB call and are not intercepting any of
236  // its callbacks).
238 
239  // Set when we get a cancel_stream op.
241 };
242 
243 //
244 // Filter vtable
245 //
246 
249  nullptr,
251  sizeof(ClientChannel::CallData),
255  sizeof(ClientChannel),
260  "client-channel",
261 };
262 
263 //
264 // dynamic termination filter
265 //
266 
267 namespace {
268 
269 // Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL.
270 void* ClientChannelArgCopy(void* p) { return p; }
271 void ClientChannelArgDestroy(void* /*p*/) {}
272 int ClientChannelArgCmp(void* p, void* q) { return QsortCompare(p, q); }
273 const grpc_arg_pointer_vtable kClientChannelArgPointerVtable = {
274  ClientChannelArgCopy, ClientChannelArgDestroy, ClientChannelArgCmp};
275 
276 // Channel arg pointer vtable for GRPC_ARG_SERVICE_CONFIG_OBJ.
277 void* ServiceConfigObjArgCopy(void* p) {
278  auto* service_config = static_cast<ServiceConfig*>(p);
279  service_config->Ref().release();
280  return p;
281 }
282 void ServiceConfigObjArgDestroy(void* p) {
283  auto* service_config = static_cast<ServiceConfig*>(p);
284  service_config->Unref();
285 }
286 int ServiceConfigObjArgCmp(void* p, void* q) { return QsortCompare(p, q); }
287 const grpc_arg_pointer_vtable kServiceConfigObjArgPointerVtable = {
288  ServiceConfigObjArgCopy, ServiceConfigObjArgDestroy,
289  ServiceConfigObjArgCmp};
290 
291 class DynamicTerminationFilter {
292  public:
293  class CallData;
294 
295  static const grpc_channel_filter kFilterVtable;
296 
299  GPR_ASSERT(args->is_last);
300  GPR_ASSERT(elem->filter == &kFilterVtable);
301  new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
302  return GRPC_ERROR_NONE;
303  }
304 
305  static void Destroy(grpc_channel_element* elem) {
306  auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
307  chand->~DynamicTerminationFilter();
308  }
309 
310  // Will never be called.
311  static void StartTransportOp(grpc_channel_element* /*elem*/,
312  grpc_transport_op* /*op*/) {}
313  static void GetChannelInfo(grpc_channel_element* /*elem*/,
314  const grpc_channel_info* /*info*/) {}
315 
316  private:
317  explicit DynamicTerminationFilter(const grpc_channel_args* args)
318  : chand_(grpc_channel_args_find_pointer<ClientChannel>(
320 
321  ClientChannel* chand_;
322 };
323 
324 class DynamicTerminationFilter::CallData {
325  public:
327  const grpc_call_element_args* args) {
328  new (elem->call_data) CallData(*args);
329  return GRPC_ERROR_NONE;
330  }
331 
332  static void Destroy(grpc_call_element* elem,
333  const grpc_call_final_info* /*final_info*/,
334  grpc_closure* then_schedule_closure) {
335  auto* calld = static_cast<CallData*>(elem->call_data);
336  RefCountedPtr<SubchannelCall> subchannel_call;
337  if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
338  subchannel_call = calld->lb_call_->subchannel_call();
339  }
340  calld->~CallData();
341  if (GPR_LIKELY(subchannel_call != nullptr)) {
342  subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
343  } else {
344  // TODO(yashkt) : This can potentially be a Closure::Run
345  ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
346  }
347  }
348 
349  static void StartTransportStreamOpBatch(
351  auto* calld = static_cast<CallData*>(elem->call_data);
352  calld->lb_call_->StartTransportStreamOpBatch(batch);
353  }
354 
355  static void SetPollent(grpc_call_element* elem,
356  grpc_polling_entity* pollent) {
357  auto* calld = static_cast<CallData*>(elem->call_data);
358  auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
359  ClientChannel* client_channel = chand->chand_;
360  grpc_call_element_args args = {calld->owning_call_, nullptr,
361  calld->call_context_, calld->path_,
362  /*start_time=*/0, calld->deadline_,
363  calld->arena_, calld->call_combiner_};
364  auto* service_config_call_data =
365  static_cast<ClientChannelServiceConfigCallData*>(
366  calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
367  calld->lb_call_ = client_channel->CreateLoadBalancedCall(
368  args, pollent, nullptr,
369  service_config_call_data->call_dispatch_controller(),
370  /*is_transparent_retry=*/false);
373  "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
374  client_channel, calld->lb_call_.get());
375  }
376  }
377 
378  private:
379  explicit CallData(const grpc_call_element_args& args)
381  deadline_(args.deadline),
382  arena_(args.arena),
383  owning_call_(args.call_stack),
384  call_combiner_(args.call_combiner),
386 
387  ~CallData() { grpc_slice_unref_internal(path_); }
388 
389  grpc_slice path_; // Request path.
393  CallCombiner* call_combiner_;
395 
396  OrphanablePtr<ClientChannel::LoadBalancedCall> lb_call_;
397 };
398 
400  DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
401  nullptr,
403  sizeof(DynamicTerminationFilter::CallData),
405  DynamicTerminationFilter::CallData::SetPollent,
407  sizeof(DynamicTerminationFilter),
412  "dynamic_filter_termination",
413 };
414 
415 } // namespace
416 
417 //
418 // ClientChannel::ResolverResultHandler
419 //
420 
422  public:
423  explicit ResolverResultHandler(ClientChannel* chand) : chand_(chand) {
424  GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
425  }
426 
429  gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
430  }
431  GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
432  }
433 
437  }
438 
439  private:
441 };
442 
443 //
444 // ClientChannel::SubchannelWrapper
445 //
446 
447 // This class is a wrapper for Subchannel that hides details of the
448 // channel's implementation (such as the health check service name and
449 // connected subchannel) from the LB policy API.
450 //
451 // Note that no synchronization is needed here, because even if the
452 // underlying subchannel is shared between channels, this wrapper will only
453 // be used within one channel, so it will always be synchronized by the
454 // control plane work_serializer.
456  public:
458  absl::optional<std::string> health_check_service_name)
460  ? "SubchannelWrapper"
461  : nullptr),
462  chand_(chand),
464  health_check_service_name_(std::move(health_check_service_name)) {
467  "chand=%p: creating subchannel wrapper %p for subchannel %p",
468  chand, this, subchannel_.get());
469  }
470  GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
471  if (chand_->channelz_node_ != nullptr) {
472  auto* subchannel_node = subchannel_->channelz_node();
473  if (subchannel_node != nullptr) {
474  auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
475  if (it == chand_->subchannel_refcount_map_.end()) {
476  chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
477  it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
478  .first;
479  }
480  ++it->second;
481  }
482  }
483  chand_->subchannel_wrappers_.insert(this);
484  }
485 
486  ~SubchannelWrapper() override {
489  "chand=%p: destroying subchannel wrapper %p for subchannel %p",
490  chand_, this, subchannel_.get());
491  }
492  chand_->subchannel_wrappers_.erase(this);
493  if (chand_->channelz_node_ != nullptr) {
494  auto* subchannel_node = subchannel_->channelz_node();
495  if (subchannel_node != nullptr) {
496  auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
497  GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
498  --it->second;
499  if (it->second == 0) {
501  subchannel_node->uuid());
502  chand_->subchannel_refcount_map_.erase(it);
503  }
504  }
505  }
506  GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
507  }
508 
510  std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
512  auto& watcher_wrapper = watcher_map_[watcher.get()];
513  GPR_ASSERT(watcher_wrapper == nullptr);
514  watcher_wrapper = new WatcherWrapper(std::move(watcher),
515  Ref(DEBUG_LOCATION, "WatcherWrapper"));
516  subchannel_->WatchConnectivityState(
519  watcher_wrapper));
520  }
521 
524  auto it = watcher_map_.find(watcher);
525  GPR_ASSERT(it != watcher_map_.end());
526  subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
527  it->second);
528  watcher_map_.erase(it);
529  }
530 
532  return subchannel_->connected_subchannel();
533  }
534 
535  void RequestConnection() override { subchannel_->RequestConnection(); }
536 
537  void ResetBackoff() override { subchannel_->ResetBackoff(); }
538 
539  void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
541  std::unique_ptr<InternalSubchannelDataWatcherInterface> internal_watcher(
543  watcher.release()));
544  internal_watcher->SetSubchannel(subchannel_.get());
545  data_watchers_.push_back(std::move(internal_watcher));
546  }
547 
548  const grpc_channel_args* channel_args() override {
549  return subchannel_->channel_args();
550  }
551 
552  void ThrottleKeepaliveTime(int new_keepalive_time) {
553  subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
554  }
555 
556  private:
557  // Subchannel and SubchannelInterface have different interfaces for
558  // their respective ConnectivityStateWatcherInterface classes.
559  // The one in Subchannel updates the ConnectedSubchannel along with
560  // the state, whereas the one in SubchannelInterface does not expose
561  // the ConnectedSubchannel.
562  //
563  // This wrapper provides a bridge between the two. It implements
564  // Subchannel::ConnectivityStateWatcherInterface and wraps
565  // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
566  // that was passed in by the LB policy. We pass an instance of this
567  // class to the underlying Subchannel, and when we get updates from
568  // the subchannel, we pass those on to the wrapped watcher to return
569  // the update to the LB policy. This allows us to set the connected
570  // subchannel before passing the result back to the LB policy.
572  public:
574  std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
575  watcher,
577  : watcher_(std::move(watcher)), parent_(std::move(parent)) {}
578 
579  ~WatcherWrapper() override {
580  auto* parent = parent_.release(); // ref owned by lambda
581  parent->chand_->work_serializer_->Run(
583  *parent_->chand_->work_serializer_) {
584  parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
585  },
587  }
588 
589  void OnConnectivityStateChange() override {
592  "chand=%p: connectivity change for subchannel wrapper %p "
593  "subchannel %p; hopping into work_serializer",
594  parent_->chand_, parent_.get(), parent_->subchannel_.get());
595  }
596  Ref().release(); // ref owned by lambda
597  parent_->chand_->work_serializer_->Run(
599  *parent_->chand_->work_serializer_) {
600  ApplyUpdateInControlPlaneWorkSerializer();
601  Unref();
602  },
604  }
605 
608  watcher_.get();
609  if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
610  return watcher->interested_parties();
611  }
612 
614  auto* replacement = new WatcherWrapper(std::move(watcher_), parent_);
615  replacement_ = replacement;
616  return replacement;
617  }
618 
619  private:
624  "chand=%p: processing connectivity change in work serializer "
625  "for subchannel wrapper %p subchannel %p "
626  "watcher=%p",
627  parent_->chand_, parent_.get(), parent_->subchannel_.get(),
628  watcher_.get());
629  }
631  absl::optional<absl::Cord> keepalive_throttling =
633  if (keepalive_throttling.has_value()) {
634  int new_keepalive_time = -1;
635  if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
636  &new_keepalive_time)) {
637  if (new_keepalive_time > parent_->chand_->keepalive_time_) {
638  parent_->chand_->keepalive_time_ = new_keepalive_time;
640  gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
641  parent_->chand_, parent_->chand_->keepalive_time_);
642  }
643  // Propagate the new keepalive time to all subchannels. This is so
644  // that new transports created by any subchannel (and not just the
645  // subchannel that received the GOAWAY), use the new keepalive time.
646  for (auto* subchannel_wrapper :
647  parent_->chand_->subchannel_wrappers_) {
648  subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
649  }
650  }
651  } else {
652  gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
653  parent_->chand_,
654  std::string(keepalive_throttling.value()).c_str());
655  }
656  }
657  // Ignore update if the parent WatcherWrapper has been replaced
658  // since this callback was scheduled.
659  if (watcher_ != nullptr) {
660  // Propagate status only in state TF.
661  // We specifically want to avoid propagating the status for
662  // state IDLE that the real subchannel gave us only for the
663  // purpose of keepalive propagation.
664  if (state_change.state != GRPC_CHANNEL_TRANSIENT_FAILURE) {
665  state_change.status = absl::OkStatus();
666  }
667  watcher_->OnConnectivityStateChange(state_change.state,
668  state_change.status);
669  }
670  }
671 
672  std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
676  };
677 
681  // Maps from the address of the watcher passed to us by the LB policy
682  // to the address of the WrapperWatcher that we passed to the underlying
683  // subchannel. This is needed so that when the LB policy calls
684  // CancelConnectivityStateWatch() with its watcher, we know the
685  // corresponding WrapperWatcher to cancel on the underlying subchannel.
686  std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
688  std::vector<std::unique_ptr<InternalSubchannelDataWatcherInterface>>
690 };
691 
692 //
693 // ClientChannel::ExternalConnectivityWatcher
694 //
695 
697  ClientChannel* chand, grpc_polling_entity pollent,
699  grpc_closure* watcher_timer_init)
700  : chand_(chand),
701  pollent_(pollent),
702  initial_state_(*state),
703  state_(state),
704  on_complete_(on_complete),
705  watcher_timer_init_(watcher_timer_init) {
708  GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
709  {
711  // Will be deleted when the watch is complete.
712  GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
713  // Store a ref to the watcher in the external_watchers_ map.
714  chand->external_watchers_[on_complete] =
715  Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
716  }
717  // Pass the ref from creating the object to Start().
718  chand_->work_serializer_->Run(
720  // The ref is passed to AddWatcherLocked().
721  AddWatcherLocked();
722  },
724 }
725 
728  chand_->interested_parties_);
729  GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
730  "ExternalConnectivityWatcher");
731 }
732 
735  grpc_closure* on_complete,
736  bool cancel) {
738  {
739  MutexLock lock(&chand->external_watchers_mu_);
740  auto it = chand->external_watchers_.find(on_complete);
741  if (it != chand->external_watchers_.end()) {
742  watcher = std::move(it->second);
743  chand->external_watchers_.erase(it);
744  }
745  }
746  // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
747  // the mutex before calling it.
748  if (watcher != nullptr && cancel) watcher->Cancel();
749 }
750 
752  grpc_connectivity_state state, const absl::Status& /* status */) {
753  bool done = false;
754  if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
755  std::memory_order_relaxed)) {
756  return; // Already done.
757  }
758  // Remove external watcher.
760  chand_, on_complete_, /*cancel=*/false);
761  // Report new state to the user.
762  *state_ = state;
764  // Hop back into the work_serializer to clean up.
765  // Not needed in state SHUTDOWN, because the tracker will
766  // automatically remove all watchers in that case.
767  if (state != GRPC_CHANNEL_SHUTDOWN) {
768  chand_->work_serializer_->Run(
769  [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
770  RemoveWatcherLocked();
771  },
773  }
774 }
775 
777  bool done = false;
778  if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
779  std::memory_order_relaxed)) {
780  return; // Already done.
781  }
783  // Hop back into the work_serializer to clean up.
784  chand_->work_serializer_->Run(
785  [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
786  RemoveWatcherLocked();
787  },
789 }
790 
792  Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
793  // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
794  chand_->state_tracker_.AddWatcher(
796 }
797 
799  chand_->state_tracker_.RemoveWatcher(this);
800 }
801 
802 //
803 // ClientChannel::ConnectivityWatcherAdder
804 //
805 
807  public:
809  ClientChannel* chand, grpc_connectivity_state initial_state,
811  : chand_(chand),
812  initial_state_(initial_state),
813  watcher_(std::move(watcher)) {
814  GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
815  chand_->work_serializer_->Run(
816  [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
817  AddWatcherLocked();
818  },
820  }
821 
822  private:
825  chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
826  GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
827  delete this;
828  }
829 
833 };
834 
835 //
836 // ClientChannel::ConnectivityWatcherRemover
837 //
838 
840  public:
843  : chand_(chand), watcher_(watcher) {
844  GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
845  chand_->work_serializer_->Run(
846  [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
847  RemoveWatcherLocked();
848  },
850  }
851 
852  private:
855  chand_->state_tracker_.RemoveWatcher(watcher_);
856  GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
857  "ConnectivityWatcherRemover");
858  delete this;
859  }
860 
863 };
864 
865 //
866 // ClientChannel::ClientChannelControlHelper
867 //
868 
871  public:
872  explicit ClientChannelControlHelper(ClientChannel* chand) : chand_(chand) {
873  GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
874  }
875 
877  GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
878  "ClientChannelControlHelper");
879  }
880 
882  ServerAddress address, const grpc_channel_args& args) override
883  ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
884  if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
885  // Determine health check service name.
886  absl::optional<std::string> health_check_service_name;
887  const char* health_check_service_name_arg = grpc_channel_args_find_string(
889  if (health_check_service_name_arg != nullptr) {
890  bool inhibit_health_checking = grpc_channel_args_find_bool(
892  if (!inhibit_health_checking) {
893  health_check_service_name = health_check_service_name_arg;
894  }
895  }
896  // Construct channel args for subchannel.
897  // Remove channel args that should not affect subchannel uniqueness.
898  absl::InlinedVector<const char*, 4> args_to_remove = {
902  };
903  // Add channel args needed for the subchannel.
904  absl::InlinedVector<grpc_arg, 2> args_to_add = {
906  chand_->subchannel_pool_.get()),
907  };
908  // Check if default authority arg is already set.
909  const char* default_authority =
911  // Add args from subchannel address.
912  if (address.args() != nullptr) {
913  for (size_t j = 0; j < address.args()->num_args; ++j) {
914  grpc_arg& arg = address.args()->args[j];
915  if (strcmp(arg.key, GRPC_ARG_DEFAULT_AUTHORITY) == 0) {
916  // Don't add default authority arg from subchannel address if
917  // it's already set at the channel level -- the value from the
918  // application should take precedence over what is set by the
919  // resolver.
920  if (default_authority != nullptr) continue;
921  default_authority = arg.value.string;
922  }
923  args_to_add.emplace_back(arg);
924  }
925  }
926  // If we haven't already set the default authority arg, add it from
927  // the channel.
928  if (default_authority == nullptr) {
929  // Remove it, just in case it's actually present but is the wrong type.
930  args_to_remove.push_back(GRPC_ARG_DEFAULT_AUTHORITY);
932  const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
933  const_cast<char*>(chand_->default_authority_.c_str())));
934  }
936  &args, args_to_remove.data(), args_to_remove.size(), args_to_add.data(),
937  args_to_add.size());
938  // Create subchannel.
940  chand_->client_channel_factory_->CreateSubchannel(address.address(),
941  new_args);
942  grpc_channel_args_destroy(new_args);
943  if (subchannel == nullptr) return nullptr;
944  // Make sure the subchannel has updated keepalive time.
945  subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
946  // Create and return wrapper for the subchannel.
947  return MakeRefCounted<SubchannelWrapper>(
948  chand_, std::move(subchannel), std::move(health_check_service_name));
949  }
950 
953  std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override
954  ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
955  if (chand_->resolver_ == nullptr) return; // Shutting down.
957  const char* extra = GRPC_ERROR_IS_NONE(chand_->disconnect_error_)
958  ? ""
959  : " (ignoring -- channel shutting down)";
960  gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
962  picker.get(), extra);
963  }
964  // Do update only if not shutting down.
965  if (GRPC_ERROR_IS_NONE(chand_->disconnect_error_)) {
966  chand_->UpdateStateAndPickerLocked(state, status, "helper",
967  std::move(picker));
968  }
969  }
970 
971  void RequestReresolution() override
973  if (chand_->resolver_ == nullptr) return; // Shutting down.
975  gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
976  }
977  chand_->resolver_->RequestReresolutionLocked();
978  }
979 
981  return chand_->default_authority_;
982  }
983 
985  ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
986  if (chand_->resolver_ == nullptr) return; // Shutting down.
987  if (chand_->channelz_node_ != nullptr) {
988  chand_->channelz_node_->AddTraceEvent(
989  ConvertSeverityEnum(severity),
991  }
992  }
993 
994  private:
997  if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
998  if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1000  }
1001 
1003 };
1004 
1005 //
1006 // ClientChannel implementation
1007 //
1008 
1011  grpc_channel_stack_last_element(channel->channel_stack());
1012  if (elem->filter != &kFilterVtable) return nullptr;
1013  return static_cast<ClientChannel*>(elem->channel_data);
1014 }
1015 
1018  GPR_ASSERT(args->is_last);
1019  GPR_ASSERT(elem->filter == &kFilterVtable);
1021  new (elem->channel_data) ClientChannel(args, &error);
1022  return error;
1023 }
1024 
1026  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1027  chand->~ClientChannel();
1028 }
1029 
1030 namespace {
1031 
1032 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1033  const grpc_channel_args* args) {
1034  const bool use_local_subchannel_pool = grpc_channel_args_find_bool(
1036  if (use_local_subchannel_pool) {
1037  return MakeRefCounted<LocalSubchannelPool>();
1038  }
1040 }
1041 
1042 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1043  return grpc_channel_args_find_pointer<channelz::ChannelNode>(
1045 }
1046 
1047 } // namespace
1048 
1052  grpc_deadline_checking_enabled(args->channel_args)),
1053  owning_stack_(args->channel_stack),
1055  ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1056  channelz_node_(GetChannelzNode(args->channel_args)),
1059  internal::ClientChannelServiceConfigParser::ParserIndex()),
1060  work_serializer_(std::make_shared<WorkSerializer>()),
1061  state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1062  subchannel_pool_(GetSubchannelPool(args->channel_args)) {
1064  gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1065  this, owning_stack_);
1066  }
1067  // Start backup polling.
1069  // Check client channel factory.
1070  if (client_channel_factory_ == nullptr) {
1072  "Missing client channel factory in args for client channel filter");
1073  return;
1074  }
1075  // Get default service config. If none is specified via the client API,
1076  // we use an empty config.
1077  const char* service_config_json = grpc_channel_args_find_string(
1078  args->channel_args, GRPC_ARG_SERVICE_CONFIG);
1079  if (service_config_json == nullptr) service_config_json = "{}";
1082  ServiceConfigImpl::Create(args->channel_args, service_config_json, error);
1083  if (!GRPC_ERROR_IS_NONE(*error)) {
1084  default_service_config_.reset();
1085  return;
1086  }
1087  // Get URI to resolve, using proxy mapper if needed.
1088  const char* server_uri =
1090  if (server_uri == nullptr) {
1092  "target URI channel arg missing or wrong type in client channel "
1093  "filter");
1094  return;
1095  }
1097  char* proxy_name = nullptr;
1098  grpc_channel_args* new_args = nullptr;
1099  ProxyMapperRegistry::MapName(server_uri, args->channel_args, &proxy_name,
1100  &new_args);
1101  if (proxy_name != nullptr) {
1102  uri_to_resolve_ = proxy_name;
1103  gpr_free(proxy_name);
1104  }
1105  // Make sure the URI to resolve is valid, so that we know that
1106  // resolver creation will succeed later.
1107  if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
1108  uri_to_resolve_)) {
1110  absl::StrCat("the target uri is not valid: ", uri_to_resolve_));
1111  return;
1112  }
1113  // Strip out service config channel arg, so that it doesn't affect
1114  // subchannel uniqueness when the args flow down to that layer.
1115  const char* arg_to_remove = GRPC_ARG_SERVICE_CONFIG;
1117  new_args != nullptr ? new_args : args->channel_args, &arg_to_remove, 1);
1118  grpc_channel_args_destroy(new_args);
1119  // Set initial keepalive time.
1120  keepalive_time_ = grpc_channel_args_find_integer(
1122  {-1 /* default value, unset */, 1, INT_MAX});
1123  // Set default authority.
1124  const char* default_authority =
1126  if (default_authority == nullptr) {
1129  server_uri);
1130  } else {
1131  default_authority_ = default_authority;
1132  }
1133  // Success.
1135 }
1136 
1139  gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1140  }
1143  // Stop backup polling.
1146  GRPC_ERROR_UNREF(disconnect_error_);
1147 }
1148 
1152  grpc_closure* on_call_destruction_complete,
1153  ConfigSelector::CallDispatchController* call_dispatch_controller,
1154  bool is_transparent_retry) {
1156  this, args, pollent, on_call_destruction_complete,
1157  call_dispatch_controller, is_transparent_retry));
1158 }
1159 
1160 namespace {
1161 
1163  const Resolver::Result& resolver_result,
1164  const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1165  // Prefer the LB policy config found in the service config.
1166  if (parsed_service_config->parsed_lb_config() != nullptr) {
1167  return parsed_service_config->parsed_lb_config();
1168  }
1169  // Try the deprecated LB policy name from the service config.
1170  // If not, try the setting from channel args.
1171  const char* policy_name = nullptr;
1172  if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1173  policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
1174  } else {
1175  policy_name = grpc_channel_args_find_string(resolver_result.args,
1177  bool requires_config = false;
1178  if (policy_name != nullptr &&
1180  policy_name, &requires_config) ||
1181  requires_config)) {
1182  if (requires_config) {
1184  "LB policy: %s passed through channel_args must not "
1185  "require a config. Using pick_first instead.",
1186  policy_name);
1187  } else {
1189  "LB policy: %s passed through channel_args does not exist. "
1190  "Using pick_first instead.",
1191  policy_name);
1192  }
1193  policy_name = "pick_first";
1194  }
1195  }
1196  // Use pick_first if nothing was specified and we didn't select grpclb
1197  // above.
1198  if (policy_name == nullptr) policy_name = "pick_first";
1199  // Now that we have the policy name, construct an empty config for it.
1200  Json config_json = Json::Array{Json::Object{
1201  {policy_name, Json::Object{}},
1202  }};
1205  config_json, &parse_error);
1206  // The policy name came from one of three places:
1207  // - The deprecated loadBalancingPolicy field in the service config,
1208  // in which case the code in ClientChannelServiceConfigParser
1209  // already verified that the policy does not require a config.
1210  // - One of the hard-coded values here, all of which are known to not
1211  // require a config.
1212  // - A channel arg, in which case we check that the specified policy exists
1213  // and accepts an empty config. If not, we revert to using pick_first
1214  // lb_policy
1215  GPR_ASSERT(lb_policy_config != nullptr);
1217  return lb_policy_config;
1218 }
1219 
1220 } // namespace
1221 
1223  // Handle race conditions.
1224  if (resolver_ == nullptr) return;
1226  gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
1227  }
1228  // We only want to trace the address resolution in the follow cases:
1229  // (a) Address resolution resulted in service config change.
1230  // (b) Address resolution that causes number of backends to go from
1231  // zero to non-zero.
1232  // (c) Address resolution that causes number of backends to go from
1233  // non-zero to zero.
1234  // (d) Address resolution that causes a new LB policy to be created.
1235  //
1236  // We track a list of strings to eventually be concatenated and traced.
1237  std::vector<const char*> trace_strings;
1238  const bool resolution_contains_addresses =
1239  result.addresses.ok() && !result.addresses->empty();
1240  if (!resolution_contains_addresses &&
1241  previous_resolution_contained_addresses_) {
1242  trace_strings.push_back("Address list became empty");
1243  } else if (resolution_contains_addresses &&
1244  !previous_resolution_contained_addresses_) {
1245  trace_strings.push_back("Address list became non-empty");
1246  }
1247  previous_resolution_contained_addresses_ = resolution_contains_addresses;
1248  std::string service_config_error_string_storage;
1249  if (!result.service_config.ok()) {
1250  service_config_error_string_storage =
1251  result.service_config.status().ToString();
1252  trace_strings.push_back(service_config_error_string_storage.c_str());
1253  }
1254  // Choose the service config.
1255  RefCountedPtr<ServiceConfig> service_config;
1256  RefCountedPtr<ConfigSelector> config_selector;
1257  if (!result.service_config.ok()) {
1259  gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
1260  this, result.service_config.status().ToString().c_str());
1261  }
1262  // If the service config was invalid, then fallback to the
1263  // previously returned service config.
1264  if (saved_service_config_ != nullptr) {
1266  gpr_log(GPR_INFO,
1267  "chand=%p: resolver returned invalid service config. "
1268  "Continuing to use previous service config.",
1269  this);
1270  }
1271  service_config = saved_service_config_;
1272  config_selector = saved_config_selector_;
1273  } else {
1274  // We received a service config error and we don't have a
1275  // previous service config to fall back to. Put the channel into
1276  // TRANSIENT_FAILURE.
1277  OnResolverErrorLocked(result.service_config.status());
1278  trace_strings.push_back("no valid service config");
1279  }
1280  } else if (*result.service_config == nullptr) {
1281  // Resolver did not return any service config.
1283  gpr_log(GPR_INFO,
1284  "chand=%p: resolver returned no service config. Using default "
1285  "service config for channel.",
1286  this);
1287  }
1288  service_config = default_service_config_;
1289  } else {
1290  // Use ServiceConfig and ConfigSelector returned by resolver.
1291  service_config = std::move(*result.service_config);
1292  config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
1293  }
1294  // Note: The only case in which service_config is null here is if the resolver
1295  // returned a service config error and we don't have a previous service
1296  // config to fall back to.
1297  if (service_config != nullptr) {
1298  // Extract global config for client channel.
1299  const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1300  static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1301  service_config->GetGlobalParsedConfig(
1303  // Choose LB policy config.
1305  ChooseLbPolicy(result, parsed_service_config);
1306  // Check if the ServiceConfig has changed.
1307  const bool service_config_changed =
1308  saved_service_config_ == nullptr ||
1309  service_config->json_string() != saved_service_config_->json_string();
1310  // Check if the ConfigSelector has changed.
1311  const bool config_selector_changed = !ConfigSelector::Equals(
1312  saved_config_selector_.get(), config_selector.get());
1313  // If either has changed, apply the global parameters now.
1314  if (service_config_changed || config_selector_changed) {
1315  // Update service config in control plane.
1317  std::move(config_selector),
1318  lb_policy_config->name());
1320  gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
1321  }
1322  // Create or update LB policy, as needed.
1324  std::move(lb_policy_config),
1325  parsed_service_config->health_check_service_name(), std::move(result));
1326  if (service_config_changed || config_selector_changed) {
1327  // Start using new service config for calls.
1328  // This needs to happen after the LB policy has been updated, since
1329  // the ConfigSelector may need the LB policy to know about new
1330  // destinations before it can send RPCs to those destinations.
1332  // TODO(ncteisen): might be worth somehow including a snippet of the
1333  // config in the trace, at the risk of bloating the trace logs.
1334  trace_strings.push_back("Service config changed");
1335  }
1336  }
1337  // Add channel trace event.
1338  if (!trace_strings.empty()) {
1340  absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1341  if (channelz_node_ != nullptr) {
1342  channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1344  }
1345  }
1346 }
1347 
1349  if (resolver_ == nullptr) return;
1351  gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
1352  status.ToString().c_str());
1353  }
1354  // If we already have an LB policy from a previous resolution
1355  // result, then we continue to let it set the connectivity state.
1356  // Otherwise, we go into TRANSIENT_FAILURE.
1357  if (lb_policy_ == nullptr) {
1359  {
1360  MutexLock lock(&resolution_mu_);
1361  // Update resolver transient failure.
1362  resolver_transient_failure_error_ = status;
1363  // Process calls that were queued waiting for the resolver result.
1364  for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
1365  call = call->next) {
1366  grpc_call_element* elem = call->elem;
1367  CallData* calld = static_cast<CallData*>(elem->call_data);
1369  if (calld->CheckResolutionLocked(elem, &error)) {
1370  calld->AsyncResolutionDone(elem, error);
1371  }
1372  }
1373  }
1375  // Update connectivity state.
1377  GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure",
1378  absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(status));
1379  }
1380 }
1381 
1384  const absl::optional<std::string>& health_check_service_name,
1386  // Construct update.
1387  LoadBalancingPolicy::UpdateArgs update_args;
1388  update_args.addresses = std::move(result.addresses);
1389  update_args.config = std::move(lb_policy_config);
1390  update_args.resolution_note = std::move(result.resolution_note);
1391  // Add health check service name to channel args.
1393  if (health_check_service_name.has_value()) {
1395  const_cast<char*>(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME),
1396  const_cast<char*>(health_check_service_name->c_str())));
1397  }
1398  // Remove the config selector from channel args so that we're not holding
1399  // unnecessary refs that cause it to be destroyed somewhere other than in the
1400  // WorkSerializer.
1401  const char* arg_to_remove = GRPC_ARG_CONFIG_SELECTOR;
1403  result.args, &arg_to_remove, 1, args_to_add.data(), args_to_add.size());
1404  // Create policy if needed.
1405  if (lb_policy_ == nullptr) {
1406  lb_policy_ = CreateLbPolicyLocked(*update_args.args);
1407  }
1408  // Update the policy.
1410  gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
1411  lb_policy_.get());
1412  }
1413  lb_policy_->UpdateLocked(std::move(update_args));
1414 }
1415 
1416 // Creates a new LB policy.
1418  const grpc_channel_args& args) {
1419  LoadBalancingPolicy::Args lb_policy_args;
1420  lb_policy_args.work_serializer = work_serializer_;
1421  lb_policy_args.channel_control_helper =
1422  absl::make_unique<ClientChannelControlHelper>(this);
1423  lb_policy_args.args = &args;
1425  MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1428  gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
1429  lb_policy.get());
1430  }
1431  grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1433  return lb_policy;
1434 }
1435 
1437  grpc_polling_entity* pollent) {
1438  // Add call to queued calls list.
1439  call->next = resolver_queued_calls_;
1440  resolver_queued_calls_ = call;
1441  // Add call's pollent to channel's interested_parties, so that I/O
1442  // can be done under the call's CQ.
1444 }
1445 
1447  grpc_polling_entity* pollent) {
1448  // Remove call's pollent from channel's interested_parties.
1450  // Remove from queued calls list.
1451  for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
1452  call = &(*call)->next) {
1453  if (*call == to_remove) {
1454  *call = to_remove->next;
1455  return;
1456  }
1457  }
1458 }
1459 
1461  RefCountedPtr<ServiceConfig> service_config,
1462  RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1463  std::string service_config_json(service_config->json_string());
1465  gpr_log(GPR_INFO,
1466  "chand=%p: resolver returned updated service config: \"%s\"", this,
1467  service_config_json.c_str());
1468  }
1469  // Save service config.
1470  saved_service_config_ = std::move(service_config);
1471  // Swap out the data used by GetChannelInfo().
1472  {
1473  MutexLock lock(&info_mu_);
1474  info_lb_policy_name_ = std::move(lb_policy_name);
1475  info_service_config_json_ = std::move(service_config_json);
1476  }
1477  // Save config selector.
1478  saved_config_selector_ = std::move(config_selector);
1480  gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
1481  saved_config_selector_.get());
1482  }
1483 }
1484 
1486  // Grab ref to service config.
1487  RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1488  // Grab ref to config selector. Use default if resolver didn't supply one.
1489  RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1491  gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
1492  saved_config_selector_.get());
1493  }
1494  if (config_selector == nullptr) {
1495  config_selector =
1496  MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1497  }
1498  absl::InlinedVector<grpc_arg, 2> args_to_add = {
1500  const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL), this,
1501  &kClientChannelArgPointerVtable),
1503  const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_OBJ), service_config.get(),
1504  &kServiceConfigObjArgPointerVtable),
1505  };
1507  channel_args_, args_to_add.data(), args_to_add.size());
1508  new_args = config_selector->ModifyChannelArgs(new_args);
1509  bool enable_retries =
1512  // Construct dynamic filter stack.
1513  std::vector<const grpc_channel_filter*> filters =
1514  config_selector->GetFilters();
1515  if (enable_retries) {
1516  filters.push_back(&kRetryFilterVtable);
1517  } else {
1518  filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1519  }
1520  RefCountedPtr<DynamicFilters> dynamic_filters =
1521  DynamicFilters::Create(new_args, std::move(filters));
1522  GPR_ASSERT(dynamic_filters != nullptr);
1523  grpc_channel_args_destroy(new_args);
1524  // Grab data plane lock to update service config.
1525  //
1526  // We defer unreffing the old values (and deallocating memory) until
1527  // after releasing the lock to keep the critical section small.
1528  {
1529  MutexLock lock(&resolution_mu_);
1530  resolver_transient_failure_error_ = absl::OkStatus();
1531  // Update service config.
1532  received_service_config_data_ = true;
1533  // Old values will be unreffed after lock is released.
1534  service_config_.swap(service_config);
1535  config_selector_.swap(config_selector);
1536  dynamic_filters_.swap(dynamic_filters);
1537  // Process calls that were queued waiting for the resolver result.
1538  for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
1539  call = call->next) {
1540  // If there are a lot of queued calls here, resuming them all may cause us
1541  // to stay inside C-core for a long period of time. All of that work would
1542  // be done using the same ExecCtx instance and therefore the same cached
1543  // value of "now". The longer it takes to finish all of this work and exit
1544  // from C-core, the more stale the cached value of "now" may become. This
1545  // can cause problems whereby (e.g.) we calculate a timer deadline based
1546  // on the stale value, which results in the timer firing too early. To
1547  // avoid this, we invalidate the cached value for each call we process.
1549  grpc_call_element* elem = call->elem;
1550  CallData* calld = static_cast<CallData*>(elem->call_data);
1552  if (calld->CheckResolutionLocked(elem, &error)) {
1553  calld->AsyncResolutionDone(elem, error);
1554  }
1555  }
1556  }
1557  // Old values will be unreffed after lock is released when they go out
1558  // of scope.
1559 }
1560 
1563  gpr_log(GPR_INFO, "chand=%p: starting name resolution", this);
1564  }
1567  work_serializer_, absl::make_unique<ResolverResultHandler>(this));
1568  // Since the validity of the args was checked when the channel was created,
1569  // CreateResolver() must return a non-null result.
1570  GPR_ASSERT(resolver_ != nullptr);
1572  GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1573  absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
1574  resolver_->StartLocked();
1576  gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
1577  }
1578 }
1579 
1581  if (resolver_ != nullptr) {
1583  gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
1584  resolver_.get());
1585  }
1586  resolver_.reset();
1587  if (lb_policy_ != nullptr) {
1589  gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
1590  lb_policy_.get());
1591  }
1592  grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1594  lb_policy_.reset();
1595  }
1596  }
1597 }
1598 
1601  const char* reason,
1602  std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
1603  // Special case for IDLE and SHUTDOWN states.
1604  if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
1605  saved_service_config_.reset();
1606  saved_config_selector_.reset();
1607  // Acquire resolution lock to update config selector and associated state.
1608  // To minimize lock contention, we wait to unref these objects until
1609  // after we release the lock.
1610  RefCountedPtr<ServiceConfig> service_config_to_unref;
1611  RefCountedPtr<ConfigSelector> config_selector_to_unref;
1612  RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1613  {
1614  MutexLock lock(&resolution_mu_);
1615  received_service_config_data_ = false;
1616  service_config_to_unref = std::move(service_config_);
1617  config_selector_to_unref = std::move(config_selector_);
1618  dynamic_filters_to_unref = std::move(dynamic_filters_);
1619  }
1620  }
1621  // Update connectivity state.
1622  state_tracker_.SetState(state, status, reason);
1623  if (channelz_node_ != nullptr) {
1626  channelz::ChannelTrace::Severity::Info,
1629  state)));
1630  }
1631  // Grab data plane lock to update the picker.
1632  {
1633  MutexLock lock(&data_plane_mu_);
1634  // Swap out the picker.
1635  // Note: Original value will be destroyed after the lock is released.
1636  picker_.swap(picker);
1637  // Re-process queued picks.
1638  for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
1639  call = call->next) {
1640  // If there are a lot of queued calls here, resuming them all may cause us
1641  // to stay inside C-core for a long period of time. All of that work would
1642  // be done using the same ExecCtx instance and therefore the same cached
1643  // value of "now". The longer it takes to finish all of this work and exit
1644  // from C-core, the more stale the cached value of "now" may become. This
1645  // can cause problems whereby (e.g.) we calculate a timer deadline based
1646  // on the stale value, which results in the timer firing too early. To
1647  // avoid this, we invalidate the cached value for each call we process.
1650  if (call->lb_call->PickSubchannelLocked(&error)) {
1651  call->lb_call->AsyncPickDone(error);
1652  }
1653  }
1654  }
1655 }
1656 
1657 namespace {
1658 
1659 // TODO(roth): Remove this in favor of the gprpp Match() function once
1660 // we can do that without breaking lock annotations.
1661 template <typename T>
1662 T HandlePickResult(
1668  auto* complete_pick =
1669  absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
1670  if (complete_pick != nullptr) {
1671  return complete_func(complete_pick);
1672  }
1673  auto* queue_pick =
1674  absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
1675  if (queue_pick != nullptr) {
1676  return queue_func(queue_pick);
1677  }
1678  auto* fail_pick =
1679  absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
1680  if (fail_pick != nullptr) {
1681  return fail_func(fail_pick);
1682  }
1683  auto* drop_pick =
1684  absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
1685  GPR_ASSERT(drop_pick != nullptr);
1686  return drop_func(drop_pick);
1687 }
1688 
1689 } // namespace
1690 
1692  if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1693  return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1694  }
1696  {
1697  MutexLock lock(&data_plane_mu_);
1699  }
1700  return HandlePickResult<grpc_error_handle>(
1701  &result,
1702  // Complete pick.
1706  complete_pick->subchannel.get());
1707  RefCountedPtr<ConnectedSubchannel> connected_subchannel =
1708  subchannel->connected_subchannel();
1709  connected_subchannel->Ping(op->send_ping.on_initiate,
1710  op->send_ping.on_ack);
1711  return GRPC_ERROR_NONE;
1712  },
1713  // Queue pick.
1714  [](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
1715  return GRPC_ERROR_CREATE_FROM_STATIC_STRING("LB picker queued call");
1716  },
1717  // Fail pick.
1718  [](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
1719  return absl_status_to_grpc_error(fail_pick->status);
1720  },
1721  // Drop pick.
1722  [](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
1723  return absl_status_to_grpc_error(drop_pick->status);
1724  });
1725 }
1726 
1728  // Connectivity watch.
1729  if (op->start_connectivity_watch != nullptr) {
1730  state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1731  std::move(op->start_connectivity_watch));
1732  }
1733  if (op->stop_connectivity_watch != nullptr) {
1734  state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1735  }
1736  // Ping.
1737  if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1739  if (!GRPC_ERROR_IS_NONE(error)) {
1740  ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
1742  ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1743  }
1744  op->bind_pollset = nullptr;
1745  op->send_ping.on_initiate = nullptr;
1746  op->send_ping.on_ack = nullptr;
1747  }
1748  // Reset backoff.
1749  if (op->reset_connect_backoff) {
1750  if (lb_policy_ != nullptr) {
1751  lb_policy_->ResetBackoffLocked();
1752  }
1753  }
1754  // Disconnect or enter IDLE.
1755  if (!GRPC_ERROR_IS_NONE(op->disconnect_with_error)) {
1757  gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
1758  grpc_error_std_string(op->disconnect_with_error).c_str());
1759  }
1761  intptr_t value;
1762  if (grpc_error_get_int(op->disconnect_with_error,
1764  static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1765  if (GRPC_ERROR_IS_NONE(disconnect_error_)) {
1766  // Enter IDLE state.
1768  "channel entering IDLE", nullptr);
1769  }
1770  GRPC_ERROR_UNREF(op->disconnect_with_error);
1771  } else {
1772  // Disconnect.
1773  GPR_ASSERT(GRPC_ERROR_IS_NONE(disconnect_error_));
1774  disconnect_error_ = op->disconnect_with_error;
1776  GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1777  absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
1778  grpc_error_to_absl_status(op->disconnect_with_error)));
1779  }
1780  }
1781  GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1783 }
1784 
1786  grpc_transport_op* op) {
1787  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1788  GPR_ASSERT(op->set_accept_stream == false);
1789  // Handle bind_pollset.
1790  if (op->bind_pollset != nullptr) {
1791  grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1792  }
1793  // Pop into control plane work_serializer for remaining ops.
1794  GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1795  chand->work_serializer_->Run(
1796  [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
1797  chand->StartTransportOpLocked(op);
1798  },
1799  DEBUG_LOCATION);
1800 }
1801 
1803  const grpc_channel_info* info) {
1804  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1805  MutexLock lock(&chand->info_mu_);
1806  if (info->lb_policy_name != nullptr) {
1807  *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.c_str());
1808  }
1809  if (info->service_config_json != nullptr) {
1810  *info->service_config_json =
1811  gpr_strdup(chand->info_service_config_json_.c_str());
1812  }
1813 }
1814 
1816  grpc_polling_entity* pollent) {
1817  // Add call to queued picks list.
1818  call->next = lb_queued_calls_;
1819  lb_queued_calls_ = call;
1820  // Add call's pollent to channel's interested_parties, so that I/O
1821  // can be done under the call's CQ.
1823 }
1824 
1826  grpc_polling_entity* pollent) {
1827  // Remove call's pollent from channel's interested_parties.
1829  // Remove from queued picks list.
1830  for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
1831  call = &(*call)->next) {
1832  if (*call == to_remove) {
1833  *call = to_remove->next;
1834  return;
1835  }
1836  }
1837 }
1838 
1840  if (lb_policy_ != nullptr) {
1841  lb_policy_->ExitIdleLocked();
1842  } else if (resolver_ == nullptr) {
1844  }
1845  GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
1846 }
1847 
1849  bool try_to_connect) {
1850  // state_tracker_ is guarded by work_serializer_, which we're not
1851  // holding here. But the one method of state_tracker_ that *is*
1852  // thread-safe to call without external synchronization is the state()
1853  // method, so we can disable thread-safety analysis for this one read.
1854  grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
1855  if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1856  GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1859  DEBUG_LOCATION);
1860  }
1861  return out;
1862 }
1863 
1865  grpc_connectivity_state initial_state,
1867  new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
1868 }
1869 
1873 }
1874 
1875 //
1876 // CallData implementation
1877 //
1878 
1880  const ClientChannel& chand,
1882  : deadline_state_(elem, args,
1883  GPR_LIKELY(chand.deadline_checking_enabled_)
1884  ? args.deadline
1885  : Timestamp::InfFuture()),
1887  call_start_time_(args.start_time),
1888  deadline_(args.deadline),
1889  arena_(args.arena),
1890  owning_call_(args.call_stack),
1891  call_combiner_(args.call_combiner),
1894  gpr_log(GPR_INFO, "chand=%p calld=%p: created call", &chand, this);
1895  }
1896 }
1897 
1900  GRPC_ERROR_UNREF(cancel_error_);
1901  // Make sure there are no remaining pending batches.
1902  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1903  GPR_ASSERT(pending_batches_[i] == nullptr);
1904  }
1905 }
1906 
1909  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1910  new (elem->call_data) CallData(elem, *chand, *args);
1911  return GRPC_ERROR_NONE;
1912 }
1913 
1915  grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
1916  grpc_closure* then_schedule_closure) {
1917  CallData* calld = static_cast<CallData*>(elem->call_data);
1919  std::move(calld->dynamic_call_);
1920  calld->~CallData();
1921  if (GPR_LIKELY(dynamic_call != nullptr)) {
1922  dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
1923  } else {
1924  // TODO(yashkt) : This can potentially be a Closure::Run
1925  ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
1926  }
1927 }
1928 
1931  GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1932  CallData* calld = static_cast<CallData*>(elem->call_data);
1933  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1936  gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from above: %s", chand,
1938  }
1939  if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
1941  }
1942  // Intercept recv_trailing_metadata to call CallDispatchController::Commit(),
1943  // in case we wind up failing the call before we get down to the retry
1944  // or LB call layer.
1947  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1949  RecvTrailingMetadataReadyForConfigSelectorCommitCallback,
1950  elem, nullptr);
1951  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1953  }
1954  // If we already have a dynamic call, pass the batch down to it.
1955  // Note that once we have done so, we do not need to acquire the channel's
1956  // resolution mutex, which is more efficient (especially for streaming calls).
1957  if (calld->dynamic_call_ != nullptr) {
1959  gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
1960  chand, calld, calld->dynamic_call_.get());
1961  }
1962  calld->dynamic_call_->StartTransportStreamOpBatch(batch);
1963  return;
1964  }
1965  // We do not yet have a dynamic call.
1966  //
1967  // If we've previously been cancelled, immediately fail any new batches.
1970  gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1971  chand, calld,
1972  grpc_error_std_string(calld->cancel_error_).c_str());
1973  }
1974  // Note: This will release the call combiner.
1977  return;
1978  }
1979  // Handle cancellation.
1981  // Stash a copy of cancel_error in our call data, so that we can use
1982  // it for subsequent operations. This ensures that if the call is
1983  // cancelled before any batches are passed down (e.g., if the deadline
1984  // is in the past when the call starts), we can return the right
1985  // error to the caller when the first batch does get passed down.
1987  calld->cancel_error_ =
1990  gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1991  calld, grpc_error_std_string(calld->cancel_error_).c_str());
1992  }
1993  // Fail all pending batches.
1995  NoYieldCallCombiner);
1996  // Note: This will release the call combiner.
1999  return;
2000  }
2001  // Add the batch to the pending list.
2002  calld->PendingBatchesAdd(elem, batch);
2003  // For batches containing a send_initial_metadata op, acquire the
2004  // channel's resolution mutex to apply the service config to the call,
2005  // after which we will create a dynamic call.
2008  gpr_log(GPR_INFO,
2009  "chand=%p calld=%p: grabbing resolution mutex to apply service "
2010  "config",
2011  chand, calld);
2012  }
2013  CheckResolution(elem, GRPC_ERROR_NONE);
2014  } else {
2015  // For all other batches, release the call combiner.
2017  gpr_log(GPR_INFO,
2018  "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2019  calld);
2020  }
2022  "batch does not include send_initial_metadata");
2023  }
2024 }
2025 
2027  grpc_polling_entity* pollent) {
2028  CallData* calld = static_cast<CallData*>(elem->call_data);
2029  calld->pollent_ = pollent;
2030 }
2031 
2032 //
2033 // pending_batches management
2034 //
2035 
2038  // Note: It is important the send_initial_metadata be the first entry
2039  // here, since the code in ApplyServiceConfigToCallLocked() and
2040  // CheckResolutionLocked() assumes it will be.
2041  if (batch->send_initial_metadata) return 0;
2042  if (batch->send_message) return 1;
2043  if (batch->send_trailing_metadata) return 2;
2044  if (batch->recv_initial_metadata) return 3;
2045  if (batch->recv_message) return 4;
2046  if (batch->recv_trailing_metadata) return 5;
2047  GPR_UNREACHABLE_CODE(return (size_t)-1);
2048 }
2049 
2050 // This is called via the call combiner, so access to calld is synchronized.
2053  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2054  const size_t idx = GetBatchIndex(batch);
2056  gpr_log(GPR_INFO,
2057  "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2058  this, idx);
2059  }
2061  GPR_ASSERT(pending == nullptr);
2062  pending = batch;
2063 }
2064 
2065 // This is called via the call combiner, so access to calld is synchronized.
2067  void* arg, grpc_error_handle error) {
2069  static_cast<grpc_transport_stream_op_batch*>(arg);
2070  CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2071  // Note: This will release the call combiner.
2074 }
2075 
2076 // This is called via the call combiner, so access to calld is synchronized.
2079  YieldCallCombinerPredicate yield_call_combiner_predicate) {
2082  size_t num_batches = 0;
2083  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2084  if (pending_batches_[i] != nullptr) ++num_batches;
2085  }
2086  gpr_log(GPR_INFO,
2087  "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2088  elem->channel_data, this, num_batches,
2090  }
2091  CallCombinerClosureList closures;
2092  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2094  if (batch != nullptr) {
2097  FailPendingBatchInCallCombiner, batch,
2098  grpc_schedule_on_exec_ctx);
2100  "PendingBatchesFail");
2101  batch = nullptr;
2102  }
2103  }
2104  if (yield_call_combiner_predicate(closures)) {
2105  closures.RunClosures(call_combiner_);
2106  } else {
2108  }
2110 }
2111 
2112 // This is called via the call combiner, so access to calld is synchronized.
2114  void* arg, grpc_error_handle /*ignored*/) {
2116  static_cast<grpc_transport_stream_op_batch*>(arg);
2117  auto* elem =
2119  auto* calld = static_cast<CallData*>(elem->call_data);
2120  // Note: This will release the call combiner.
2121  calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2122 }
2123 
2124 // This is called via the call combiner, so access to calld is synchronized.
2126  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2127  // Retries not enabled; send down batches as-is.
2129  size_t num_batches = 0;
2130  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2131  if (pending_batches_[i] != nullptr) ++num_batches;
2132  }
2133  gpr_log(GPR_INFO,
2134  "chand=%p calld=%p: starting %" PRIuPTR
2135  " pending batches on dynamic_call=%p",
2136  chand, this, num_batches, dynamic_call_.get());
2137  }
2138  CallCombinerClosureList closures;
2139  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2141  if (batch != nullptr) {
2144  ResumePendingBatchInCallCombiner, batch, nullptr);
2146  "resuming pending batch from client channel call");
2147  batch = nullptr;
2148  }
2149  }
2150  // Note: This will release the call combiner.
2151  closures.RunClosures(call_combiner_);
2152 }
2153 
2154 //
2155 // name resolution
2156 //
2157 
2158 // A class to handle the call combiner cancellation callback for a
2159 // queued pick.
2161  public:
2163  auto* calld = static_cast<CallData*>(elem->call_data);
2164  GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller");
2165  GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2166  grpc_schedule_on_exec_ctx);
2167  calld->call_combiner_->SetNotifyOnCancel(&closure_);
2168  }
2169 
2170  private:
2171  static void CancelLocked(void* arg, grpc_error_handle error) {
2172  auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2173  auto* chand = static_cast<ClientChannel*>(self->elem_->channel_data);
2174  auto* calld = static_cast<CallData*>(self->elem_->call_data);
2175  {
2176  MutexLock lock(&chand->resolution_mu_);
2178  gpr_log(GPR_INFO,
2179  "chand=%p calld=%p: cancelling resolver queued pick: "
2180  "error=%s self=%p calld->resolver_pick_canceller=%p",
2181  chand, calld, grpc_error_std_string(error).c_str(), self,
2182  calld->resolver_call_canceller_);
2183  }
2184  if (calld->resolver_call_canceller_ == self &&
2186  // Remove pick from list of queued picks.
2187  calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_);
2188  // Fail pending batches on the call.
2189  calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
2190  YieldCallCombinerIfPendingBatchesFound);
2191  }
2192  }
2193  GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller");
2194  delete self;
2195  }
2196 
2199 };
2200 
2201 void ClientChannel::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
2203  if (!queued_pending_resolver_result_) return;
2204  auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2206  gpr_log(GPR_INFO,
2207  "chand=%p calld=%p: removing from resolver queued picks list",
2208  chand, this);
2209  }
2210  chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_);
2211  queued_pending_resolver_result_ = false;
2212  // Lame the call combiner canceller.
2213  resolver_call_canceller_ = nullptr;
2214 }
2215 
2216 void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked(
2218  if (queued_pending_resolver_result_) return;
2219  auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2221  gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
2222  chand, this);
2223  }
2224  queued_pending_resolver_result_ = true;
2225  resolver_queued_call_.elem = elem;
2226  chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_);
2227  // Register call combiner cancellation callback.
2228  resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
2229 }
2230 
2231 grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
2232  grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
2233  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2235  gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2236  chand, this);
2237  }
2238  ConfigSelector* config_selector = chand->config_selector_.get();
2239  if (config_selector != nullptr) {
2240  // Use the ConfigSelector to determine the config for the call.
2241  ConfigSelector::CallConfig call_config =
2242  config_selector->GetCallConfig({&path_, initial_metadata, arena_});
2243  if (!GRPC_ERROR_IS_NONE(call_config.error)) return call_config.error;
2244  // Create a ClientChannelServiceConfigCallData for the call. This stores
2245  // a ref to the ServiceConfig and caches the right set of parsed configs
2246  // to use for the call. The ClientChannelServiceConfigCallData will store
2247  // itself in the call context, so that it can be accessed by filters
2248  // below us in the stack, and it will be cleaned up when the call ends.
2249  auto* service_config_call_data =
2250  arena_->New<ClientChannelServiceConfigCallData>(
2251  std::move(call_config.service_config), call_config.method_configs,
2252  std::move(call_config.call_attributes),
2253  call_config.call_dispatch_controller, call_context_);
2254  // Apply our own method params to the call.
2255  auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
2256  service_config_call_data->GetMethodParsedConfig(
2257  chand->service_config_parser_index_));
2258  if (method_params != nullptr) {
2259  // If the deadline from the service config is shorter than the one
2260  // from the client API, reset the deadline timer.
2261  if (chand->deadline_checking_enabled_ &&
2262  method_params->timeout() != Duration::Zero()) {
2263  const Timestamp per_method_deadline =
2264  Timestamp::FromCycleCounterRoundUp(call_start_time_) +
2265  method_params->timeout();
2266  if (per_method_deadline < deadline_) {
2267  deadline_ = per_method_deadline;
2269  }
2270  }
2271  // If the service config set wait_for_ready and the application
2272  // did not explicitly set it, use the value from the service config.
2273  uint32_t* send_initial_metadata_flags =
2274  &pending_batches_[0]
2275  ->payload->send_initial_metadata.send_initial_metadata_flags;
2276  if (method_params->wait_for_ready().has_value() &&
2277  !(*send_initial_metadata_flags &
2279  if (method_params->wait_for_ready().value()) {
2280  *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2281  } else {
2282  *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2283  }
2284  }
2285  }
2286  // Set the dynamic filter stack.
2287  dynamic_filters_ = chand->dynamic_filters_;
2288  }
2289  return GRPC_ERROR_NONE;
2290 }
2291 
2294  void* arg, grpc_error_handle error) {
2295  auto* elem = static_cast<grpc_call_element*>(arg);
2296  auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2297  auto* calld = static_cast<CallData*>(elem->call_data);
2298  auto* service_config_call_data =
2299  static_cast<ClientChannelServiceConfigCallData*>(
2300  calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2302  gpr_log(GPR_INFO,
2303  "chand=%p calld=%p: got recv_trailing_metadata_ready: error=%s "
2304  "service_config_call_data=%p",
2305  chand, calld, grpc_error_std_string(error).c_str(),
2306  service_config_call_data);
2307  }
2308  if (service_config_call_data != nullptr) {
2309  service_config_call_data->call_dispatch_controller()->Commit();
2310  }
2311  // Chain to original callback.
2312  Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2314 }
2315 
2318  // TODO(roth): Does this callback need to hold a ref to the call stack?
2319  GRPC_CLOSURE_INIT(&resolution_done_closure_, ResolutionDone, elem, nullptr);
2320  ExecCtx::Run(DEBUG_LOCATION, &resolution_done_closure_, error);
2321 }
2322 
2325  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2326  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2327  CallData* calld = static_cast<CallData*>(elem->call_data);
2328  if (!GRPC_ERROR_IS_NONE(error)) {
2330  gpr_log(GPR_INFO,
2331  "chand=%p calld=%p: error applying config to call: error=%s",
2332  chand, calld, grpc_error_std_string(error).c_str());
2333  }
2334  calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
2335  return;
2336  }
2337  calld->CreateDynamicCall(elem);
2338 }
2339 
2342  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2343  CallData* calld = static_cast<CallData*>(elem->call_data);
2344  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2345  bool resolution_complete;
2346  {
2347  MutexLock lock(&chand->resolution_mu_);
2348  resolution_complete = calld->CheckResolutionLocked(elem, &error);
2349  }
2350  if (resolution_complete) {
2351  ResolutionDone(elem, error);
2353  }
2354 }
2355 
2356 bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
2358  ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2359  // If we're still in IDLE, we need to start resolving.
2360  if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
2362  gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand, this);
2363  }
2364  // Bounce into the control plane work serializer to start resolving,
2365  // in case we are still in IDLE state. Since we are holding on to the
2366  // resolution mutex here, we offload it on the ExecCtx so that we don't
2367  // deadlock with ourselves.
2368  GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "CheckResolutionLocked");
2369  ExecCtx::Run(
2372  [](void* arg, grpc_error_handle /*error*/) {
2373  auto* chand = static_cast<ClientChannel*>(arg);
2374  chand->work_serializer_->Run(
2375  [chand]()
2377  chand->CheckConnectivityState(/*try_to_connect=*/true);
2378  GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_,
2379  "CheckResolutionLocked");
2380  },
2381  DEBUG_LOCATION);
2382  },
2383  chand, nullptr),
2384  GRPC_ERROR_NONE);
2385  }
2386  // Get send_initial_metadata batch and flags.
2387  auto& send_initial_metadata =
2388  pending_batches_[0]->payload->send_initial_metadata;
2389  grpc_metadata_batch* initial_metadata_batch =
2390  send_initial_metadata.send_initial_metadata;
2391  const uint32_t send_initial_metadata_flags =
2392  send_initial_metadata.send_initial_metadata_flags;
2393  // If we don't yet have a resolver result, we need to queue the call
2394  // until we get one.
2395  if (GPR_UNLIKELY(!chand->received_service_config_data_)) {
2396  // If the resolver returned transient failure before returning the
2397  // first service config, fail any non-wait_for_ready calls.
2398  absl::Status resolver_error = chand->resolver_transient_failure_error_;
2399  if (!resolver_error.ok() && (send_initial_metadata_flags &
2402  gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call",
2403  chand, this);
2404  }
2405  MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
2406  *error = absl_status_to_grpc_error(resolver_error);
2407  return true;
2408  }
2409  // Either the resolver has not yet returned a result, or it has
2410  // returned transient failure but the call is wait_for_ready. In
2411  // either case, queue the call.
2413  gpr_log(GPR_INFO, "chand=%p calld=%p: queuing to wait for resolution",
2414  chand, this);
2415  }
2416  MaybeAddCallToResolverQueuedCallsLocked(elem);
2417  return false;
2418  }
2419  // Apply service config to call if not yet applied.
2420  if (GPR_LIKELY(!service_config_applied_)) {
2421  service_config_applied_ = true;
2422  *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
2423  }
2424  MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
2425  return true;
2426 }
2427 
2429  auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2430  DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
2431  pollent_,
2432  path_,
2433  call_start_time_,
2434  deadline_,
2435  arena_,
2436  call_context_,
2437  call_combiner_};
2439  DynamicFilters* channel_stack = args.channel_stack.get();
2441  gpr_log(
2442  GPR_INFO,
2443  "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
2444  chand, this, channel_stack);
2445  }
2446  dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2447  if (!GRPC_ERROR_IS_NONE(error)) {
2449  gpr_log(GPR_INFO,
2450  "chand=%p calld=%p: failed to create dynamic call: error=%s",
2451  chand, this, grpc_error_std_string(error).c_str());
2452  }
2453  PendingBatchesFail(elem, error, YieldCallCombiner);
2454  return;
2455  }
2456  PendingBatchesResume(elem);
2457 }
2458 
2459 //
2460 // ClientChannel::LoadBalancedCall::Metadata
2461 //
2462 
2465  public:
2467 
2469  if (batch_ == nullptr) return;
2470  // Gross, egregious hack to support legacy grpclb behavior.
2471  // TODO(ctiller): Use a promise context for this once that plumbing is done.
2473  batch_->Set(
2475  const_cast<GrpcLbClientStats*>(
2476  reinterpret_cast<const GrpcLbClientStats*>(value.data())));
2477  return;
2478  }
2480  [key](absl::string_view error, const Slice& value) {
2481  gpr_log(GPR_ERROR, "%s",
2482  absl::StrCat(error, " key:", key,
2483  " value:", value.as_string_view())
2484  .c_str());
2485  });
2486  }
2487 
2488  std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
2489  override {
2490  if (batch_ == nullptr) return {};
2491  Encoder encoder;
2492  batch_->Encode(&encoder);
2493  return encoder.Take();
2494  }
2495 
2497  std::string* buffer) const override {
2498  if (batch_ == nullptr) return absl::nullopt;
2499  return batch_->GetStringValue(key, buffer);
2500  }
2501 
2502  private:
2503  class Encoder {
2504  public:
2505  void Encode(const Slice& key, const Slice& value) {
2506  out_.emplace_back(std::string(key.as_string_view()),
2507  std::string(value.as_string_view()));
2508  }
2509 
2510  template <class Which>
2511  void Encode(Which, const typename Which::ValueType& value) {
2512  auto value_slice = Which::Encode(value);
2513  out_.emplace_back(std::string(Which::key()),
2514  std::string(value_slice.as_string_view()));
2515  }
2516 
2518  const typename GrpcTimeoutMetadata::ValueType&) {}
2519  void Encode(HttpPathMetadata, const Slice&) {}
2521  const typename HttpMethodMetadata::ValueType&) {}
2522 
2523  std::vector<std::pair<std::string, std::string>> Take() {
2524  return std::move(out_);
2525  }
2526 
2527  private:
2528  std::vector<std::pair<std::string, std::string>> out_;
2529  };
2530 
2532 };
2533 
2534 //
2535 // ClientChannel::LoadBalancedCall::LbCallState
2536 //
2537 
2540  UniqueTypeName type) {
2541  auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
2542  lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2543  auto& call_attributes = service_config_call_data->call_attributes();
2544  auto it = call_attributes.find(type);
2545  if (it == call_attributes.end()) return absl::string_view();
2546  return it->second;
2547 }
2548 
2549 //
2550 // ClientChannel::LoadBalancedCall::BackendMetricAccessor
2551 //
2552 
2555  public:
2557  : lb_call_(lb_call) {}
2558 
2560  if (lb_call_->backend_metric_data_ == nullptr &&
2561  lb_call_->recv_trailing_metadata_ != nullptr) {
2562  if (const auto* md = lb_call_->recv_trailing_metadata_->get_pointer(
2564  BackendMetricAllocator allocator(lb_call_->arena_);
2565  lb_call_->backend_metric_data_ =
2566  ParseBackendMetricData(md->as_string_view(), &allocator);
2567  }
2568  }
2569  return lb_call_->backend_metric_data_;
2570  }
2571 
2572  private:
2574  public:
2576 
2578  return arena_->New<BackendMetricData>();
2579  }
2580 
2581  char* AllocateString(size_t size) override {
2582  return static_cast<char*>(arena_->Alloc(size));
2583  }
2584 
2585  private:
2587  };
2588 
2590 };
2591 
2592 //
2593 // ClientChannel::LoadBalancedCall
2594 //
2595 
2596 namespace {
2597 
2598 CallTracer::CallAttemptTracer* GetCallAttemptTracer(
2599  grpc_call_context_element* context, bool is_transparent_retry) {
2600  auto* call_tracer =
2601  static_cast<CallTracer*>(context[GRPC_CONTEXT_CALL_TRACER].value);
2602  if (call_tracer == nullptr) return nullptr;
2603  return call_tracer->StartNewAttempt(is_transparent_retry);
2604 }
2605 
2606 } // namespace
2607 
2609  ClientChannel* chand, const grpc_call_element_args& args,
2610  grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
2611  ConfigSelector::CallDispatchController* call_dispatch_controller,
2612  bool is_transparent_retry)
2615  ? "LoadBalancedCall"
2616  : nullptr),
2617  chand_(chand),
2619  deadline_(args.deadline),
2620  arena_(args.arena),
2621  owning_call_(args.call_stack),
2622  call_combiner_(args.call_combiner),
2624  pollent_(pollent),
2625  on_call_destruction_complete_(on_call_destruction_complete),
2626  call_dispatch_controller_(call_dispatch_controller),
2627  call_attempt_tracer_(
2628  GetCallAttemptTracer(args.context, is_transparent_retry)) {
2630  gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this);
2631  }
2632 }
2633 
2635  GRPC_ERROR_UNREF(cancel_error_);
2636  GRPC_ERROR_UNREF(failure_error_);
2637  if (backend_metric_data_ != nullptr) {
2638  backend_metric_data_->BackendMetricData::~BackendMetricData();
2639  }
2640  // Make sure there are no remaining pending batches.
2641  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2642  GPR_ASSERT(pending_batches_[i] == nullptr);
2643  }
2644  if (on_call_destruction_complete_ != nullptr) {
2645  ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
2646  GRPC_ERROR_NONE);
2647  }
2648 }
2649 
2651  // If the recv_trailing_metadata op was never started, then notify
2652  // about call completion here, as best we can. We assume status
2653  // CANCELLED in this case.
2654  if (recv_trailing_metadata_ == nullptr) {
2655  RecordCallCompletion(absl::CancelledError("call cancelled"));
2656  }
2657  // Compute latency and report it to the tracer.
2658  if (call_attempt_tracer_ != nullptr) {
2659  gpr_timespec latency =
2660  gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
2661  call_attempt_tracer_->RecordEnd(latency);
2662  }
2663  Unref();
2664 }
2665 
2668  // Note: It is important the send_initial_metadata be the first entry
2669  // here, since the code in PickSubchannelLocked() assumes it will be.
2670  if (batch->send_initial_metadata) return 0;
2671  if (batch->send_message) return 1;
2672  if (batch->send_trailing_metadata) return 2;
2673  if (batch->recv_initial_metadata) return 3;
2674  if (batch->recv_message) return 4;
2675  if (batch->recv_trailing_metadata) return 5;
2676  GPR_UNREACHABLE_CODE(return (size_t)-1);
2677 }
2678 
2679 // This is called via the call combiner, so access to calld is synchronized.
2682  const size_t idx = GetBatchIndex(batch);
2684  gpr_log(GPR_INFO,
2685  "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
2686  chand_, this, idx);
2687  }
2688  GPR_ASSERT(pending_batches_[idx] == nullptr);
2690 }
2691 
2692 // This is called via the call combiner, so access to calld is synchronized.
2694  void* arg, grpc_error_handle error) {
2696  static_cast<grpc_transport_stream_op_batch*>(arg);
2697  auto* self = static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
2698  // Note: This will release the call combiner.
2700  batch, GRPC_ERROR_REF(error), self->call_combiner_);
2701 }
2702 
2703 // This is called via the call combiner, so access to calld is synchronized.
2706  YieldCallCombinerPredicate yield_call_combiner_predicate) {
2708  GRPC_ERROR_UNREF(failure_error_);
2709  failure_error_ = error;
2711  size_t num_batches = 0;
2712  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2713  if (pending_batches_[i] != nullptr) ++num_batches;
2714  }
2715  gpr_log(GPR_INFO,
2716  "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
2717  chand_, this, num_batches, grpc_error_std_string(error).c_str());
2718  }
2719  CallCombinerClosureList closures;
2720  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2722  if (batch != nullptr) {
2725  FailPendingBatchInCallCombiner, batch,
2726  grpc_schedule_on_exec_ctx);
2728  "PendingBatchesFail");
2729  batch = nullptr;
2730  }
2731  }
2732  if (yield_call_combiner_predicate(closures)) {
2733  closures.RunClosures(call_combiner_);
2734  } else {
2736  }
2737 }
2738 
2739 // This is called via the call combiner, so access to calld is synchronized.
2741  void* arg, grpc_error_handle /*ignored*/) {
2743  static_cast<grpc_transport_stream_op_batch*>(arg);
2744  SubchannelCall* subchannel_call =
2746  // Note: This will release the call combiner.
2747  subchannel_call->StartTransportStreamOpBatch(batch);
2748 }
2749 
2750 // This is called via the call combiner, so access to calld is synchronized.
2753  size_t num_batches = 0;
2754  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2755  if (pending_batches_[i] != nullptr) ++num_batches;
2756  }
2757  gpr_log(GPR_INFO,
2758  "chand=%p lb_call=%p: starting %" PRIuPTR
2759  " pending batches on subchannel_call=%p",
2760  chand_, this, num_batches, subchannel_call_.get());
2761  }
2762  CallCombinerClosureList closures;
2763  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2765  if (batch != nullptr) {
2766  batch->handler_private.extra_arg = subchannel_call_.get();
2768  ResumePendingBatchInCallCombiner, batch,
2769  grpc_schedule_on_exec_ctx);
2771  "resuming pending batch from LB call");
2772  batch = nullptr;
2773  }
2774  }
2775  // Note: This will release the call combiner.
2776  closures.RunClosures(call_combiner_);
2777 }
2778 
2783  gpr_log(GPR_INFO,
2784  "chand=%p lb_call=%p: batch started from above: %s, "
2785  "call_attempt_tracer_=%p",
2787  call_attempt_tracer_);
2788  }
2789  // Handle call tracing.
2790  if (call_attempt_tracer_ != nullptr) {
2791  // Record send ops in tracer.
2792  if (batch->cancel_stream) {
2793  call_attempt_tracer_->RecordCancel(
2795  }
2797  call_attempt_tracer_->RecordSendInitialMetadata(
2798  batch->payload->send_initial_metadata.send_initial_metadata,
2799  batch->payload->send_initial_metadata.send_initial_metadata_flags);
2801  original_send_initial_metadata_on_complete_ = batch->on_complete;
2802  GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_,
2803  SendInitialMetadataOnComplete, this, nullptr);
2804  batch->on_complete = &send_initial_metadata_on_complete_;
2805  }
2806  if (batch->send_message) {
2807  call_attempt_tracer_->RecordSendMessage(
2808  *batch->payload->send_message.send_message);
2809  }
2811  call_attempt_tracer_->RecordSendTrailingMetadata(
2812  batch->payload->send_trailing_metadata.send_trailing_metadata);
2813  }
2814  // Intercept recv ops.
2817  batch->payload->recv_initial_metadata.recv_initial_metadata;
2819  batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
2820  GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
2821  this, nullptr);
2822  batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2824  }
2825  if (batch->recv_message) {
2826  recv_message_ = batch->payload->recv_message.recv_message;
2828  batch->payload->recv_message.recv_message_ready;
2829  GRPC_CLOSURE_INIT(&recv_message_ready_, RecvMessageReady, this, nullptr);
2830  batch->payload->recv_message.recv_message_ready = &recv_message_ready_;
2831  }
2832  }
2833  // Intercept recv_trailing_metadata even if there is no call tracer,
2834  // since we may need to notify the LB policy about trailing metadata.
2837  batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2838  transport_stream_stats_ =
2839  batch->payload->recv_trailing_metadata.collect_stats;
2841  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2842  GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
2843  this, nullptr);
2844  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2846  }
2847  // If we've already gotten a subchannel call, pass the batch down to it.
2848  // Note that once we have picked a subchannel, we do not need to acquire
2849  // the channel's data plane mutex, which is more efficient (especially for
2850  // streaming calls).
2851  if (subchannel_call_ != nullptr) {
2853  gpr_log(GPR_INFO,
2854  "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
2855  chand_, this, subchannel_call_.get());
2856  }
2857  subchannel_call_->StartTransportStreamOpBatch(batch);
2858  return;
2859  }
2860  // We do not yet have a subchannel call.
2861  //
2862  // If we've previously been cancelled, immediately fail any new batches.
2863  if (GPR_UNLIKELY(!GRPC_ERROR_IS_NONE(cancel_error_))) {
2865  gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
2866  chand_, this, grpc_error_std_string(cancel_error_).c_str());
2867  }
2868  // Note: This will release the call combiner.
2870  batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
2871  return;
2872  }
2873  // Handle cancellation.
2875  // Stash a copy of cancel_error in our call data, so that we can use
2876  // it for subsequent operations. This ensures that if the call is
2877  // cancelled before any batches are passed down (e.g., if the deadline
2878  // is in the past when the call starts), we can return the right
2879  // error to the caller when the first batch does get passed down.
2880  GRPC_ERROR_UNREF(cancel_error_);
2883  gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
2884  chand_, this, grpc_error_std_string(cancel_error_).c_str());
2885  }
2886  // Fail all pending batches.
2887  PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
2888  // Note: This will release the call combiner.
2890  batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
2891  return;
2892  }
2893  // Add the batch to the pending list.
2894  PendingBatchesAdd(batch);
2895  // For batches containing a send_initial_metadata op, acquire the
2896  // channel's data plane mutex to pick a subchannel.
2899  gpr_log(GPR_INFO,
2900  "chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
2901  chand_, this);
2902  }
2903  PickSubchannel(this, GRPC_ERROR_NONE);
2904  } else {
2905  // For all other batches, release the call combiner.
2907  gpr_log(GPR_INFO,
2908  "chand=%p lb_call=%p: saved batch, yielding call combiner",
2909  chand_, this);
2910  }
2912  "batch does not include send_initial_metadata");
2913  }
2914 }
2915 
2917  void* arg, grpc_error_handle error) {
2918  auto* self = static_cast<LoadBalancedCall*>(arg);
2920  gpr_log(GPR_INFO,
2921  "chand=%p lb_call=%p: got on_complete for send_initial_metadata: "
2922  "error=%s",
2923  self->chand_, self, grpc_error_std_string(error).c_str());
2924  }
2925  self->call_attempt_tracer_->RecordOnDoneSendInitialMetadata(
2926  self->peer_string_);
2928  self->original_send_initial_metadata_on_complete_,
2930 }
2931 
2933  void* arg, grpc_error_handle error) {
2934  auto* self = static_cast<LoadBalancedCall*>(arg);
2936  gpr_log(GPR_INFO,
2937  "chand=%p lb_call=%p: got recv_initial_metadata_ready: error=%s",
2938  self->chand_, self, grpc_error_std_string(error).c_str());
2939  }
2940  if (GRPC_ERROR_IS_NONE(error)) {
2941  // recv_initial_metadata_flags is not populated for clients
2942  self->call_attempt_tracer_->RecordReceivedInitialMetadata(
2943  self->recv_initial_metadata_, 0 /* recv_initial_metadata_flags */);
2944  }
2945  Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
2947 }
2948 
2950  void* arg, grpc_error_handle error) {
2951  auto* self = static_cast<LoadBalancedCall*>(arg);
2953  gpr_log(GPR_INFO, "chand=%p lb_call=%p: got recv_message_ready: error=%s",
2954  self->chand_, self, grpc_error_std_string(error).c_str());
2955  }
2956  if (self->recv_message_->has_value()) {
2957  self->call_attempt_tracer_->RecordReceivedMessage(**self->recv_message_);
2958  }
2959  Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_,
2961 }
2962 
2964  void* arg, grpc_error_handle error) {
2965  auto* self = static_cast<LoadBalancedCall*>(arg);
2967  gpr_log(GPR_INFO,
2968  "chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s "
2969  "call_attempt_tracer_=%p lb_subchannel_call_tracker_=%p "
2970  "failure_error_=%s",
2971  self->chand_, self, grpc_error_std_string(error).c_str(),
2972  self->call_attempt_tracer_, self->lb_subchannel_call_tracker_.get(),
2973  grpc_error_std_string(self->failure_error_).c_str());
2974  }
2975  // Check if we have a tracer or an LB callback to invoke.
2976  if (self->call_attempt_tracer_ != nullptr ||
2977  self->lb_subchannel_call_tracker_ != nullptr) {
2978  // Get the call's status.
2980  if (!GRPC_ERROR_IS_NONE(error)) {
2981  // Get status from error.
2984  grpc_error_get_status(error, self->deadline_, &code, &message,
2985  /*http_error=*/nullptr, /*error_string=*/nullptr);
2986  status = absl::Status(static_cast<absl::StatusCode>(code), message);
2987  } else {
2988  // Get status from headers.
2989  const auto& md = *self->recv_trailing_metadata_;
2991  md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
2992  if (code != GRPC_STATUS_OK) {
2994  if (const auto* grpc_message = md.get_pointer(GrpcMessageMetadata())) {
2995  message = grpc_message->as_string_view();
2996  }
2997  status = absl::Status(static_cast<absl::StatusCode>(code), message);
2998  }
2999  }
3000  self->RecordCallCompletion(status);
3001  }
3002  // Chain to original callback.
3003  if (!GRPC_ERROR_IS_NONE(self->failure_error_)) {
3004  error = self->failure_error_;
3005  self->failure_error_ = GRPC_ERROR_NONE;
3006  } else {
3008  }
3009  Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
3010  error);
3011 }
3012 
3014  absl::Status status) {
3015  // If we have a tracer, notify it.
3016  if (call_attempt_tracer_ != nullptr) {
3017  call_attempt_tracer_->RecordReceivedTrailingMetadata(
3018  status, recv_trailing_metadata_, transport_stream_stats_);
3019  }
3020  // If the LB policy requested a callback for trailing metadata, invoke
3021  // the callback.
3022  if (lb_subchannel_call_tracker_ != nullptr) {
3024  BackendMetricAccessor backend_metric_accessor(this);
3026  status, &trailing_metadata, &backend_metric_accessor};
3027  lb_subchannel_call_tracker_->Finish(args);
3028  lb_subchannel_call_tracker_.reset();
3029  }
3030 }
3031 
3033  SubchannelCall::Args call_args = {
3034  std::move(connected_subchannel_), pollent_, path_.Ref(), /*start_time=*/0,
3035  deadline_, arena_,
3036  // TODO(roth): When we implement hedging support, we will probably
3037  // need to use a separate call context for each subchannel call.
3040  subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3042  gpr_log(GPR_INFO,
3043  "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
3044  this, subchannel_call_.get(), grpc_error_std_string(error).c_str());
3045  }
3046  if (on_call_destruction_complete_ != nullptr) {
3047  subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
3048  on_call_destruction_complete_ = nullptr;
3049  }
3051  PendingBatchesFail(error, YieldCallCombiner);
3052  } else {
3053  PendingBatchesResume();
3054  }
3055 }
3056 
3057 // A class to handle the call combiner cancellation callback for a
3058 // queued pick.
3059 // TODO(roth): When we implement hedging support, we won't be able to
3060 // register a call combiner cancellation closure for each LB pick,
3061 // because there may be multiple LB picks happening in parallel.
3062 // Instead, we will probably need to maintain a list in the CallData
3063 // object of pending LB picks to be cancelled when the closure runs.
3065  public:
3067  : lb_call_(std::move(lb_call)) {
3068  GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
3069  GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
3070  lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
3071  }
3072 
3073  private:
3074  static void CancelLocked(void* arg, grpc_error_handle error) {
3075  auto* self = static_cast<LbQueuedCallCanceller*>(arg);
3076  auto* lb_call = self->lb_call_.get();
3077  auto* chand = lb_call->chand_;
3078  {
3079  MutexLock lock(&chand->data_plane_mu_);
3081  gpr_log(GPR_INFO,
3082  "chand=%p lb_call=%p: cancelling queued pick: "
3083  "error=%s self=%p calld->pick_canceller=%p",
3084  chand, lb_call, grpc_error_std_string(error).c_str(), self,
3085  lb_call->lb_call_canceller_);
3086  }
3087  if (lb_call->lb_call_canceller_ == self && !GRPC_ERROR_IS_NONE(error)) {
3088  lb_call->call_dispatch_controller_->Commit();
3089  // Remove pick from list of queued picks.
3090  lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
3091  // Fail pending batches on the call.
3092  lb_call->PendingBatchesFail(GRPC_ERROR_REF(error),
3093  YieldCallCombinerIfPendingBatchesFound);
3094  }
3095  }
3096  GRPC_CALL_STACK_UNREF(lb_call->owning_call_, "LbQueuedCallCanceller");
3097  delete self;
3098  }
3099 
3102 };
3103 
3104 void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
3105  if (!queued_pending_lb_pick_) return;
3107  gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
3108  chand_, this);
3109  }
3110  chand_->RemoveLbQueuedCall(&queued_call_, pollent_);
3111  queued_pending_lb_pick_ = false;
3112  // Lame the call combiner canceller.
3113  lb_call_canceller_ = nullptr;
3114 }
3115 
3116 void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
3117  if (queued_pending_lb_pick_) return;
3119  gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
3120  chand_, this);
3121  }
3122  queued_pending_lb_pick_ = true;
3123  queued_call_.lb_call = this;
3124  chand_->AddLbQueuedCall(&queued_call_, pollent_);
3125  // Register call combiner cancellation callback.
3126  lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
3127 }
3128 
3130  // TODO(roth): Does this callback need to hold a ref to LoadBalancedCall?
3131  GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
3132  ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
3133 }
3134 
3137  auto* self = static_cast<LoadBalancedCall*>(arg);
3138  if (!GRPC_ERROR_IS_NONE(error)) {
3140  gpr_log(GPR_INFO,
3141  "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
3142  self->chand_, self, grpc_error_std_string(error).c_str());
3143  }
3144  self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner);
3145  return;
3146  }
3147  self->call_dispatch_controller_->Commit();
3148  self->CreateSubchannelCall();
3149 }
3150 
3153  auto* self = static_cast<LoadBalancedCall*>(arg);
3154  bool pick_complete;
3155  {
3156  MutexLock lock(&self->chand_->data_plane_mu_);
3157  pick_complete = self->PickSubchannelLocked(&error);
3158  }
3159  if (pick_complete) {
3160  PickDone(self, error);
3162  }
3163 }
3164 
3165 bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
3167  GPR_ASSERT(connected_subchannel_ == nullptr);
3168  GPR_ASSERT(subchannel_call_ == nullptr);
3169  // Grab initial metadata.
3170  auto& send_initial_metadata =
3171  pending_batches_[0]->payload->send_initial_metadata;
3172  grpc_metadata_batch* initial_metadata_batch =
3173  send_initial_metadata.send_initial_metadata;
3174  const uint32_t send_initial_metadata_flags =
3175  send_initial_metadata.send_initial_metadata_flags;
3176  // Perform LB pick.
3178  pick_args.path = path_.as_string_view();
3179  LbCallState lb_call_state(this);
3180  pick_args.call_state = &lb_call_state;
3181  Metadata initial_metadata(initial_metadata_batch);
3182  pick_args.initial_metadata = &initial_metadata;
3183  auto result = chand_->picker_->Pick(pick_args);
3184  return HandlePickResult<bool>(
3185  &result,
3186  // CompletePick
3187  [this](LoadBalancingPolicy::PickResult::Complete* complete_pick)
3190  gpr_log(GPR_INFO,
3191  "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
3192  chand_, this, complete_pick->subchannel.get());
3193  }
3194  GPR_ASSERT(complete_pick->subchannel != nullptr);
3195  // Grab a ref to the connected subchannel while we're still
3196  // holding the data plane mutex.
3197  SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
3198  complete_pick->subchannel.get());
3199  connected_subchannel_ = subchannel->connected_subchannel();
3200  // If the subchannel has no connected subchannel (e.g., if the
3201  // subchannel has moved out of state READY but the LB policy hasn't
3202  // yet seen that change and given us a new picker), then just
3203  // queue the pick. We'll try again as soon as we get a new picker.
3204  if (connected_subchannel_ == nullptr) {
3206  gpr_log(GPR_INFO,
3207  "chand=%p lb_call=%p: subchannel returned by LB picker "
3208  "has no connected subchannel; queueing pick",
3209  chand_, this);
3210  }
3211  MaybeAddCallToLbQueuedCallsLocked();
3212  return false;
3213  }
3214  lb_subchannel_call_tracker_ =
3215  std::move(complete_pick->subchannel_call_tracker);
3216  if (lb_subchannel_call_tracker_ != nullptr) {
3217  lb_subchannel_call_tracker_->Start();
3218  }
3219  MaybeRemoveCallFromLbQueuedCallsLocked();
3220  return true;
3221  },
3222  // QueuePick
3223  [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/)
3226  gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_,
3227  this);
3228  }
3229  MaybeAddCallToLbQueuedCallsLocked();
3230  return false;
3231  },
3232  // FailPick
3233  [this, send_initial_metadata_flags,
3237  gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s",
3238  chand_, this, fail_pick->status.ToString().c_str());
3239  }
3240  // If wait_for_ready is false, then the error indicates the RPC
3241  // attempt's final status.
3242  if ((send_initial_metadata_flags &
3244  grpc_error_handle lb_error =
3245  absl_status_to_grpc_error(fail_pick->status);
3247  "Failed to pick subchannel", &lb_error, 1);
3248  GRPC_ERROR_UNREF(lb_error);
3249  MaybeRemoveCallFromLbQueuedCallsLocked();
3250  return true;
3251  }
3252  // If wait_for_ready is true, then queue to retry when we get a new
3253  // picker.
3254  MaybeAddCallToLbQueuedCallsLocked();
3255  return false;
3256  },
3257  // DropPick
3258  [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick)
3261  gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s",
3262  chand_, this, drop_pick->status.ToString().c_str());
3263  }
3264  *error =
3265  grpc_error_set_int(absl_status_to_grpc_error(drop_pick->status),
3267  MaybeRemoveCallFromLbQueuedCallsLocked();
3268  return true;
3269  });
3270 }
3271 
3272 } // namespace grpc_core
grpc_core::ClientChannel::ClientChannelControlHelper::ClientChannelControlHelper
ClientChannelControlHelper(ClientChannel *chand)
Definition: client_channel.cc:872
grpc_core::ClientChannel::ClientChannelControlHelper::AddTraceEvent
void AddTraceEvent(TraceSeverity severity, absl::string_view message) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:984
grpc_core::ClientChannel::LoadBalancedCall::chand_
void MaybeRemoveCallFromLbQueuedCallsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel void MaybeAddCallToLbQueuedCallsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel ClientChannel * chand_
Definition: client_channel.h:481
grpc_core::ClientChannel::GetFromChannel
static ClientChannel * GetFromChannel(Channel *channel)
Definition: client_channel.cc:1009
grpc_core::Json::Array
std::vector< Json > Array
Definition: src/core/lib/json/json.h:55
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_arg
Definition: grpc_types.h:103
grpc_core::CallCombinerClosureList
Definition: call_combiner.h:144
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::WatcherWrapper
WatcherWrapper(std::unique_ptr< SubchannelInterface::ConnectivityStateWatcherInterface > watcher, RefCountedPtr< SubchannelWrapper > parent)
Definition: client_channel.cc:573
trace.h
grpc_core::Resolver::Result::args
const grpc_channel_args * args
Definition: resolver/resolver.h:70
grpc_core::Channel
Definition: src/core/lib/surface/channel.h:108
grpc_core::LoadBalancingPolicy::PickArgs::initial_metadata
MetadataInterface * initial_metadata
Definition: lb_policy.h:148
grpc_channel_args_find_string
char * grpc_channel_args_find_string(const grpc_channel_args *args, const char *name)
Definition: channel_args.cc:441
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
MAX_PENDING_BATCHES
#define MAX_PENDING_BATCHES
Definition: client_channel.h:105
grpc_core::ClientChannel::default_authority_
std::string default_authority_
Definition: client_channel.h:302
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::ClientChannel::ExternalConnectivityWatcher::Cancel
void Cancel()
Definition: client_channel.cc:776
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor::BackendMetricAccessor
BackendMetricAccessor(LoadBalancedCall *lb_call)
Definition: client_channel.cc:2556
grpc_core::LoadBalancingPolicy::BackendMetricAccessor
Definition: lb_policy.h:157
grpc_core::ClientChannel::~ClientChannel
~ClientChannel()
Definition: client_channel.cc:1137
grpc_core::ClientChannel::RemoveResolverQueuedCall
void RemoveResolverQueuedCall(ResolverQueuedCall *to_remove, grpc_polling_entity *pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_)
Definition: client_channel.cc:1446
grpc_core::LoadBalancingPolicy::ChannelControlHelper::TraceSeverity
TraceSeverity
Adds a trace message associated with the channel.
Definition: lb_policy.h:297
grpc_core::ClientChannel::LoadBalancedCall::LoadBalancedCall
LoadBalancedCall(ClientChannel *chand, const grpc_call_element_args &args, grpc_polling_entity *pollent, grpc_closure *on_call_destruction_complete, ConfigSelector::CallDispatchController *call_dispatch_controller, bool is_transparent_retry)
Definition: client_channel.cc:2608
grpc_core::LoadBalancingPolicy::UpdateArgs::args
const grpc_channel_args * args
Definition: lb_policy.h:330
grpc_core::LoadBalancingPolicy::UpdateArgs::resolution_note
std::string resolution_note
Definition: lb_policy.h:326
grpc_core::CallCombiner
Definition: call_combiner.h:50
grpc_core::GrpcTimeoutMetadata
Definition: metadata_batch.h:59
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
grpc_core::LoadBalancingPolicy::PickResult::Complete::subchannel
RefCountedPtr< SubchannelInterface > subchannel
The subchannel to be used for the call. Must be non-null.
Definition: lb_policy.h:192
gen_build_yaml.out
dictionary out
Definition: src/benchmark/gen_build_yaml.py:24
grpc_core::ClientChannel::ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap
static void RemoveWatcherFromExternalWatchersMap(ClientChannel *chand, grpc_closure *on_complete, bool cancel)
Definition: client_channel.cc:734
grpc_core::ClientChannel::uri_to_resolve_
std::string uri_to_resolve_
Definition: client_channel.h:301
grpc_core::ClientChannel::ClientChannelControlHelper::UpdateState
void UpdateState(grpc_connectivity_state state, const absl::Status &status, std::unique_ptr< LoadBalancingPolicy::SubchannelPicker > picker) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:951
grpc_core::Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange::status
absl::Status status
Definition: subchannel.h:173
grpc_core::ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch *batch)
Definition: client_channel.cc:2779
regen-readme.it
it
Definition: regen-readme.py:15
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
grpc_core::ClientChannel::LoadBalancedCall::PendingBatchesAdd
void PendingBatchesAdd(grpc_transport_stream_op_batch *batch)
Definition: client_channel.cc:2680
grpc_core::ConnectivityStateWatcherInterface
Definition: src/core/lib/transport/connectivity_state.h:49
core_configuration.h
bloat_diff.severity
def severity
Definition: bloat_diff.py:143
metadata_batch.h
grpc_core::ClientChannel::CallData::cancel_error_
grpc_error_handle cancel_error_
Definition: client_channel.cc:240
grpc_slice_ref_internal
const grpc_slice & grpc_slice_ref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:32
grpc_core::ClientChannel::CallData::StartTransportStreamOpBatch
static void StartTransportStreamOpBatch(grpc_call_element *elem, grpc_transport_stream_op_batch *batch)
Definition: client_channel.cc:1929
grpc_core::channelz::ChannelNode::AddChildSubchannel
void AddChildSubchannel(intptr_t child_uuid)
Definition: src/core/lib/channel/channelz.cc:241
bool
bool
Definition: setup_once.h:312
grpc_core::ClientChannel::CallData::ResolverQueuedCallCanceller::ResolverQueuedCallCanceller
ResolverQueuedCallCanceller(grpc_call_element *elem)
Definition: client_channel.cc:2162
grpc_core::LoadBalancingPolicy::UpdateArgs
Definition: lb_policy.h:315
grpc_core::ClientChannel::CallData::~CallData
~CallData()
Definition: client_channel.cc:1898
grpc_core::ClientChannel::CallData::YieldCallCombiner
static bool YieldCallCombiner(const CallCombinerClosureList &)
Definition: client_channel.cc:151
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor::BackendMetricAllocator
Definition: client_channel.cc:2573
grpc_core::GrpcLbClientStats
Definition: grpclb_client_stats.h:40
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
chand_
ClientChannel * chand_
Definition: client_channel.cc:321
grpc_core::ClientChannel::channel_args_
const grpc_channel_args * channel_args_
Definition: client_channel.h:299
absl::InlinedVector::emplace_back
reference emplace_back(Args &&... args)
Definition: abseil-cpp/absl/container/inlined_vector.h:675
connectivity_state.h
grpc_core::LoadBalancingPolicy::Args::args
const grpc_channel_args * args
Channel args.
Definition: lb_policy.h:355
grpc_core::ClientChannel::CallData::PendingBatchesFail
void PendingBatchesFail(grpc_call_element *elem, grpc_error_handle error, YieldCallCombinerPredicate yield_call_combiner_predicate)
Definition: client_channel.cc:2077
timers.h
client_channel_channelz.h
polling_entity.h
grpc_deadline_checking_enabled
bool grpc_deadline_checking_enabled(const grpc_channel_args *channel_args)
Definition: deadline_filter.cc:377
Timestamp
Definition: bloaty/third_party/protobuf/src/google/protobuf/timestamp.pb.h:69
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor::GetBackendMetricData
const BackendMetricData * GetBackendMetricData() override
Definition: client_channel.cc:2559
grpc_transport_stream_op_batch::recv_message
bool recv_message
Definition: transport.h:322
grpc_core::RefCountedPtr::get
T * get() const
Definition: ref_counted_ptr.h:146
grpc_core::SubchannelCall::StartTransportStreamOpBatch
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch *batch)
Definition: subchannel.cc:178
grpc_core::ClientChannel::work_serializer_
std::shared_ptr< WorkSerializer > work_serializer_
Definition: client_channel.h:336
grpc_channel_args_copy_and_remove
grpc_channel_args * grpc_channel_args_copy_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove)
Definition: channel_args.cc:231
grpc_handler_private_op_data::closure
grpc_closure closure
Definition: transport.h:275
grpc_core::ClientChannel::LoadBalancedCall::Metadata::TestOnlyCopyToVector
std::vector< std::pair< std::string, std::string > > TestOnlyCopyToVector() override
Produce a vector of metadata key/value strings for tests.
Definition: client_channel.cc:2488
slice.h
grpc_core::ClientChannel::AddConnectivityWatcher
void AddConnectivityWatcher(grpc_connectivity_state initial_state, OrphanablePtr< AsyncConnectivityStateWatcherInterface > watcher)
Definition: client_channel.cc:1864
GRPC_CONTEXT_CALL_TRACER
@ GRPC_CONTEXT_CALL_TRACER
Value is a CallTracer object.
Definition: core/lib/channel/context.h:40
grpc_core::ClientChannel::client_channel_factory_
ClientChannelFactory * client_channel_factory_
Definition: client_channel.h:298
grpc_core::SubchannelPoolInterface::CreateChannelArg
static grpc_arg CreateChannelArg(SubchannelPoolInterface *subchannel_pool)
Definition: subchannel_pool_interface.cc:121
grpc_core::HttpMethodMetadata
Definition: metadata_batch.h:136
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
GRPC_ERROR_INT_LB_POLICY_DROP
@ GRPC_ERROR_INT_LB_POLICY_DROP
LB policy drop.
Definition: error.h:97
subchannel.h
grpc_core::ClientChannel::CallData::SetPollent
static void SetPollent(grpc_call_element *elem, grpc_polling_entity *pollent)
Definition: client_channel.cc:2026
grpc_core::ClientChannel::Init
static grpc_error_handle Init(grpc_channel_element *elem, grpc_channel_element_args *args)
Definition: client_channel.cc:1016
grpc_core::BackendMetricData
Definition: backend_metric_data.h:31
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::kKeepaliveThrottlingKey
constexpr const char * kKeepaliveThrottlingKey
Definition: transport.h:598
grpc_core::ClientChannel::ClientChannelControlHelper
Definition: client_channel.cc:869
grpc_core::channelz::ChannelTrace::Error
@ Error
Definition: channel_trace.h:55
grpc_transport_stream_op_batch::on_complete
grpc_closure * on_complete
Definition: transport.h:304
grpc_core::Slice
Definition: src/core/lib/slice/slice.h:282
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
absl::CancelledError
Status CancelledError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:331
grpc_core::WorkSerializer
Definition: work_serializer.h:51
grpc_core::HttpPathMetadata
Definition: metadata_batch.h:262
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: impl/codegen/port_platform.h:769
grpc_core::ClientChannel::ExternalConnectivityWatcher::AddWatcherLocked
void AddWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:791
grpc_core::ClientChannel::LbQueuedCall
Definition: client_channel.h:220
grpc_core::ClientChannel::OnResolverErrorLocked
void OnResolverErrorLocked(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1348
grpc_core::ClientChannel::CallData::NoYieldCallCombiner
static bool NoYieldCallCombiner(const CallCombinerClosureList &)
Definition: client_channel.cc:154
grpc_channel_args_find_pointer
T * grpc_channel_args_find_pointer(const grpc_channel_args *args, const char *name)
Definition: channel_args.h:342
string.h
grpc_core::slice_detail::StaticConstructors< Slice >::FromStaticString
static Slice FromStaticString(const char *s)
Definition: src/core/lib/slice/slice.h:201
ABSL_TS_UNCHECKED_READ
#define ABSL_TS_UNCHECKED_READ(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:312
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE
@ GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE
channel connectivity state associated with the error
Definition: error.h:94
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_core::LoadBalancingPolicy::PickArgs::call_state
CallState * call_state
Definition: lb_policy.h:151
grpc_core::ServiceConfigCallData
Definition: service_config_call_data.h:41
grpc_pollset_set_create
grpc_pollset_set * grpc_pollset_set_create()
Definition: pollset_set.cc:29
grpc_core::ClientChannel::SubchannelWrapper::connected_subchannel
RefCountedPtr< ConnectedSubchannel > connected_subchannel() const
Definition: client_channel.cc:531
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
Arena
Definition: arena.c:39
grpc_core::ClientChannel::SubchannelWrapper::RequestConnection
void RequestConnection() override
Definition: client_channel.cc:535
GRPC_CHANNEL_STACK_UNREF
#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason)
Definition: channel_stack.h:299
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
grpc_core::LoadBalancingPolicy::PickArgs::path
absl::string_view path
The path of the call. Indicates the RPC service and method name.
Definition: lb_policy.h:143
grpc_transport_stream_op_batch_finish_with_failure
void grpc_transport_stream_op_batch_finish_with_failure(grpc_transport_stream_op_batch *batch, grpc_error_handle error, grpc_core::CallCombiner *call_combiner)
Definition: transport.cc:151
grpc_core::internal::ClientChannelGlobalParsedConfig::parsed_lb_config
RefCountedPtr< LoadBalancingPolicy::Config > parsed_lb_config() const
Definition: resolver_result_parsing.h:55
subchannel
RingHashSubchannelData * subchannel
Definition: ring_hash.cc:285
grpc_channel_element
Definition: channel_stack.h:186
grpc_core::ClientChannel::ClientChannelControlHelper::~ClientChannelControlHelper
~ClientChannelControlHelper() override
Definition: client_channel.cc:876
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
grpc_core::GrpcMessageMetadata
Definition: metadata_batch.h:220
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_core::ProxyMapperRegistry::MapName
static bool MapName(const char *server_uri, const grpc_channel_args *args, char **name_to_resolve, grpc_channel_args **new_args)
Definition: proxy_mapper_registry.cc:64
grpc_message
absl::optional< grpc_core::SliceBuffer > grpc_message
Definition: binder_transport_test.cc:310
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
client_channel.h
grpc_core::ClientChannel::ExternalConnectivityWatcher::RemoveWatcherLocked
void RemoveWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:798
grpc_core::ServerAddress
Definition: server_address.h:49
arg::value
void * value
Definition: cmdline.cc:44
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper
Definition: client_channel.cc:571
grpc_core::ClientChannel::ResolverQueuedCall::next
ResolverQueuedCall * next
Definition: client_channel.h:218
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::~WatcherWrapper
~WatcherWrapper() override
Definition: client_channel.cc:579
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc_core::ClientChannel::CallData::PendingBatchesAdd
void PendingBatchesAdd(grpc_call_element *elem, grpc_transport_stream_op_batch *batch)
Definition: client_channel.cc:2051
connected_subchannel_
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
Definition: oob_backend_metric.cc:113
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
config_selector.h
grpc_channel_args_copy_and_add_and_remove
grpc_channel_args * grpc_channel_args_copy_and_add_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:246
phony_filter::GetChannelInfo
void GetChannelInfo(grpc_channel_element *, const grpc_channel_info *)
Definition: bm_call_create.cc:391
grpc_core::ClientChannel::ResolverQueuedCall
Definition: client_channel.h:216
closure_
grpc_closure closure_
Definition: channel_connectivity.cc:164
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(desc, errs, count)
Definition: error.h:307
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Encoder::Encode
void Encode(HttpMethodMetadata, const typename HttpMethodMetadata::ValueType &)
Definition: client_channel.cc:2520
status
absl::Status status
Definition: rls.cc:251
grpc_handler_private_op_data::extra_arg
void * extra_arg
Definition: transport.h:274
google::protobuf::python::cmessage::Init
static int Init(CMessage *self, PyObject *args, PyObject *kwargs)
Definition: bloaty/third_party/protobuf/python/google/protobuf/pyext/message.cc:1287
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::replacement_
WatcherWrapper * replacement_
Definition: client_channel.cc:675
GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL
#define GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL
Definition: grpc_types.h:443
grpc_transport_stream_op_batch_payload::send_initial_metadata
grpc_metadata_batch * send_initial_metadata
Definition: transport.h:346
grpc_core::ClientChannel::SubchannelWrapper::AddDataWatcher
void AddDataWatcher(std::unique_ptr< DataWatcherInterface > watcher) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:539
grpc_core::ClientChannel::ClientChannelControlHelper::GetAuthority
absl::string_view GetAuthority() override
Returns the channel authority.
Definition: client_channel.cc:980
grpc_core::ClientChannel::LoadBalancedCall::LbQueuedCallCanceller
Definition: client_channel.cc:3064
grpc_core::CallTracer
Definition: call_tracer.h:41
grpc_channel_arg_string_create
grpc_arg grpc_channel_arg_string_create(char *name, char *value)
Definition: channel_args.cc:476
check_documentation.path
path
Definition: check_documentation.py:57
grpc_core::ClientChannel::CallData::call_start_time_
gpr_cycle_counter call_start_time_
Definition: client_channel.cc:205
run_xds_tests.server_uri
string server_uri
Definition: run_xds_tests.py:3320
grpc_core::Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange
ConnectivityStateChange PopConnectivityStateChange()
Definition: subchannel.cc:574
grpc_core::ClientChannel::ExternalConnectivityWatcher::Notify
void Notify(grpc_connectivity_state state, const absl::Status &) override
Definition: client_channel.cc:751
xds_manager.p
p
Definition: xds_manager.py:60
GRPC_CLOSURE_CREATE
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
Definition: closure.h:160
start_time
static int64_t start_time
Definition: benchmark-getaddrinfo.c:37
GRPC_ERROR_CANCELLED
#define GRPC_ERROR_CANCELLED
Definition: error.h:238
grpc_core::ClientChannel::ConnectivityWatcherAdder::AddWatcherLocked
void AddWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:823
grpc_call_element
Definition: channel_stack.h:194
grpc_core::ClientChannel::ResolverResultHandler
Definition: client_channel.cc:421
grpc_pollset_set_del_pollset_set
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:52
grpc_core::LoadBalancingPolicy::PickResult::Complete
A successful pick.
Definition: lb_policy.h:190
grpc_core::ClientChannel::LoadBalancedCall::AsyncPickDone
bool PickSubchannelLocked(grpc_error_handle *error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi AsyncPickDone)(grpc_error_handle error)
Definition: client_channel.h:427
grpc_core::internal::ClientChannelGlobalParsedConfig::parsed_deprecated_lb_policy
const std::string & parsed_deprecated_lb_policy() const
Definition: resolver_result_parsing.h:59
grpc_core::Arena
Definition: src/core/lib/resource_quota/arena.h:45
grpc_channel_args_find_bool
bool grpc_channel_args_find_bool(const grpc_channel_args *args, const char *name, bool default_value)
Definition: channel_args.cc:465
grpc_core::ClientChannel::CreateLbPolicyLocked
OrphanablePtr< LoadBalancingPolicy > CreateLbPolicyLocked(const grpc_channel_args &args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1417
grpc_arg_pointer_vtable
Definition: grpc_types.h:85
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_core::pending
P< T > pending()
Definition: try_join_test.cc:50
subchannel_interface.h
lb_call_
OrphanablePtr< ClientChannel::LoadBalancedCall > lb_call_
Definition: client_channel.cc:396
grpc_core::ClientChannel::AddResolverQueuedCall
void AddResolverQueuedCall(ResolverQueuedCall *call, grpc_polling_entity *pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_)
Definition: client_channel.cc:1436
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
grpc_core::DynamicFilters::Create
static RefCountedPtr< DynamicFilters > Create(const grpc_channel_args *args, std::vector< const grpc_channel_filter * > filters)
Definition: dynamic_filters.cc:169
T
#define T(upbtypeconst, upbtype, ctype, default_value)
kFilterVtable
static const grpc_channel_filter kFilterVtable
Definition: client_channel.cc:293
GRPC_ARG_HEALTH_CHECK_SERVICE_NAME
#define GRPC_ARG_HEALTH_CHECK_SERVICE_NAME
Definition: client_channel.cc:95
done_
std::atomic< bool > done_
Definition: fuzzing_event_engine_test.cc:57
grpc_core::ClientChannel::default_service_config_
RefCountedPtr< ServiceConfig > default_service_config_
Definition: client_channel.h:300
grpc_status._async.code
code
Definition: grpcio_status/grpc_status/_async.py:34
GRPC_ARG_CLIENT_CHANNEL
#define GRPC_ARG_CLIENT_CHANNEL
Definition: client_channel.h:92
grpc_transport_op
Definition: transport.h:452
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
grpc_core::ClientChannel::service_config_parser_index_
const size_t service_config_parser_index_
Definition: client_channel.h:305
call
FilterStackCall * call
Definition: call.cc:750
grpc_core::CallCombinerClosureList::size
size_t size() const
Definition: call_combiner.h:195
grpc_core::ClientChannel::CallData::original_recv_trailing_metadata_ready_
grpc_closure * original_recv_trailing_metadata_ready_
Definition: client_channel.cc:226
grpc_core::ServiceConfigCallData::call_attributes
const CallAttributes & call_attributes() const
Definition: service_config_call_data.h:66
GRPC_ARG_CONFIG_SELECTOR
#define GRPC_ARG_CONFIG_SELECTOR
Definition: config_selector.h:42
grpc_core::CallTracer::CallAttemptTracer
Definition: call_tracer.h:47
status.h
GRPC_ARG_CHANNELZ_CHANNEL_NODE
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE
Definition: channelz.h:49
grpc_client_channel_stop_backup_polling
void grpc_client_channel_stop_backup_polling(grpc_pollset_set *interested_parties)
Definition: backup_poller.cc:181
grpc_core::channelz::ChannelTrace::Warning
@ Warning
Definition: channel_trace.h:54
grpc_core::ClientChannel::LoadBalancedCall::RecvMessageReady
static void RecvMessageReady(void *arg, grpc_error_handle error)
Definition: client_channel.cc:2949
grpc_core::ClientChannel::CheckConnectivityState
grpc_connectivity_state CheckConnectivityState(bool try_to_connect)
Definition: client_channel.cc:1848
grpc_core::LoadBalancingPolicy::PickResult
The result of picking a subchannel for a call.
Definition: lb_policy.h:188
grpc_core::ClientChannel::ResolverResultHandler::ResolverResultHandler
ResolverResultHandler(ClientChannel *chand)
Definition: client_channel.cc:423
global_subchannel_pool.h
grpc_core::ClientChannel::CallData::path_
grpc_slice path_
Definition: client_channel.cc:204
grpc_core::ConfigSelector::GetFromChannelArgs
static RefCountedPtr< ConfigSelector > GetFromChannelArgs(const grpc_channel_args &args)
Definition: config_selector.cc:52
grpc_core::channelz::ChannelNode::SetConnectivityState
void SetConnectivityState(grpc_connectivity_state state)
Definition: src/core/lib/channel/channelz.cc:225
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Encoder::out_
std::vector< std::pair< std::string, std::string > > out_
Definition: client_channel.cc:2528
grpc_core::ClientChannel::CallData::FailPendingBatchInCallCombiner
static void FailPendingBatchInCallCombiner(void *arg, grpc_error_handle error)
Definition: client_channel.cc:2066
grpc_core::ClientChannel::UpdateStateAndPickerLocked
void UpdateStateAndPickerLocked(grpc_connectivity_state state, const absl::Status &status, const char *reason, std::unique_ptr< LoadBalancingPolicy::SubchannelPicker > picker) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1599
grpc_core::ClientChannel::CallData::YieldCallCombinerPredicate
bool(* YieldCallCombinerPredicate)(const CallCombinerClosureList &closures)
Definition: client_channel.cc:149
grpc_core::ClientChannel::LoadBalancedCall::PickDone
static void PickDone(void *arg, grpc_error_handle error)
Definition: client_channel.cc:3135
grpc_core::ClientChannel::CallData::resolution_done_closure_
grpc_closure resolution_done_closure_
Definition: client_channel.cc:214
GRPC_ARG_ENABLE_RETRIES
#define GRPC_ARG_ENABLE_RETRIES
Definition: grpc_types.h:396
grpc_core::LoadBalancingPolicy::UpdateArgs::config
RefCountedPtr< Config > config
The LB policy config.
Definition: lb_policy.h:320
grpc_core::ClientChannel::ExternalConnectivityWatcher::chand_
ClientChannel * chand_
Definition: client_channel.h:207
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
absl::SimpleAtoi
ABSL_NAMESPACE_BEGIN ABSL_MUST_USE_RESULT bool SimpleAtoi(absl::string_view str, int_type *out)
Definition: abseil-cpp/absl/strings/numbers.h:271
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::ApplyUpdateInControlPlaneWorkSerializer
void ApplyUpdateInControlPlaneWorkSerializer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_ -> chand_->work_serializer_)
Definition: client_channel.cc:620
string_util.h
grpc_core::ClientChannelFactory
Definition: client_channel_factory.h:32
GRPC_CALL_STACK_UNREF
#define GRPC_CALL_STACK_UNREF(call_stack, reason)
Definition: channel_stack.h:295
GRPC_ARG_DEFAULT_AUTHORITY
#define GRPC_ARG_DEFAULT_AUTHORITY
Definition: grpc_types.h:251
grpc_core::ClientChannel::CallData::CheckResolution
static void CheckResolution(void *arg, grpc_error_handle error)
Definition: client_channel.cc:2340
grpc_transport_stream_op_batch::cancel_stream
bool cancel_stream
Definition: transport.h:329
grpc_core::LoadBalancingPolicy::PickResult::Complete::subchannel_call_tracker
std::unique_ptr< SubchannelCallTrackerInterface > subchannel_call_tracker
Definition: lb_policy.h:198
grpc_core::DynamicFilters::CreateCall
RefCountedPtr< Call > CreateCall(Call::Args args, grpc_error_handle *error)
Definition: dynamic_filters.cc:193
grpc_core::ClientChannel::AddLbQueuedCall
void AddLbQueuedCall(LbQueuedCall *call, grpc_polling_entity *pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_)
Definition: client_channel.cc:1815
grpc_core::ClientChannel::LoadBalancedCall
Definition: client_channel.h:384
grpc_transport_stream_op_batch::handler_private
grpc_handler_private_op_data handler_private
Definition: transport.h:338
grpc_core::ClientChannel::CallData::ResolverQueuedCallCanceller::CancelLocked
static void CancelLocked(void *arg, grpc_error_handle error)
Definition: client_channel.cc:2171
grpc_channel_stack_last_element
grpc_channel_element * grpc_channel_stack_last_element(grpc_channel_stack *channel_stack)
Definition: channel_stack.cc:83
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
grpc_status._async.trailing_metadata
trailing_metadata
Definition: grpcio_status/grpc_status/_async.py:36
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
GRPC_INITIAL_METADATA_WAIT_FOR_READY
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY
Definition: grpc_types.h:523
GRPC_STATUS_OK
@ GRPC_STATUS_OK
Definition: include/grpc/impl/codegen/status.h:30
retry_filter.h
grpc_core::ClientChannel::LoadBalancedCall::PendingBatchesResume
void PendingBatchesResume()
Definition: client_channel.cc:2751
grpc_core::ClientChannel::ClientChannelControlHelper::CreateSubchannel
RefCountedPtr< SubchannelInterface > CreateSubchannel(ServerAddress address, const grpc_channel_args &args) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Creates a new subchannel with the specified channel args.
Definition: client_channel.cc:881
grpc_core::ClientChannel::ClientChannelControlHelper::chand_
ClientChannel * chand_
Definition: client_channel.cc:1002
grpc_core::LoadBalancingPolicy::PickArgs
Arguments used when picking a subchannel for a call.
Definition: lb_policy.h:141
arena_
Arena * arena_
Definition: client_channel.cc:391
grpc_core::RefCountedPtr
Definition: ref_counted_ptr.h:35
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::ClientChannel::CallData::GetBatchIndex
static size_t GetBatchIndex(grpc_transport_stream_op_batch *batch)
Definition: client_channel.cc:2036
grpc_core::ClientChannel::LoadBalancedCall::LbQueuedCallCanceller::lb_call_
RefCountedPtr< LoadBalancedCall > lb_call_
Definition: client_channel.cc:3100
grpc_core::ClientChannel::LoadBalancedCall::LbQueuedCallCanceller::LbQueuedCallCanceller
LbQueuedCallCanceller(RefCountedPtr< LoadBalancedCall > lb_call)
Definition: client_channel.cc:3066
grpc_core::ClientChannel::LoadBalancedCall::LbQueuedCallCanceller::closure_
grpc_closure closure_
Definition: client_channel.cc:3101
absl::StrJoin
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
Definition: abseil-cpp/absl/strings/str_join.h:239
grpc_core::CoreConfiguration::Get
static const CoreConfiguration & Get()
Definition: core_configuration.h:82
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
grpc_core::ClientChannel::ConnectivityWatcherRemover::ConnectivityWatcherRemover
ConnectivityWatcherRemover(ClientChannel *chand, AsyncConnectivityStateWatcherInterface *watcher)
Definition: client_channel.cc:841
grpc_core::ClientChannel::CallData::pending_batches_
grpc_transport_stream_op_batch * pending_batches_[MAX_PENDING_BATCHES]
Definition: client_channel.cc:237
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
Json
JSON (JavaScript Object Notation).
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:227
grpc_core::ClientChannel::SubchannelWrapper::WatchConnectivityState
void WatchConnectivityState(std::unique_ptr< ConnectivityStateWatcherInterface > watcher) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:509
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Encoder::Encode
void Encode(const Slice &key, const Slice &value)
Definition: client_channel.cc:2505
grpc_core::SubchannelCall::Args
Definition: subchannel.h:98
grpc_core::ClientChannel::external_watchers_mu_
Mutex external_watchers_mu_
Definition: client_channel.h:373
GRPC_ARG_SERVER_URI
#define GRPC_ARG_SERVER_URI
Definition: client_channel.h:89
grpc_core::ClientChannel::LoadBalancedCall::Metadata
Definition: client_channel.cc:2463
grpc_channel_stack_no_post_init
void grpc_channel_stack_no_post_init(grpc_channel_stack *, grpc_channel_element *)
Definition: channel_stack.cc:282
grpc_core::ClientChannel::CallData::CallData
CallData(grpc_call_element *elem, const ClientChannel &chand, const grpc_call_element_args &args)
Definition: client_channel.cc:1879
grpc_core::ClientChannel::CallData::owning_call_
grpc_call_stack * owning_call_
Definition: client_channel.cc:208
grpc_core::BackendMetricAllocatorInterface
Definition: backend_metric.h:30
grpc_core::LoadBalancingPolicy::PickResult::Fail
Definition: lb_policy.h:216
channel_stack.h
subchannel_interface_internal.h
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
#define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
Definition: grpc_types.h:526
grpc_core::InternallyRefCounted< ConnectivityStateWatcherInterface >::Ref
RefCountedPtr< ConnectivityStateWatcherInterface > Ref() GRPC_MUST_USE_RESULT
Definition: orphanable.h:90
grpc_call_stack
Definition: channel_stack.h:233
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::ClientChannel::ConnectivityWatcherRemover::RemoveWatcherLocked
void RemoveWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:853
local_subchannel_pool.h
work_serializer.h
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
grpc_core::grpc_client_channel_trace
TraceFlag grpc_client_channel_trace(false, "client_channel")
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
grpc_core::Resolver::Result
Results returned by the resolver.
Definition: resolver/resolver.h:56
grpc_core::ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady
static void RecvTrailingMetadataReady(void *arg, grpc_error_handle error)
Definition: client_channel.cc:2963
grpc_core::ClientChannel::channelz_node_
channelz::ChannelNode * channelz_node_
Definition: client_channel.h:303
grpc_core::InternalSubchannelDataWatcherInterface
Definition: subchannel_interface_internal.h:29
grpc_core::ClientChannel::CreateLoadBalancedCall
OrphanablePtr< LoadBalancedCall > CreateLoadBalancedCall(const grpc_call_element_args &args, grpc_polling_entity *pollent, grpc_closure *on_call_destruction_complete, ConfigSelector::CallDispatchController *call_dispatch_controller, bool is_transparent_retry)
Definition: client_channel.cc:1150
grpc_core::ClientChannel::LoadBalancedCall::SendInitialMetadataOnComplete
static void SendInitialMetadataOnComplete(void *arg, grpc_error_handle error)
Definition: client_channel.cc:2916
grpc_core::ClientChannel::UpdateServiceConfigInControlPlaneLocked
void UpdateServiceConfigInControlPlaneLocked(RefCountedPtr< ServiceConfig > service_config, RefCountedPtr< ConfigSelector > config_selector, std::string lb_policy_name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1460
GRPC_ARG_LB_POLICY_NAME
#define GRPC_ARG_LB_POLICY_NAME
Definition: grpc_types.h:309
grpc_pollset_set_destroy
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set)
Definition: pollset_set.cc:33
grpc_core::channelz::ChannelNode::GetChannelConnectivityStateChangeString
static const char * GetChannelConnectivityStateChangeString(grpc_connectivity_state state)
Definition: src/core/lib/channel/channelz.cc:151
grpc_transport_stream_op_batch_payload::cancel_error
grpc_error_handle cancel_error
Definition: transport.h:444
grpc_core::ClientChannel::CallData::ResolutionDone
grpc_error_handle static ApplyServiceConfigToCallLocked(grpc_call_element *elem, grpc_metadata_batch *initial_metadata) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi ResolutionDone)(void *arg, grpc_error_handle error)
Definition: client_channel.cc:181
absl::InlinedVector::push_back
void push_back(const_reference v)
Definition: abseil-cpp/absl/container/inlined_vector.h:682
grpc_core::GlobalSubchannelPool::instance
static RefCountedPtr< GlobalSubchannelPool > instance()
Definition: global_subchannel_pool.cc:29
grpc_core::ClientChannel::SubchannelWrapper::subchannel_
RefCountedPtr< Subchannel > subchannel_
Definition: client_channel.cc:679
grpc_core::HttpMethodMetadata::ValueType
ValueType
Definition: metadata_batch.h:138
grpc_core::SubchannelCall::Create
static RefCountedPtr< SubchannelCall > Create(Args args, grpc_error_handle *error)
Definition: subchannel.cc:142
grpc_core::ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher
ExternalConnectivityWatcher(ClientChannel *chand, grpc_polling_entity pollent, grpc_connectivity_state *state, grpc_closure *on_complete, grpc_closure *watcher_timer_init)
Definition: client_channel.cc:696
GRPC_CALL_COMBINER_STOP
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason)
Definition: call_combiner.h:58
grpc_core::ClientChannel::ExternalConnectivityWatcher::~ExternalConnectivityWatcher
~ExternalConnectivityWatcher() override
Definition: client_channel.cc:726
phony_filter::StartTransportOp
static void StartTransportOp(grpc_channel_element *, grpc_transport_op *)
Definition: bm_call_create.cc:369
recv_message_
grpc_byte_buffer * recv_message_
Definition: rls.cc:672
absl::optional< std::string >
grpc_core::ClientChannel::CallData::PendingBatchesResume
void PendingBatchesResume(grpc_call_element *elem)
Definition: client_channel.cc:2125
pollset_set.h
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
grpc_polling_entity_del_from_pollset_set
void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity *pollent, grpc_pollset_set *pss_dst)
Definition: polling_entity.cc:78
grpc_core::ClientChannel::LoadBalancedCall::GetBatchIndex
static size_t GetBatchIndex(grpc_transport_stream_op_batch *batch)
Definition: client_channel.cc:2666
grpc_core::ConfigSelector::Equals
virtual bool Equals(const ConfigSelector *other) const =0
grpc_core::ResolverRegistry::GetDefaultAuthority
std::string GetDefaultAuthority(absl::string_view target) const
Returns the default authority to pass from a client for target.
Definition: resolver_registry.cc:90
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::OnConnectivityStateChange
void OnConnectivityStateChange() override
Definition: client_channel.cc:589
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
arg
Definition: cmdline.cc:40
grpc_slice_from_static_string
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
Definition: slice/slice.cc:89
grpc_error_get_int
bool grpc_error_get_int(grpc_error_handle err, grpc_error_ints which, intptr_t *p)
Definition: error.cc:635
grpc_transport_stream_op_batch::payload
grpc_transport_stream_op_batch_payload * payload
Definition: transport.h:307
service_config_call_data.h
grpc_core::ClientChannel::CallData::ResolverQueuedCallCanceller::closure_
grpc_closure closure_
Definition: client_channel.cc:2198
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Lookup
absl::optional< absl::string_view > Lookup(absl::string_view key, std::string *buffer) const override
Definition: client_channel.cc:2496
server_address.h
grpc_transport_stream_op_batch_payload::send_trailing_metadata
grpc_metadata_batch * send_trailing_metadata
Definition: transport.h:359
call_combiner_
CallCombiner * call_combiner_
Definition: client_channel.cc:393
grpc_core::ClientChannel::ResolverResultHandler::~ResolverResultHandler
~ResolverResultHandler() override
Definition: client_channel.cc:427
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
grpc_core::ClientChannel::CallData
Definition: client_channel.cc:110
grpc_core::ClientChannel::CallData::arena_
Arena * arena_
Definition: client_channel.cc:207
absl::InlinedVector::data
pointer data() noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:302
backup_poller.h
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
grpc_deadline_state
Definition: deadline_filter.h:38
grpc_core::ClientChannel::CallData::deadline_
Timestamp deadline_
Definition: client_channel.cc:206
grpc_core::ClientChannel::TryToConnectLocked
void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1839
grpc_core::ClientChannel::SubchannelWrapper
Definition: client_channel.cc:455
grpc_core::channelz::ChannelTrace::Severity
Severity
Definition: channel_trace.h:51
grpc_polling_entity
Definition: polling_entity.h:38
grpc_core::ClientChannel::LoadBalancedCall::RecvInitialMetadataReady
static void RecvInitialMetadataReady(void *arg, grpc_error_handle error)
Definition: client_channel.cc:2932
grpc_call_element_args
Definition: channel_stack.h:80
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
grpc_core::InternallyRefCounted
Definition: orphanable.h:73
grpc_core::ClientChannel::ExternalConnectivityWatcher::pollent_
grpc_polling_entity pollent_
Definition: client_channel.h:208
grpc_core::ClientChannel::UpdateServiceConfigInDataPlaneLocked
void UpdateServiceConfigInDataPlaneLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1485
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Encoder::Encode
void Encode(Which, const typename Which::ValueType &value)
Definition: client_channel.cc:2511
grpc_core::channelz::ChannelNode::AddTraceEvent
void AddTraceEvent(ChannelTrace::Severity severity, const grpc_slice &data)
Definition: channelz.h:194
absl::InlinedVector::size
size_type size() const noexcept
Definition: abseil-cpp/absl/container/inlined_vector.h:270
json.h
buffer
char buffer[1024]
Definition: libuv/docs/code/idle-compute/main.c:8
grpc_core::Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
Definition: subchannel.h:171
slice_internal.h
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Add
void Add(absl::string_view key, absl::string_view value) override
Definition: client_channel.cc:2468
watcher_
RefCountedPtr< ConnectivityStateWatcherInterface > watcher_
Definition: health_check_client.cc:155
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
grpc_core::ClientChannel::CreateResolverLocked
void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1561
grpc_core::ClientChannel::StartTransportOp
static void StartTransportOp(grpc_channel_element *elem, grpc_transport_op *op)
Definition: client_channel.cc:1785
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor::lb_call_
LoadBalancedCall * lb_call_
Definition: client_channel.cc:2589
grpc_core::Resolver::ResultHandler
Definition: resolver/resolver.h:84
grpc_core::ClientChannel::ClientChannelControlHelper::ConvertSeverityEnum
static channelz::ChannelTrace::Severity ConvertSeverityEnum(TraceSeverity severity)
Definition: client_channel.cc:995
grpc_core::DynamicFilters::Call::Args
Definition: dynamic_filters.h:48
grpc_core::ClientChannel::Destroy
static void Destroy(grpc_channel_element *elem)
Definition: client_channel.cc:1025
peer_string_
gpr_atm * peer_string_
Definition: retry_filter.cc:622
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Encoder::Encode
void Encode(HttpPathMetadata, const Slice &)
Definition: client_channel.cc:2519
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::MakeReplacement
WatcherWrapper * MakeReplacement()
Definition: client_channel.cc:613
grpc_core::ClientChannel::GetChannelInfo
static void GetChannelInfo(grpc_channel_element *elem, const grpc_channel_info *info)
Definition: client_channel.cc:1802
grpc_core::ClientChannel::LoadBalancedCall::PendingBatchesFail
void PendingBatchesFail(grpc_error_handle error, YieldCallCombinerPredicate yield_call_combiner_predicate)
Definition: client_channel.cc:2704
grpc_core::ClientChannel::ConnectivityWatcherRemover::chand_
ClientChannel * chand_
Definition: client_channel.cc:861
grpc_core::ClientChannel::CallData::recv_trailing_metadata_ready_
grpc_closure recv_trailing_metadata_ready_
Definition: client_channel.cc:227
grpc_core::GrpcStatusMetadata
Definition: metadata_batch.h:293
Json::ValueType
ValueType
Type of the value held by a Value object.
Definition: third_party/bloaty/third_party/protobuf/conformance/third_party/jsoncpp/json.h:463
grpc_core::LoadBalancingPolicy::Args
Args used to instantiate an LB policy.
Definition: lb_policy.h:343
setup.idx
idx
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:197
grpc_channel_filter
Definition: channel_stack.h:111
deadline_
Timestamp deadline_
Definition: client_channel.cc:390
grpc_core::ServiceConfigImpl::Create
static RefCountedPtr< ServiceConfig > Create(const grpc_channel_args *args, absl::string_view json_string, grpc_error_handle *error)
Definition: service_config_impl.cc:41
resolver_registry.h
grpc_core::EndpointLoadMetricsBinMetadata
Definition: metadata_batch.h:232
grpc_core::ClientChannel::data_plane_mu_
Mutex data_plane_mu_
Definition: client_channel.h:327
grpc_core::ClientChannel::ConnectivityWatcherRemover
Definition: client_channel.cc:839
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
grpc_channel_args_want_minimal_stack
bool grpc_channel_args_want_minimal_stack(const grpc_channel_args *args)
Definition: channel_args.cc:471
grpc_core::ClientChannel::SubchannelWrapper::channel_args
const grpc_channel_args * channel_args() override
Definition: client_channel.cc:548
grpc_core::kRetryFilterVtable
const grpc_channel_filter kRetryFilterVtable
Definition: retry_filter.cc:2678
recv_trailing_metadata_
grpc_metadata_array recv_trailing_metadata_
Definition: rls.cc:673
grpc_core::grpc_client_channel_lb_call_trace
TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call")
gpr_types.h
GRPC_CHANNEL_CONNECTING
@ GRPC_CHANNEL_CONNECTING
Definition: include/grpc/impl/codegen/connectivity_state.h:34
grpc_core::ClientChannel::ClientChannel
ClientChannel(grpc_channel_element_args *args, grpc_error_handle *error)
Definition: client_channel.cc:1049
grpc_core::ClientChannel::RemoveLbQueuedCall
void RemoveLbQueuedCall(LbQueuedCall *to_remove, grpc_polling_entity *pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_)
Definition: client_channel.cc:1825
grpc_core::LoadBalancingPolicyRegistry::ParseLoadBalancingConfig
static RefCountedPtr< LoadBalancingPolicy::Config > ParseLoadBalancingConfig(const Json &json, grpc_error_handle *error)
Definition: lb_policy_registry.cc:169
grpc_core::ConfigSelector::CallDispatchController
Definition: config_selector.h:51
grpc_core::LoadBalancingPolicy::Args::work_serializer
std::shared_ptr< WorkSerializer > work_serializer
The work_serializer under which all LB policy calls will be run.
Definition: lb_policy.h:345
grpc_core::ClientChannel::LoadBalancedCall::LbCallState::GetCallAttribute
absl::string_view GetCallAttribute(UniqueTypeName type)
Definition: client_channel.cc:2539
child_policy_handler.h
grpc_core::ResolverRegistry::CreateResolver
OrphanablePtr< Resolver > CreateResolver(absl::string_view target, const grpc_channel_args *args, grpc_pollset_set *pollset_set, std::shared_ptr< WorkSerializer > work_serializer, std::unique_ptr< Resolver::ResultHandler > result_handler) const
Definition: resolver_registry.cc:73
grpc_core::CallCombinerClosureList::RunClosures
void RunClosures(CallCombiner *call_combiner)
Definition: call_combiner.h:161
grpc_core::ClientChannel::SubchannelWrapper::CancelConnectivityStateWatch
void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface *watcher) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Definition: client_channel.cc:522
grpc_core::ClientChannel::ResolverResultHandler::ReportResult
void ReportResult(Resolver::Result result) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Reports a result to the channel.
Definition: client_channel.cc:434
grpc_core::CallTracer::StartNewAttempt
virtual CallAttemptTracer * StartNewAttempt(bool is_transparent_retry)=0
grpc_deadline_state_client_start_transport_stream_op_batch
void grpc_deadline_state_client_start_transport_stream_op_batch(grpc_call_element *elem, grpc_transport_stream_op_batch *op)
Definition: deadline_filter.cc:228
GRPC_CHANNEL_STACK_REF
#define GRPC_CHANNEL_STACK_REF(channel_stack, reason)
Definition: channel_stack.h:297
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_core::Duration::Zero
static constexpr Duration Zero()
Definition: src/core/lib/gprpp/time.h:130
grpc_trace_channel
grpc_core::TraceFlag grpc_trace_channel(false, "channel")
absl::Status
ABSL_NAMESPACE_BEGIN class ABSL_MUST_USE_RESULT Status
Definition: abseil-cpp/absl/status/internal/status_internal.h:36
grpc_client_channel_start_backup_polling
void grpc_client_channel_start_backup_polling(grpc_pollset_set *interested_parties)
Definition: backup_poller.cc:162
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Encoder
Definition: client_channel.cc:2503
grpc_transport_stream_op_batch::recv_initial_metadata
bool recv_initial_metadata
Definition: transport.h:319
grpc_transport_stream_op_batch_string
std::string grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op)
Definition: transport_op_string.cc:44
grpc_core::Json::Object
std::map< std::string, Json > Object
Definition: src/core/lib/json/json.h:54
absl::optional::value
constexpr const T & value() const &
Definition: abseil-cpp/absl/types/optional.h:475
grpc_core::ClientChannel::CallData::YieldCallCombinerIfPendingBatchesFound
static bool YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList &closures)
Definition: client_channel.cc:157
original_recv_initial_metadata_ready_
grpc_closure * original_recv_initial_metadata_ready_
Definition: message_decompress_filter.cc:115
grpc_core::ClientChannel::CallData::ResumePendingBatchInCallCombiner
static void ResumePendingBatchInCallCombiner(void *arg, grpc_error_handle ignored)
Definition: client_channel.cc:2113
grpc_core::ClientChannel::LbQueuedCall::next
LbQueuedCall * next
Definition: client_channel.h:222
grpc_core::ClientChannel::LoadBalancedCall::CreateSubchannelCall
void CreateSubchannelCall()
Definition: client_channel.cc:3032
grpc_core::Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange::state
grpc_connectivity_state state
Definition: subchannel.h:172
GRPC_ARG_INHIBIT_HEALTH_CHECKING
#define GRPC_ARG_INHIBIT_HEALTH_CHECKING
Definition: grpc_types.h:424
grpc_core::ClientChannel::CallData::ResolverQueuedCallCanceller::elem_
grpc_call_element * elem_
Definition: client_channel.cc:2197
grpc_core::grpc_client_channel_call_trace
TraceFlag grpc_client_channel_call_trace(false, "client_channel_call")
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
benchmark.md
md
Definition: benchmark.py:86
picker_
std::unique_ptr< SubchannelPicker > picker_
Definition: outlier_detection.cc:323
grpc_transport_stream_op_batch::send_trailing_metadata
bool send_trailing_metadata
Definition: transport.h:313
grpc_channel_info::service_config_json
char ** service_config_json
Definition: grpc_types.h:726
grpc_slice_from_copied_buffer
GPRAPI grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len)
Definition: slice/slice.cc:170
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
backend_metric.h
grpc_core::ClientChannel::SubchannelWrapper::ThrottleKeepaliveTime
void ThrottleKeepaliveTime(int new_keepalive_time)
Definition: client_channel.cc:552
absl_status_to_grpc_error
grpc_error_handle absl_status_to_grpc_error(absl::Status status)
Definition: error_utils.cc:167
debug_location.h
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::interested_parties
grpc_pollset_set * interested_parties() override
Definition: client_channel.cc:606
phony_transport::Destroy
void Destroy(grpc_transport *)
Definition: bm_call_create.cc:443
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Encoder::Take
std::vector< std::pair< std::string, std::string > > Take()
Definition: client_channel.cc:2523
key
const char * key
Definition: hpack_parser_table.cc:164
grpc_core::UniqueTypeName
Definition: unique_type_name.h:56
grpc_core::ClientChannel::deadline_checking_enabled_
const bool deadline_checking_enabled_
Definition: client_channel.h:296
path_
grpc_slice path_
Definition: client_channel.cc:389
grpc_transport_stream_op_batch::send_message
bool send_message
Definition: transport.h:316
grpc_core::ClientChannel::CallData::Destroy
static void Destroy(grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *then_schedule_closure)
Definition: client_channel.cc:1914
absl::StatusCode
StatusCode
Definition: third_party/abseil-cpp/absl/status/status.h:92
grpc_core::ClientChannel::ConnectivityWatcherAdder::initial_state_
grpc_connectivity_state initial_state_
Definition: client_channel.cc:831
lb_policy_registry.h
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
recv_initial_metadata_ready_
grpc_closure recv_initial_metadata_ready_
Definition: retry_filter.cc:464
grpc_core::QsortCompare
int QsortCompare(const T &a, const T &b)
Definition: useful.h:95
grpc_core::CallCombinerClosureList::Add
void Add(grpc_closure *closure, grpc_error_handle error, const char *reason)
Definition: call_combiner.h:150
grpc_core::ClientChannel::ConnectivityWatcherRemover::watcher_
AsyncConnectivityStateWatcherInterface * watcher_
Definition: client_channel.cc:862
GRPC_ARG_SERVICE_CONFIG
#define GRPC_ARG_SERVICE_CONFIG
Definition: grpc_types.h:304
grpc_core::internal::ClientChannelGlobalParsedConfig::health_check_service_name
const absl::optional< std::string > & health_check_service_name() const
Definition: resolver_result_parsing.h:63
grpc_core::DynamicFilters
Definition: dynamic_filters.h:43
grpc_core::ClientChannel::CallData::Init
static grpc_error_handle Init(grpc_call_element *elem, const grpc_call_element_args *args)
Definition: client_channel.cc:1907
grpc_core::ClientChannel::CallData::deadline_state_
grpc_deadline_state deadline_state_
Definition: client_channel.cc:202
GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA
@ GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA
Holds a pointer to ServiceConfigCallData associated with this call.
Definition: core/lib/channel/context.h:46
grpc_core::ClientChannel::CallData::dynamic_filters_
RefCountedPtr< DynamicFilters > dynamic_filters_
Definition: client_channel.cc:229
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::parent_
RefCountedPtr< SubchannelWrapper > parent_
Definition: client_channel.cc:674
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
grpc_core::Subchannel::ConnectivityStateWatcherInterface
Definition: subchannel.h:168
grpc_channel_info::lb_policy_name
char ** lb_policy_name
Definition: grpc_types.h:723
grpc_core::ClientChannel::DestroyResolverAndLbPolicyLocked
void DestroyResolverAndLbPolicyLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1580
out_
std::ostream & out_
Definition: abseil-cpp/absl/flags/internal/usage.cc:184
grpc_core::LoadBalancingPolicy::UpdateArgs::addresses
absl::StatusOr< ServerAddressList > addresses
Definition: lb_policy.h:318
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_core::LoadBalancingPolicy::PickResult::Drop
Definition: lb_policy.h:225
service_config_
RefCountedPtr< ServiceConfig > service_config_
Definition: service_config_channel_arg_filter.cc:76
gpr_cycle_counter_sub
gpr_timespec gpr_cycle_counter_sub(gpr_cycle_counter a, gpr_cycle_counter b)
grpc_core::ClientChannel::StartTransportOpLocked
void StartTransportOpLocked(grpc_transport_op *op) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1727
GRPC_ERROR_CREATE_FROM_CPP_STRING
#define GRPC_ERROR_CREATE_FROM_CPP_STRING(desc)
Definition: error.h:297
call_context_
grpc_call_context_element * call_context_
Definition: client_channel.cc:394
grpc_core::ClientChannel::CallData::CreateDynamicCall
void CreateDynamicCall(grpc_call_element *elem)
Definition: client_channel.cc:2428
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Encoder::Encode
void Encode(GrpcTimeoutMetadata, const typename GrpcTimeoutMetadata::ValueType &)
Definition: client_channel.cc:2517
alloc.h
grpc_core::OrphanablePtr
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:64
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_deadline_state_reset
void grpc_deadline_state_reset(grpc_call_element *elem, grpc_core::Timestamp new_deadline)
Definition: deadline_filter.cc:220
backend_metric_data_
BackendMetricData backend_metric_data_
Definition: oob_backend_metric.cc:278
grpc_pollset_set_add_pollset_set
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:47
grpc_core::ClientChannel::CallData::call_context_
grpc_call_context_element * call_context_
Definition: client_channel.cc:210
grpc_polling_entity_add_to_pollset_set
void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity *pollent, grpc_pollset_set *pss_dst)
Definition: polling_entity.cc:61
grpc_transport_stream_op_batch_payload::send_message
grpc_core::SliceBuffer * send_message
Definition: transport.h:373
grpc_call_context_element
Definition: core/lib/channel/context.h:51
grpc_core::ClientChannel::ABSL_GUARDED_BY
ResolverQueuedCall *resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_)
deadline_filter.h
service_config_impl.h
grpc_transport_stream_op_batch_payload::recv_initial_metadata
grpc_metadata_batch * recv_initial_metadata
Definition: transport.h:390
recv_trailing_metadata_ready_
grpc_closure recv_trailing_metadata_ready_
Definition: retry_filter.cc:473
grpc_core::ClientChannel::LoadBalancedCall::LbQueuedCallCanceller::CancelLocked
static void CancelLocked(void *arg, grpc_error_handle error)
Definition: client_channel.cc:3074
grpc_core::CallCombinerClosureList::RunClosuresWithoutYielding
void RunClosuresWithoutYielding(CallCombiner *call_combiner)
Definition: call_combiner.h:186
dynamic_filters.h
grpc_core::ClientChannel
Definition: client_channel.h:109
grpc_transport_stream_op_batch::send_initial_metadata
bool send_initial_metadata
Definition: transport.h:310
arg
struct arg arg
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
grpc_core::CoreConfiguration::resolver_registry
const ResolverRegistry & resolver_registry() const
Definition: core_configuration.h:157
grpc_transport_stream_op_batch_payload::recv_message
absl::optional< grpc_core::SliceBuffer > * recv_message
Definition: transport.h:416
exec_ctx.h
grpc_slice_from_cpp_string
grpc_slice grpc_slice_from_cpp_string(std::string str)
Definition: slice/slice.cc:202
grpc_core::ClientChannel::RemoveConnectivityWatcher
void RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface *watcher)
Definition: client_channel.cc:1870
slice_refcount.h
resolver_
OrphanablePtr< Resolver > resolver_
Definition: xds_cluster_resolver.cc:291
absl::Status::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: third_party/abseil-cpp/absl/status/status.h:802
grpc_core::Timestamp::FromCycleCounterRoundUp
static Timestamp FromCycleCounterRoundUp(gpr_cycle_counter c)
Definition: src/core/lib/gprpp/time.cc:147
grpc_core::LoadBalancingPolicy::ChannelControlHelper
Definition: lb_policy.h:275
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
pending_batches_
PendingBatch pending_batches_[MAX_PENDING_BATCHES]
Definition: retry_filter.cc:595
state_
grpc_connectivity_state state_
Definition: channel_connectivity.cc:213
grpc_core::ClientChannel::CreateOrUpdateLbPolicyLocked
void CreateOrUpdateLbPolicyLocked(RefCountedPtr< LoadBalancingPolicy::Config > lb_policy_config, const absl::optional< std::string > &health_check_service_name, Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1382
grpc_core::LoadBalancingPolicy::Args::channel_control_helper
std::unique_ptr< ChannelControlHelper > channel_control_helper
Definition: lb_policy.h:349
grpc_core::ClientChannel::LoadBalancedCall::Metadata::batch_
grpc_metadata_batch * batch_
Definition: client_channel.cc:2531
grpc_core::ClientChannel::kFilterVtable
static const grpc_channel_filter kFilterVtable
Definition: client_channel.h:111
grpc_core::ClientChannel::CallData::AsyncResolutionDone
bool CheckResolutionLocked(grpc_call_element *elem, grpc_error_handle *error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi AsyncResolutionDone)(grpc_call_element *elem, grpc_error_handle error)
Definition: client_channel.cc:133
absl::Status::GetPayload
absl::optional< absl::Cord > GetPayload(absl::string_view type_url) const
Definition: third_party/abseil-cpp/absl/status/status.cc:119
grpc_core::ClientChannel::LoadBalancedCall::PickSubchannel
static void PickSubchannel(void *arg, grpc_error_handle error)
Definition: client_channel.cc:3151
watcher
ClusterWatcher * watcher
Definition: cds.cc:148
cancel
bool cancel
Definition: client_callback_end2end_test.cc:634
channel_args.h
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor::BackendMetricAllocator::AllocateString
char * AllocateString(size_t size) override
Definition: client_channel.cc:2581
grpc_core::ClientChannel::CallData::call_combiner_
CallCombiner * call_combiner_
Definition: client_channel.cc:209
grpc_core::SubchannelCall
Definition: subchannel.h:96
grpc_core::ClientChannel::LoadBalancedCall::Orphan
void Orphan() override
Definition: client_channel.cc:2650
grpc_core::ClientChannel::interested_parties_
grpc_pollset_set * interested_parties_
Definition: client_channel.h:304
grpc_core::ClientChannel::CallData::ResolverQueuedCallCanceller
Definition: client_channel.cc:2160
grpc_core::channelz::ChannelNode::RemoveChildSubchannel
void RemoveChildSubchannel(intptr_t child_uuid)
Definition: src/core/lib/channel/channelz.cc:246
grpc_transport_stream_op_batch::recv_trailing_metadata
bool recv_trailing_metadata
Definition: transport.h:326
gpr_strdup
GPRAPI char * gpr_strdup(const char *src)
Definition: string.cc:39
grpc_core::ClientChannel::OnResolverResultChangedLocked
void OnResolverResultChangedLocked(Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1222
grpc_core::ClientChannel::LoadBalancedCall::~LoadBalancedCall
~LoadBalancedCall() override
Definition: client_channel.cc:2634
grpc_core::ClientChannel::SubchannelWrapper::health_check_service_name_
absl::optional< std::string > health_check_service_name_
Definition: client_channel.cc:680
GRPC_ARG_SERVICE_CONFIG_OBJ
#define GRPC_ARG_SERVICE_CONFIG_OBJ
Definition: client_channel.h:95
internal
Definition: benchmark/test/output_test_helper.cc:20
Fail
void Fail(const char *msg)
Definition: bloaty/third_party/googletest/googletest/test/gtest_assert_by_exception_test.cc:52
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor::BackendMetricAllocator::AllocateBackendMetricData
BackendMetricData * AllocateBackendMetricData() override
Definition: client_channel.cc:2577
grpc_error_get_status
void grpc_error_get_status(grpc_error_handle error, grpc_core::Timestamp deadline, grpc_status_code *code, std::string *message, grpc_http2_error_code *http_error, const char **error_string)
Definition: error_utils.cc:67
grpc_core::ClientChannel::SubchannelWrapper::~SubchannelWrapper
~SubchannelWrapper() override
Definition: client_channel.cc:486
grpc_channel_element_args
Definition: channel_stack.h:74
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
grpc_core::ClientChannel::LoadBalancedCall::ResumePendingBatchInCallCombiner
static void ResumePendingBatchInCallCombiner(void *arg, grpc_error_handle ignored)
Definition: client_channel.cc:2740
on_complete_
grpc_closure on_complete_
Definition: channel_connectivity.cc:217
grpc_core::ClientChannel::SubchannelWrapper::chand_
ClientChannel * chand_
Definition: client_channel.cc:678
grpc_core::ClientChannel::SubchannelWrapper::SubchannelWrapper
SubchannelWrapper(ClientChannel *chand, RefCountedPtr< Subchannel > subchannel, absl::optional< std::string > health_check_service_name)
Definition: client_channel.cc:457
grpc_binder::Metadata
std::vector< std::pair< std::string, std::string > > Metadata
Definition: transaction.h:38
grpc_core::LoadBalancingPolicyRegistry::LoadBalancingPolicyExists
static bool LoadBalancingPolicyExists(const char *name, bool *requires_config)
Definition: lb_policy_registry.cc:109
GRPC_CHANNEL_SHUTDOWN
@ GRPC_CHANNEL_SHUTDOWN
Definition: include/grpc/impl/codegen/connectivity_state.h:40
grpc_core::ClientChannel::info_mu_
Mutex info_mu_
Definition: client_channel.h:365
grpc_call_final_info
Definition: channel_stack.h:95
grpc_transport_stream_op_batch_payload::recv_trailing_metadata
grpc_metadata_batch * recv_trailing_metadata
Definition: transport.h:425
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
code
Definition: bloaty/third_party/zlib/contrib/infback9/inftree9.h:24
absl::InlinedVector
Definition: abseil-cpp/absl/container/inlined_vector.h:69
gpr_timespec
Definition: gpr_types.h:50
GRPC_ARG_KEEPALIVE_TIME_MS
#define GRPC_ARG_KEEPALIVE_TIME_MS
Definition: grpc_types.h:240
grpc_core::LoadBalancingPolicy::MetadataInterface
Definition: lb_policy.h:113
tests.unit._exit_scenarios.try_to_connect
try_to_connect
Definition: _exit_scenarios.py:189
grpc_error
Definition: error_internal.h:42
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
grpc_core::ClientChannel::SubchannelWrapper::ResetBackoff
void ResetBackoff() override
Definition: client_channel.cc:537
batch_
grpc_transport_stream_op_batch batch_
Definition: retry_filter.cc:357
resolver_result_parsing.h
grpc_core::ClientChannel::CallData::dynamic_call_
RefCountedPtr< DynamicFilters::Call > dynamic_call_
Definition: client_channel.cc:230
channel_trace.h
GRPC_CALL_STACK_REF
#define GRPC_CALL_STACK_REF(call_stack, reason)
Definition: channel_stack.h:293
grpc_core::ClientChannel::ResolverResultHandler::chand_
ClientChannel * chand_
Definition: client_channel.cc:440
grpc_core::ClientChannel::ConnectivityWatcherAdder::chand_
ClientChannel * chand_
Definition: client_channel.cc:830
grpc_error_to_absl_status
absl::Status grpc_error_to_absl_status(grpc_error_handle error)
Definition: error_utils.cc:156
proxy_mapper_registry.h
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
grpc_channel_args_find_integer
int grpc_channel_args_find_integer(const grpc_channel_args *args, const char *name, const grpc_integer_options options)
Definition: channel_args.cc:425
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
grpc_metadata_batch
Definition: metadata_batch.h:1259
grpc_core::channelz::ChannelTrace::Info
@ Info
Definition: channel_trace.h:53
grpc_core::ClientChannel::resolution_mu_
Mutex resolution_mu_
Definition: client_channel.h:310
grpc_core::SubchannelInterface
Definition: subchannel_interface.h:37
grpc_core::ClientChannel::LoadBalancedCall::Metadata::Metadata
Metadata(grpc_metadata_batch *batch)
Definition: client_channel.cc:2466
original_recv_message_ready_
grpc_closure * original_recv_message_ready_
Definition: message_decompress_filter.cc:124
grpc_channel_info
Definition: grpc_types.h:720
lb_policy_
RefCountedPtr< GrpcLb > lb_policy_
Definition: grpclb.cc:291
grpc_core::AsyncConnectivityStateWatcherInterface
Definition: src/core/lib/transport/connectivity_state.h:64
grpc_transport_stream_op_batch
Definition: transport.h:284
grpc_core::internal::ClientChannelGlobalParsedConfig
Definition: resolver_result_parsing.h:44
grpc_core::GrpcLbClientStatsMetadata::key
static absl::string_view key()
Definition: metadata_batch.h:331
recv_message_ready_
grpc_closure recv_message_ready_
Definition: retry_filter.cc:467
grpc_closure
Definition: closure.h:56
grpc_channel_arg_pointer_create
grpc_arg grpc_channel_arg_pointer_create(char *name, void *value, const grpc_arg_pointer_vtable *vtable)
Definition: channel_args.cc:492
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor::BackendMetricAllocator::BackendMetricAllocator
BackendMetricAllocator(Arena *arena)
Definition: client_channel.cc:2575
grpc_pollset_set_add_pollset
void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset)
Definition: pollset_set.cc:37
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
grpc_core::GrpcLbClientStatsMetadata
Definition: metadata_batch.h:329
grpc_core::ClientChannel::DoPingLocked
grpc_error_handle DoPingLocked(grpc_transport_op *op) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_)
Definition: client_channel.cc:1691
send_initial_metadata
static void send_initial_metadata(void)
Definition: test/core/fling/server.cc:121
grpc_core::SubchannelInterface::ConnectivityStateWatcherInterface
Definition: subchannel_interface.h:39
grpc_core::ClientChannel::CallData::RecvTrailingMetadataReadyForConfigSelectorCommitCallback
void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element *elem) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel void static MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element *elem) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi RecvTrailingMetadataReadyForConfigSelectorCommitCallback)(void *arg, grpc_error_handle error)
Definition: client_channel.cc:191
grpc_core::ClientChannel::SubchannelWrapper::WatcherWrapper::watcher_
std::unique_ptr< SubchannelInterface::ConnectivityStateWatcherInterface > watcher_
Definition: client_channel.cc:673
grpc_core::Closure::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: closure.h:250
recv_initial_metadata_
grpc_metadata_array recv_initial_metadata_
Definition: rls.cc:671
grpc_core::ClientChannel::ConnectivityWatcherAdder::watcher_
OrphanablePtr< AsyncConnectivityStateWatcherInterface > watcher_
Definition: client_channel.cc:832
grpc_core::ClientChannel::owning_stack_
grpc_channel_stack * owning_stack_
Definition: client_channel.h:297
grpc_core::ClientChannel::ConnectivityWatcherAdder::ConnectivityWatcherAdder
ConnectivityWatcherAdder(ClientChannel *chand, grpc_connectivity_state initial_state, OrphanablePtr< AsyncConnectivityStateWatcherInterface > watcher)
Definition: client_channel.cc:808
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor
Definition: client_channel.cc:2553
grpc_core::ClientChannel::CallData::pollent_
grpc_polling_entity * pollent_
Definition: client_channel.cc:212
sync.h
grpc_core::ParseBackendMetricData
const BackendMetricData * ParseBackendMetricData(absl::string_view serialized_load_report, BackendMetricAllocatorInterface *allocator)
Definition: backend_metric.cc:57
grpc_core::LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs
Definition: lb_policy.h:179
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
grpc_core::LoadBalancingPolicy::PickResult::Queue
Definition: lb_policy.h:210
grpc_channel_args_copy_and_add
grpc_channel_args * grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:224
grpc_core::RefCounted< SubchannelInterface >::Ref
RefCountedPtr< SubchannelInterface > Ref() GRPC_MUST_USE_RESULT
Definition: ref_counted.h:287
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
state
static struct rpc_state state
Definition: bad_server_response_test.cc:87
GRPC_STATUS_UNKNOWN
@ GRPC_STATUS_UNKNOWN
Definition: include/grpc/impl/codegen/status.h:40
grpc_slice_unref_internal
void grpc_slice_unref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:39
pollent_
grpc_polling_entity pollent_
Definition: google_c2p_resolver.cc:135
grpc_core::ClientChannel::ConnectivityWatcherAdder
Definition: client_channel.cc:806
owning_call_
grpc_call_stack * owning_call_
Definition: client_channel.cc:392
error_utils.h
grpc_core::ClientChannel::LoadBalancedCall::BackendMetricAccessor::BackendMetricAllocator::arena_
Arena * arena_
Definition: client_channel.cc:2586
grpc_transport_stream_op_batch_payload::cancel_stream
struct grpc_transport_stream_op_batch_payload::@46 cancel_stream
grpc_core::ClientChannel::ClientChannelControlHelper::RequestReresolution
void RequestReresolution() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_ -> work_serializer_)
Requests that the resolver re-resolve.
Definition: client_channel.cc:971
grpc_core::ExecCtx::InvalidateNow
void InvalidateNow()
Definition: exec_ctx.h:188
original_recv_trailing_metadata_ready_
grpc_closure * original_recv_trailing_metadata_ready_
Definition: message_decompress_filter.cc:128
grpc_core::ClientChannel::LoadBalancedCall::RecordCallCompletion
void RecordCallCompletion(absl::Status status)
Definition: client_channel.cc:3013
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
parse_error
@ parse_error
Definition: pem_info.c:88
channel.h
grpc_core::ClientChannel::LoadBalancedCall::FailPendingBatchInCallCombiner
static void FailPendingBatchInCallCombiner(void *arg, grpc_error_handle error)
Definition: client_channel.cc:2693
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:55