24 #include <type_traits>
26 #include "absl/status/status.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/utility/utility.h"
84 if (
auto* sweep =
sweep_.exchange(
nullptr, std::memory_order_acq_rel)) {
85 sweep->RunAndDelete(absl::nullopt);
91 if (
auto* sweep = sweep_.exchange(
nullptr, std::memory_order_acq_rel)) {
92 sweep->RunAndDelete(
std::move(reclamation_sweep));
97 if (sweep_.load(std::memory_order_relaxed)) {
114 std::unique_ptr<QueuedNode> node(
116 if (node ==
nullptr)
break;
117 if (node->reclaimer_handle->sweep_.load(std::memory_order_relaxed) !=
119 state_->queue.Push(node.release());
140 std::unique_ptr<QueuedNode> node(
143 if (node !=
nullptr)
return std::move(node->reclaimer_handle);
173 std::shared_ptr<BasicMemoryQuota> memory_quota;
195 if (reservation.has_value()) {
209 if (scaled_size_over_min != 0) {
210 const auto pressure_and_max_recommended_allocation_size =
211 memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize();
212 double pressure = pressure_and_max_recommended_allocation_size.first;
213 size_t max_recommended_allocation_size =
214 pressure_and_max_recommended_allocation_size.second;
216 if (pressure > 0.8) {
217 scaled_size_over_min =
220 (1.0 - pressure) / 0.2));
222 if (max_recommended_allocation_size <
request.min()) {
223 scaled_size_over_min = 0;
224 }
else if (
request.min() + scaled_size_over_min >
225 max_recommended_allocation_size) {
226 scaled_size_over_min = max_recommended_allocation_size -
request.min();
233 size_t available =
free_bytes_.load(std::memory_order_acquire);
243 std::memory_order_acq_rel,
244 std::memory_order_acquire)) {
251 size_t free =
free_bytes_.load(std::memory_order_relaxed);
254 if (free <= kReduceToSize)
return;
255 size_t ret = free - kReduceToSize;
256 if (
free_bytes_.compare_exchange_weak(free, kReduceToSize,
257 std::memory_order_acq_rel,
258 std::memory_order_acquire)) {
278 taken_bytes_.fetch_add(amount, std::memory_order_relaxed);
280 free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
293 auto self = shared_from_this();
294 std::weak_ptr<EventEngineMemoryAllocatorImpl> self_weak{
self};
298 auto self = self_weak.lock();
299 if (
self ==
nullptr)
return;
301 p->registered_reclaimer_.store(
false, std::memory_order_relaxed);
303 size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel);
304 if (return_bytes == 0)
return;
306 p->taken_bytes_.fetch_sub(return_bytes);
308 p->memory_quota_->Return(return_bytes);
324 if (
memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) !=
338 auto self = shared_from_this();
344 auto reclamation_loop =
Loop(
Seq(
347 if (
self->free_bytes_.load(std::memory_order_acquire) > 0) {
356 auto annotate = [](
const char*
name) {
361 return Race(
Map(
self->reclaimers_[0].Next(), annotate(
"compact")),
362 Map(
self->reclaimers_[1].Next(), annotate(
"benign")),
363 Map(
self->reclaimers_[2].Next(), annotate(
"idle")),
364 Map(
self->reclaimers_[3].Next(), annotate(
"destructive")));
367 std::tuple<const char*, RefCountedPtr<ReclaimerQueue::Handle>>
arg) {
371 size_t quota_size =
self->quota_size_.load();
373 "RQ: %s perform %s reclamation. Available free bytes: %f, "
374 "total quota_size: %zu",
375 self->name_.c_str(), std::get<0>(
arg), free, quota_size);
381 self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
383 reclaimer->Run(ReclamationSweep(
388 return WaitForSweepPromise(
self, token);
390 []() -> LoopCtl<absl::Status> {
395 reclaimer_activity_ =
402 void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); }
405 size_t old_size = quota_size_.exchange(
new_size, std::memory_order_relaxed);
415 void BasicMemoryQuota::Take(
size_t amount) {
417 if (amount == 0)
return;
420 auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel);
422 if (prior >= 0 && prior <
static_cast<intptr_t>(amount)) {
423 if (reclaimer_activity_ !=
nullptr) reclaimer_activity_->ForceWakeup();
428 uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
429 if (current != token)
return;
430 if (reclamation_counter_.compare_exchange_strong(current, current + 1,
431 std::memory_order_relaxed,
432 std::memory_order_relaxed)) {
435 size_t quota_size = quota_size_.load();
437 "RQ: %s reclamation complete. Available free bytes: %f, "
438 "total quota_size: %zu",
439 name_.c_str(), free, quota_size);
446 free_bytes_.fetch_add(amount, std::memory_order_relaxed);
449 std::pair<double, size_t>
450 BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize()
const {
451 double free = free_bytes_.load();
452 if (free < 0) free = 0;
453 size_t quota_size = quota_size_.load();
454 double size = quota_size;
455 if (
size < 1)
return std::make_pair(1.0, 1);
456 double pressure = (
size - free) /
size;
457 if (pressure < 0.0) pressure = 0.0;
458 if (pressure > 1.0) pressure = 1.0;
459 return std::make_pair(pressure, quota_size / 16);
467 auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
473 auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(