subchannel_stream_client.cc
Go to the documentation of this file.
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
18 
20 
21 #include <inttypes.h>
22 #include <stdio.h>
23 
24 #include <string>
25 #include <utility>
26 
27 #include <grpc/status.h>
28 #include <grpc/support/log.h>
29 
38 
39 #define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS 1
40 #define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER 1.6
41 #define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS 120
42 #define SUBCHANNEL_STREAM_RECONNECT_JITTER 0.2
43 
44 namespace grpc_core {
45 
46 //
47 // SubchannelStreamClient
48 //
49 
51  RefCountedPtr<ConnectedSubchannel> connected_subchannel,
52  grpc_pollset_set* interested_parties,
53  std::unique_ptr<CallEventHandler> event_handler, const char* tracer)
55  connected_subchannel_(std::move(connected_subchannel)),
56  interested_parties_(interested_parties),
57  tracer_(tracer),
58  call_allocator_(
60  ->memory_quota()
61  ->CreateMemoryAllocator(
62  (tracer != nullptr) ? tracer : "SubchannelStreamClient")),
63  event_handler_(std::move(event_handler)),
65  BackOff::Options()
66  .set_initial_backoff(Duration::Seconds(
70  .set_max_backoff(Duration::Seconds(
72  if (GPR_UNLIKELY(tracer_ != nullptr)) {
73  gpr_log(GPR_INFO, "%s %p: created SubchannelStreamClient", tracer_, this);
74  }
75  GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
76  grpc_schedule_on_exec_ctx);
77  StartCall();
78 }
79 
81  if (GPR_UNLIKELY(tracer_ != nullptr)) {
82  gpr_log(GPR_INFO, "%s %p: destroying SubchannelStreamClient", tracer_,
83  this);
84  }
85 }
86 
88  if (GPR_UNLIKELY(tracer_ != nullptr)) {
89  gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient shutting down", tracer_,
90  this);
91  }
92  {
93  MutexLock lock(&mu_);
94  event_handler_.reset();
95  call_state_.reset();
98  }
99  }
100  Unref(DEBUG_LOCATION, "orphan");
101 }
102 
104  MutexLock lock(&mu_);
105  StartCallLocked();
106 }
107 
109  if (event_handler_ == nullptr) return;
110  GPR_ASSERT(call_state_ == nullptr);
111  if (event_handler_ != nullptr) {
112  event_handler_->OnCallStartLocked(this);
113  }
114  call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
115  if (GPR_UNLIKELY(tracer_ != nullptr)) {
116  gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient created CallState %p",
117  tracer_, this, call_state_.get());
118  }
119  call_state_->StartCallLocked();
120 }
121 
123  if (event_handler_ != nullptr) {
124  event_handler_->OnRetryTimerStartLocked(this);
125  }
126  Timestamp next_try = retry_backoff_.NextAttemptTime();
127  if (GPR_UNLIKELY(tracer_ != nullptr)) {
128  gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient health check call lost...",
129  tracer_, this);
130  Duration timeout = next_try - ExecCtx::Get()->Now();
131  if (timeout > Duration::Zero()) {
132  gpr_log(GPR_INFO, "%s %p: ... will retry in %" PRId64 "ms.", tracer_,
133  this, timeout.millis());
134  } else {
135  gpr_log(GPR_INFO, "%s %p: ... retrying immediately.", tracer_, this);
136  }
137  }
138  // Ref for callback, tracked manually.
139  Ref(DEBUG_LOCATION, "health_retry_timer").release();
141  grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
142 }
143 
145  auto* self = static_cast<SubchannelStreamClient*>(arg);
146  {
147  MutexLock lock(&self->mu_);
148  self->retry_timer_callback_pending_ = false;
149  if (self->event_handler_ != nullptr && GRPC_ERROR_IS_NONE(error) &&
150  self->call_state_ == nullptr) {
151  if (GPR_UNLIKELY(self->tracer_ != nullptr)) {
153  "%s %p: SubchannelStreamClient restarting health check call",
154  self->tracer_, self);
155  }
156  self->StartCallLocked();
157  }
158  }
159  self->Unref(DEBUG_LOCATION, "health_retry_timer");
160 }
161 
162 //
163 // SubchannelStreamClient::CallState
164 //
165 
167  RefCountedPtr<SubchannelStreamClient> health_check_client,
168  grpc_pollset_set* interested_parties)
169  : subchannel_stream_client_(std::move(health_check_client)),
171  arena_(Arena::Create(subchannel_stream_client_->connected_subchannel_
172  ->GetInitialCallSizeEstimate(),
173  &subchannel_stream_client_->call_allocator_)),
174  payload_(context_),
179 
181  if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
182  gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient destroying CallState %p",
183  subchannel_stream_client_->tracer_, subchannel_stream_client_.get(),
184  this);
185  }
186  for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
187  if (context_[i].destroy != nullptr) {
188  context_[i].destroy(context_[i].value);
189  }
190  }
191  // Unset the call combiner cancellation closure. This has the
192  // effect of scheduling the previously set cancellation closure, if
193  // any, so that it can release any internal references it may be
194  // holding to the call stack.
195  call_combiner_.SetNotifyOnCancel(nullptr);
196 }
197 
200  Cancel();
201 }
202 
203 void SubchannelStreamClient::CallState::StartCallLocked() {
205  subchannel_stream_client_->connected_subchannel_,
206  &pollent_,
207  Slice::FromStaticString("/grpc.health.v1.Health/Watch"),
208  gpr_get_cycle_counter(), // start_time
209  Timestamp::InfFuture(), // deadline
210  arena_.get(),
211  context_,
213  };
216  // Register after-destruction callback.
217  GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
218  this, grpc_schedule_on_exec_ctx);
219  call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
220  // Check if creation failed.
221  if (!GRPC_ERROR_IS_NONE(error) ||
222  subchannel_stream_client_->event_handler_ == nullptr) {
224  "SubchannelStreamClient %p CallState %p: error creating "
225  "stream on subchannel (%s); will retry",
226  subchannel_stream_client_.get(), this,
229  CallEndedLocked(/*retry=*/true);
230  return;
231  }
232  // Initialize payload and batch.
233  payload_.context = context_;
234  batch_.payload = &payload_;
235  // on_complete callback takes ref, handled manually.
236  call_->Ref(DEBUG_LOCATION, "on_complete").release();
237  batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
238  grpc_schedule_on_exec_ctx);
239  // Add send_initial_metadata op.
241  HttpPathMetadata(),
242  subchannel_stream_client_->event_handler_->GetPathLocked());
244  payload_.send_initial_metadata.send_initial_metadata =
246  payload_.send_initial_metadata.send_initial_metadata_flags = 0;
247  payload_.send_initial_metadata.peer_string = nullptr;
249  // Add send_message op.
250  send_message_.Append(Slice(
251  subchannel_stream_client_->event_handler_->EncodeSendMessageLocked()));
252  payload_.send_message.send_message = &send_message_;
253  batch_.send_message = true;
254  // Add send_trailing_metadata op.
255  payload_.send_trailing_metadata.send_trailing_metadata =
258  // Add recv_initial_metadata op.
259  payload_.recv_initial_metadata.recv_initial_metadata =
261  payload_.recv_initial_metadata.recv_flags = nullptr;
262  payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
263  payload_.recv_initial_metadata.peer_string = nullptr;
264  // recv_initial_metadata_ready callback takes ref, handled manually.
265  call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
266  payload_.recv_initial_metadata.recv_initial_metadata_ready =
267  GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
268  this, grpc_schedule_on_exec_ctx);
270  // Add recv_message op.
271  payload_.recv_message.recv_message = &recv_message_;
272  payload_.recv_message.call_failed_before_recv_message = nullptr;
273  // recv_message callback takes ref, handled manually.
274  call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
275  payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
276  &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
277  batch_.recv_message = true;
278  // Start batch.
279  StartBatch(&batch_);
280  // Initialize recv_trailing_metadata batch.
281  recv_trailing_metadata_batch_.payload = &payload_;
282  // Add recv_trailing_metadata op.
283  payload_.recv_trailing_metadata.recv_trailing_metadata =
285  payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
286  // This callback signals the end of the call, so it relies on the
287  // initial ref instead of taking a new ref. When it's invoked, the
288  // initial ref is released.
289  payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
291  RecvTrailingMetadataReady, this,
292  grpc_schedule_on_exec_ctx);
293  recv_trailing_metadata_batch_.recv_trailing_metadata = true;
294  // Start recv_trailing_metadata batch.
295  StartBatch(&recv_trailing_metadata_batch_);
296 }
297 
299  void* arg, grpc_error_handle /*error*/) {
300  auto* batch = static_cast<grpc_transport_stream_op_batch*>(arg);
301  auto* call = static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
302  call->StartTransportStreamOpBatch(batch);
303 }
304 
308  GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
309  batch, grpc_schedule_on_exec_ctx);
311  GRPC_ERROR_NONE, "start_subchannel_batch");
312 }
313 
315  void* arg, grpc_error_handle /*error*/) {
316  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
317  delete self;
318 }
319 
321  void* arg, grpc_error_handle /*error*/) {
322  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
323  GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
324  self->call_->Unref(DEBUG_LOCATION, "cancel");
325 }
326 
328  void* arg, grpc_error_handle /*error*/) {
329  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
331  GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
332  batch->cancel_stream = true;
334  self->call_->StartTransportStreamOpBatch(batch);
335 }
336 
338  bool expected = false;
339  if (cancelled_.compare_exchange_strong(expected, true,
340  std::memory_order_acq_rel,
341  std::memory_order_acquire)) {
342  call_->Ref(DEBUG_LOCATION, "cancel").release();
345  GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
346  GRPC_ERROR_NONE, "health_cancel");
347  }
348 }
349 
351  void* arg, grpc_error_handle /*error*/) {
352  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
353  GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
354  self->send_initial_metadata_.Clear();
355  self->send_trailing_metadata_.Clear();
356  self->call_->Unref(DEBUG_LOCATION, "on_complete");
357 }
358 
360  void* arg, grpc_error_handle /*error*/) {
361  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
362  GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
363  self->recv_initial_metadata_.Clear();
364  self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
365 }
366 
368  if (!recv_message_.has_value()) {
369  call_->Unref(DEBUG_LOCATION, "recv_message_ready");
370  return;
371  }
372  // Report payload.
373  {
374  MutexLock lock(&subchannel_stream_client_->mu_);
375  if (subchannel_stream_client_->event_handler_ != nullptr) {
377  subchannel_stream_client_->event_handler_->RecvMessageReadyLocked(
378  subchannel_stream_client_.get(), recv_message_->JoinIntoString());
379  if (!status.ok()) {
380  if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
382  "%s %p: SubchannelStreamClient CallState %p: failed to "
383  "parse response message: %s",
384  subchannel_stream_client_->tracer_,
385  subchannel_stream_client_.get(), this,
386  status.ToString().c_str());
387  }
388  Cancel();
389  }
390  }
391  }
392  seen_response_.store(true, std::memory_order_release);
393  recv_message_.reset();
394  // Start another recv_message batch.
395  // This re-uses the ref we're holding.
396  // Note: Can't just reuse batch_ here, since we don't know that all
397  // callbacks from the original batch have completed yet.
398  recv_message_batch_.payload = &payload_;
399  payload_.recv_message.recv_message = &recv_message_;
400  payload_.recv_message.call_failed_before_recv_message = nullptr;
401  payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
402  &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
403  recv_message_batch_.recv_message = true;
404  StartBatch(&recv_message_batch_);
405 }
406 
408  void* arg, grpc_error_handle /*error*/) {
409  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
410  GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
411  self->RecvMessageReady();
412 }
413 
415  void* arg, grpc_error_handle error) {
416  auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
417  GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
418  "recv_trailing_metadata_ready");
419  // Get call status.
421  self->recv_trailing_metadata_.get(GrpcStatusMetadata())
422  .value_or(GRPC_STATUS_UNKNOWN);
423  if (!GRPC_ERROR_IS_NONE(error)) {
425  nullptr /* slice */, nullptr /* http_error */,
426  nullptr /* error_string */);
427  }
428  if (GPR_UNLIKELY(self->subchannel_stream_client_->tracer_ != nullptr)) {
430  "%s %p: SubchannelStreamClient CallState %p: health watch failed "
431  "with status %d",
432  self->subchannel_stream_client_->tracer_,
433  self->subchannel_stream_client_.get(), self, status);
434  }
435  // Clean up.
436  self->recv_trailing_metadata_.Clear();
437  // Report call end.
438  MutexLock lock(&self->subchannel_stream_client_->mu_);
439  if (self->subchannel_stream_client_->event_handler_ != nullptr) {
440  self->subchannel_stream_client_->event_handler_
441  ->RecvTrailingMetadataReadyLocked(self->subchannel_stream_client_.get(),
442  status);
443  }
444  // For status UNIMPLEMENTED, give up and assume always healthy.
445  self->CallEndedLocked(/*retry=*/status != GRPC_STATUS_UNIMPLEMENTED);
446 }
447 
449  // If this CallState is still in use, this call ended because of a failure,
450  // so we need to stop using it and optionally create a new one.
451  // Otherwise, we have deliberately ended this call, and no further action
452  // is required.
453  if (this == subchannel_stream_client_->call_state_.get()) {
454  subchannel_stream_client_->call_state_.reset();
455  if (retry) {
456  GPR_ASSERT(subchannel_stream_client_->event_handler_ != nullptr);
457  if (seen_response_.load(std::memory_order_acquire)) {
458  // If the call fails after we've gotten a successful response, reset
459  // the backoff and restart the call immediately.
460  subchannel_stream_client_->retry_backoff_.Reset();
461  subchannel_stream_client_->StartCallLocked();
462  } else {
463  // If the call failed without receiving any messages, retry later.
464  subchannel_stream_client_->StartRetryTimerLocked();
465  }
466  }
467  }
468  // When the last ref to the call stack goes away, the CallState object
469  // will be automatically destroyed.
470  call_->Unref(DEBUG_LOCATION, "call_ended");
471 }
472 
473 } // namespace grpc_core
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::SubchannelStreamClient::CallState::OnComplete
static void OnComplete(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:350
grpc_core::MetadataMap::Set
absl::enable_if_t< Which::kRepeatable==false, void > Set(Which, Args &&... args)
Definition: metadata_batch.h:1075
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
absl::Status::ToString
std::string ToString(StatusToStringMode mode=StatusToStringMode::kDefault) const
Definition: third_party/abseil-cpp/absl/status/status.h:821
call_
grpc_call * call_
Definition: rls.cc:669
grpc_transport_stream_op_batch::recv_message
bool recv_message
Definition: transport.h:322
grpc_handler_private_op_data::closure
grpc_closure closure
Definition: transport.h:275
grpc_core::InternallyRefCounted< SubchannelStreamClient >::Unref
void Unref()
Definition: orphanable.h:100
grpc_core
Definition: call_metric_recorder.h:31
grpc_transport_stream_op_batch::on_complete
grpc_closure * on_complete
Definition: transport.h:304
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_core::SubchannelStreamClient::CallState::StartBatch
void StartBatch(grpc_transport_stream_op_batch *batch)
Definition: subchannel_stream_client.cc:305
grpc_core::slice_detail::StaticConstructors< Slice >::FromStaticString
static Slice FromStaticString(const char *s)
Definition: src/core/lib/slice/slice.h:201
grpc_core::SubchannelStreamClient::CallState::Cancel
void Cancel()
Definition: subchannel_stream_client.cc:337
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc_core::SubchannelStreamClient::interested_parties_
grpc_pollset_set * interested_parties_
Definition: subchannel_stream_client.h:202
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
grpc_core::SubchannelStreamClient::mu_
Mutex mu_
Definition: subchannel_stream_client.h:206
connected_subchannel_
RefCountedPtr< ConnectedSubchannel > connected_subchannel_
Definition: oob_backend_metric.cc:113
status
absl::Status status
Definition: rls.cc:251
GRPC_CONTEXT_COUNT
@ GRPC_CONTEXT_COUNT
Definition: core/lib/channel/context.h:48
grpc_handler_private_op_data::extra_arg
void * extra_arg
Definition: transport.h:274
SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS
#define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS
Definition: subchannel_stream_client.cc:39
GRPC_CLOSURE_CREATE
#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler)
Definition: closure.h:160
grpc_core::SubchannelStreamClient::CallState::CallEndedLocked
void CallEndedLocked(bool retry) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&subchannel_stream_client_ -> mu_)
Definition: subchannel_stream_client.cc:448
GRPC_ERROR_CANCELLED
#define GRPC_ERROR_CANCELLED
Definition: error.h:238
grpc_core::Arena
Definition: src/core/lib/resource_quota/arena.h:45
grpc_core::SubchannelStreamClient::CallState::CallState
CallState(RefCountedPtr< SubchannelStreamClient > client, grpc_pollset_set *interested_parties)
Definition: subchannel_stream_client.cc:166
grpc_core::SubchannelStreamClient::CallState::RecvInitialMetadataReady
static void RecvInitialMetadataReady(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:359
send_initial_metadata_
grpc_metadata_batch send_initial_metadata_
Definition: retry_filter.cc:459
call
FilterStackCall * call
Definition: call.cc:750
status.h
retry
void retry(grpc_end2end_test_config config)
Definition: retry.cc:319
SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS
#define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS
Definition: subchannel_stream_client.cc:41
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc_core::SubchannelStreamClient::CallState::RecvTrailingMetadataReady
static void RecvTrailingMetadataReady(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:414
grpc_transport_stream_op_batch::cancel_stream
bool cancel_stream
Definition: transport.h:329
grpc_transport_stream_op_batch::handler_private
grpc_handler_private_op_data handler_private
Definition: transport.h:338
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
arena_
Arena * arena_
Definition: client_channel.cc:391
grpc_core::RefCountedPtr
Definition: ref_counted_ptr.h:35
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
context_
ScopedContext * context_
Definition: filter_fuzzer.cc:559
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
grpc_core::SubchannelCall::Args
Definition: subchannel.h:98
grpc_core::InternallyRefCounted< SubchannelStreamClient >::Ref
RefCountedPtr< SubchannelStreamClient > Ref() GRPC_MUST_USE_RESULT
Definition: orphanable.h:90
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER
#define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER
Definition: subchannel_stream_client.cc:40
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
grpc_core::SubchannelStreamClient::StartCall
void StartCall()
Definition: subchannel_stream_client.cc:103
grpc_core::SubchannelStreamClient::~SubchannelStreamClient
~SubchannelStreamClient() override
Definition: subchannel_stream_client.cc:80
grpc_transport_stream_op_batch_payload::cancel_error
grpc_error_handle cancel_error
Definition: transport.h:444
grpc_core::SubchannelCall::Create
static RefCountedPtr< SubchannelCall > Create(Args args, grpc_error_handle *error)
Definition: subchannel.cc:142
GRPC_CALL_COMBINER_STOP
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason)
Definition: call_combiner.h:58
recv_message_
grpc_byte_buffer * recv_message_
Definition: rls.cc:672
arg
Definition: cmdline.cc:40
grpc_transport_stream_op_batch::payload
grpc_transport_stream_op_batch_payload * payload
Definition: transport.h:307
time.h
call_combiner_
CallCombiner * call_combiner_
Definition: client_channel.cc:393
grpc_core::SubchannelStreamClient::CallState::StartCancel
static void StartCancel(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:327
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
grpc_core::InternallyRefCounted
Definition: orphanable.h:73
grpc_core::SubchannelStreamClient::CallState::~CallState
~CallState() override
Definition: subchannel_stream_client.cc:180
grpc_core::SubchannelStreamClient::CallState::OnCancelComplete
static void OnCancelComplete(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:320
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GRPC_CALL_COMBINER_START
#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason)
Definition: call_combiner.h:56
resource_quota.h
grpc_core::SubchannelStreamClient::SubchannelStreamClient
SubchannelStreamClient(RefCountedPtr< ConnectedSubchannel > connected_subchannel, grpc_pollset_set *interested_parties, std::unique_ptr< CallEventHandler > event_handler, const char *tracer)
Definition: subchannel_stream_client.cc:50
retry_timer_callback_pending_
bool retry_timer_callback_pending_
Definition: grpclb.cc:536
grpc_core::BackOff
Definition: backoff.h:32
grpc_core::GrpcStatusMetadata
Definition: metadata_batch.h:293
recv_trailing_metadata_
grpc_metadata_array recv_trailing_metadata_
Definition: rls.cc:673
subchannel_stream_client.h
grpc_make_transport_stream_op
grpc_transport_stream_op_batch * grpc_make_transport_stream_op(grpc_closure *on_complete)
Definition: transport.cc:230
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_core::Duration::Zero
static constexpr Duration Zero()
Definition: src/core/lib/gprpp/time.h:130
grpc_core::SubchannelStreamClient::CallState::RecvMessageReady
void RecvMessageReady()
Definition: subchannel_stream_client.cc:367
grpc_transport_stream_op_batch::recv_initial_metadata
bool recv_initial_metadata
Definition: transport.h:319
absl::Seconds
constexpr Duration Seconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:419
grpc_timer_cancel
void grpc_timer_cancel(grpc_timer *timer)
Definition: iomgr/timer.cc:36
grpc_transport_stream_op_batch::send_trailing_metadata
bool send_trailing_metadata
Definition: transport.h:313
grpc_core::SubchannelStreamClient
Definition: subchannel_stream_client.h:64
debug_location.h
collect_stats_
grpc_transport_stream_stats collect_stats_
Definition: retry_filter.cc:472
grpc_core::SubchannelStreamClient::Orphan
void Orphan() override
Definition: subchannel_stream_client.cc:87
SUBCHANNEL_STREAM_RECONNECT_JITTER
#define SUBCHANNEL_STREAM_RECONNECT_JITTER
Definition: subchannel_stream_client.cc:42
grpc_transport_stream_op_batch::send_message
bool send_message
Definition: transport.h:316
recv_initial_metadata_ready_
grpc_closure recv_initial_metadata_ready_
Definition: retry_filter.cc:464
grpc_core::ResourceQuotaFromChannelArgs
ResourceQuotaRefPtr ResourceQuotaFromChannelArgs(const grpc_channel_args *args)
Definition: api.cc:40
grpc_core::SubchannelStreamClient::CallState
Definition: subchannel_stream_client.h:115
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
retry_backoff_
BackOff retry_backoff_
Definition: retry_filter.cc:566
grpc_core::SubchannelStreamClient::CallState::StartBatchInCallCombiner
static void StartBatchInCallCombiner(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:298
std
Definition: grpcpp/impl/codegen/async_unary_call.h:407
send_trailing_metadata_
grpc_metadata_batch send_trailing_metadata_
Definition: retry_filter.cc:461
recv_trailing_metadata_ready_
grpc_closure recv_trailing_metadata_ready_
Definition: retry_filter.cc:473
grpc_transport_stream_op_batch::send_initial_metadata
bool send_initial_metadata
Definition: transport.h:310
arg
struct arg arg
exec_ctx.h
grpc_timer_init
void grpc_timer_init(grpc_timer *timer, grpc_core::Timestamp deadline, grpc_closure *closure)
Definition: iomgr/timer.cc:31
absl::Status::ok
ABSL_MUST_USE_RESULT bool ok() const
Definition: third_party/abseil-cpp/absl/status/status.h:802
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::SubchannelStreamClient::tracer_
const char * tracer_
Definition: subchannel_stream_client.h:203
GRPC_STATUS_UNIMPLEMENTED
@ GRPC_STATUS_UNIMPLEMENTED
Definition: include/grpc/impl/codegen/status.h:124
grpc_core::SubchannelCall
Definition: subchannel.h:96
grpc_core::SubchannelStreamClient::StartRetryTimerLocked
void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_)
Definition: subchannel_stream_client.cc:122
grpc_error_get_status
void grpc_error_get_status(grpc_error_handle error, grpc_core::Timestamp deadline, grpc_status_code *code, std::string *message, grpc_http2_error_code *http_error, const char **error_string)
Definition: error_utils.cc:67
on_complete_
grpc_closure on_complete_
Definition: channel_connectivity.cc:217
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_core::SubchannelStreamClient::StartCallLocked
void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_)
Definition: subchannel_stream_client.cc:108
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
grpc_core::SubchannelStreamClient::OnRetryTimer
static void OnRetryTimer(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:144
grpc_error
Definition: error_internal.h:42
grpc_polling_entity_create_from_pollset_set
grpc_polling_entity grpc_polling_entity_create_from_pollset_set(grpc_pollset_set *pollset_set)
Definition: polling_entity.cc:26
batch_
grpc_transport_stream_op_batch batch_
Definition: retry_filter.cc:357
grpc_core::Duration
Definition: src/core/lib/gprpp/time.h:122
self
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/map.c:543
send_message_
grpc_byte_buffer * send_message_
Definition: rls.cc:670
grpc_transport_stream_op_batch
Definition: transport.h:284
recv_message_ready_
grpc_closure recv_message_ready_
Definition: retry_filter.cc:467
grpc_core::SubchannelStreamClient::CallState::Orphan
void Orphan() override
Definition: subchannel_stream_client.cc:198
recv_initial_metadata_
grpc_metadata_array recv_initial_metadata_
Definition: rls.cc:671
grpc_core::SubchannelStreamClient::CallState::AfterCallStackDestruction
static void AfterCallStackDestruction(void *arg, grpc_error_handle error)
Definition: subchannel_stream_client.cc:314
timeout
uv_timer_t timeout
Definition: libuv/docs/code/uvwget/main.c:9
sync.h
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
destroy
static std::function< void(void *, Slot *)> destroy
Definition: abseil-cpp/absl/container/internal/hash_policy_traits_test.cc:42
interested_parties_
grpc_pollset_set * interested_parties_
Definition: oob_backend_metric.cc:169
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
GRPC_STATUS_UNKNOWN
@ GRPC_STATUS_UNKNOWN
Definition: include/grpc/impl/codegen/status.h:40
pollent_
grpc_polling_entity pollent_
Definition: google_c2p_resolver.cc:135
error_utils.h
grpc_transport_stream_op_batch_payload::cancel_stream
struct grpc_transport_stream_op_batch_payload::@46 cancel_stream
retry_timer_
grpc_timer retry_timer_
Definition: retry_filter.cc:606
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
api.h
time_precise.h
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:23