27 #include "absl/types/optional.h"
78 cb->next = t->write_cb_pool;
79 t->write_cb_pool =
cb;
94 t->is_client ?
"CLIENT" :
"SERVER", t->peer_string.c_str());
98 if (t->is_client && t->ping_state.pings_before_data_required == 0 &&
99 t->ping_policy.max_pings_without_data != 0) {
105 "CLIENT: Ping delayed [%s]: too many recent pings: %d/%d",
106 t->peer_string.c_str(), t->ping_state.pings_before_data_required,
107 t->ping_policy.max_pings_without_data);
119 next_allowed_ping_interval =
120 (t->keepalive_permit_without_calls == 0 &&
129 next_allowed_ping_interval =
132 : t->keepalive_time / 2;
135 t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
137 if (next_allowed_ping >
now) {
144 "%s: Ping delayed [%s]: not enough time elapsed since last "
146 " Last ping %" PRId64
": Next ping %" PRId64
": Now %" PRId64,
147 t->is_client ?
"CLIENT" :
"SERVER", t->peer_string.c_str(),
148 t->ping_state.last_ping_sent_time.milliseconds_after_process_epoch(),
150 now.milliseconds_after_process_epoch());
152 if (!t->ping_state.is_delayed_ping_timer_set) {
153 t->ping_state.is_delayed_ping_timer_set =
true;
157 grpc_schedule_on_exec_ctx);
159 &t->retry_initiate_ping_locked);
163 t->ping_state.last_ping_sent_time =
now;
178 t->is_client ?
"CLIENT" :
"SERVER", t->peer_string.c_str(),
179 t->ping_state.pings_before_data_required,
180 t->ping_policy.max_pings_without_data);
182 t->ping_state.pings_before_data_required -=
183 (t->ping_state.pings_before_data_required != 0);
189 bool sched_any =
false;
195 if (
cb->call_at_byte <= *ctr) {
208 const char* staller) {
212 "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
213 "to happen in a healthy program that is not seeing flow control stalls."
214 " However, if you know that there are unwanted stalls, here is some "
215 "helpful data: [fc:pending=%" PRIdPTR
":flowed=%" PRId64
216 ":peer_initwin=%d:t_win=%" PRId64
":s_win=%d:s_delta=%" PRId64
"]",
217 t->peer_string.c_str(), t, s->id, staller,
218 s->flow_controlled_buffer.length, s->flow_controlled_bytes_flowed,
221 t->flow_control.remote_window(),
224 s->flow_control.remote_window_delta() +
228 s->flow_control.remote_window_delta());
239 class CountDefaultMetadataEncoder {
245 template <
typename Which>
258 CountDefaultMetadataEncoder enc;
259 initial_metadata->
Encode(&enc);
260 return enc.count() == initial_metadata->
count();
275 initial_metadata_writes_);
278 trailing_metadata_writes_);
282 void FlushSettings() {
283 if (t_->dirtied_local_settings && !t_->sent_local_settings) {
289 t_->force_send_settings =
false;
290 t_->dirtied_local_settings =
false;
291 t_->sent_local_settings =
true;
296 void FlushQueuedBuffers() {
299 t_->num_pending_induced_frames = 0;
303 void FlushWindowUpdates() {
305 t_->flow_control.MaybeSendUpdate(t_->outbuf.count > 0);
306 if (transport_announce) {
315 void FlushPingAcks() {
316 for (
size_t i = 0;
i < t_->ping_ack_count;
i++) {
320 t_->ping_ack_count = 0;
323 void EnactHpackSettings() {
324 t_->hpack_compressor.SetMaxTableSize(
329 void UpdateStreamsNoLongerStalled() {
334 if (!
s->refcount->refs.RefIfNonZero()) {
343 result_.partial =
true;
355 void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
356 void IncWindowUpdateWrites() { ++flow_control_writes_; }
357 void IncMessageWrites() { ++message_writes_; }
358 void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
360 void NoteScheduledResults() { result_.early_results_scheduled =
true; }
365 result_.writing = t_->outbuf.count > 0;
374 int flow_control_writes_ = 0;
375 int initial_metadata_writes_ = 0;
376 int trailing_metadata_writes_ = 0;
377 int message_writes_ = 0;
381 class DataSendContext {
385 : write_context_(write_context),
388 sending_bytes_before_(
s_->sending_bytes) {}
390 uint32_t stream_remote_window()
const {
393 s_->flow_control.remote_window_delta() +
403 t_->flow_control.remote_window()))));
406 bool AnyOutgoing()
const {
return max_outgoing() > 0; }
410 std::min(
size_t(max_outgoing()),
s_->flow_controlled_buffer.length));
411 is_last_frame_ = send_bytes ==
s_->flow_controlled_buffer.length &&
412 s_->send_trailing_metadata !=
nullptr &&
413 s_->send_trailing_metadata->empty();
415 is_last_frame_, &
s_->stats.outgoing, &t_->outbuf);
416 sfc_upd_.SentData(send_bytes);
417 s_->sending_bytes += send_bytes;
420 bool is_last_frame()
const {
return is_last_frame_; }
422 void CallCallbacks() {
425 static_cast<int64_t>(
s_->sending_bytes - sending_bytes_before_),
426 &
s_->on_flow_controlled_cbs, &
s_->flow_controlled_bytes_flowed,
428 write_context_->NoteScheduledResults();
433 WriteContext* write_context_;
438 const size_t sending_bytes_before_;
439 bool is_last_frame_ =
false;
442 class StreamWriteContext {
445 : write_context_(write_context), t_(write_context->
transport()),
s_(
s) {
448 t_->is_client ?
"CLIENT" :
"SERVER",
s->id,
449 s->sent_initial_metadata,
s->send_initial_metadata !=
nullptr));
452 void FlushInitialMetadata() {
454 if (
s_->sent_initial_metadata)
return;
455 if (
s_->send_initial_metadata ==
nullptr)
return;
462 if (!t_->is_client &&
s_->flow_controlled_buffer.length == 0 &&
463 s_->send_trailing_metadata !=
nullptr &&
465 ConvertInitialMetadataToTrailingMetadata();
467 t_->hpack_compressor.EncodeHeaders(
480 *
s_->send_initial_metadata, &t_->outbuf);
482 write_context_->IncInitialMetadataWrites();
485 s_->send_initial_metadata =
nullptr;
486 s_->sent_initial_metadata =
true;
487 write_context_->NoteScheduledResults();
490 "send_initial_metadata_finished");
493 void FlushWindowUpdates() {
494 if (
s_->read_closed)
return;
497 const uint32_t stream_announce =
s_->flow_control.MaybeSendUpdate();
498 if (stream_announce == 0)
return;
502 &
s_->stats.outgoing));
504 write_context_->IncWindowUpdateWrites();
508 if (!
s_->sent_initial_metadata)
return;
510 if (
s_->flow_controlled_buffer.length == 0) {
514 DataSendContext data_send_context(write_context_, t_,
s_);
516 if (!data_send_context.AnyOutgoing()) {
517 if (t_->flow_control.remote_window() <= 0) {
520 }
else if (data_send_context.stream_remote_window() <= 0) {
527 while (
s_->flow_controlled_buffer.length > 0 &&
528 data_send_context.max_outgoing() > 0) {
529 data_send_context.FlushBytes();
532 if (data_send_context.is_last_frame()) {
535 data_send_context.CallCallbacks();
536 stream_became_writable_ =
true;
537 if (
s_->flow_controlled_buffer.length > 0) {
541 write_context_->IncMessageWrites();
544 void FlushTrailingMetadata() {
545 if (!
s_->sent_initial_metadata)
return;
547 if (
s_->send_trailing_metadata ==
nullptr)
return;
548 if (
s_->flow_controlled_buffer.length != 0)
return;
551 if (
s_->send_trailing_metadata->empty()) {
553 &
s_->stats.outgoing, &t_->outbuf);
555 if (send_status_.has_value()) {
559 if (send_content_type_.has_value()) {
561 *send_content_type_);
563 t_->hpack_compressor.EncodeHeaders(
572 &
s_->stats.outgoing},
573 *
s_->send_trailing_metadata, &t_->outbuf);
575 write_context_->IncTrailingMetadataWrites();
579 write_context_->NoteScheduledResults();
582 "send_trailing_metadata_finished");
585 bool stream_became_writable() {
return stream_became_writable_; }
588 void ConvertInitialMetadataToTrailingMetadata() {
599 void SentLastFrame() {
600 s_->send_trailing_metadata =
nullptr;
601 if (
s_->sent_trailing_metadata_op) {
602 *
s_->sent_trailing_metadata_op =
true;
603 s_->sent_trailing_metadata_op =
nullptr;
605 s_->sent_trailing_metadata =
true;
608 if (!t_->is_client && !
s_->read_closed) {
617 WriteContext*
const write_context_;
620 bool stream_became_writable_ =
false;
632 ctx.FlushQueuedBuffers();
633 ctx.EnactHpackSettings();
635 if (t->flow_control.remote_window() > 0) {
636 ctx.UpdateStreamsNoLongerStalled();
642 StreamWriteContext stream_ctx(&
ctx, s);
643 size_t orig_len = t->outbuf.length;
644 stream_ctx.FlushInitialMetadata();
645 stream_ctx.FlushWindowUpdates();
646 stream_ctx.FlushData();
647 stream_ctx.FlushTrailingMetadata();
648 if (t->outbuf.length > orig_len) {
650 s->byte_counter += t->outbuf.length - orig_len;
655 if (stream_ctx.stream_became_writable()) {
667 ctx.FlushWindowUpdates();
678 if (t->channelz_socket !=
nullptr) {
679 t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
681 t->num_messages_in_next_write = 0;
684 if (s->sending_bytes != 0) {
686 &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
688 s->sending_bytes = 0;