39 #define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS 1
40 #define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER 1.6
41 #define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS 120
42 #define SUBCHANNEL_STREAM_RECONNECT_JITTER 0.2
53 std::unique_ptr<CallEventHandler> event_handler,
const char* tracer)
61 ->CreateMemoryAllocator(
62 (tracer != nullptr) ? tracer :
"SubchannelStreamClient")),
63 event_handler_(
std::
move(event_handler)),
76 grpc_schedule_on_exec_ctx);
94 event_handler_.reset();
109 if (event_handler_ ==
nullptr)
return;
111 if (event_handler_ !=
nullptr) {
112 event_handler_->OnCallStartLocked(
this);
116 gpr_log(
GPR_INFO,
"%s %p: SubchannelStreamClient created CallState %p",
117 tracer_,
this, call_state_.get());
119 call_state_->StartCallLocked();
123 if (event_handler_ !=
nullptr) {
124 event_handler_->OnRetryTimerStartLocked(
this);
128 gpr_log(
GPR_INFO,
"%s %p: SubchannelStreamClient health check call lost...",
148 self->retry_timer_callback_pending_ =
false;
150 self->call_state_ ==
nullptr) {
153 "%s %p: SubchannelStreamClient restarting health check call",
154 self->tracer_,
self);
156 self->StartCallLocked();
169 : subchannel_stream_client_(
std::
move(health_check_client)),
172 ->GetInitialCallSizeEstimate(),
173 &subchannel_stream_client_->call_allocator_)),
181 if (
GPR_UNLIKELY(subchannel_stream_client_->tracer_ !=
nullptr)) {
182 gpr_log(
GPR_INFO,
"%s %p: SubchannelStreamClient destroying CallState %p",
183 subchannel_stream_client_->tracer_, subchannel_stream_client_.get(),
203 void SubchannelStreamClient::CallState::StartCallLocked() {
205 subchannel_stream_client_->connected_subchannel_,
208 gpr_get_cycle_counter(),
218 this, grpc_schedule_on_exec_ctx);
219 call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
222 subchannel_stream_client_->event_handler_ ==
nullptr) {
224 "SubchannelStreamClient %p CallState %p: error creating "
225 "stream on subchannel (%s); will retry",
226 subchannel_stream_client_.get(),
this,
229 CallEndedLocked(
true);
238 grpc_schedule_on_exec_ctx);
242 subchannel_stream_client_->event_handler_->GetPathLocked());
244 payload_.send_initial_metadata.send_initial_metadata =
246 payload_.send_initial_metadata.send_initial_metadata_flags = 0;
247 payload_.send_initial_metadata.peer_string =
nullptr;
251 subchannel_stream_client_->event_handler_->EncodeSendMessageLocked()));
255 payload_.send_trailing_metadata.send_trailing_metadata =
259 payload_.recv_initial_metadata.recv_initial_metadata =
261 payload_.recv_initial_metadata.recv_flags =
nullptr;
262 payload_.recv_initial_metadata.trailing_metadata_available =
nullptr;
263 payload_.recv_initial_metadata.peer_string =
nullptr;
266 payload_.recv_initial_metadata.recv_initial_metadata_ready =
268 this, grpc_schedule_on_exec_ctx);
272 payload_.recv_message.call_failed_before_recv_message =
nullptr;
281 recv_trailing_metadata_batch_.payload = &payload_;
283 payload_.recv_trailing_metadata.recv_trailing_metadata =
289 payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
291 RecvTrailingMetadataReady,
this,
292 grpc_schedule_on_exec_ctx);
293 recv_trailing_metadata_batch_.recv_trailing_metadata =
true;
295 StartBatch(&recv_trailing_metadata_batch_);
302 call->StartTransportStreamOpBatch(
batch);
309 batch, grpc_schedule_on_exec_ctx);
334 self->call_->StartTransportStreamOpBatch(
batch);
338 bool expected =
false;
339 if (cancelled_.compare_exchange_strong(expected,
true,
340 std::memory_order_acq_rel,
341 std::memory_order_acquire)) {
354 self->send_initial_metadata_.Clear();
355 self->send_trailing_metadata_.Clear();
363 self->recv_initial_metadata_.Clear();
364 self->call_->Unref(
DEBUG_LOCATION,
"recv_initial_metadata_ready");
374 MutexLock lock(&subchannel_stream_client_->mu_);
375 if (subchannel_stream_client_->event_handler_ !=
nullptr) {
377 subchannel_stream_client_->event_handler_->RecvMessageReadyLocked(
378 subchannel_stream_client_.get(),
recv_message_->JoinIntoString());
380 if (
GPR_UNLIKELY(subchannel_stream_client_->tracer_ !=
nullptr)) {
382 "%s %p: SubchannelStreamClient CallState %p: failed to "
383 "parse response message: %s",
384 subchannel_stream_client_->tracer_,
385 subchannel_stream_client_.get(),
this,
392 seen_response_.store(
true, std::memory_order_release);
398 recv_message_batch_.payload = &payload_;
400 payload_.recv_message.call_failed_before_recv_message =
nullptr;
403 recv_message_batch_.recv_message =
true;
404 StartBatch(&recv_message_batch_);
411 self->RecvMessageReady();
418 "recv_trailing_metadata_ready");
430 "%s %p: SubchannelStreamClient CallState %p: health watch failed "
432 self->subchannel_stream_client_->tracer_,
433 self->subchannel_stream_client_.get(),
self,
status);
436 self->recv_trailing_metadata_.Clear();
439 if (
self->subchannel_stream_client_->event_handler_ !=
nullptr) {
440 self->subchannel_stream_client_->event_handler_
441 ->RecvTrailingMetadataReadyLocked(
self->subchannel_stream_client_.get(),
453 if (
this == subchannel_stream_client_->call_state_.get()) {
454 subchannel_stream_client_->call_state_.reset();
456 GPR_ASSERT(subchannel_stream_client_->event_handler_ !=
nullptr);
457 if (seen_response_.load(std::memory_order_acquire)) {
460 subchannel_stream_client_->retry_backoff_.Reset();
461 subchannel_stream_client_->StartCallLocked();
464 subchannel_stream_client_->StartRetryTimerLocked();