channel_connectivity.cc
Go to the documentation of this file.
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
19 #include <inttypes.h>
20 
21 #include <grpc/grpc.h>
25 #include <grpc/support/log.h>
26 
43 
44 namespace grpc_core {
45 namespace {
46 
47 bool IsLameChannel(Channel* channel) {
49  grpc_channel_stack_last_element(channel->channel_stack());
50  return elem->filter == &LameClientFilter::kFilter;
51 }
52 
53 } // namespace
54 } // namespace grpc_core
55 
57  grpc_channel* c_channel, int try_to_connect) {
58  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
61  "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
62  (c_channel, try_to_connect));
64  // Forward through to the underlying client channel.
65  grpc_core::ClientChannel* client_channel =
67  if (GPR_UNLIKELY(client_channel == nullptr)) {
68  if (grpc_core::IsLameChannel(channel)) {
70  }
72  "grpc_channel_check_connectivity_state called on something that is "
73  "not a client channel");
74  return GRPC_CHANNEL_SHUTDOWN;
75  }
76  return client_channel->CheckConnectivityState(try_to_connect);
77 }
78 
81  grpc_core::ClientChannel* client_channel =
83  if (client_channel == nullptr) {
84  if (!grpc_core::IsLameChannel(channel)) {
86  "grpc_channel_num_external_connectivity_watchers called on "
87  "something that is not a client channel");
88  }
89  return 0;
90  }
91  return client_channel->NumExternalConnectivityWatchers();
92 }
93 
97 }
98 
99 namespace grpc_core {
100 namespace {
101 
102 class StateWatcher : public DualRefCounted<StateWatcher> {
103  public:
104  StateWatcher(grpc_channel* c_channel, grpc_completion_queue* cq, void* tag,
105  grpc_connectivity_state last_observed_state,
106  gpr_timespec deadline)
107  : channel_(Channel::FromC(c_channel)->Ref()),
108  cq_(cq),
109  tag_(tag),
110  state_(last_observed_state) {
112  GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
113  GRPC_CLOSURE_INIT(&on_timeout_, TimeoutComplete, this, nullptr);
114  ClientChannel* client_channel =
116  if (client_channel == nullptr) {
117  // If the target URI used to create the channel was invalid, channel
118  // stack initialization failed, and that caused us to create a lame
119  // channel. In that case, connectivity state will never change (it
120  // will always be TRANSIENT_FAILURE), so we don't actually start a
121  // watch, but we are hiding that fact from the application.
122  if (IsLameChannel(channel_.get())) {
123  // Ref from object creation is held by timer callback.
124  StartTimer(Timestamp::FromTimespecRoundUp(deadline));
125  return;
126  }
128  "grpc_channel_watch_connectivity_state called on "
129  "something that is not a client channel");
130  GPR_ASSERT(false);
131  }
132  // Take an addition ref, so we have two (the first one is from the
133  // creation of this object). One will be held by the timer callback,
134  // the other by the watcher callback.
135  Ref().release();
136  auto* watcher_timer_init_state = new WatcherTimerInitState(
137  this, Timestamp::FromTimespecRoundUp(deadline));
138  client_channel->AddExternalConnectivityWatcher(
140  &on_complete_, watcher_timer_init_state->closure());
141  }
142 
143  private:
144  // A fire-and-forget object used to delay starting the timer until the
145  // ClientChannel actually starts the watch.
146  class WatcherTimerInitState {
147  public:
148  WatcherTimerInitState(StateWatcher* state_watcher, Timestamp deadline)
149  : state_watcher_(state_watcher), deadline_(deadline) {
150  GRPC_CLOSURE_INIT(&closure_, WatcherTimerInit, this, nullptr);
151  }
152 
153  grpc_closure* closure() { return &closure_; }
154 
155  private:
156  static void WatcherTimerInit(void* arg, grpc_error_handle /*error*/) {
157  auto* self = static_cast<WatcherTimerInitState*>(arg);
158  self->state_watcher_->StartTimer(self->deadline_);
159  delete self;
160  }
161 
162  StateWatcher* state_watcher_;
165  };
166 
167  void StartTimer(Timestamp deadline) {
168  grpc_timer_init(&timer_, deadline, &on_timeout_);
169  }
170 
171  static void WatchComplete(void* arg, grpc_error_handle error) {
172  auto* self = static_cast<StateWatcher*>(arg);
174  GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
175  }
176  grpc_timer_cancel(&self->timer_);
177  self->Unref();
178  }
179 
180  static void TimeoutComplete(void* arg, grpc_error_handle error) {
181  auto* self = static_cast<StateWatcher*>(arg);
182  self->timer_fired_ = GRPC_ERROR_IS_NONE(error);
183  // If this is a client channel (not a lame channel), cancel the watch.
184  ClientChannel* client_channel =
185  ClientChannel::GetFromChannel(self->channel_.get());
186  if (client_channel != nullptr) {
187  client_channel->CancelExternalConnectivityWatcher(&self->on_complete_);
188  }
189  self->Unref();
190  }
191 
192  // Invoked when both strong refs are released.
193  void Orphan() override {
194  WeakRef().release(); // Take a weak ref until completion is finished.
197  "Timed out waiting for connection state change")
198  : GRPC_ERROR_NONE;
199  grpc_cq_end_op(cq_, tag_, error, FinishedCompletion, this,
201  }
202 
203  // Called when the completion is returned to the CQ.
204  static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
205  auto* self = static_cast<StateWatcher*>(arg);
206  self->WeakUnref();
207  }
208 
209  RefCountedPtr<Channel> channel_;
211  void* tag_;
212 
214 
216 
220 
221  bool timer_fired_ = false;
222 };
223 
224 } // namespace
225 } // namespace grpc_core
226 
228  grpc_channel* channel, grpc_connectivity_state last_observed_state,
229  gpr_timespec deadline, grpc_completion_queue* cq, void* tag) {
230  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
233  "grpc_channel_watch_connectivity_state("
234  "channel=%p, last_observed_state=%d, "
235  "deadline=gpr_timespec { tv_sec: %" PRId64
236  ", tv_nsec: %d, clock_type: %d }, "
237  "cq=%p, tag=%p)",
238  7,
239  (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
240  (int)deadline.clock_type, cq, tag));
241  new grpc_core::StateWatcher(channel, cq, tag, last_observed_state, deadline);
242 }
grpc_core::ClientChannel::GetFromChannel
static ClientChannel * GetFromChannel(Channel *channel)
Definition: client_channel.cc:1009
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
trace.h
gpr_timespec::tv_nsec
int32_t tv_nsec
Definition: gpr_types.h:52
grpc_core::Channel
Definition: src/core/lib/surface/channel.h:108
cq_
grpc_completion_queue * cq_
Definition: channel_connectivity.cc:210
gpr_timespec::tv_sec
int64_t tv_sec
Definition: gpr_types.h:51
grpc_channel_watch_connectivity_state
void grpc_channel_watch_connectivity_state(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag)
Definition: channel_connectivity.cc:227
timer_
grpc_timer timer_
Definition: channel_connectivity.cc:218
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
lame_client.h
deadline_
Timestamp deadline_
Definition: channel_connectivity.cc:163
channel_fwd.h
timer_fired_
bool timer_fired_
Definition: channel_connectivity.cc:221
grpc_core::ClientChannel::NumExternalConnectivityWatchers
int NumExternalConnectivityWatchers() const
Definition: client_channel.h:145
grpc_core::LameClientFilter::kFilter
static const grpc_channel_filter kFilter
Definition: lame_client.h:49
grpc_channel_check_connectivity_state
grpc_connectivity_state grpc_channel_check_connectivity_state(grpc_channel *c_channel, int try_to_connect)
Definition: channel_connectivity.cc:56
polling_entity.h
Timestamp
Definition: bloaty/third_party/protobuf/src/google/protobuf/timestamp.pb.h:69
grpc_core
Definition: call_metric_recorder.h:31
grpc_channel_element
Definition: channel_stack.h:186
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
error
grpc_error_handle error
Definition: retry_filter.cc:499
client_channel.h
completion_queue.h
GRPC_CHANNEL_TRANSIENT_FAILURE
@ GRPC_CHANNEL_TRANSIENT_FAILURE
Definition: include/grpc/impl/codegen/connectivity_state.h:38
closure.h
grpc_cq_completion
Definition: src/core/lib/surface/completion_queue.h:43
closure_
grpc_closure closure_
Definition: channel_connectivity.cc:164
grpc_core::ApplicationCallbackExecCtx
Definition: exec_ctx.h:283
grpc_cq_end_op
void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error_handle error, void(*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage, bool internal)
Definition: completion_queue.cc:894
GRPC_LOG_IF_ERROR
#define GRPC_LOG_IF_ERROR(what, error)
Definition: error.h:398
grpc_timer
Definition: iomgr/timer.h:33
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
grpc_core::ClientChannel::CheckConnectivityState
grpc_connectivity_state CheckConnectivityState(bool try_to_connect)
Definition: client_channel.cc:1848
grpc_types.h
framework.rpc.grpc_channelz.Channel
Channel
Definition: grpc_channelz.py:32
grpc_channel_stack_last_element
grpc_channel_element * grpc_channel_stack_last_element(grpc_channel_stack *channel_stack)
Definition: channel_stack.cc:83
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
channel_stack.h
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc_completion_queue
Definition: completion_queue.cc:347
grpc_polling_entity_create_from_pollset
grpc_polling_entity grpc_polling_entity_create_from_pollset(grpc_pollset *pollset)
Definition: polling_entity.cc:34
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
closure
grpc_closure closure
Definition: src/core/lib/surface/server.cc:466
grpc.h
connectivity_state.h
grpc_cq_pollset
grpc_pollset * grpc_cq_pollset(grpc_completion_queue *cq)
Definition: completion_queue.cc:1433
channel_
RefCountedPtr< Channel > channel_
Definition: channel_connectivity.cc:209
arg
Definition: cmdline.cc:40
time.h
error.h
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_core::ExecCtx
Definition: exec_ctx.h:97
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
gpr_types.h
gpr_timespec::clock_type
gpr_clock_type clock_type
Definition: gpr_types.h:55
grpc_timer_cancel
void grpc_timer_cancel(grpc_timer *timer)
Definition: iomgr/timer.cc:36
grpc_cq_begin_op
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag)
Definition: completion_queue.cc:672
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc_channel_support_connectivity_watcher
int grpc_channel_support_connectivity_watcher(grpc_channel *channel)
Definition: channel_connectivity.cc:94
grpc_core::ClientChannel
Definition: client_channel.h:109
arg
struct arg arg
exec_ctx.h
grpc_timer_init
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
Definition: iomgr/timer.cc:31
ref_counted_ptr.h
state_
grpc_connectivity_state state_
Definition: channel_connectivity.cc:213
grpc_channel
struct grpc_channel grpc_channel
Definition: grpc_types.h:62
dual_ref_counted.h
on_timeout_
grpc_closure on_timeout_
Definition: channel_connectivity.cc:219
timer.h
api_trace.h
state_watcher_
StateWatcher * state_watcher_
Definition: channel_connectivity.cc:162
grpc_trace_operation_failures
grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure")
tag_
void * tag_
Definition: channel_connectivity.cc:211
on_complete_
grpc_closure on_complete_
Definition: channel_connectivity.cc:217
GRPC_CHANNEL_SHUTDOWN
@ GRPC_CHANNEL_SHUTDOWN
Definition: include/grpc/impl/codegen/connectivity_state.h:40
gpr_timespec
Definition: gpr_types.h:50
tests.unit._exit_scenarios.try_to_connect
try_to_connect
Definition: _exit_scenarios.py:189
grpc_error
Definition: error_internal.h:42
grpc_core::Timestamp::FromTimespecRoundUp
static Timestamp FromTimespecRoundUp(gpr_timespec t)
Definition: src/core/lib/gprpp/time.cc:136
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
grpc_core::CppImplOf< Channel, grpc_channel >::FromC
static Channel * FromC(grpc_channel *c_type)
Definition: cpp_impl_of.h:30
grpc_closure
Definition: closure.h:56
completion_storage_
grpc_cq_completion completion_storage_
Definition: channel_connectivity.cc:215
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
GRPC_API_TRACE
#define GRPC_API_TRACE(fmt, nargs, args)
Definition: api_trace.h:48
grpc_channel_num_external_connectivity_watchers
int grpc_channel_num_external_connectivity_watchers(grpc_channel *c_channel)
Definition: channel_connectivity.cc:79
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
channel.h
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:52