flow_control.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 
26 #include <algorithm>
27 #include <cmath>
28 #include <ostream>
29 #include <string>
30 #include <vector>
31 
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/str_join.h"
35 
36 #include <grpc/support/log.h>
37 
41 
43 
44 namespace grpc_core {
45 namespace chttp2 {
46 
49 
50 namespace {
51 
52 constexpr const int64_t kMaxWindowUpdateSize = (1u << 31) - 1;
53 
54 } // namespace
55 
57  switch (u) {
59  return "no-action";
61  return "now";
63  return "queue";
64  default:
65  GPR_UNREACHABLE_CODE(return "unknown");
66  }
67  GPR_UNREACHABLE_CODE(return "unknown");
68 }
69 
70 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency u) {
72 }
73 
75  std::vector<std::string> segments;
77  segments.push_back(
79  }
81  segments.push_back(absl::StrCat("s:", UrgencyString(send_stream_update_)));
82  }
84  segments.push_back(
87  }
89  segments.push_back(
90  absl::StrCat("mf=", max_frame_size_, ":",
92  }
93  if (segments.empty()) return "no action";
94  return absl::StrJoin(segments, ",");
95 }
96 
97 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action) {
98  return out << action.DebugString();
99 }
100 
102  bool enable_bdp_probe,
103  MemoryOwner* memory_owner)
104  : memory_owner_(memory_owner),
105  enable_bdp_probe_(enable_bdp_probe),
106  bdp_estimator_(name),
107  pid_controller_(PidController::Args()
108  .set_gain_p(4)
109  .set_gain_i(8)
110  .set_gain_d(0)
111  .set_initial_control_value(TargetLogBdp())
112  .set_min_control_value(-1)
113  .set_max_control_value(25)
114  .set_integral_range(10)),
115  last_pid_update_(ExecCtx::Get()->Now()) {}
116 
118  const uint32_t target_announced_window =
119  static_cast<uint32_t>(target_window());
120  if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
121  announced_window_ != target_announced_window) {
122  const uint32_t announce =
123  static_cast<uint32_t>(Clamp(target_announced_window - announced_window_,
124  int64_t(0), kMaxWindowUpdateSize));
125  announced_window_ += announce;
126  return announce;
127  }
128  return 0;
129 }
130 
132 
134  int64_t incoming_frame_size) {
135  return tfc_upd_.RecvData(incoming_frame_size, [this, incoming_frame_size]() {
136  int64_t acked_stream_window =
138  if (incoming_frame_size > acked_stream_window) {
140  "frame of size %" PRId64 " overflows local window of %" PRId64,
141  incoming_frame_size, acked_stream_window));
142  }
143 
145  -incoming_frame_size);
147  std::min(sfc_->min_progress_size_, incoming_frame_size);
148  return absl::OkStatus();
149  });
150 }
151 
153  int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream) {
154  if (incoming_frame_size > tfc_->announced_window_) {
156  "frame of size %" PRId64 " overflows local window of %" PRId64,
157  incoming_frame_size, tfc_->announced_window_));
158  }
160  if (!error.ok()) return error;
161  tfc_->announced_window_ -= incoming_frame_size;
162  return absl::OkStatus();
163 }
164 
166  // See comment above announced_stream_total_over_incoming_window_ for the
167  // logic behind this decision.
168  return static_cast<uint32_t>(
169  std::min(static_cast<int64_t>((1u << 31) - 1),
170  announced_stream_total_over_incoming_window_ +
171  target_initial_window_size_));
172 }
173 
175  if (announced_window_ < target_window() / 2) {
176  action.set_send_transport_update(
178  }
179  return action;
180 }
181 
182 // Take in a target and modifies it based on the memory pressure of the system
183 static double AdjustForMemoryPressure(double memory_pressure, double target) {
184  // do not increase window under heavy memory pressure.
185  static const double kLowMemPressure = 0.1;
186  static const double kZeroTarget = 22;
187  static const double kHighMemPressure = 0.8;
188  static const double kMaxMemPressure = 0.9;
189  if (memory_pressure < kLowMemPressure && target < kZeroTarget) {
190  target = (target - kZeroTarget) * memory_pressure / kLowMemPressure +
191  kZeroTarget;
192  } else if (memory_pressure > kHighMemPressure) {
193  target *= 1 - std::min(1.0, (memory_pressure - kHighMemPressure) /
194  (kMaxMemPressure - kHighMemPressure));
195  }
196  return target;
197 }
198 
201  memory_owner_->is_valid() ? memory_owner_->InstantaneousPressure() : 0.0,
202  1 + log2(bdp_estimator_.EstimateBdp()));
203 }
204 
206  Timestamp now = ExecCtx::Get()->Now();
207  double bdp_error = value - pid_controller_.last_control_value();
208  const double dt = (now - last_pid_update_).seconds();
209  last_pid_update_ = now;
210  // Limit dt to 100ms
211  const double kMaxDt = 0.1;
212  return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
213 }
214 
216  int64_t* desired_value, int64_t new_desired_value,
219  uint32_t)) {
220  int64_t delta = new_desired_value - *desired_value;
221  // TODO(ncteisen): tune this
222  if (delta != 0 &&
223  (delta <= -*desired_value / 5 || delta >= *desired_value / 5)) {
224  *desired_value = new_desired_value;
226  }
227 }
228 
231  if (enable_bdp_probe_) {
232  // get bdp estimate and update initial_window accordingly.
233  // target might change based on how much memory pressure we are under
234  // TODO(ncteisen): experiment with setting target to be huge under low
235  // memory pressure.
236  double target = pow(2, SmoothLogBdp(TargetLogBdp()));
238  // Hook for simulating unusual flow control situations in tests.
241  target_initial_window_size_ /* current target */);
242  }
243  // Though initial window 'could' drop to 0, we keep the floor at
244  // kMinInitialWindowSize
245  UpdateSetting(
246  &target_initial_window_size_,
247  static_cast<int32_t>(Clamp(target, double(kMinInitialWindowSize),
248  double(kMaxInitialWindowSize))),
250 
251  // get bandwidth estimate and update max_frame accordingly.
252  double bw_dbl = bdp_estimator_.EstimateBandwidth();
253  // we target the max of BDP or bandwidth in microseconds.
254  UpdateSetting(
255  &target_frame_size_,
256  static_cast<int32_t>(Clamp(
257  std::max(static_cast<int32_t>(Clamp(bw_dbl, 0.0, double(INT_MAX))) /
258  1000,
259  static_cast<int32_t>(target_initial_window_size_)),
260  16384, 16777215)),
262  }
263  return UpdateAction(action);
264 }
265 
268  const uint32_t announce = DesiredAnnounceSize();
269  pending_size_ = absl::nullopt;
272  tfc_upd.MakeAction();
273  return announce;
274 }
275 
277  int64_t desired_window_delta = [this]() {
278  if (min_progress_size_ == 0) {
279  if (pending_size_.has_value() &&
281  return -*pending_size_;
282  } else {
284  }
285  } else {
287  }
288  }();
289  return Clamp(desired_window_delta - announced_window_delta_, int64_t(0),
290  kMaxWindowUpdateSize);
291 }
292 
294  const int64_t desired_announce_size = DesiredAnnounceSize();
295  if (desired_announce_size > 0) {
296  if ((min_progress_size_ > 0 && announced_window_delta_ < 0) ||
297  desired_announce_size >= 8192) {
298  action.set_send_stream_update(
300  } else {
301  action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
302  }
303  }
304  return action;
305 }
306 
308  int64_t pending_size) {
309  GPR_ASSERT(pending_size >= 0);
310  sfc_->pending_size_ = pending_size;
311 }
312 
313 } // namespace chttp2
314 } // namespace grpc_core
tfc_
std::unique_ptr< TransportFlowControl > tfc_
Definition: flow_control_fuzzer.cc:106
grpc_core::MemoryOwner
Definition: memory_quota.h:381
absl::time_internal::cctz::seconds
std::chrono::duration< std::int_fast64_t > seconds
Definition: abseil-cpp/absl/time/internal/cctz/include/cctz/time_zone.h:40
grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY
@ UPDATE_IMMEDIATELY
gen_build_yaml.out
dictionary out
Definition: src/benchmark/gen_build_yaml.py:24
now
static double now(void)
Definition: test/core/fling/client.cc:130
log.h
grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext::tfc_upd_
TransportFlowControl::IncomingUpdateContext tfc_upd_
Definition: flow_control.h:323
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc_core::chttp2::FlowControlAction::initial_window_size_
uint32_t initial_window_size_
Definition: flow_control.h:136
grpc_flowctl_trace
grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl")
absl::StrFormat
ABSL_MUST_USE_RESULT std::string StrFormat(const FormatSpec< Args... > &format, const Args &... args)
Definition: abseil-cpp/absl/strings/str_format.h:338
grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE
@ QUEUE_UPDATE
grpc_core::chttp2::FlowControlAction::send_stream_update_
Urgency send_stream_update_
Definition: flow_control.h:132
grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext::SetPendingSize
void SetPendingSize(int64_t pending_size)
Definition: flow_control.cc:307
grpc_core
Definition: call_metric_recorder.h:31
grpc_core::chttp2::StreamFlowControl::DesiredAnnounceSize
uint32_t DesiredAnnounceSize() const
Definition: flow_control.cc:276
grpc_core::chttp2::TransportFlowControl::UpdateAction
FlowControlAction UpdateAction(FlowControlAction action)
Definition: flow_control.cc:174
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
useful.h
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_core::chttp2::kMaxInitialWindowSize
static constexpr const uint32_t kMaxInitialWindowSize
Definition: flow_control.h:59
memory_owner_
MemoryOwner memory_owner_
Definition: flow_control_fuzzer.cc:105
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
grpc_core::chttp2::TransportFlowControl::target_window
int64_t target_window() const
Definition: flow_control.cc:165
u
OPENSSL_EXPORT pem_password_cb void * u
Definition: pem.h:351
setup.name
name
Definition: setup.py:542
absl::InternalError
Status InternalError(absl::string_view message)
Definition: third_party/abseil-cpp/absl/status/status.cc:347
grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED
@ NO_ACTION_NEEDED
grpc_core::chttp2::TransportFlowControl::SmoothLogBdp
double SmoothLogBdp(double value)
Definition: flow_control.cc:205
grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext::sfc_
StreamFlowControl *const sfc_
Definition: flow_control.h:324
grpc_core::chttp2::FlowControlAction::max_frame_size_
uint32_t max_frame_size_
Definition: flow_control.h:137
grpc_core::chttp2::TransportFlowControl::PeriodicUpdate
FlowControlAction PeriodicUpdate()
Definition: flow_control.cc:229
absl::synchronization_internal::Get
static GraphId Get(const IdMap &id, int num)
Definition: abseil-cpp/absl/synchronization/internal/graphcycles_test.cc:44
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc_core::chttp2::StreamFlowControl::announced_window_delta_
int64_t announced_window_delta_
Definition: flow_control.h:359
grpc_core::chttp2::TransportFlowControl::TargetLogBdp
double TargetLogBdp()
Definition: flow_control.cc:199
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::chttp2::kMinInitialWindowSize
static constexpr const uint32_t kMinInitialWindowSize
Definition: flow_control.h:58
absl::StrJoin
std::string StrJoin(Iterator start, Iterator end, absl::string_view sep, Formatter &&fmt)
Definition: abseil-cpp/absl/strings/str_join.h:239
hpack_encoder_fixtures::Args
Args({0, 16384})
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
grpc_core::chttp2::TransportFlowControl
Definition: flow_control.h:145
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_core::chttp2::FlowControlAction::send_max_frame_size_update_
Urgency send_max_frame_size_update_
Definition: flow_control.h:135
grpc_core::chttp2::StreamFlowControl::UpdateAction
FlowControlAction UpdateAction(FlowControlAction action)
Definition: flow_control.cc:293
grpc_core::chttp2::TransportFlowControl::TransportFlowControl
TransportFlowControl(const char *name, bool enable_bdp_probe, MemoryOwner *memory_owner)
Definition: flow_control.cc:101
grpc_core::chttp2::FlowControlAction::send_transport_update_
Urgency send_transport_update_
Definition: flow_control.h:133
grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker::ComputeNextTargetInitialWindowSizeFromPeriodicUpdate
virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(double current_target)=0
grpc_core::chttp2::FlowControlAction::send_initial_window_update_
Urgency send_initial_window_update_
Definition: flow_control.h:134
grpc_core::chttp2::kMaxWindowDelta
static constexpr const int64_t kMaxWindowDelta
Definition: flow_control.h:61
grpc_core::chttp2::FlowControlAction::set_send_initial_window_update
FlowControlAction & set_send_initial_window_update(Urgency u, uint32_t update)
Definition: flow_control.h:102
grpc_core::chttp2::TransportFlowControl::IncomingUpdateContext::RecvData
absl::Status RecvData(int64_t incoming_frame_size, absl::FunctionRef< absl::Status()> stream=[]() { return absl::OkStatus();})
Definition: flow_control.cc:152
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
min
#define min(a, b)
Definition: qsort.h:83
grpc_core::chttp2::operator<<
std::ostream & operator<<(std::ostream &out, FlowControlAction::Urgency u)
Definition: flow_control.cc:70
grpc_core::chttp2::FlowControlAction::DebugString
std::string DebugString() const
Definition: flow_control.cc:74
grpc_core::chttp2::FlowControlAction
Definition: flow_control.h:71
grpc_core::chttp2::TransportFlowControl::MaybeSendUpdate
uint32_t MaybeSendUpdate(bool writing_anyway)
Definition: flow_control.cc:117
grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker
TestOnlyTransportTargetWindowEstimatesMocker * g_test_only_transport_target_window_estimates_mocker
Definition: flow_control.cc:48
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_core::TraceFlag
Definition: debug/trace.h:63
grpc_core::chttp2::TransportFlowControl::UpdateSetting
static void UpdateSetting(int64_t *desired_value, int64_t new_desired_value, FlowControlAction *action, FlowControlAction &(FlowControlAction::*set)(FlowControlAction::Urgency, uint32_t))
Definition: flow_control.cc:215
grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker
Definition: flow_control.h:366
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext::RecvData
absl::Status RecvData(int64_t incoming_frame_size)
Definition: flow_control.cc:133
grpc_core::chttp2::TransportFlowControl::announced_window_
int64_t announced_window_
Definition: flow_control.h:287
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
grpc_core::Clamp
T Clamp(T val, T min, T max)
Definition: useful.h:31
client.action
action
Definition: examples/python/xds/client.py:49
grpc_core::chttp2::TransportFlowControl::IncomingUpdateContext
Definition: flow_control.h:162
grpc_core::chttp2::AdjustForMemoryPressure
static double AdjustForMemoryPressure(double memory_pressure, double target)
Definition: flow_control.cc:183
grpc_core::chttp2::StreamFlowControl::tfc_
TransportFlowControl *const tfc_
Definition: flow_control.h:356
grpc_core::chttp2::TransportFlowControl::IncomingUpdateContext::UpdateAnnouncedWindowDelta
void UpdateAnnouncedWindowDelta(int64_t *delta, int64_t change)
Definition: flow_control.h:184
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc_core::chttp2::StreamFlowControl::min_progress_size_
int64_t min_progress_size_
Definition: flow_control.h:357
grpc_core::PidController
Definition: pid_controller.h:35
grpc_core::chttp2::FlowControlAction::UrgencyString
static const char * UrgencyString(Urgency u)
Definition: flow_control.cc:56
grpc_core::chttp2::FlowControlAction::Urgency
Urgency
Definition: flow_control.h:73
flow_control.h
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
exec_ctx.h
grpc_core::chttp2::FlowControlAction::set_send_max_frame_size_update
FlowControlAction & set_send_max_frame_size_update(Urgency u, uint32_t update)
Definition: flow_control.h:108
memory_quota.h
absl::FunctionRef
Definition: abseil-cpp/absl/functional/function_ref.h:65
grpc_core::chttp2::StreamFlowControl::StreamFlowControl
StreamFlowControl(TransportFlowControl *tfc)
Definition: flow_control.cc:131
grpc_core::chttp2::TransportFlowControl::IncomingUpdateContext::MakeAction
FlowControlAction MakeAction()
Definition: flow_control.h:172
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
int32_t
signed int int32_t
Definition: stdint-msvc2008.h:77
setup.target
target
Definition: third_party/bloaty/third_party/protobuf/python/setup.py:179
grpc_core::chttp2::StreamFlowControl::MaybeSendUpdate
uint32_t MaybeSendUpdate()
Definition: flow_control.cc:266
grpc_core::chttp2::TransportFlowControl::acked_init_window
uint32_t acked_init_window() const
Definition: flow_control.h:234
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
grpc_core::chttp2::StreamFlowControl::pending_size_
absl::optional< int64_t > pending_size_
Definition: flow_control.h:360
port_platform.h
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:22