subchannel.cc
Go to the documentation of this file.
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 
24 #include <algorithm>
25 #include <cstring>
26 #include <memory>
27 #include <new>
28 #include <type_traits>
29 #include <utility>
30 
31 #include "absl/status/statusor.h"
32 
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/status.h>
36 #include <grpc/support/alloc.h>
37 #include <grpc/support/log.h>
38 
55 #include "src/core/lib/gpr/alloc.h"
66 
67 // Strong and weak refs.
68 #define INTERNAL_REF_BITS 16
69 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
70 
71 // Backoff parameters.
72 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
73 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
74 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
75 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
76 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
77 
78 // Conversion between subchannel call and call stack.
79 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
80  (grpc_call_stack*)((char*)(call) + \
81  GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
82 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
83  (SubchannelCall*)(((char*)(call_stack)) - \
84  GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
85 
86 namespace grpc_core {
88 
89 TraceFlag grpc_trace_subchannel(false, "subchannel");
90 DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
91 
92 //
93 // ConnectedSubchannel
94 //
95 
97  grpc_channel_stack* channel_stack, const grpc_channel_args* args,
98  RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
101  ? "ConnectedSubchannel"
102  : nullptr),
103  channel_stack_(channel_stack),
105  channelz_subchannel_(std::move(channelz_subchannel)) {}
106 
109  GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
110 }
111 
113  grpc_pollset_set* interested_parties,
116  op->start_connectivity_watch = std::move(watcher);
117  op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
118  op->bind_pollset_set = interested_parties;
120  elem->filter->start_transport_op(elem, op);
121 }
122 
124  grpc_closure* on_ack) {
127  op->send_ping.on_initiate = on_initiate;
128  op->send_ping.on_ack = on_ack;
130  elem->filter->start_transport_op(elem, op);
131 }
132 
136 }
137 
138 //
139 // SubchannelCall
140 //
141 
144  const size_t allocation_size =
145  args.connected_subchannel->GetInitialCallSizeEstimate();
146  Arena* arena = args.arena;
147  return RefCountedPtr<SubchannelCall>(new (
148  arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
149 }
150 
152  : connected_subchannel_(std::move(args.connected_subchannel)),
153  deadline_(args.deadline) {
155  const grpc_call_element_args call_args = {
156  callstk, /* call_stack */
157  nullptr, /* server_transport_data */
158  args.context, /* context */
159  args.path.c_slice(), /* path */
160  args.start_time, /* start_time */
161  args.deadline, /* deadline */
162  args.arena, /* arena */
163  args.call_combiner /* call_combiner */
164  };
165  *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
166  SubchannelCall::Destroy, this, &call_args);
168  gpr_log(GPR_ERROR, "error: %s", grpc_error_std_string(*error).c_str());
169  return;
170  }
172  auto* channelz_node = connected_subchannel_->channelz_subchannel();
173  if (channelz_node != nullptr) {
174  channelz_node->RecordCallStarted();
175  }
176 }
177 
180  GPR_TIMER_SCOPE("subchannel_call_process_op", 0);
183  grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
184  GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
185  top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
186 }
187 
189  return SUBCHANNEL_CALL_TO_CALL_STACK(this);
190 }
191 
194  GPR_ASSERT(closure != nullptr);
196 }
197 
200  return RefCountedPtr<SubchannelCall>(this);
201 }
202 
204  const char* reason) {
205  IncrementRefCount(location, reason);
206  return RefCountedPtr<SubchannelCall>(this);
207 }
208 
211 }
212 
213 void SubchannelCall::Unref(const DebugLocation& /*location*/,
214  const char* reason) {
216 }
217 
219  GPR_TIMER_SCOPE("subchannel_call_destroy", 0);
220  SubchannelCall* self = static_cast<SubchannelCall*>(arg);
221  // Keep some members before destroying the subchannel call.
222  grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
223  RefCountedPtr<ConnectedSubchannel> connected_subchannel =
224  std::move(self->connected_subchannel_);
225  // Destroy the subchannel call.
226  self->~SubchannelCall();
227  // Destroy the call stack. This should be after destroying the subchannel
228  // call, because call->after_call_stack_destroy(), if not null, will free the
229  // call arena.
231  after_call_stack_destroy);
232  // Automatically reset connected_subchannel. This should be after destroying
233  // the call stack, because destroying call stack needs access to the channel
234  // stack.
235 }
236 
239  // only intercept payloads with recv trailing.
241  return;
242  }
243  // only add interceptor is channelz is enabled.
244  if (connected_subchannel_->channelz_subchannel() == nullptr) {
245  return;
246  }
248  this, grpc_schedule_on_exec_ctx);
249  // save some state needed for the interception callback.
252  batch->payload->recv_trailing_metadata.recv_trailing_metadata;
254  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
255  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
257 }
258 
259 namespace {
260 
261 // Sets *status based on the rest of the parameters.
262 void GetCallStatus(grpc_status_code* status, Timestamp deadline,
264  if (!GRPC_ERROR_IS_NONE(error)) {
265  grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
266  } else {
267  *status = md_batch->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
268  }
270 }
271 
272 } // namespace
273 
276  SubchannelCall* call = static_cast<SubchannelCall*>(arg);
277  GPR_ASSERT(call->recv_trailing_metadata_ != nullptr);
279  GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_,
281  channelz::SubchannelNode* channelz_subchannel =
282  call->connected_subchannel_->channelz_subchannel();
283  GPR_ASSERT(channelz_subchannel != nullptr);
284  if (status == GRPC_STATUS_OK) {
285  channelz_subchannel->RecordCallSucceeded();
286  } else {
287  channelz_subchannel->RecordCallFailed();
288  }
289  Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_,
291 }
292 
295 }
296 
298  const char* reason) {
300 }
301 
302 //
303 // Subchannel::ConnectedSubchannelStateWatcher
304 //
305 
308  public:
309  // Must be instantiated while holding c->mu.
311  : subchannel_(std::move(c)) {}
312 
314  subchannel_.reset(DEBUG_LOCATION, "state_watcher");
315  }
316 
317  private:
319  const absl::Status& status) override {
320  Subchannel* c = subchannel_.get();
321  MutexLock lock(&c->mu_);
322  // If we're either shutting down or have already seen this connection
323  // failure (i.e., c->connected_subchannel_ is null), do nothing.
324  //
325  // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN
326  // upon connection close. So if the server gracefully shuts down,
327  // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we
328  // will see only SHUTDOWN. Either way, we react to the first one we
329  // see, ignoring anything that happens after that.
330  if (c->connected_subchannel_ == nullptr) return;
331  if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
332  new_state == GRPC_CHANNEL_SHUTDOWN) {
335  "subchannel %p %s: Connected subchannel %p reports %s: %s", c,
336  c->key_.ToString().c_str(), c->connected_subchannel_.get(),
337  ConnectivityStateName(new_state), status.ToString().c_str());
338  }
339  c->connected_subchannel_.reset();
340  if (c->channelz_node() != nullptr) {
341  c->channelz_node()->SetChildSocket(nullptr);
342  }
343  // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here,
344  // pass along the status from the transport, since it may have
345  // keepalive info attached to it that the channel needs.
346  // TODO(roth): Consider whether there's a cleaner way to do this.
347  c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status);
348  c->backoff_.Reset();
349  }
350  }
351 
353 };
354 
355 // Asynchronously notifies the \a watcher of a change in the connectvity state
356 // of \a subchannel to the current \a state. Deletes itself when done.
358  public:
362  : watcher_(std::move(watcher)) {
363  watcher_->PushConnectivityStateChange({state, status});
366  &closure_,
367  [](void* arg, grpc_error_handle /*error*/) {
368  auto* self =
369  static_cast<AsyncWatcherNotifierLocked*>(arg);
370  self->watcher_->OnConnectivityStateChange();
371  delete self;
372  },
373  this, nullptr),
375  }
376 
377  private:
380 };
381 
382 //
383 // Subchannel::ConnectivityStateWatcherList
384 //
385 
388  watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
389 }
390 
393  watchers_.erase(watcher);
394 }
395 
398  for (const auto& p : watchers_) {
399  new AsyncWatcherNotifierLocked(p.second, state, status);
400  }
401 }
402 
403 //
404 // Subchannel::HealthWatcherMap::HealthWatcher
405 //
406 
407 // State needed for tracking the connectivity state with a particular
408 // health check service name.
411  public:
413  std::string health_check_service_name)
414  : subchannel_(std::move(c)),
415  health_check_service_name_(std::move(health_check_service_name)),
418  : subchannel_->state_) {
419  // If the subchannel is already connected, start health checking.
420  if (subchannel_->state_ == GRPC_CHANNEL_READY) StartHealthCheckingLocked();
421  }
422 
423  ~HealthWatcher() override {
424  subchannel_.reset(DEBUG_LOCATION, "health_watcher");
425  }
426 
428  return health_check_service_name_;
429  }
430 
432 
436  watcher_list_.AddWatcherLocked(std::move(watcher));
437  }
438 
441  watcher_list_.RemoveWatcherLocked(watcher);
442  }
443 
444  bool HasWatchers() const { return !watcher_list_.empty(); }
445 
448  if (state == GRPC_CHANNEL_READY) {
449  // If we had not already notified for CONNECTING state, do so now.
450  // (We may have missed this earlier, because if the transition
451  // from IDLE to CONNECTING to READY was too quick, the connected
452  // subchannel may not have sent us a notification for CONNECTING.)
455  status_ = status;
456  watcher_list_.NotifyLocked(state_, status);
457  }
458  // If we've become connected, start health checking.
459  StartHealthCheckingLocked();
460  } else {
461  state_ = state;
462  status_ = status;
463  watcher_list_.NotifyLocked(state_, status);
464  // We're not connected, so stop health checking.
465  health_check_client_.reset();
466  }
467  }
468 
469  void Orphan() override {
470  watcher_list_.Clear();
471  health_check_client_.reset();
472  Unref();
473  }
474 
475  private:
477  const absl::Status& status) override {
478  MutexLock lock(&subchannel_->mu_);
479  if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) {
480  state_ = new_state;
481  status_ = status;
482  watcher_list_.NotifyLocked(new_state, status);
483  }
484  }
485 
488  GPR_ASSERT(health_check_client_ == nullptr);
489  health_check_client_ = MakeHealthCheckClient(
490  health_check_service_name_, subchannel_->connected_subchannel_,
491  subchannel_->pollset_set_, subchannel_->channelz_node_, Ref());
492  }
493 
500 };
501 
502 //
503 // Subchannel::HealthWatcherMap
504 //
505 
508  const std::string& health_check_service_name,
510  // If the health check service name is not already present in the map,
511  // add it.
512  auto it = map_.find(health_check_service_name);
513  HealthWatcher* health_watcher;
514  if (it == map_.end()) {
515  auto w = MakeOrphanable<HealthWatcher>(std::move(subchannel),
516  health_check_service_name);
517  health_watcher = w.get();
518  map_.emplace(health_check_service_name, std::move(w));
519  } else {
520  health_watcher = it->second.get();
521  }
522  // Add the watcher to the entry.
523  health_watcher->AddWatcherLocked(std::move(watcher));
524 }
525 
527  const std::string& health_check_service_name,
529  auto it = map_.find(health_check_service_name);
530  GPR_ASSERT(it != map_.end());
531  it->second->RemoveWatcherLocked(watcher);
532  // If we just removed the last watcher for this service name, remove
533  // the map entry.
534  if (!it->second->HasWatchers()) map_.erase(it);
535 }
536 
537 void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state,
538  const absl::Status& status) {
539  for (const auto& p : map_) {
540  p.second->NotifyLocked(state, status);
541  }
542 }
543 
545 Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
546  Subchannel* subchannel, const std::string& health_check_service_name) {
547  auto it = map_.find(health_check_service_name);
548  if (it == map_.end()) {
549  // If the health check service name is not found in the map, we're
550  // not currently doing a health check for that service name. If the
551  // subchannel's state without health checking is READY, report
552  // CONNECTING, since that's what we'd be in as soon as we do start a
553  // watch. Otherwise, report the channel's state without health checking.
555  : subchannel->state_;
556  }
557  HealthWatcher* health_watcher = it->second.get();
558  return health_watcher->state();
559 }
560 
561 void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); }
562 
563 //
564 // Subchannel::ConnectivityStateWatcherInterface
565 //
566 
568  ConnectivityStateChange state_change) {
569  MutexLock lock(&mu_);
570  connectivity_state_queue_.push_back(std::move(state_change));
571 }
572 
575  MutexLock lock(&mu_);
576  GPR_ASSERT(!connectivity_state_queue_.empty());
577  ConnectivityStateChange state_change = connectivity_state_queue_.front();
578  connectivity_state_queue_.pop_front();
579  return state_change;
580 }
581 
582 //
583 // Subchannel
584 //
585 
586 namespace {
587 
588 BackOff::Options ParseArgsForBackoffValues(const grpc_channel_args* args,
589  Duration* min_connect_timeout) {
590  Duration initial_backoff =
592  *min_connect_timeout =
594  Duration max_backoff =
596  bool fixed_reconnect_backoff = false;
597  if (args != nullptr) {
598  for (size_t i = 0; i < args->num_args; i++) {
599  if (0 == strcmp(args->args[i].key,
600  "grpc.testing.fixed_reconnect_backoff_ms")) {
601  fixed_reconnect_backoff = true;
602  initial_backoff = *min_connect_timeout = max_backoff =
604  &args->args[i],
605  {static_cast<int>(initial_backoff.millis()), 100, INT_MAX}));
606  } else if (0 ==
607  strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
608  fixed_reconnect_backoff = false;
609  *min_connect_timeout =
611  &args->args[i],
612  {static_cast<int>(min_connect_timeout->millis()), 100,
613  INT_MAX}));
614  } else if (0 ==
615  strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
616  fixed_reconnect_backoff = false;
618  &args->args[i],
619  {static_cast<int>(max_backoff.millis()), 100, INT_MAX}));
620  } else if (0 == strcmp(args->args[i].key,
622  fixed_reconnect_backoff = false;
624  &args->args[i],
625  {static_cast<int>(initial_backoff.millis()), 100, INT_MAX}));
626  }
627  }
628  }
629  return BackOff::Options()
630  .set_initial_backoff(initial_backoff)
631  .set_multiplier(fixed_reconnect_backoff
632  ? 1.0
634  .set_jitter(fixed_reconnect_backoff ? 0.0
636  .set_max_backoff(max_backoff);
637 }
638 
639 } // namespace
640 
643  const grpc_channel_args* args)
646  : nullptr),
647  key_(std::move(key)),
649  connector_(std::move(connector)),
650  backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_)) {
651  // A grpc_init is added here to ensure that grpc_shutdown does not happen
652  // until the subchannel is destroyed. Subchannels can persist longer than
653  // channels because they maybe reused/shared among multiple channels. As a
654  // result the subchannel destruction happens asynchronously to channel
655  // destruction. If the last channel destruction triggers a grpc_shutdown
656  // before the last subchannel destruction, then there maybe race conditions
657  // triggering segmentation faults. To prevent this issue, we call a grpc_init
658  // here and a grpc_shutdown in the subchannel destructor.
659  grpc_init();
662  grpc_schedule_on_exec_ctx);
663  // Check proxy mapper to determine address to connect to and channel
664  // args to use.
666  grpc_resolved_address* new_address = nullptr;
667  grpc_channel_args* new_args = nullptr;
669  &new_args)) {
670  GPR_ASSERT(new_address != nullptr);
671  address_for_connect_ = *new_address;
672  gpr_free(new_address);
673  }
674  if (new_args != nullptr) {
675  args_ = new_args;
676  } else {
678  }
679  // Initialize channelz.
680  const bool channelz_enabled = grpc_channel_args_find_bool(
682  if (channelz_enabled) {
683  const size_t channel_tracer_max_memory =
684  static_cast<size_t>(grpc_channel_args_find_integer(
687  INT_MAX}));
688  channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
690  .value_or("<unknown address type>"),
691  channel_tracer_max_memory);
692  channelz_node_->AddTraceEvent(
693  channelz::ChannelTrace::Severity::Info,
694  grpc_slice_from_static_string("subchannel created"));
695  }
696 }
697 
699  if (channelz_node_ != nullptr) {
700  channelz_node_->AddTraceEvent(
701  channelz::ChannelTrace::Severity::Info,
702  grpc_slice_from_static_string("Subchannel destroyed"));
703  channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
704  }
706  connector_.reset();
708  // grpc_shutdown is called here because grpc_init is called in the ctor.
709  grpc_shutdown();
710 }
711 
714  const grpc_resolved_address& address, const grpc_channel_args* args) {
715  SubchannelKey key(address, args);
716  SubchannelPoolInterface* subchannel_pool =
718  GPR_ASSERT(subchannel_pool != nullptr);
719  RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key);
720  if (c != nullptr) {
721  return c;
722  }
723  c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args);
724  // Try to register the subchannel before setting the subchannel pool.
725  // Otherwise, in case of a registration race, unreffing c in
726  // RegisterSubchannel() will cause c to be tried to be unregistered, while
727  // its key maps to a different subchannel.
728  RefCountedPtr<Subchannel> registered =
729  subchannel_pool->RegisterSubchannel(c->key_, c);
730  if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
731  return registered;
732 }
733 
734 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
735  MutexLock lock(&mu_);
736  // Only update the value if the new keepalive time is larger.
737  if (new_keepalive_time > keepalive_time_) {
738  keepalive_time_ = new_keepalive_time;
740  gpr_log(GPR_INFO, "subchannel %p %s: throttling keepalive time to %d",
741  this, key_.ToString().c_str(), new_keepalive_time);
742  }
743  const grpc_arg arg_to_add = grpc_channel_arg_integer_create(
744  const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time);
745  const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS;
747  args_, &arg_to_remove, 1, &arg_to_add, 1);
749  args_ = new_args;
750  }
751 }
752 
754  return channelz_node_.get();
755 }
756 
758  const absl::optional<std::string>& health_check_service_name,
760  MutexLock lock(&mu_);
761  grpc_pollset_set* interested_parties = watcher->interested_parties();
762  if (interested_parties != nullptr) {
763  grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
764  }
765  if (!health_check_service_name.has_value()) {
767  watcher_list_.AddWatcherLocked(std::move(watcher));
768  } else {
769  health_watcher_map_.AddWatcherLocked(
770  WeakRef(DEBUG_LOCATION, "health_watcher"), *health_check_service_name,
771  std::move(watcher));
772  }
773 }
774 
776  const absl::optional<std::string>& health_check_service_name,
778  MutexLock lock(&mu_);
779  grpc_pollset_set* interested_parties = watcher->interested_parties();
780  if (interested_parties != nullptr) {
781  grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
782  }
783  if (!health_check_service_name.has_value()) {
784  watcher_list_.RemoveWatcherLocked(watcher);
785  } else {
786  health_watcher_map_.RemoveWatcherLocked(*health_check_service_name,
787  watcher);
788  }
789 }
790 
792  MutexLock lock(&mu_);
793  if (state_ == GRPC_CHANNEL_IDLE) {
795  }
796 }
797 
799  // Hold a ref to ensure cancellation and subsequent deletion of the closure
800  // does not eliminate the last ref and destroy the Subchannel before the
801  // method returns.
802  auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff");
803  MutexLock lock(&mu_);
804  backoff_.Reset();
806  GetDefaultEventEngine()->Cancel(retry_timer_handle_)) {
808  } else if (state_ == GRPC_CHANNEL_CONNECTING) {
809  next_attempt_time_ = ExecCtx::Get()->Now();
810  }
811 }
812 
814  // The subchannel_pool is only used once here in this subchannel, so the
815  // access can be outside of the lock.
816  if (subchannel_pool_ != nullptr) {
817  subchannel_pool_->UnregisterSubchannel(key_, this);
818  subchannel_pool_.reset();
819  }
820  MutexLock lock(&mu_);
822  shutdown_ = true;
823  connector_.reset();
824  connected_subchannel_.reset();
825  health_watcher_map_.ShutdownLocked();
826 }
827 
829  MutexLock lock(&mu_);
830  auto& entry = data_producer_map_[data_producer->type()];
831  GPR_ASSERT(entry == nullptr);
832  entry = data_producer;
833 }
834 
836  MutexLock lock(&mu_);
837  auto it = data_producer_map_.find(data_producer->type());
838  GPR_ASSERT(it != data_producer_map_.end());
839  GPR_ASSERT(it->second == data_producer);
840  data_producer_map_.erase(it);
841 }
842 
845  MutexLock lock(&mu_);
846  auto it = data_producer_map_.find(type);
847  if (it == data_producer_map_.end()) return nullptr;
848  return it->second;
849 }
850 
851 namespace {
852 
853 // Returns a string indicating the subchannel's connectivity state change to
854 // \a state.
855 const char* SubchannelConnectivityStateChangeString(
857  switch (state) {
858  case GRPC_CHANNEL_IDLE:
859  return "Subchannel state change to IDLE";
861  return "Subchannel state change to CONNECTING";
862  case GRPC_CHANNEL_READY:
863  return "Subchannel state change to READY";
865  return "Subchannel state change to TRANSIENT_FAILURE";
867  return "Subchannel state change to SHUTDOWN";
868  }
869  GPR_UNREACHABLE_CODE(return "UNKNOWN");
870 }
871 
872 } // namespace
873 
874 // Note: Must be called with a state that is different from the current state.
876  const absl::Status& status) {
877  state_ = state;
878  status_ = status;
879  if (channelz_node_ != nullptr) {
880  channelz_node_->UpdateConnectivityState(state);
881  channelz_node_->AddTraceEvent(
882  channelz::ChannelTrace::Severity::Info,
884  SubchannelConnectivityStateChangeString(state)));
885  }
886  // Notify non-health watchers.
887  watcher_list_.NotifyLocked(state, status);
888  // Notify health watchers.
889  health_watcher_map_.NotifyLocked(state, status);
890 }
891 
893  MutexLock lock(&mu_);
895 }
896 
898  if (shutdown_) return;
899  gpr_log(GPR_INFO, "subchannel %p %s: backoff delay elapsed, reporting IDLE",
900  this, key_.ToString().c_str());
902 }
903 
905  // Set next attempt time.
906  const Timestamp min_deadline = min_connect_timeout_ + ExecCtx::Get()->Now();
907  next_attempt_time_ = backoff_.NextAttemptTime();
908  // Report CONNECTING.
910  // Start connection attempt.
912  args.address = &address_for_connect_;
913  args.interested_parties = pollset_set_;
914  args.deadline = std::max(next_attempt_time_, min_deadline);
915  args.channel_args = args_;
916  WeakRef(DEBUG_LOCATION, "Connect").release(); // Ref held by callback.
918 }
919 
921  WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
922  const grpc_channel_args* delete_channel_args =
923  c->connecting_result_.channel_args;
924  {
925  MutexLock lock(&c->mu_);
926  c->OnConnectingFinishedLocked(GRPC_ERROR_REF(error));
927  }
928  grpc_channel_args_destroy(delete_channel_args);
929  c.reset(DEBUG_LOCATION, "Connect");
930 }
931 
933  if (shutdown_) {
934  (void)GRPC_ERROR_UNREF(error);
935  return;
936  }
937  // If we didn't get a transport or we fail to publish it, report
938  // TRANSIENT_FAILURE and start the retry timer.
939  // Note that if the connection attempt took longer than the backoff
940  // time, then the timer will fire immediately, and we will quickly
941  // transition back to IDLE.
942  if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
943  const Duration time_until_next_attempt =
944  next_attempt_time_ - ExecCtx::Get()->Now();
946  "subchannel %p %s: connect failed (%s), backing off for %" PRId64
947  " ms",
948  this, key_.ToString().c_str(), grpc_error_std_string(error).c_str(),
949  time_until_next_attempt.millis());
952  retry_timer_handle_ = GetDefaultEventEngine()->RunAfter(
953  time_until_next_attempt,
954  [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
955  {
956  ApplicationCallbackExecCtx callback_exec_ctx;
958  self->OnRetryTimer();
959  // Subchannel deletion might require an active ExecCtx. So if
960  // self.reset() is not called here, the WeakRefCountedPtr destructor
961  // may run after the ExecCtx declared in the callback is destroyed.
962  // Since subchannel may get destroyed when the WeakRefCountedPtr
963  // destructor runs, it may not have an active ExecCtx - thus leading
964  // to crashes.
965  self.reset();
966  }
967  });
968  }
969  (void)GRPC_ERROR_UNREF(error);
970 }
971 
973  // Construct channel stack.
976  .SetTransport(connecting_result_.transport);
977  if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
978  return false;
979  }
981  if (!stk.ok()) {
982  auto error = absl_status_to_grpc_error(stk.status());
985  "subchannel %p %s: error initializing subchannel stack: %s", this,
988  return false;
989  }
993  if (shutdown_) return false;
994  // Publish.
995  connected_subchannel_.reset(
996  new ConnectedSubchannel(stk->release(), args_, channelz_node_));
998  gpr_log(GPR_INFO, "subchannel %p %s: new connected subchannel at %p", this,
999  key_.ToString().c_str(), connected_subchannel_.get());
1000  }
1001  if (channelz_node_ != nullptr) {
1002  channelz_node_->SetChildSocket(std::move(socket));
1003  }
1004  // Start watching connected subchannel.
1005  connected_subchannel_->StartWatch(
1006  pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(
1007  WeakRef(DEBUG_LOCATION, "state_watcher")));
1008  // Report initial state.
1010  return true;
1011 }
1012 
1013 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_arg
Definition: grpc_types.h:103
trace.h
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::Orphan
void Orphan() override
Definition: subchannel.cc:469
grpc_core::SubchannelConnector::Args
Definition: connector.h:41
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
absl::StatusOr::value_or
T value_or(U &&default_value) const &
grpc_core::Subchannel::AsyncWatcherNotifierLocked::closure_
grpc_closure closure_
Definition: subchannel.cc:379
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
grpc_core::Subchannel::ConnectivityStateWatcherList::AddWatcherLocked
void AddWatcherLocked(RefCountedPtr< ConnectivityStateWatcherInterface > watcher)
Definition: subchannel.cc:386
regen-readme.it
it
Definition: regen-readme.py:15
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
core_configuration.h
backoff.h
sockaddr_utils.h
grpc_core::Subchannel::Subchannel
Subchannel(SubchannelKey key, OrphanablePtr< SubchannelConnector > connector, const grpc_channel_args *args)
Definition: subchannel.cc:641
grpc_core::ConnectedSubchannel
Definition: subchannel.h:67
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::subchannel_
WeakRefCountedPtr< Subchannel > subchannel_
Definition: subchannel.cc:494
grpc_core::Subchannel::subchannel_pool_
RefCountedPtr< SubchannelPoolInterface > subchannel_pool_
Definition: subchannel.h:368
grpc_core::DebugLocation
Definition: debug_location.h:31
grpc_core::Subchannel::ThrottleKeepaliveTime
void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:734
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
deadline_
Timestamp deadline_
Definition: channel_connectivity.cc:163
grpc_channel_stack
Definition: channel_stack.h:202
connectivity_state.h
grpc_channel_stack_element
grpc_channel_element * grpc_channel_stack_element(grpc_channel_stack *channel_stack, size_t index)
Definition: channel_stack.cc:78
grpc_core::Subchannel::AsyncWatcherNotifierLocked::AsyncWatcherNotifierLocked
AsyncWatcherNotifierLocked(RefCountedPtr< Subchannel::ConnectivityStateWatcherInterface > watcher, grpc_connectivity_state state, const absl::Status &status)
Definition: subchannel.cc:359
grpc_core::Subchannel::ConnectivityStateWatcherList::NotifyLocked
void NotifyLocked(grpc_connectivity_state state, const absl::Status &status)
Definition: subchannel.cc:396
grpc_core::SubchannelCall::connected_subchannel_
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
Definition: subchannel.h:150
grpc_core::MetadataMap::get
absl::optional< typename Which::ValueType > get(Which) const
Definition: metadata_batch.h:1067
timers.h
grpc_core::Subchannel::RemoveDataProducer
void RemoveDataProducer(DataProducerInterface *data_producer) ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:835
grpc_core::SubchannelCall::StartTransportStreamOpBatch
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch *batch)
Definition: subchannel.cc:178
grpc_channel_arg_get_integer
int grpc_channel_arg_get_integer(const grpc_arg *arg, const grpc_integer_options options)
Definition: channel_args.cc:405
grpc_core::MakeHealthCheckClient
OrphanablePtr< SubchannelStreamClient > MakeHealthCheckClient(std::string service_name, RefCountedPtr< ConnectedSubchannel > connected_subchannel, grpc_pollset_set *interested_parties, RefCountedPtr< channelz::SubchannelNode > channelz_node, RefCountedPtr< ConnectivityStateWatcherInterface > watcher)
Definition: health_check_client.cc:160
slice.h
grpc_core::Subchannel::args_
grpc_channel_args * args_
Definition: subchannel.h:375
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::health_check_client_
OrphanablePtr< SubchannelStreamClient > health_check_client_
Definition: subchannel.cc:496
subchannel.h
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_core::SubchannelCall::IncrementRefCount
void IncrementRefCount()
Definition: subchannel.cc:293
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_core::Subchannel::OnConnectingFinished
static void OnConnectingFinished(void *arg, grpc_error_handle error) ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:920
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::state_
grpc_connectivity_state state_
Definition: subchannel.cc:497
grpc_call_stack_set_pollset_or_pollset_set
void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack *call_stack, grpc_polling_entity *pollent)
Definition: channel_stack.cc:219
grpc_core::Subchannel::DataProducerInterface::type
virtual UniqueTypeName type() const =0
grpc_core::Subchannel::connecting_result_
SubchannelConnector::Result connecting_result_
Definition: subchannel.h:385
grpc_core::Subchannel::AddDataProducer
void AddDataProducer(DataProducerInterface *data_producer) ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:828
grpc_core::DualRefCounted
Definition: dual_ref_counted.h:48
grpc_pollset_set_create
grpc_pollset_set * grpc_pollset_set_create()
Definition: pollset_set.cc:29
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc_core::Subchannel::min_connect_timeout_
Duration min_connect_timeout_
Definition: subchannel.h:381
GRPC_CHANNEL_STACK_UNREF
#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason)
Definition: channel_stack.h:299
grpc_core::Subchannel::ConnectedSubchannelStateWatcher
Definition: subchannel.cc:306
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
subchannel
RingHashSubchannelData * subchannel
Definition: ring_hash.cc:285
grpc_channel_element
Definition: channel_stack.h:186
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
grpc_core::ChannelArgs::FromC
static ChannelArgs FromC(const grpc_channel_args *args)
Definition: channel_args.cc:84
grpc_core::ConnectedSubchannel::args_
grpc_channel_args * args_
Definition: subchannel.h:89
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
grpc_resolved_address
Definition: resolved_address.h:34
connected_subchannel_
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
Definition: oob_backend_metric.cc:113
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
grpc_core::Subchannel::ConnectivityStateWatcherList::watchers_
std::map< ConnectivityStateWatcherInterface *, RefCountedPtr< ConnectivityStateWatcherInterface > > watchers_
Definition: subchannel.h:312
grpc_channel_args_copy_and_add_and_remove
grpc_channel_args * grpc_channel_args_copy_and_add_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:246
subchannel_pool_interface.h
grpc_core::SubchannelCall::MaybeInterceptRecvTrailingMetadata
void MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch *batch)
Definition: subchannel.cc:237
status
absl::Status status
Definition: rls.cc:251
grpc_core::SubchannelPoolInterface::FindSubchannel
virtual RefCountedPtr< Subchannel > FindSubchannel(const SubchannelKey &key)=0
grpc_core::ApplicationCallbackExecCtx
Definition: exec_ctx.h:283
grpc_core::DualRefCounted< Subchannel >::WeakRef
WeakRefCountedPtr< Subchannel > WeakRef() GRPC_MUST_USE_RESULT
Definition: dual_ref_counted.h:149
GRPC_ARG_ENABLE_CHANNELZ
#define GRPC_ARG_ENABLE_CHANNELZ
Definition: grpc_types.h:323
grpc_call_stack_init
grpc_error_handle grpc_call_stack_init(grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_call_element_args *elem_args)
Definition: channel_stack.cc:180
subchannel_
RefCountedPtr< Subchannel > subchannel_
Definition: oob_backend_metric.cc:112
args_
grpc_channel_args * args_
Definition: grpclb.cc:513
grpc_core::Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange
ConnectivityStateChange PopConnectivityStateChange()
Definition: subchannel.cc:574
grpc_core::grpc_trace_subchannel_refcount
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount")
grpc_call_stack_element
grpc_call_element * grpc_call_stack_element(grpc_call_stack *call_stack, size_t index)
Definition: channel_stack.cc:100
GRPC_CLIENT_SUBCHANNEL
@ GRPC_CLIENT_SUBCHANNEL
Definition: channel_stack_type.h:29
grpc_core::Subchannel::HealthWatcherMap::ShutdownLocked
void NotifyLocked(grpc_connectivity_state state, const absl::Status &status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel grpc_connectivity_state CheckConnectivityStateLocked(Subchannel *subchannel, const std::string &health_check_service_name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel voi ShutdownLocked)()
Definition: subchannel.h:341
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::RemoveWatcherLocked
void RemoveWatcherLocked(Subchannel::ConnectivityStateWatcherInterface *watcher)
Definition: subchannel.cc:439
grpc_call_element
Definition: channel_stack.h:194
grpc_core::SubchannelCall::recv_trailing_metadata_
grpc_metadata_batch * recv_trailing_metadata_
Definition: subchannel.h:155
grpc_core::Subchannel::SetConnectivityStateLocked
void SetConnectivityStateLocked(grpc_connectivity_state state, const absl::Status &status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: subchannel.cc:875
grpc_core::Subchannel::ConnectedSubchannelStateWatcher::OnConnectivityStateChange
void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status &status) override
Definition: subchannel.cc:318
channelz.h
grpc_event_engine::experimental::EventEngine::RunAfter
virtual TaskHandle RunAfter(Duration when, Closure *closure)=0
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED
#define GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED()
Definition: stats_data.h:184
grpc_core::SubchannelCall::Ref
RefCountedPtr< SubchannelCall > Ref() GRPC_MUST_USE_RESULT
Definition: subchannel.cc:198
grpc_pollset_set_del_pollset_set
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:52
grpc_core::Arena
Definition: src/core/lib/resource_quota/arena.h:45
grpc_channel_args_find_bool
bool grpc_channel_args_find_bool(const grpc_channel_args *args, const char *name, bool default_value)
Definition: channel_args.cc:465
health_check_client.h
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
GRPC_CALL_LOG_OP
#define GRPC_CALL_LOG_OP(sev, elem, op)
Definition: channel_stack.h:374
stats.h
grpc_transport_op
Definition: transport.h:452
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
call
FilterStackCall * call
Definition: call.cc:750
alloc.h
status.h
grpc_core::SubchannelCall::GetCallStack
grpc_call_stack * GetCallStack()
Definition: subchannel.cc:188
GRPC_ENABLE_CHANNELZ_DEFAULT
#define GRPC_ENABLE_CHANNELZ_DEFAULT
Definition: channelz.h:57
grpc_core::SubchannelConnector::Result::transport
grpc_transport * transport
Definition: connector.h:54
grpc_core::Subchannel::RequestConnection
void RequestConnection() ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:791
grpc_core::Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked
void RemoveWatcherLocked(ConnectivityStateWatcherInterface *watcher)
Definition: subchannel.cc:391
grpc_channel_stack::call_stack_size
size_t call_stack_size
Definition: channel_stack.h:208
GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS
#define GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS
Definition: grpc_types.h:263
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
GPR_ROUND_UP_TO_ALIGNMENT_SIZE
#define GPR_ROUND_UP_TO_ALIGNMENT_SIZE(x)
Given a size, round up to the next multiple of sizeof(void*).
Definition: src/core/lib/gpr/alloc.h:25
GRPC_SUBCHANNEL_RECONNECT_JITTER
#define GRPC_SUBCHANNEL_RECONNECT_JITTER
Definition: subchannel.cc:76
GRPC_CALL_STACK_UNREF
#define GRPC_CALL_STACK_UNREF(call_stack, reason)
Definition: channel_stack.h:295
grpc_core::Subchannel::channelz_node
channelz::SubchannelNode * channelz_node()
Definition: subchannel.cc:753
GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS
#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS
Definition: subchannel.cc:74
grpc_core::Subchannel::address_for_connect_
grpc_resolved_address address_for_connect_
Definition: subchannel.h:373
grpc_core::Subchannel::WatchConnectivityState
void WatchConnectivityState(const absl::optional< std::string > &health_check_service_name, RefCountedPtr< ConnectivityStateWatcherInterface > watcher) ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:757
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::HasWatchers
bool HasWatchers() const
Definition: subchannel.cc:444
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
channel_init.h
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
GRPC_STATUS_OK
@ GRPC_STATUS_OK
Definition: include/grpc/impl/codegen/status.h:30
grpc_core::RefCountedPtr
Definition: ref_counted_ptr.h:35
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::Subchannel::ConnectedSubchannelStateWatcher::~ConnectedSubchannelStateWatcher
~ConnectedSubchannelStateWatcher() override
Definition: subchannel.cc:313
grpc_core::CoreConfiguration::Get
static const CoreConfiguration & Get()
Definition: core_configuration.h:82
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
grpc_core::Subchannel::ConnectedSubchannelStateWatcher::ConnectedSubchannelStateWatcher
ConnectedSubchannelStateWatcher(WeakRefCountedPtr< Subchannel > c)
Definition: subchannel.cc:310
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_core::Subchannel::key_
const SubchannelKey key_
Definition: subchannel.h:370
grpc_core::grpc_trace_subchannel
TraceFlag grpc_trace_subchannel(false, "subchannel")
grpc_core::SubchannelCall::Args
Definition: subchannel.h:98
grpc_core::SubchannelCall::RecvTrailingMetadataReady
static void RecvTrailingMetadataReady(void *arg, grpc_error_handle error)
Definition: subchannel.cc:274
channel_stack.h
SUBCHANNEL_CALL_TO_CALL_STACK
#define SUBCHANNEL_CALL_TO_CALL_STACK(call)
Definition: subchannel.cc:79
grpc_call_stack
Definition: channel_stack.h:233
grpc_call_element::filter
const grpc_channel_filter * filter
Definition: channel_stack.h:195
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_core::Subchannel::OnRetryTimerLocked
void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: subchannel.cc:897
watchers_
std::map< SubchannelInterface::ConnectivityStateWatcherInterface *, WatcherWrapper * > watchers_
Definition: outlier_detection.cc:226
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
grpc_core::SubchannelCall::recv_trailing_metadata_ready_
grpc_closure recv_trailing_metadata_ready_
Definition: subchannel.h:153
grpc_core::ProxyMapperRegistry::MapAddress
static bool MapAddress(const grpc_resolved_address &address, const grpc_channel_args *args, grpc_resolved_address **new_address, grpc_channel_args **new_args)
Definition: proxy_mapper_registry.cc:77
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
grpc_transport_destroy
void grpc_transport_destroy(grpc_transport *transport)
Definition: transport.cc:96
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT
#define GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT
Definition: channelz.h:63
grpc_core::Subchannel::DataProducerInterface
Definition: subchannel.h:212
grpc.h
grpc_pollset_set_destroy
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set)
Definition: pollset_set.cc:33
grpc_core::Subchannel::~Subchannel
~Subchannel() override
Definition: subchannel.cc:698
grpc_core::SubchannelCall::after_call_stack_destroy_
grpc_closure * after_call_stack_destroy_
Definition: subchannel.h:151
grpc_core::SubchannelCall::Create
static RefCountedPtr< SubchannelCall > Create(Args args, grpc_error_handle *error)
Definition: subchannel.cc:142
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
#define GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
Definition: grpc_types.h:261
grpc_core::SubchannelCall::SetAfterCallStackDestroy
void SetAfterCallStackDestroy(grpc_closure *closure)
Definition: subchannel.cc:192
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::HealthWatcher
HealthWatcher(WeakRefCountedPtr< Subchannel > c, std::string health_check_service_name)
Definition: subchannel.cc:412
grpc_core::Subchannel::ResetBackoff
void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:798
grpc_core::ConnectedSubchannel::StartWatch
void StartWatch(grpc_pollset_set *interested_parties, OrphanablePtr< ConnectivityStateWatcherInterface > watcher)
Definition: subchannel.cc:112
absl::optional< std::string >
grpc_core::DualRefCounted< Subchannel >::Ref
RefCountedPtr< Subchannel > Ref() GRPC_MUST_USE_RESULT
Definition: dual_ref_counted.h:52
pollset_set.h
GRPC_CHANNEL_IDLE
@ GRPC_CHANNEL_IDLE
Definition: include/grpc/impl/codegen/connectivity_state.h:32
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS
Definition: subchannel.cc:72
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
arg
Definition: cmdline.cc:40
grpc_slice_from_static_string
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
Definition: slice/slice.cc:89
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::health_check_service_name_
std::string health_check_service_name_
Definition: subchannel.cc:495
grpc_transport_stream_op_batch::payload
grpc_transport_stream_op_batch_payload * payload
Definition: transport.h:307
grpc_core::Subchannel::connector_
OrphanablePtr< SubchannelConnector > connector_
Definition: subchannel.h:384
grpc_channel_args_copy
grpc_channel_args * grpc_channel_args_copy(const grpc_channel_args *src)
Definition: channel_args.cc:285
grpc_call_stack_destroy
void grpc_call_stack_destroy(grpc_call_stack *stack, const grpc_call_final_info *final_info, grpc_closure *then_schedule_closure)
Definition: channel_stack.cc:236
status_
absl::Status status_
Definition: outlier_detection.cc:404
grpc_make_transport_op
grpc_transport_op * grpc_make_transport_op(grpc_closure *on_complete)
Definition: transport.cc:205
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::watcher_list_
ConnectivityStateWatcherList watcher_list_
Definition: subchannel.cc:499
grpc_core::ChannelStackBuilderImpl
Definition: channel_stack_builder_impl.h:33
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::StartHealthCheckingLocked
void StartHealthCheckingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_ -> mu_)
Definition: subchannel.cc:486
channel_stack_
RefCountedPtr< grpc_channel_stack > channel_stack_
Definition: filter_fuzzer.cc:570
grpc_call_element_args
Definition: channel_stack.h:80
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
grpc_core::SubchannelConnector::Result::Reset
void Reset()
Definition: connector.h:60
grpc_core::Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
Definition: subchannel.h:171
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
grpc_core::SubchannelKey::ToString
std::string ToString() const
Definition: subchannel_pool_interface.cc:93
grpc_core::RefCounted
Definition: ref_counted.h:280
channel_stack_type.h
grpc_core::SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs
static SubchannelPoolInterface * GetSubchannelPoolFromChannelArgs(const grpc_channel_args *args)
Definition: subchannel_pool_interface.cc:129
grpc_core::Subchannel::Create
static RefCountedPtr< Subchannel > Create(OrphanablePtr< SubchannelConnector > connector, const grpc_resolved_address &address, const grpc_channel_args *args)
Definition: subchannel.cc:712
grpc_core::DualRefCounted< Subchannel >::Unref
void Unref()
Definition: dual_ref_counted.h:63
grpc_core::Subchannel::StartConnectingLocked
void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: subchannel.cc:904
grpc_core::Subchannel::OnRetryTimer
void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:892
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_core::SubchannelConnector::Result::socket_node
RefCountedPtr< channelz::SocketNode > socket_node
Definition: connector.h:58
GRPC_ARG_MIN_RECONNECT_BACKOFF_MS
#define GRPC_ARG_MIN_RECONNECT_BACKOFF_MS
Definition: grpc_types.h:259
grpc_core::SubchannelConnector::Result::channel_args
const grpc_channel_args * channel_args
Definition: connector.h:56
grpc_core::ConnectedSubchannel::ConnectedSubchannel
ConnectedSubchannel(grpc_channel_stack *channel_stack, const grpc_channel_args *args, RefCountedPtr< channelz::SubchannelNode > channelz_subchannel)
Definition: subchannel.cc:96
subchannel_stream_client.h
GRPC_CHANNEL_CONNECTING
@ GRPC_CHANNEL_CONNECTING
Definition: include/grpc/impl/codegen/connectivity_state.h:34
grpc_core::Subchannel::HealthWatcherMap::AddWatcherLocked
void AddWatcherLocked(WeakRefCountedPtr< Subchannel > subchannel, const std::string &health_check_service_name, RefCountedPtr< ConnectivityStateWatcherInterface > watcher)
Definition: subchannel.cc:506
grpc_channel_filter::start_transport_stream_op_batch
void(* start_transport_stream_op_batch)(grpc_call_element *elem, grpc_transport_stream_op_batch *op)
Definition: channel_stack.h:114
grpc_core::Subchannel::GetDataProducer
DataProducerInterface * GetDataProducer(UniqueTypeName type) ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:843
grpc_core::Subchannel::CancelConnectivityStateWatch
void CancelConnectivityStateWatch(const absl::optional< std::string > &health_check_service_name, ConnectivityStateWatcherInterface *watcher) ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:775
grpc_core::SubchannelKey
Definition: subchannel_pool_interface.h:40
GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER
Definition: subchannel.cc:73
grpc_core::Duration::Milliseconds
static constexpr Duration Milliseconds(int64_t millis)
Definition: src/core/lib/gprpp/time.h:155
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::OnConnectivityStateChange
void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status &status) override
Definition: subchannel.cc:476
grpc_core::ConnectedSubchannel::Ping
void Ping(grpc_closure *on_initiate, grpc_closure *on_ack)
Definition: subchannel.cc:123
grpc_core::Subchannel::AsyncWatcherNotifierLocked::watcher_
RefCountedPtr< Subchannel::ConnectivityStateWatcherInterface > watcher_
Definition: subchannel.cc:378
grpc_core::Subchannel::HealthWatcherMap::RemoveWatcherLocked
void RemoveWatcherLocked(const std::string &health_check_service_name, ConnectivityStateWatcherInterface *watcher)
Definition: subchannel.cc:526
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::~HealthWatcher
~HealthWatcher() override
Definition: subchannel.cc:423
absl::StatusOr::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: abseil-cpp/absl/status/statusor.h:491
grpc_core::ConnectedSubchannel::~ConnectedSubchannel
~ConnectedSubchannel() override
Definition: subchannel.cc:107
shutdown_
bool shutdown_
Definition: pick_first.cc:173
grpc_core::Subchannel
Definition: subchannel.h:166
grpc_core::ConnectivityStateName
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:38
grpc_core::Duration::millis
constexpr int64_t millis() const
Definition: src/core/lib/gprpp/time.h:208
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
absl_status_to_grpc_error
grpc_error_handle absl_status_to_grpc_error(absl::Status status)
Definition: error_utils.cc:167
debug_location.h
key
const char * key
Definition: hpack_parser_table.cc:164
grpc_core::UniqueTypeName
Definition: unique_type_name.h:56
grpc_core::Subchannel::OnConnectingFinishedLocked
void OnConnectingFinishedLocked(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: subchannel.cc:932
grpc_core::SubchannelCall::original_recv_trailing_metadata_
grpc_closure * original_recv_trailing_metadata_
Definition: subchannel.h:154
grpc_core::SubchannelPoolInterface
Definition: subchannel_pool_interface.h:76
grpc_channel_arg_integer_create
grpc_arg grpc_channel_arg_integer_create(char *name, int value)
Definition: channel_args.cc:484
grpc_core::SubchannelKey::address
const grpc_resolved_address & address() const
Definition: subchannel_pool_interface.h:55
grpc_core::SubchannelPoolInterface::RegisterSubchannel
virtual RefCountedPtr< Subchannel > RegisterSubchannel(const SubchannelKey &key, RefCountedPtr< Subchannel > constructed)=0
GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE
#define GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE
Definition: grpc_types.h:318
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
grpc_core::Subchannel::ConnectivityStateWatcherInterface
Definition: subchannel.h:168
grpc_core::channelz::SubchannelNode
Definition: client_channel_channelz.h:44
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_core::Subchannel::mu_
Mutex mu_
Definition: subchannel.h:389
alloc.h
grpc_core::OrphanablePtr
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:64
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::NotifyLocked
void NotifyLocked(grpc_connectivity_state state, const absl::Status &status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_ -> mu_)
Definition: subchannel.cc:446
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
grpc_core::WeakRefCountedPtr
Definition: ref_counted_ptr.h:185
grpc_pollset_set_add_pollset_set
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
Definition: pollset_set.cc:47
grpc_core::Duration::Seconds
static constexpr Duration Seconds(int64_t seconds)
Definition: src/core/lib/gprpp/time.h:151
grpc_core::Subchannel::ConnectivityStateWatcherList
Definition: subchannel.h:291
grpc_core::SubchannelCall::Unref
void Unref()
Definition: subchannel.cc:209
arg
struct arg arg
grpc_event_engine::experimental::GetDefaultEventEngine
EventEngine * GetDefaultEventEngine()
Definition: event_engine.cc:47
state
Definition: bloaty/third_party/zlib/contrib/blast/blast.c:41
exec_ctx.h
closure
Definition: proxy.cc:59
grpc_core::ConnectedSubchannel::channel_stack_
grpc_channel_stack * channel_stack_
Definition: subchannel.h:88
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
ref_counted_ptr.h
grpc_core::Subchannel::AsyncWatcherNotifierLocked
Definition: subchannel.cc:357
state_
grpc_connectivity_state state_
Definition: channel_connectivity.cc:213
watcher
ClusterWatcher * watcher
Definition: cds.cc:148
channel_args.h
grpc_core::SubchannelCall
Definition: subchannel.h:96
grpc_core::Subchannel::Orphan
void Orphan() override ABSL_LOCKS_EXCLUDED(mu_)
Definition: subchannel.cc:813
grpc_core::Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange
void PushConnectivityStateChange(ConnectivityStateChange state_change)
Definition: subchannel.cc:567
grpc_core::channelz::SubchannelNode::RecordCallFailed
void RecordCallFailed()
Definition: client_channel_channelz.h:70
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::AddWatcherLocked
void AddWatcherLocked(RefCountedPtr< Subchannel::ConnectivityStateWatcherInterface > watcher)
Definition: subchannel.cc:433
grpc_transport_stream_op_batch::recv_trailing_metadata
bool recv_trailing_metadata
Definition: transport.h:326
grpc_core::Subchannel::ConnectedSubchannelStateWatcher::subchannel_
WeakRefCountedPtr< Subchannel > subchannel_
Definition: subchannel.cc:352
channel_stack_builder_impl.h
grpc_core::channelz::SubchannelNode::RecordCallSucceeded
void RecordCallSucceeded()
Definition: client_channel_channelz.h:71
grpc_error_get_status
void grpc_error_get_status(grpc_error_handle error, grpc_core::Timestamp deadline, grpc_status_code *code, std::string *message, grpc_http2_error_code *http_error, const char **error_string)
Definition: error_utils.cc:67
test_server.socket
socket
Definition: test_server.py:65
grpc_core::Subchannel::on_connecting_finished_
grpc_closure on_connecting_finished_
Definition: subchannel.h:386
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::state
grpc_connectivity_state state() const
Definition: subchannel.cc:431
GRPC_CHANNEL_SHUTDOWN
@ GRPC_CHANNEL_SHUTDOWN
Definition: include/grpc/impl/codegen/connectivity_state.h:40
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher
Definition: subchannel.cc:409
grpc_core::ConnectedSubchannel::GetInitialCallSizeEstimate
size_t GetInitialCallSizeEstimate() const
Definition: subchannel.cc:133
grpc_transport_stream_op_batch_payload::recv_trailing_metadata
grpc_metadata_batch * recv_trailing_metadata
Definition: transport.h:425
absl::StatusOr
Definition: abseil-cpp/absl/status/statusor.h:187
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
GRPC_ARG_KEEPALIVE_TIME_MS
#define GRPC_ARG_KEEPALIVE_TIME_MS
Definition: grpc_types.h:240
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
grpc_core::SubchannelCall::Destroy
static void Destroy(void *arg, grpc_error_handle error)
Definition: subchannel.cc:218
grpc_error
Definition: error_internal.h:42
event_engine_factory.h
grpc_core::Subchannel::pollset_set_
grpc_pollset_set * pollset_set_
Definition: subchannel.h:377
channel_trace.h
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
GRPC_CALL_STACK_REF
#define GRPC_CALL_STACK_REF(call_stack, reason)
Definition: channel_stack.h:293
grpc_error_to_absl_status
absl::Status grpc_error_to_absl_status(grpc_error_handle error)
Definition: error_utils.cc:156
proxy_mapper_registry.h
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
grpc_channel_args_find_integer
int grpc_channel_args_find_integer(const grpc_channel_args *args, const char *name, const grpc_integer_options options)
Definition: channel_args.cc:425
grpc_core::BackOff::Options
Definition: backoff.h:46
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::health_check_service_name
const std::string & health_check_service_name() const
Definition: subchannel.cc:427
grpc_metadata_batch
Definition: metadata_batch.h:1259
channel_stack_builder.h
grpc_core::AsyncConnectivityStateWatcherInterface
Definition: src/core/lib/transport/connectivity_state.h:64
grpc_transport_stream_op_batch
Definition: transport.h:284
grpc_core::Subchannel::channelz_node_
RefCountedPtr< channelz::SubchannelNode > channelz_node_
Definition: subchannel.h:379
grpc_core::SubchannelCall::SubchannelCall
SubchannelCall(Args args, grpc_error_handle *error)
Definition: subchannel.cc:151
grpc_closure
Definition: closure.h:56
grpc_core::Subchannel::HealthWatcherMap::HealthWatcher::status_
absl::Status status_
Definition: subchannel.cc:498
grpc_sockaddr_to_uri
absl::StatusOr< std::string > grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr)
Definition: sockaddr_utils.cc:260
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
grpc_core::DebugOnlyTraceFlag
TraceFlag DebugOnlyTraceFlag
Definition: debug/trace.h:117
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
grpc_core::Closure::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: closure.h:250
sync.h
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
grpc_core::Subchannel::PublishTransportLocked
bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
Definition: subchannel.cc:972
grpc_core::RefCounted::Ref
RefCountedPtr< Child > Ref() GRPC_MUST_USE_RESULT
Definition: ref_counted.h:287
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
state
static struct rpc_state state
Definition: bad_server_response_test.cc:87
GRPC_STATUS_UNKNOWN
@ GRPC_STATUS_UNKNOWN
Definition: include/grpc/impl/codegen/status.h:40
absl::StatusOr::status
const Status & status() const &
Definition: abseil-cpp/absl/status/statusor.h:678
error_utils.h
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS
Definition: subchannel.cc:75
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:23