Go to the documentation of this file.
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/str_join.h"
52 constexpr
const int64_t kMaxWindowUpdateSize = (1
u << 31) - 1;
75 std::vector<std::string> segments;
93 if (segments.empty())
return "no action";
102 bool enable_bdp_probe,
105 enable_bdp_probe_(enable_bdp_probe),
106 bdp_estimator_(
name),
111 .set_initial_control_value(TargetLogBdp())
112 .set_min_control_value(-1)
113 .set_max_control_value(25)
114 .set_integral_range(10)),
118 const uint32_t target_announced_window =
124 int64_t(0), kMaxWindowUpdateSize));
135 return tfc_upd_.
RecvData(incoming_frame_size, [
this, incoming_frame_size]() {
138 if (incoming_frame_size > acked_stream_window) {
140 "frame of size %" PRId64
" overflows local window of %" PRId64,
141 incoming_frame_size, acked_stream_window));
145 -incoming_frame_size);
156 "frame of size %" PRId64
" overflows local window of %" PRId64,
170 announced_stream_total_over_incoming_window_ +
171 target_initial_window_size_));
175 if (announced_window_ < target_window() / 2) {
176 action.set_send_transport_update(
185 static const double kLowMemPressure = 0.1;
186 static const double kZeroTarget = 22;
187 static const double kHighMemPressure = 0.8;
188 static const double kMaxMemPressure = 0.9;
189 if (memory_pressure < kLowMemPressure &&
target < kZeroTarget) {
190 target = (
target - kZeroTarget) * memory_pressure / kLowMemPressure +
192 }
else if (memory_pressure > kHighMemPressure) {
193 target *= 1 -
std::min(1.0, (memory_pressure - kHighMemPressure) /
194 (kMaxMemPressure - kHighMemPressure));
202 1 + log2(bdp_estimator_.EstimateBdp()));
207 double bdp_error =
value - pid_controller_.last_control_value();
208 const double dt = (
now - last_pid_update_).
seconds();
209 last_pid_update_ =
now;
211 const double kMaxDt = 0.1;
212 return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
220 int64_t delta = new_desired_value - *desired_value;
223 (delta <= -*desired_value / 5 || delta >= *desired_value / 5)) {
224 *desired_value = new_desired_value;
231 if (enable_bdp_probe_) {
236 double target = pow(2, SmoothLogBdp(TargetLogBdp()));
241 target_initial_window_size_ );
246 &target_initial_window_size_,
252 double bw_dbl = bdp_estimator_.EstimateBandwidth();
259 static_cast<int32_t>(target_initial_window_size_)),
277 int64_t desired_window_delta = [
this]() {
290 kMaxWindowUpdateSize);
295 if (desired_announce_size > 0) {
297 desired_announce_size >= 8192) {
298 action.set_send_stream_update(
310 sfc_->pending_size_ = pending_size;
std::unique_ptr< TransportFlowControl > tfc_
std::chrono::duration< std::int_fast64_t > seconds
TransportFlowControl::IncomingUpdateContext tfc_upd_
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
uint32_t initial_window_size_
grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl")
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Urgency send_stream_update_
void SetPendingSize(int64_t pending_size)
uint32_t DesiredAnnounceSize() const
FlowControlAction UpdateAction(FlowControlAction action)
static constexpr const uint32_t kMaxInitialWindowSize
MemoryOwner memory_owner_
int64_t target_window() const
OPENSSL_EXPORT pem_password_cb void * u
Status InternalError(absl::string_view message)
double SmoothLogBdp(double value)
StreamFlowControl *const sfc_
FlowControlAction PeriodicUpdate()
static GraphId Get(const IdMap &id, int num)
int64_t announced_window_delta_
static constexpr const uint32_t kMinInitialWindowSize
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
constexpr bool has_value() const noexcept
Urgency send_max_frame_size_update_
FlowControlAction UpdateAction(FlowControlAction action)
TransportFlowControl(const char *name, bool enable_bdp_probe, MemoryOwner *memory_owner)
Urgency send_transport_update_
virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(double current_target)=0
Urgency send_initial_window_update_
static constexpr const int64_t kMaxWindowDelta
FlowControlAction & set_send_initial_window_update(Urgency u, uint32_t update)
absl::Status RecvData(int64_t incoming_frame_size, absl::FunctionRef< absl::Status()> stream=[]() { return absl::OkStatus();})
std::ostream & operator<<(std::ostream &out, FlowControlAction::Urgency u)
std::string DebugString() const
uint32_t MaybeSendUpdate(bool writing_anyway)
TestOnlyTransportTargetWindowEstimatesMocker * g_test_only_transport_target_window_estimates_mocker
static void UpdateSetting(int64_t *desired_value, int64_t new_desired_value, FlowControlAction *action, FlowControlAction &(FlowControlAction::*set)(FlowControlAction::Urgency, uint32_t))
absl::Status RecvData(int64_t incoming_frame_size)
int64_t announced_window_
ABSL_NAMESPACE_BEGIN Time Now()
T Clamp(T val, T min, T max)
static double AdjustForMemoryPressure(double memory_pressure, double target)
TransportFlowControl *const tfc_
void UpdateAnnouncedWindowDelta(int64_t *delta, int64_t change)
int64_t min_progress_size_
static const char * UrgencyString(Urgency u)
FlowControlAction & set_send_max_frame_size_update(Urgency u, uint32_t update)
StreamFlowControl(TransportFlowControl *tfc)
FlowControlAction MakeAction()
uint32_t MaybeSendUpdate()
uint32_t acked_init_window() const
absl::optional< int64_t > pending_size_
grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:22