Go to the documentation of this file.
28 #include <type_traits>
31 #include "absl/status/statusor.h"
68 #define INTERNAL_REF_BITS 16
69 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
72 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
73 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
74 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
75 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
76 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
79 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
80 (grpc_call_stack*)((char*)(call) + \
81 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
82 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
83 (SubchannelCall*)(((char*)(call_stack)) - \
84 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
101 ?
"ConnectedSubchannel"
105 channelz_subchannel_(
std::
move(channelz_subchannel)) {}
118 op->bind_pollset_set = interested_parties;
127 op->send_ping.on_initiate = on_initiate;
128 op->send_ping.on_ack = on_ack;
144 const size_t allocation_size =
145 args.connected_subchannel->GetInitialCallSizeEstimate();
173 if (channelz_node !=
nullptr) {
174 channelz_node->RecordCallStarted();
204 const char* reason) {
214 const char* reason) {
222 grpc_closure* after_call_stack_destroy =
self->after_call_stack_destroy_;
226 self->~SubchannelCall();
231 after_call_stack_destroy);
248 this, grpc_schedule_on_exec_ctx);
279 GetCallStatus(&
status,
call->deadline_,
call->recv_trailing_metadata_,
282 call->connected_subchannel_->channelz_subchannel();
298 const char* reason) {
330 if (c->connected_subchannel_ ==
nullptr)
return;
335 "subchannel %p %s: Connected subchannel %p reports %s: %s", c,
336 c->key_.ToString().c_str(), c->connected_subchannel_.get(),
339 c->connected_subchannel_.reset();
340 if (c->channelz_node() !=
nullptr) {
341 c->channelz_node()->SetChildSocket(
nullptr);
370 self->
watcher_->OnConnectivityStateChange();
415 health_check_service_name_(
std::
move(health_check_service_name)),
428 return health_check_service_name_;
441 watcher_list_.RemoveWatcherLocked(
watcher);
459 StartHealthCheckingLocked();
465 health_check_client_.reset();
470 watcher_list_.Clear();
471 health_check_client_.reset();
482 watcher_list_.NotifyLocked(new_state,
status);
490 health_check_service_name_,
subchannel_->connected_subchannel_,
512 auto it = map_.find(health_check_service_name);
514 if (
it == map_.end()) {
516 health_check_service_name);
517 health_watcher = w.get();
518 map_.emplace(health_check_service_name,
std::move(w));
520 health_watcher =
it->second.get();
529 auto it = map_.find(health_check_service_name);
531 it->second->RemoveWatcherLocked(
watcher);
534 if (!
it->second->HasWatchers()) map_.erase(
it);
539 for (
const auto& p : map_) {
545 Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
547 auto it = map_.find(health_check_service_name);
548 if (
it == map_.end()) {
557 HealthWatcher* health_watcher =
it->second.get();
558 return health_watcher->state();
570 connectivity_state_queue_.push_back(
std::move(state_change));
576 GPR_ASSERT(!connectivity_state_queue_.empty());
578 connectivity_state_queue_.pop_front();
592 *min_connect_timeout =
596 bool fixed_reconnect_backoff =
false;
597 if (
args !=
nullptr) {
598 for (
size_t i = 0;
i <
args->num_args;
i++) {
599 if (0 == strcmp(
args->args[i].key,
600 "grpc.testing.fixed_reconnect_backoff_ms")) {
601 fixed_reconnect_backoff =
true;
602 initial_backoff = *min_connect_timeout = max_backoff =
605 {static_cast<int>(initial_backoff.millis()), 100, INT_MAX}));
608 fixed_reconnect_backoff =
false;
609 *min_connect_timeout =
612 {static_cast<int>(min_connect_timeout->millis()), 100,
616 fixed_reconnect_backoff =
false;
619 {static_cast<int>(max_backoff.millis()), 100, INT_MAX}));
620 }
else if (0 == strcmp(
args->args[i].key,
622 fixed_reconnect_backoff =
false;
625 {static_cast<int>(initial_backoff.millis()), 100, INT_MAX}));
629 return BackOff::Options()
630 .set_initial_backoff(initial_backoff)
631 .set_multiplier(fixed_reconnect_backoff
634 .set_jitter(fixed_reconnect_backoff ? 0.0
636 .set_max_backoff(max_backoff);
662 grpc_schedule_on_exec_ctx);
674 if (new_args !=
nullptr) {
682 if (channelz_enabled) {
683 const size_t channel_tracer_max_memory =
690 .
value_or(
"<unknown address type>"),
691 channel_tracer_max_memory);
693 channelz::ChannelTrace::Severity::Info,
701 channelz::ChannelTrace::Severity::Info,
730 if (registered == c) c->subchannel_pool_ = subchannel_pool->
Ref();
737 if (new_keepalive_time > keepalive_time_) {
738 keepalive_time_ = new_keepalive_time;
747 args_, &arg_to_remove, 1, &arg_to_add, 1);
762 if (interested_parties !=
nullptr) {
765 if (!health_check_service_name.
has_value()) {
769 health_watcher_map_.AddWatcherLocked(
780 if (interested_parties !=
nullptr) {
783 if (!health_check_service_name.
has_value()) {
784 watcher_list_.RemoveWatcherLocked(
watcher);
786 health_watcher_map_.RemoveWatcherLocked(*health_check_service_name,
825 health_watcher_map_.ShutdownLocked();
830 auto& entry = data_producer_map_[data_producer->
type()];
832 entry = data_producer;
837 auto it = data_producer_map_.find(data_producer->
type());
840 data_producer_map_.erase(
it);
846 auto it = data_producer_map_.find(
type);
847 if (
it == data_producer_map_.end())
return nullptr;
855 const char* SubchannelConnectivityStateChangeString(
859 return "Subchannel state change to IDLE";
861 return "Subchannel state change to CONNECTING";
863 return "Subchannel state change to READY";
865 return "Subchannel state change to TRANSIENT_FAILURE";
867 return "Subchannel state change to SHUTDOWN";
882 channelz::ChannelTrace::Severity::Info,
884 SubchannelConnectivityStateChangeString(
state)));
899 gpr_log(
GPR_INFO,
"subchannel %p %s: backoff delay elapsed, reporting IDLE",
907 next_attempt_time_ = backoff_.NextAttemptTime();
914 args.deadline =
std::max(next_attempt_time_, min_deadline);
923 c->connecting_result_.channel_args;
943 const Duration time_until_next_attempt =
946 "subchannel %p %s: connect failed (%s), backing off for %" PRId64
949 time_until_next_attempt.
millis());
953 time_until_next_attempt,
958 self->OnRetryTimer();
985 "subchannel %p %s: error initializing subchannel stack: %s",
this,
998 gpr_log(
GPR_INFO,
"subchannel %p %s: new connected subchannel at %p",
this,
1006 pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
T value_or(U &&default_value) const &
void AddWatcherLocked(RefCountedPtr< ConnectivityStateWatcherInterface > watcher)
Subchannel(SubchannelKey key, OrphanablePtr< SubchannelConnector > connector, const grpc_channel_args *args)
WeakRefCountedPtr< Subchannel > subchannel_
RefCountedPtr< SubchannelPoolInterface > subchannel_pool_
void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_)
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
grpc_channel_element * grpc_channel_stack_element(grpc_channel_stack *channel_stack, size_t index)
AsyncWatcherNotifierLocked(RefCountedPtr< Subchannel::ConnectivityStateWatcherInterface > watcher, grpc_connectivity_state state, const absl::Status &status)
void NotifyLocked(grpc_connectivity_state state, const absl::Status &status)
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
void RemoveDataProducer(DataProducerInterface *data_producer) ABSL_LOCKS_EXCLUDED(mu_)
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch *batch)
int grpc_channel_arg_get_integer(const grpc_arg *arg, const grpc_integer_options options)
OrphanablePtr< SubchannelStreamClient > MakeHealthCheckClient(std::string service_name, RefCountedPtr< ConnectedSubchannel > connected_subchannel, grpc_pollset_set *interested_parties, RefCountedPtr< channelz::SubchannelNode > channelz_node, RefCountedPtr< ConnectivityStateWatcherInterface > watcher)
grpc_channel_args * args_
#define GPR_TIMER_SCOPE(tag, important)
OrphanablePtr< SubchannelStreamClient > health_check_client_
struct grpc_pollset_set grpc_pollset_set
static void OnConnectingFinished(void *arg, grpc_error_handle error) ABSL_LOCKS_EXCLUDED(mu_)
grpc_connectivity_state state_
void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack *call_stack, grpc_polling_entity *pollent)
virtual UniqueTypeName type() const =0
SubchannelConnector::Result connecting_result_
void AddDataProducer(DataProducerInterface *data_producer) ABSL_LOCKS_EXCLUDED(mu_)
grpc_pollset_set * grpc_pollset_set_create()
Duration min_connect_timeout_
#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason)
GPRAPI void gpr_free(void *ptr)
RingHashSubchannelData * subchannel
static ChannelArgs FromC(const grpc_channel_args *args)
grpc_channel_args * args_
@ GRPC_CHANNEL_TRANSIENT_FAILURE
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
std::map< ConnectivityStateWatcherInterface *, RefCountedPtr< ConnectivityStateWatcherInterface > > watchers_
grpc_channel_args * grpc_channel_args_copy_and_add_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add)
void MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch *batch)
virtual RefCountedPtr< Subchannel > FindSubchannel(const SubchannelKey &key)=0
WeakRefCountedPtr< Subchannel > WeakRef() GRPC_MUST_USE_RESULT
#define GRPC_ARG_ENABLE_CHANNELZ
grpc_error_handle grpc_call_stack_init(grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_call_element_args *elem_args)
RefCountedPtr< Subchannel > subchannel_
grpc_channel_args * args_
ConnectivityStateChange PopConnectivityStateChange()
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount")
grpc_call_element * grpc_call_stack_element(grpc_call_stack *call_stack, size_t index)
void NotifyLocked(grpc_connectivity_state state, const absl::Status &status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel grpc_connectivity_state CheckConnectivityStateLocked(Subchannel *subchannel, const std::string &health_check_service_name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel voi ShutdownLocked)()
void RemoveWatcherLocked(Subchannel::ConnectivityStateWatcherInterface *watcher)
grpc_metadata_batch * recv_trailing_metadata_
void SetConnectivityStateLocked(grpc_connectivity_state state, const absl::Status &status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status &status) override
virtual TaskHandle RunAfter(Duration when, Closure *closure)=0
#define GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED()
RefCountedPtr< SubchannelCall > Ref() GRPC_MUST_USE_RESULT
void grpc_pollset_set_del_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
bool grpc_channel_args_find_bool(const grpc_channel_args *args, const char *name, bool default_value)
#define GRPC_TRACE_FLAG_ENABLED(f)
#define GRPC_CALL_LOG_OP(sev, elem, op)
grpc_core::ScopedArenaPtr arena
grpc_call_stack * GetCallStack()
#define GRPC_ENABLE_CHANNELZ_DEFAULT
grpc_transport * transport
void RequestConnection() ABSL_LOCKS_EXCLUDED(mu_)
void RemoveWatcherLocked(ConnectivityStateWatcherInterface *watcher)
#define GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS
#define GPR_ROUND_UP_TO_ALIGNMENT_SIZE(x)
Given a size, round up to the next multiple of sizeof(void*).
#define GRPC_SUBCHANNEL_RECONNECT_JITTER
#define GRPC_CALL_STACK_UNREF(call_stack, reason)
channelz::SubchannelNode * channelz_node()
#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS
grpc_resolved_address address_for_connect_
void WatchConnectivityState(const absl::optional< std::string > &health_check_service_name, RefCountedPtr< ConnectivityStateWatcherInterface > watcher) ABSL_LOCKS_EXCLUDED(mu_)
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
~ConnectedSubchannelStateWatcher() override
static const CoreConfiguration & Get()
constexpr bool has_value() const noexcept
ConnectedSubchannelStateWatcher(WeakRefCountedPtr< Subchannel > c)
def c_str(s, encoding='ascii')
TraceFlag grpc_trace_subchannel(false, "subchannel")
static void RecvTrailingMetadataReady(void *arg, grpc_error_handle error)
#define SUBCHANNEL_CALL_TO_CALL_STACK(call)
const grpc_channel_filter * filter
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
std::map< SubchannelInterface::ConnectivityStateWatcherInterface *, WatcherWrapper * > watchers_
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
grpc_closure recv_trailing_metadata_ready_
static bool MapAddress(const grpc_resolved_address &address, const grpc_channel_args *args, grpc_resolved_address **new_address, grpc_channel_args **new_args)
void grpc_transport_destroy(grpc_transport *transport)
#define GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set)
grpc_closure * after_call_stack_destroy_
static RefCountedPtr< SubchannelCall > Create(Args args, grpc_error_handle *error)
#define GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
void SetAfterCallStackDestroy(grpc_closure *closure)
HealthWatcher(WeakRefCountedPtr< Subchannel > c, std::string health_check_service_name)
void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_)
void StartWatch(grpc_pollset_set *interested_parties, OrphanablePtr< ConnectivityStateWatcherInterface > watcher)
RefCountedPtr< Subchannel > Ref() GRPC_MUST_USE_RESULT
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS
void grpc_channel_args_destroy(grpc_channel_args *a)
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
std::string health_check_service_name_
grpc_transport_stream_op_batch_payload * payload
OrphanablePtr< SubchannelConnector > connector_
grpc_channel_args * grpc_channel_args_copy(const grpc_channel_args *src)
void grpc_call_stack_destroy(grpc_call_stack *stack, const grpc_call_final_info *final_info, grpc_closure *then_schedule_closure)
grpc_transport_op * grpc_make_transport_op(grpc_closure *on_complete)
ConnectivityStateWatcherList watcher_list_
void StartHealthCheckingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_ -> mu_)
RefCountedPtr< grpc_channel_stack > channel_stack_
grpc_transport_stream_op_batch * batch
std::string ToString() const
static SubchannelPoolInterface * GetSubchannelPoolFromChannelArgs(const grpc_channel_args *args)
static RefCountedPtr< Subchannel > Create(OrphanablePtr< SubchannelConnector > connector, const grpc_resolved_address &address, const grpc_channel_args *args)
void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_)
RefCountedPtr< channelz::SocketNode > socket_node
#define GRPC_ARG_MIN_RECONNECT_BACKOFF_MS
const grpc_channel_args * channel_args
ConnectedSubchannel(grpc_channel_stack *channel_stack, const grpc_channel_args *args, RefCountedPtr< channelz::SubchannelNode > channelz_subchannel)
@ GRPC_CHANNEL_CONNECTING
void AddWatcherLocked(WeakRefCountedPtr< Subchannel > subchannel, const std::string &health_check_service_name, RefCountedPtr< ConnectivityStateWatcherInterface > watcher)
void(* start_transport_stream_op_batch)(grpc_call_element *elem, grpc_transport_stream_op_batch *op)
DataProducerInterface * GetDataProducer(UniqueTypeName type) ABSL_LOCKS_EXCLUDED(mu_)
void CancelConnectivityStateWatch(const absl::optional< std::string > &health_check_service_name, ConnectivityStateWatcherInterface *watcher) ABSL_LOCKS_EXCLUDED(mu_)
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER
static constexpr Duration Milliseconds(int64_t millis)
void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status &status) override
void Ping(grpc_closure *on_initiate, grpc_closure *on_ack)
RefCountedPtr< Subchannel::ConnectivityStateWatcherInterface > watcher_
void RemoveWatcherLocked(const std::string &health_check_service_name, ConnectivityStateWatcherInterface *watcher)
~HealthWatcher() override
ABSL_MUST_USE_RESULT bool ok() const
~ConnectedSubchannel() override
const char * ConnectivityStateName(grpc_connectivity_state state)
constexpr int64_t millis() const
#define GRPC_ERROR_REF(err)
grpc_error_handle absl_status_to_grpc_error(absl::Status status)
void OnConnectingFinishedLocked(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
grpc_closure * original_recv_trailing_metadata_
grpc_arg grpc_channel_arg_integer_create(char *name, int value)
const grpc_resolved_address & address() const
virtual RefCountedPtr< Subchannel > RegisterSubchannel(const SubchannelKey &key, RefCountedPtr< Subchannel > constructed)=0
#define GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE
grpc_core::ExecCtx exec_ctx
std::string grpc_error_std_string(grpc_error_handle error)
std::unique_ptr< T, Deleter > OrphanablePtr
void NotifyLocked(grpc_connectivity_state state, const absl::Status &status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_ -> mu_)
void grpc_pollset_set_add_pollset_set(grpc_pollset_set *bag, grpc_pollset_set *item)
static constexpr Duration Seconds(int64_t seconds)
EventEngine * GetDefaultEventEngine()
grpc_channel_stack * channel_stack_
#define GRPC_ERROR_UNREF(err)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
grpc_connectivity_state state_
void Orphan() override ABSL_LOCKS_EXCLUDED(mu_)
void PushConnectivityStateChange(ConnectivityStateChange state_change)
void AddWatcherLocked(RefCountedPtr< Subchannel::ConnectivityStateWatcherInterface > watcher)
bool recv_trailing_metadata
WeakRefCountedPtr< Subchannel > subchannel_
void RecordCallSucceeded()
void grpc_error_get_status(grpc_error_handle error, grpc_core::Timestamp deadline, grpc_status_code *code, std::string *message, grpc_http2_error_code *http_error, const char **error_string)
grpc_closure on_connecting_finished_
grpc_connectivity_state state() const
size_t GetInitialCallSizeEstimate() const
grpc_metadata_batch * recv_trailing_metadata
#define GRPC_ARG_KEEPALIVE_TIME_MS
GRPCAPI void grpc_init(void)
static void Destroy(void *arg, grpc_error_handle error)
grpc_pollset_set * pollset_set_
#define GRPC_CALL_STACK_REF(call_stack, reason)
absl::Status grpc_error_to_absl_status(grpc_error_handle error)
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
int grpc_channel_args_find_integer(const grpc_channel_args *args, const char *name, const grpc_integer_options options)
const std::string & health_check_service_name() const
RefCountedPtr< channelz::SubchannelNode > channelz_node_
SubchannelCall(Args args, grpc_error_handle *error)
absl::StatusOr< std::string > grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr)
TraceFlag DebugOnlyTraceFlag
GRPCAPI void grpc_shutdown(void)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_)
RefCountedPtr< Child > Ref() GRPC_MUST_USE_RESULT
static struct rpc_state state
const Status & status() const &
#define GRPC_ERROR_IS_NONE(err)
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS
grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:23