flow_control_fuzzer.cc
Go to the documentation of this file.
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <limits>
16 #include <queue>
17 
18 #include <grpc/grpc.h>
19 
22 #include "src/libfuzzer/libfuzzer_macro.h"
23 #include "test/core/transport/chttp2/flow_control_fuzzer.pb.h"
24 
25 bool squelch = true;
26 
27 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
28 
29 namespace grpc_core {
30 namespace chttp2 {
31 namespace {
32 
33 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
34 
37  GPR_ASSERT(clock_type != GPR_TIMESPAN);
38  gpr_timespec ts = g_now;
39  ts.clock_type = clock_type;
40  return ts;
41 }
42 
43 void InitGlobals() {
44  g_now = {1, 0, GPR_CLOCK_MONOTONIC};
47 }
48 
49 class FlowControlFuzzer {
50  public:
51  explicit FlowControlFuzzer(bool enable_bdp) {
52  ExecCtx exec_ctx;
53  tfc_ = absl::make_unique<TransportFlowControl>("fuzzer", enable_bdp,
54  &memory_owner_);
55  }
56 
57  ~FlowControlFuzzer() {
58  ExecCtx exec_ctx;
59  streams_.clear();
60  tfc_.reset();
62  }
63 
64  void Perform(const flow_control_fuzzer::Action& action);
65  void AssertNoneStuck() const;
66  void AssertAnnouncedOverInitialWindowSizeCorrect() const;
67 
68  private:
69  struct StreamPayload {
72  };
73 
74  struct SendToRemote {
75  bool bdp_ping = false;
78  std::vector<StreamPayload> stream_window_updates;
79  };
80 
81  struct SendFromRemote {
82  bool bdp_pong = false;
84  std::vector<StreamPayload> stream_writes;
85  };
86 
87  struct Stream {
88  explicit Stream(uint32_t id, TransportFlowControl* tfc) : id(id), fc(tfc) {}
89  uint32_t id;
90  StreamFlowControl fc;
93  };
94 
95  void PerformAction(FlowControlAction action, Stream* stream);
96  Stream* GetStream(uint32_t id) {
97  auto it = streams_.find(id);
98  if (it == streams_.end()) {
99  it = streams_.emplace(id, Stream{id, tfc_.get()}).first;
100  }
101  return &it->second;
102  }
103 
105  MemoryOwner memory_owner_ = memory_quota_->CreateMemoryOwner("owner");
106  std::unique_ptr<TransportFlowControl> tfc_;
109  bool scheduled_write_ = false;
111  std::deque<SendToRemote> send_to_remote_;
112  std::deque<SendFromRemote> send_from_remote_;
115  std::map<uint32_t, Stream> streams_;
116  std::queue<uint32_t> streams_to_update_;
119 };
120 
121 void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
122  ExecCtx exec_ctx;
123  bool sending_payload = false;
124  switch (action.action_case()) {
125  case flow_control_fuzzer::Action::ACTION_NOT_SET:
126  break;
127  case flow_control_fuzzer::Action::kSetMemoryQuota: {
128  memory_quota_->SetSize(
129  Clamp(action.set_memory_quota(), uint64_t(1),
131  } break;
132  case flow_control_fuzzer::Action::kStepTimeMs: {
134  g_now, gpr_time_from_millis(Clamp(action.step_time_ms(), uint64_t(1),
135  kMaxAdvanceTimeMillis),
136  GPR_TIMESPAN));
138  if (exec_ctx.Now() >= next_bdp_ping_) {
139  scheduled_write_ = true;
140  }
141  } break;
142  case flow_control_fuzzer::Action::kPeriodicUpdate: {
143  PerformAction(tfc_->PeriodicUpdate(), nullptr);
144  } break;
145  case flow_control_fuzzer::Action::kPerformSendToRemote: {
146  scheduled_write_ = true;
147  } break;
148  case flow_control_fuzzer::Action::kPerformSendToRemoteWithPayload: {
149  scheduled_write_ = true;
150  sending_payload = true;
151  } break;
152  case flow_control_fuzzer::Action::kReadSendToRemote: {
153  if (send_to_remote_.empty()) break;
154  auto sent_to_remote = send_to_remote_.front();
155  if (sent_to_remote.initial_window_size.has_value()) {
156  if (!squelch) {
157  fprintf(stderr, "Setting initial window size to %d\n",
158  sent_to_remote.initial_window_size.value());
159  }
160  SendFromRemote send_from_remote;
161  send_from_remote.ack_initial_window_size =
162  sent_to_remote.initial_window_size;
163  for (const auto& id_stream : streams_) {
164  GPR_ASSERT(id_stream.second.window_delta +
165  *sent_to_remote.initial_window_size <=
166  (1u << 31) - 1);
167  }
168  remote_initial_window_size_ = *sent_to_remote.initial_window_size;
169  send_from_remote_.push_back(send_from_remote);
170  }
171  if (sent_to_remote.bdp_ping) {
172  SendFromRemote send_from_remote;
173  send_from_remote.bdp_pong = true;
174  send_from_remote_.push_back(send_from_remote);
175  }
176  for (auto stream_update : sent_to_remote.stream_window_updates) {
177  Stream* s = GetStream(stream_update.id);
178  if (!squelch) {
179  fprintf(stderr,
180  "[%" PRIu32 "]: increase window delta by %" PRIu64
181  " from %" PRId64 "\n",
182  stream_update.id, stream_update.size, s->window_delta);
183  }
184  s->window_delta += stream_update.size;
185  GPR_ASSERT(s->window_delta <= chttp2::kMaxWindowDelta);
186  }
187  remote_transport_window_size_ += sent_to_remote.transport_window_update;
188  send_to_remote_.pop_front();
189  } break;
190  case flow_control_fuzzer::Action::kReadSendFromRemote: {
191  if (send_from_remote_.empty()) break;
192  auto sent_from_remote = send_from_remote_.front();
193  if (sent_from_remote.ack_initial_window_size.has_value()) {
194  if (!squelch) {
195  fprintf(stderr, "Received ACK for initial window size %d\n",
196  *sent_from_remote.ack_initial_window_size);
197  }
198  tfc_->SetAckedInitialWindow(*sent_from_remote.ack_initial_window_size);
200  }
201  if (sent_from_remote.bdp_pong) {
202  next_bdp_ping_ = tfc_->bdp_estimator()->CompletePing();
203  }
204  for (const auto& stream_write : sent_from_remote.stream_writes) {
205  Stream* stream = GetStream(stream_write.id);
206  if (!squelch) {
207  fprintf(stderr, "[%" PRIu32 "]: recv write of %" PRIu64 "\n",
208  stream_write.id, stream_write.size);
209  }
210  if (auto* bdp = tfc_->bdp_estimator()) {
211  bdp->AddIncomingBytes(stream_write.size);
212  }
213  StreamFlowControl::IncomingUpdateContext upd(&stream->fc);
214  GPR_ASSERT(upd.RecvData(stream_write.size).ok());
215  PerformAction(upd.MakeAction(), stream);
216  }
217  send_from_remote_.pop_front();
218  } break;
219  case flow_control_fuzzer::Action::kStreamWrite: {
220  Stream* s = GetStream(action.stream_write().id());
221  s->queued_writes += action.stream_write().size();
222  } break;
223  case flow_control_fuzzer::Action::kPerformSendFromRemote: {
224  SendFromRemote send;
225  for (auto& id_stream : streams_) {
226  auto send_amount = std::min(
227  {id_stream.second.queued_writes, remote_transport_window_size_,
228  remote_initial_window_size_ + id_stream.second.window_delta});
229  if (send_amount <= 0) continue;
230  send.stream_writes.push_back({id_stream.first, uint64_t(send_amount)});
231  id_stream.second.queued_writes -= send_amount;
232  id_stream.second.window_delta -= send_amount;
233  remote_transport_window_size_ -= send_amount;
234  }
235  send_from_remote_.push_back(send);
236  } break;
237  case flow_control_fuzzer::Action::kSetMinProgressSize: {
238  Stream* s = GetStream(action.set_min_progress_size().id());
239  StreamFlowControl::IncomingUpdateContext upd(&s->fc);
240  upd.SetMinProgressSize(action.set_min_progress_size().size());
241  PerformAction(upd.MakeAction(), s);
242  } break;
243  case flow_control_fuzzer::Action::kAllocateMemory: {
244  auto allocate = std::min(
245  size_t(action.allocate_memory()),
247  allocated_memory_ += allocate;
248  memory_owner_.Reserve(allocate);
249  } break;
250  case flow_control_fuzzer::Action::kDeallocateMemory: {
251  auto deallocate =
252  std::min(uint64_t(action.deallocate_memory()), allocated_memory_);
253  allocated_memory_ -= deallocate;
254  memory_owner_.Release(deallocate);
255  } break;
256  case flow_control_fuzzer::Action::kSetPendingSize: {
257  Stream* s = GetStream(action.set_min_progress_size().id());
258  StreamFlowControl::IncomingUpdateContext upd(&s->fc);
259  upd.SetPendingSize(action.set_pending_size().size());
260  PerformAction(upd.MakeAction(), s);
261  } break;
262  }
263  if (scheduled_write_) {
264  SendToRemote send;
265  if (exec_ctx.Now() >= next_bdp_ping_) {
266  if (auto* bdp = tfc_->bdp_estimator()) {
267  bdp->SchedulePing();
268  bdp->StartPing();
270  send.bdp_ping = true;
271  }
272  }
276  send.initial_window_size =
278  }
279  while (!streams_to_update_.empty()) {
280  auto* stream = GetStream(streams_to_update_.front());
281  streams_to_update_.pop();
282  send.stream_window_updates.push_back(
283  {stream->id, stream->fc.MaybeSendUpdate()});
284  }
285  send.transport_window_update = tfc_->MaybeSendUpdate(sending_payload);
287  send_to_remote_.emplace_back(std::move(send));
288  scheduled_write_ = false;
289  }
290 }
291 
292 void FlowControlFuzzer::PerformAction(FlowControlAction action,
293  Stream* stream) {
294  if (!squelch) {
295  fprintf(stderr, "[%" PRId64 "]: ACTION: %s\n",
296  stream == nullptr ? int64_t(-1) : int64_t(stream->id),
297  action.DebugString().c_str());
298  }
299 
300  auto with_urgency = [this](FlowControlAction::Urgency urgency,
301  std::function<void()> f) {
302  switch (urgency) {
304  break;
306  scheduled_write_ = true;
309  f();
310  break;
311  }
312  };
313  with_urgency(action.send_stream_update(),
314  [this, stream]() { streams_to_update_.push(stream->id); });
315  with_urgency(action.send_transport_update(), []() {});
316  with_urgency(action.send_initial_window_update(), [this, &action]() {
317  GPR_ASSERT(action.initial_window_size() >= chttp2::kMinInitialWindowSize);
318  GPR_ASSERT(action.initial_window_size() <= chttp2::kMaxInitialWindowSize);
319  queued_initial_window_size_ = action.initial_window_size();
320  });
321  with_urgency(action.send_max_frame_size_update(), [this, &action]() {
322  queued_send_max_frame_size_ = action.max_frame_size();
323  });
324 }
325 
326 void FlowControlFuzzer::AssertNoneStuck() const {
328 
329  // Reconcile all the values to get the view of the remote that is knowable to
330  // the flow control system.
331  std::map<uint32_t, int64_t> reconciled_stream_deltas;
332  int64_t reconciled_transport_window = remote_transport_window_size_;
333  int64_t reconciled_initial_window = remote_initial_window_size_;
334  for (const auto& id_stream : streams_) {
335  reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta;
336  }
337 
338  // Anything that's been sent from flow control -> remote needs to be added to
339  // the remote.
340  for (const auto& send_to_remote : send_to_remote_) {
341  if (send_to_remote.initial_window_size.has_value()) {
342  reconciled_initial_window = *send_to_remote.initial_window_size;
343  }
344  reconciled_transport_window += send_to_remote.transport_window_update;
345  for (const auto& stream_update : send_to_remote.stream_window_updates) {
346  reconciled_stream_deltas[stream_update.id] += stream_update.size;
347  }
348  }
349 
350  // Anything that's been sent from remote -> flow control needs to be wound
351  // back into the remote.
352  for (const auto& send_from_remote : send_from_remote_) {
353  for (const auto& stream_write : send_from_remote.stream_writes) {
354  reconciled_stream_deltas[stream_write.id] += stream_write.size;
355  reconciled_transport_window += stream_write.size;
356  }
357  }
358 
359  // Finally, if a stream has indicated it's willing to read, the reconciled
360  // remote *MUST* be in a state where it could send at least one byte.
361  for (const auto& id_stream : streams_) {
362  if (id_stream.second.fc.min_progress_size() == 0) continue;
363  int64_t stream_window =
364  reconciled_stream_deltas[id_stream.first] + reconciled_initial_window;
365  if (stream_window <= 0 || reconciled_transport_window <= 0) {
366  fprintf(stderr,
367  "FAILED: stream %d has stream_window=%" PRId64
368  ", transport_window=%" PRId64 ", delta=%" PRId64
369  ", init_window_size=%" PRId64 ", min_progress_size=%" PRId64 "\n",
370  id_stream.first, stream_window, reconciled_transport_window,
371  reconciled_stream_deltas[id_stream.first],
372  reconciled_initial_window,
373  int64_t(id_stream.second.fc.min_progress_size()));
374  abort();
375  }
376  }
377 }
378 
379 void FlowControlFuzzer::AssertAnnouncedOverInitialWindowSizeCorrect() const {
380  int64_t value_from_streams = 0;
381 
382  for (const auto& id_stream : streams_) {
383  const auto& stream = id_stream.second;
384  if (stream.fc.announced_window_delta() > 0) {
385  value_from_streams += stream.fc.announced_window_delta();
386  }
387  }
388 
389  GPR_ASSERT(value_from_streams ==
390  tfc_->announced_stream_total_over_incoming_window());
391 }
392 
393 } // namespace
394 } // namespace chttp2
395 } // namespace grpc_core
396 
397 DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg& msg) {
399  grpc_core::chttp2::FlowControlFuzzer fuzzer(msg.enable_bdp());
400  for (const auto& action : msg.actions()) {
401  if (!squelch) {
402  fprintf(stderr, "%s\n", action.DebugString().c_str());
403  }
404  fuzzer.Perform(action);
405  fuzzer.AssertNoneStuck();
406  fuzzer.AssertAnnouncedOverInitialWindowSizeCorrect();
407  }
408 }
tfc_
std::unique_ptr< TransportFlowControl > tfc_
Definition: flow_control_fuzzer.cc:106
streams_to_update_
std::queue< uint32_t > streams_to_update_
Definition: flow_control_fuzzer.cc:116
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
Stream
Definition: bm_chttp2_transport.cc:199
grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY
@ UPDATE_IMMEDIATELY
regen-readme.it
it
Definition: regen-readme.py:15
send_to_remote_
std::deque< SendToRemote > send_to_remote_
Definition: flow_control_fuzzer.cc:111
grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE
@ QUEUE_UPDATE
Timestamp
Definition: bloaty/third_party/protobuf/src/google/protobuf/timestamp.pb.h:69
grpc_event_engine::experimental::MemoryRequest::max_allowed_size
static constexpr size_t max_allowed_size()
Maximum allowable request size - hard coded to 1GB.
Definition: memory_request.h:38
fuzzer
Fuzzer * fuzzer
Definition: promise_fuzzer.cc:124
grpc_core::TestOnlySetProcessEpoch
void TestOnlySetProcessEpoch(gpr_timespec epoch)
Definition: src/core/lib/gprpp/time.cc:201
grpc_core
Definition: call_metric_recorder.h:31
bdp_pong
bool bdp_pong
Definition: flow_control_fuzzer.cc:82
memory_owner_
MemoryOwner memory_owner_
Definition: flow_control_fuzzer.cc:105
queued_send_max_frame_size_
absl::optional< uint32_t > queued_send_max_frame_size_
Definition: flow_control_fuzzer.cc:108
sending_initial_window_size_
bool sending_initial_window_size_
Definition: flow_control_fuzzer.cc:110
grpc_core::chttp2::kDefaultWindow
static constexpr uint32_t kDefaultWindow
Definition: flow_control.h:53
absl::FormatConversionChar::s
@ s
gpr_now_impl
gpr_timespec(* gpr_now_impl)(gpr_clock_type clock_type)
grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED
@ NO_ACTION_NEEDED
grpc_core::MakeMemoryQuota
MemoryQuotaRefPtr MakeMemoryQuota(std::string name)
Definition: memory_quota.h:456
python_utils.port_server.stderr
stderr
Definition: port_server.py:51
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
now_impl
static gpr_timespec now_impl(gpr_clock_type clock_type)
Definition: filter_fuzzer.cc:41
autogen_x86imm.f
f
Definition: autogen_x86imm.py:9
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
next_bdp_ping_
Timestamp next_bdp_ping_
Definition: flow_control_fuzzer.cc:118
queued_initial_window_size_
absl::optional< uint32_t > queued_initial_window_size_
Definition: flow_control_fuzzer.cc:107
send_from_remote_
std::deque< SendFromRemote > send_from_remote_
Definition: flow_control_fuzzer.cc:112
grpc.h
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
window_delta
int64_t window_delta
Definition: flow_control_fuzzer.cc:92
absl::optional< uint32_t >
size
uint64_t size
Definition: flow_control_fuzzer.cc:71
memory_quota_
MemoryQuotaRefPtr memory_quota_
Definition: flow_control_fuzzer.cc:104
bdp_ping
bool bdp_ping
Definition: flow_control_fuzzer.cc:75
squelch
bool squelch
Definition: flow_control_fuzzer.cc:25
grpc_core::MemoryQuotaRefPtr
std::shared_ptr< MemoryQuota > MemoryQuotaRefPtr
Definition: memory_quota.h:455
GPR_CLOCK_MONOTONIC
@ GPR_CLOCK_MONOTONIC
Definition: gpr_types.h:36
grpc_core::chttp2::kMaxWindowDelta
static constexpr const int64_t kMaxWindowDelta
Definition: flow_control.h:61
min
#define min(a, b)
Definition: qsort.h:83
remote_initial_window_size_
uint32_t remote_initial_window_size_
Definition: flow_control_fuzzer.cc:113
streams_
std::map< uint32_t, Stream > streams_
Definition: flow_control_fuzzer.cc:115
stream_window_updates
std::vector< StreamPayload > stream_window_updates
Definition: flow_control_fuzzer.cc:78
grpc_core::Timestamp::ProcessEpoch
static constexpr Timestamp ProcessEpoch()
Definition: src/core/lib/gprpp/time.h:77
msg
std::string msg
Definition: client_interceptors_end2end_test.cc:372
remote_transport_window_size_
int64_t remote_transport_window_size_
Definition: flow_control_fuzzer.cc:114
gpr_timespec::clock_type
gpr_clock_type clock_type
Definition: gpr_types.h:55
transport_window_update
uint32_t transport_window_update
Definition: flow_control_fuzzer.cc:77
grpc_core::Clamp
T Clamp(T val, T min, T max)
Definition: useful.h:31
queued_writes
int64_t queued_writes
Definition: flow_control_fuzzer.cc:91
client.action
action
Definition: examples/python/xds/client.py:49
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
initial_window_size
absl::optional< uint32_t > initial_window_size
Definition: flow_control_fuzzer.cc:76
gpr_timespec
struct gpr_timespec gpr_timespec
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
absl::optional::reset
ABSL_ATTRIBUTE_REINITIALIZES void reset() noexcept
Definition: abseil-cpp/absl/types/optional.h:342
grpc_core::chttp2::FlowControlAction::Urgency
Urgency
Definition: flow_control.h:73
g_now
static gpr_timespec g_now
Definition: filter_fuzzer.cc:38
first
StrT first
Definition: cxa_demangle.cpp:4884
flow_control.h
exec_ctx.h
fc
StreamFlowControl fc
Definition: flow_control_fuzzer.cc:90
gpr_time_from_millis
GPRAPI gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:119
scheduled_write_
bool scheduled_write_
Definition: flow_control_fuzzer.cc:109
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
gpr_timespec
Definition: gpr_types.h:50
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
google::protobuf::python::InitGlobals
void InitGlobals()
Definition: bloaty/third_party/protobuf/python/google/protobuf/pyext/message.cc:2932
ABSL_FALLTHROUGH_INTENDED
#define ABSL_FALLTHROUGH_INTENDED
Definition: abseil-cpp/absl/base/attributes.h:641
ack_initial_window_size
absl::optional< uint32_t > ack_initial_window_size
Definition: flow_control_fuzzer.cc:83
DEFINE_PROTO_FUZZER
DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg &msg)
Definition: flow_control_fuzzer.cc:397
stream_writes
std::vector< StreamPayload > stream_writes
Definition: flow_control_fuzzer.cc:84
absl::exchange
T exchange(T &obj, U &&new_value)
Definition: abseil-cpp/absl/utility/utility.h:314
id
uint32_t id
Definition: flow_control_fuzzer.cc:70
grpc_core::ExecCtx::InvalidateNow
void InvalidateNow()
Definition: exec_ctx.h:188
gpr_clock_type
gpr_clock_type
Definition: gpr_types.h:34
allocated_memory_
uint64_t allocated_memory_
Definition: flow_control_fuzzer.cc:117
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 02:58:24