22 #include "src/libfuzzer/libfuzzer_macro.h"
23 #include "test/core/transport/chttp2/flow_control_fuzzer.pb.h"
33 constexpr
uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
49 class FlowControlFuzzer {
51 explicit FlowControlFuzzer(
bool enable_bdp) {
53 tfc_ = absl::make_unique<TransportFlowControl>(
"fuzzer", enable_bdp,
57 ~FlowControlFuzzer() {
64 void Perform(
const flow_control_fuzzer::Action&
action);
65 void AssertNoneStuck()
const;
66 void AssertAnnouncedOverInitialWindowSizeCorrect()
const;
69 struct StreamPayload {
81 struct SendFromRemote {
106 std::unique_ptr<TransportFlowControl>
tfc_;
121 void FlowControlFuzzer::Perform(
const flow_control_fuzzer::Action&
action) {
123 bool sending_payload =
false;
124 switch (
action.action_case()) {
125 case flow_control_fuzzer::Action::ACTION_NOT_SET:
127 case flow_control_fuzzer::Action::kSetMemoryQuota: {
132 case flow_control_fuzzer::Action::kStepTimeMs: {
135 kMaxAdvanceTimeMillis),
142 case flow_control_fuzzer::Action::kPeriodicUpdate: {
143 PerformAction(
tfc_->PeriodicUpdate(),
nullptr);
145 case flow_control_fuzzer::Action::kPerformSendToRemote: {
148 case flow_control_fuzzer::Action::kPerformSendToRemoteWithPayload: {
150 sending_payload =
true;
152 case flow_control_fuzzer::Action::kReadSendToRemote: {
155 if (sent_to_remote.initial_window_size.has_value()) {
157 fprintf(
stderr,
"Setting initial window size to %d\n",
158 sent_to_remote.initial_window_size.value());
160 SendFromRemote send_from_remote;
161 send_from_remote.ack_initial_window_size =
162 sent_to_remote.initial_window_size;
163 for (
const auto& id_stream :
streams_) {
165 *sent_to_remote.initial_window_size <=
171 if (sent_to_remote.bdp_ping) {
172 SendFromRemote send_from_remote;
173 send_from_remote.bdp_pong =
true;
176 for (
auto stream_update : sent_to_remote.stream_window_updates) {
177 Stream*
s = GetStream(stream_update.id);
180 "[%" PRIu32
"]: increase window delta by %" PRIu64
181 " from %" PRId64
"\n",
182 stream_update.id, stream_update.size,
s->window_delta);
184 s->window_delta += stream_update.size;
190 case flow_control_fuzzer::Action::kReadSendFromRemote: {
193 if (sent_from_remote.ack_initial_window_size.has_value()) {
195 fprintf(
stderr,
"Received ACK for initial window size %d\n",
196 *sent_from_remote.ack_initial_window_size);
198 tfc_->SetAckedInitialWindow(*sent_from_remote.ack_initial_window_size);
201 if (sent_from_remote.bdp_pong) {
204 for (
const auto& stream_write : sent_from_remote.stream_writes) {
207 fprintf(
stderr,
"[%" PRIu32
"]: recv write of %" PRIu64
"\n",
208 stream_write.id, stream_write.size);
210 if (
auto* bdp =
tfc_->bdp_estimator()) {
211 bdp->AddIncomingBytes(stream_write.size);
213 StreamFlowControl::IncomingUpdateContext upd(&
stream->fc);
214 GPR_ASSERT(upd.RecvData(stream_write.size).ok());
215 PerformAction(upd.MakeAction(),
stream);
219 case flow_control_fuzzer::Action::kStreamWrite: {
221 s->queued_writes +=
action.stream_write().size();
223 case flow_control_fuzzer::Action::kPerformSendFromRemote: {
229 if (send_amount <= 0)
continue;
230 send.stream_writes.push_back({id_stream.first,
uint64_t(send_amount)});
231 id_stream.second.queued_writes -= send_amount;
232 id_stream.second.window_delta -= send_amount;
237 case flow_control_fuzzer::Action::kSetMinProgressSize: {
239 StreamFlowControl::IncomingUpdateContext upd(&
s->fc);
240 upd.SetMinProgressSize(
action.set_min_progress_size().size());
241 PerformAction(upd.MakeAction(), s);
243 case flow_control_fuzzer::Action::kAllocateMemory: {
245 size_t(
action.allocate_memory()),
250 case flow_control_fuzzer::Action::kDeallocateMemory: {
256 case flow_control_fuzzer::Action::kSetPendingSize: {
258 StreamFlowControl::IncomingUpdateContext upd(&
s->fc);
259 upd.SetPendingSize(
action.set_pending_size().size());
260 PerformAction(upd.MakeAction(), s);
266 if (
auto* bdp =
tfc_->bdp_estimator()) {
270 send.bdp_ping =
true;
276 send.initial_window_size =
282 send.stream_window_updates.push_back(
285 send.transport_window_update =
tfc_->MaybeSendUpdate(sending_payload);
292 void FlowControlFuzzer::PerformAction(FlowControlAction
action,
295 fprintf(
stderr,
"[%" PRId64
"]: ACTION: %s\n",
297 action.DebugString().c_str());
313 with_urgency(
action.send_stream_update(),
314 [
this,
stream]() { streams_to_update_.push(stream->id); });
315 with_urgency(
action.send_transport_update(), []() {});
316 with_urgency(
action.send_initial_window_update(), [
this, &
action]() {
317 GPR_ASSERT(action.initial_window_size() >= chttp2::kMinInitialWindowSize);
318 GPR_ASSERT(action.initial_window_size() <= chttp2::kMaxInitialWindowSize);
319 queued_initial_window_size_ = action.initial_window_size();
321 with_urgency(
action.send_max_frame_size_update(), [
this, &
action]() {
322 queued_send_max_frame_size_ = action.max_frame_size();
326 void FlowControlFuzzer::AssertNoneStuck()
const {
331 std::map<uint32_t, int64_t> reconciled_stream_deltas;
334 for (
const auto& id_stream :
streams_) {
335 reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta;
341 if (send_to_remote.initial_window_size.has_value()) {
342 reconciled_initial_window = *send_to_remote.initial_window_size;
344 reconciled_transport_window += send_to_remote.transport_window_update;
345 for (
const auto& stream_update : send_to_remote.stream_window_updates) {
346 reconciled_stream_deltas[stream_update.id] += stream_update.size;
353 for (
const auto& stream_write : send_from_remote.stream_writes) {
354 reconciled_stream_deltas[stream_write.id] += stream_write.size;
355 reconciled_transport_window += stream_write.size;
361 for (
const auto& id_stream :
streams_) {
362 if (id_stream.second.fc.min_progress_size() == 0)
continue;
364 reconciled_stream_deltas[id_stream.first] + reconciled_initial_window;
365 if (stream_window <= 0 || reconciled_transport_window <= 0) {
367 "FAILED: stream %d has stream_window=%" PRId64
368 ", transport_window=%" PRId64
", delta=%" PRId64
369 ", init_window_size=%" PRId64
", min_progress_size=%" PRId64
"\n",
370 id_stream.first, stream_window, reconciled_transport_window,
371 reconciled_stream_deltas[id_stream.first],
372 reconciled_initial_window,
373 int64_t(id_stream.second.fc.min_progress_size()));
379 void FlowControlFuzzer::AssertAnnouncedOverInitialWindowSizeCorrect()
const {
380 int64_t value_from_streams = 0;
382 for (
const auto& id_stream :
streams_) {
383 const auto&
stream = id_stream.second;
384 if (
stream.fc.announced_window_delta() > 0) {
385 value_from_streams +=
stream.fc.announced_window_delta();
390 tfc_->announced_stream_total_over_incoming_window());
399 grpc_core::chttp2::FlowControlFuzzer
fuzzer(
msg.enable_bdp());
400 for (
const auto&
action :
msg.actions()) {
406 fuzzer.AssertAnnouncedOverInitialWindowSizeCorrect();