summaryrefslogtreecommitdiff
path: root/grpc/src/core/lib/resource_quota/memory_quota.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/lib/resource_quota/memory_quota.cc')
-rw-r--r--grpc/src/core/lib/resource_quota/memory_quota.cc478
1 files changed, 478 insertions, 0 deletions
diff --git a/grpc/src/core/lib/resource_quota/memory_quota.cc b/grpc/src/core/lib/resource_quota/memory_quota.cc
new file mode 100644
index 00000000..d4799095
--- /dev/null
+++ b/grpc/src/core/lib/resource_quota/memory_quota.cc
@@ -0,0 +1,478 @@
+// Copyright 2021 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/resource_quota/memory_quota.h"
+
+#include <atomic>
+
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/mpscq.h"
+#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
+#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/map.h"
+#include "src/core/lib/promise/race.h"
+#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/resource_quota/memory_quota.h"
+#include "src/core/lib/resource_quota/trace.h"
+
+namespace grpc_core {
+
+// Maximum number of bytes an allocator will request from a quota in one step.
+// Larger allocations than this will require multiple allocation requests.
+static constexpr size_t kMaxReplenishBytes = 1024 * 1024;
+
+// Minimum number of bytes an allocator will request from a quota in one step.
+static constexpr size_t kMinReplenishBytes = 4096;
+
+//
+// Reclaimer
+//
+
+ReclamationSweep::~ReclamationSweep() {
+ if (memory_quota_ != nullptr) {
+ memory_quota_->FinishReclamation(sweep_token_, std::move(waker_));
+ }
+}
+
+//
+// ReclaimerQueue
+//
+
+struct ReclaimerQueue::QueuedNode
+ : public MultiProducerSingleConsumerQueue::Node {
+ explicit QueuedNode(RefCountedPtr<Handle> reclaimer_handle)
+ : reclaimer_handle(std::move(reclaimer_handle)) {}
+ RefCountedPtr<Handle> reclaimer_handle;
+};
+
+struct ReclaimerQueue::State {
+ Mutex reader_mu;
+ MultiProducerSingleConsumerQueue queue; // reader_mu must be held to pop
+ Waker waker ABSL_GUARDED_BY(reader_mu);
+
+ ~State() {
+ bool empty = false;
+ do {
+ delete static_cast<QueuedNode*>(queue.PopAndCheckEnd(&empty));
+ } while (!empty);
+ }
+};
+
+void ReclaimerQueue::Handle::Orphan() {
+ if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
+ sweep->RunAndDelete(absl::nullopt);
+ }
+ Unref();
+}
+
+void ReclaimerQueue::Handle::Run(ReclamationSweep reclamation_sweep) {
+ if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
+ sweep->RunAndDelete(std::move(reclamation_sweep));
+ }
+}
+
+bool ReclaimerQueue::Handle::Requeue(ReclaimerQueue* new_queue) {
+ if (sweep_.load(std::memory_order_relaxed)) {
+ new_queue->Enqueue(Ref());
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void ReclaimerQueue::Handle::Sweep::MarkCancelled() {
+ // When we cancel a reclaimer we rotate the elements of the queue once -
+ // taking one non-cancelled node from the start, and placing it on the end.
+ // This ensures that we don't suffer from head of line blocking whereby a
+ // non-cancelled reclaimer at the head of the queue, in the absence of memory
+ // pressure, prevents the remainder of the queue from being cleaned up.
+ MutexLock lock(&state_->reader_mu);
+ while (true) {
+ bool empty = false;
+ std::unique_ptr<QueuedNode> node(
+ static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
+ if (node == nullptr) break;
+ if (node->reclaimer_handle->sweep_.load(std::memory_order_relaxed) !=
+ nullptr) {
+ state_->queue.Push(node.release());
+ break;
+ }
+ }
+}
+
+ReclaimerQueue::ReclaimerQueue() : state_(std::make_shared<State>()) {}
+
+ReclaimerQueue::~ReclaimerQueue() = default;
+
+void ReclaimerQueue::Enqueue(RefCountedPtr<Handle> handle) {
+ if (state_->queue.Push(new QueuedNode(std::move(handle)))) {
+ MutexLock lock(&state_->reader_mu);
+ state_->waker.Wakeup();
+ }
+}
+
+Poll<RefCountedPtr<ReclaimerQueue::Handle>> ReclaimerQueue::PollNext() {
+ MutexLock lock(&state_->reader_mu);
+ bool empty = false;
+ // Try to pull from the queue.
+ std::unique_ptr<QueuedNode> node(
+ static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
+ // If we get something, great.
+ if (node != nullptr) return std::move(node->reclaimer_handle);
+ if (!empty) {
+ // If we don't, but the queue is probably not empty, schedule an immediate
+ // repoll.
+ Activity::current()->ForceImmediateRepoll();
+ } else {
+ // Otherwise, schedule a wakeup for whenever something is pushed.
+ state_->waker = Activity::current()->MakeNonOwningWaker();
+ }
+ return Pending{};
+}
+
+//
+// GrpcMemoryAllocatorImpl
+//
+
+GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
+ std::shared_ptr<BasicMemoryQuota> memory_quota, std::string name)
+ : memory_quota_(memory_quota), name_(std::move(name)) {
+ memory_quota_->Take(taken_bytes_);
+}
+
+GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
+ GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) +
+ sizeof(GrpcMemoryAllocatorImpl) ==
+ taken_bytes_);
+ memory_quota_->Return(taken_bytes_);
+}
+
+void GrpcMemoryAllocatorImpl::Shutdown() {
+ std::shared_ptr<BasicMemoryQuota> memory_quota;
+ OrphanablePtr<ReclaimerQueue::Handle>
+ reclamation_handles[kNumReclamationPasses];
+ {
+ MutexLock lock(&memory_quota_mu_);
+ GPR_ASSERT(!shutdown_);
+ shutdown_ = true;
+ memory_quota = memory_quota_;
+ for (size_t i = 0; i < kNumReclamationPasses; i++) {
+ reclamation_handles[i] = absl::exchange(reclamation_handles_[i], nullptr);
+ }
+ }
+}
+
+size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) {
+ // Validate request - performed here so we don't bloat the generated code with
+ // inlined asserts.
+ GPR_ASSERT(request.min() <= request.max());
+ GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size());
+ while (true) {
+ // Attempt to reserve memory from our pool.
+ auto reservation = TryReserve(request);
+ if (reservation.has_value()) return *reservation;
+ // If that failed, grab more from the quota and retry.
+ Replenish();
+ }
+}
+
+absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
+ MemoryRequest request) {
+ // How much memory should we request? (see the scaling below)
+ size_t scaled_size_over_min = request.max() - request.min();
+ // Scale the request down according to memory pressure if we have that
+ // flexibility.
+ if (scaled_size_over_min != 0) {
+ double pressure;
+ size_t max_recommended_allocation_size;
+ {
+ MutexLock lock(&memory_quota_mu_);
+ const auto pressure_and_max_recommended_allocation_size =
+ memory_quota_->InstantaneousPressureAndMaxRecommendedAllocationSize();
+ pressure = pressure_and_max_recommended_allocation_size.first;
+ max_recommended_allocation_size =
+ pressure_and_max_recommended_allocation_size.second;
+ }
+ // Reduce allocation size proportional to the pressure > 80% usage.
+ if (pressure > 0.8) {
+ scaled_size_over_min =
+ std::min(scaled_size_over_min,
+ static_cast<size_t>((request.max() - request.min()) *
+ (1.0 - pressure) / 0.2));
+ }
+ if (max_recommended_allocation_size < request.min()) {
+ scaled_size_over_min = 0;
+ } else if (request.min() + scaled_size_over_min >
+ max_recommended_allocation_size) {
+ scaled_size_over_min = max_recommended_allocation_size - request.min();
+ }
+ }
+
+ // How much do we want to reserve?
+ const size_t reserve = request.min() + scaled_size_over_min;
+ // See how many bytes are available.
+ size_t available = free_bytes_.load(std::memory_order_acquire);
+ while (true) {
+ // Does the current free pool satisfy the request?
+ if (available < reserve) {
+ return {};
+ }
+ // Try to reserve the requested amount.
+ // If the amount of free memory changed through this loop, then available
+ // will be set to the new value and we'll repeat.
+ if (free_bytes_.compare_exchange_weak(available, available - reserve,
+ std::memory_order_acq_rel,
+ std::memory_order_acquire)) {
+ return reserve;
+ }
+ }
+}
+
+void GrpcMemoryAllocatorImpl::Replenish() {
+ MutexLock lock(&memory_quota_mu_);
+ GPR_ASSERT(!shutdown_);
+ // Attempt a fairly low rate exponential growth request size, bounded between
+ // some reasonable limits declared at top of file.
+ auto amount = Clamp(taken_bytes_ / 3, kMinReplenishBytes, kMaxReplenishBytes);
+ // Take the requested amount from the quota.
+ memory_quota_->Take(amount);
+ // Record that we've taken it.
+ taken_bytes_ += amount;
+ // Add the taken amount to the free pool.
+ free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
+ // See if we can add ourselves as a reclaimer.
+ MaybeRegisterReclaimerLocked();
+}
+
+void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimer() {
+ MutexLock lock(&memory_quota_mu_);
+ MaybeRegisterReclaimerLocked();
+}
+
+void GrpcMemoryAllocatorImpl::MaybeRegisterReclaimerLocked() {
+ // If the reclaimer is already registered, then there's nothing to do.
+ if (registered_reclaimer_) return;
+ if (shutdown_) return;
+ // Grab references to the things we'll need
+ auto self = shared_from_this();
+ std::weak_ptr<EventEngineMemoryAllocatorImpl> self_weak{self};
+ registered_reclaimer_ = true;
+ InsertReclaimer(0, [self_weak](absl::optional<ReclamationSweep> sweep) {
+ if (!sweep.has_value()) return;
+ auto self = self_weak.lock();
+ if (self == nullptr) return;
+ auto* p = static_cast<GrpcMemoryAllocatorImpl*>(self.get());
+ MutexLock lock(&p->memory_quota_mu_);
+ p->registered_reclaimer_ = false;
+ // Figure out how many bytes we can return to the quota.
+ size_t return_bytes = p->free_bytes_.exchange(0, std::memory_order_acq_rel);
+ if (return_bytes == 0) return;
+ // Subtract that from our outstanding balance.
+ p->taken_bytes_ -= return_bytes;
+ // And return them to the quota.
+ p->memory_quota_->Return(return_bytes);
+ });
+}
+
+void GrpcMemoryAllocatorImpl::Rebind(
+ std::shared_ptr<BasicMemoryQuota> memory_quota) {
+ MutexLock lock(&memory_quota_mu_);
+ GPR_ASSERT(!shutdown_);
+ if (memory_quota_ == memory_quota) return;
+ // Return memory to the original memory quota.
+ memory_quota_->Return(taken_bytes_);
+ // Reassign any queued reclaimers
+ for (size_t i = 0; i < kNumReclamationPasses; i++) {
+ if (reclamation_handles_[i] != nullptr) {
+ reclamation_handles_[i]->Requeue(memory_quota->reclaimer_queue(i));
+ }
+ }
+ // Switch to the new memory quota, leaving the old one in memory_quota so that
+ // when we unref it, we are outside of lock.
+ memory_quota_.swap(memory_quota);
+ // Drop our freed memory down to zero, to avoid needing to ask the new
+ // quota for memory we're not currently using.
+ taken_bytes_ -= free_bytes_.exchange(0, std::memory_order_acq_rel);
+ // And let the new quota know how much we're already using.
+ memory_quota_->Take(taken_bytes_);
+}
+
+//
+// MemoryOwner
+//
+
+void MemoryOwner::Rebind(MemoryQuota* quota) {
+ impl()->Rebind(quota->memory_quota_);
+}
+
+//
+// BasicMemoryQuota
+//
+
+class BasicMemoryQuota::WaitForSweepPromise {
+ public:
+ WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,
+ uint64_t token)
+ : memory_quota_(std::move(memory_quota)), token_(token) {}
+
+ struct Empty {};
+ Poll<Empty> operator()() {
+ if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) !=
+ token_) {
+ return Empty{};
+ } else {
+ return Pending{};
+ }
+ }
+
+ private:
+ std::shared_ptr<BasicMemoryQuota> memory_quota_;
+ uint64_t token_;
+};
+
+void BasicMemoryQuota::Start() {
+ auto self = shared_from_this();
+
+ // Reclamation loop:
+ // basically, wait until we are in overcommit (free_bytes_ < 0), and then:
+ // while (free_bytes_ < 0) reclaim_memory()
+ // ... and repeat
+ auto reclamation_loop = Loop(Seq(
+ [self]() -> Poll<int> {
+ // If there's free memory we no longer need to reclaim memory!
+ if (self->free_bytes_.load(std::memory_order_acquire) > 0) {
+ return Pending{};
+ }
+ return 0;
+ },
+ [self]() {
+ // Race biases to the first thing that completes... so this will
+ // choose the highest priority/least destructive thing to do that's
+ // available.
+ auto annotate = [](const char* name) {
+ return [name](RefCountedPtr<ReclaimerQueue::Handle> f) {
+ return std::make_tuple(name, std::move(f));
+ };
+ };
+ return Race(Map(self->reclaimers_[0].Next(), annotate("compact")),
+ Map(self->reclaimers_[1].Next(), annotate("benign")),
+ Map(self->reclaimers_[2].Next(), annotate("idle")),
+ Map(self->reclaimers_[3].Next(), annotate("destructive")));
+ },
+ [self](
+ std::tuple<const char*, RefCountedPtr<ReclaimerQueue::Handle>> arg) {
+ auto reclaimer = std::move(std::get<1>(arg));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+ gpr_log(GPR_INFO, "RQ: %s perform %s reclamation",
+ self->name_.c_str(), std::get<0>(arg));
+ }
+ // One of the reclaimer queues gave us a way to get back memory.
+ // Call the reclaimer with a token that contains enough to wake us
+ // up again.
+ const uint64_t token =
+ self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
+ 1;
+ reclaimer->Run(ReclamationSweep(
+ self, token, Activity::current()->MakeNonOwningWaker()));
+ // Return a promise that will wait for our barrier. This will be
+ // awoken by the token above being destroyed. So, once that token is
+ // destroyed, we'll be able to proceed.
+ return WaitForSweepPromise(self, token);
+ },
+ []() -> LoopCtl<absl::Status> {
+ // Continue the loop!
+ return Continue{};
+ }));
+
+ reclaimer_activity_ =
+ MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(),
+ [](absl::Status status) {
+ GPR_ASSERT(status.code() == absl::StatusCode::kCancelled);
+ });
+}
+
+void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); }
+
+void BasicMemoryQuota::SetSize(size_t new_size) {
+ size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed);
+ if (old_size < new_size) {
+ // We're growing the quota.
+ Return(new_size - old_size);
+ } else {
+ // We're shrinking the quota.
+ Take(old_size - new_size);
+ }
+}
+
+void BasicMemoryQuota::Take(size_t amount) {
+ // If there's a request for nothing, then do nothing!
+ if (amount == 0) return;
+ GPR_DEBUG_ASSERT(amount <= std::numeric_limits<intptr_t>::max());
+ // Grab memory from the quota.
+ auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel);
+ // If we push into overcommit, awake the reclaimer.
+ if (prior >= 0 && prior < static_cast<intptr_t>(amount)) {
+ if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup();
+ }
+}
+
+void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) {
+ uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
+ if (current != token) return;
+ if (reclamation_counter_.compare_exchange_strong(current, current + 1,
+ std::memory_order_relaxed,
+ std::memory_order_relaxed)) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+ gpr_log(GPR_INFO, "RQ: %s reclamation complete", name_.c_str());
+ }
+ waker.Wakeup();
+ }
+}
+
+void BasicMemoryQuota::Return(size_t amount) {
+ free_bytes_.fetch_add(amount, std::memory_order_relaxed);
+}
+
+std::pair<double, size_t>
+BasicMemoryQuota::InstantaneousPressureAndMaxRecommendedAllocationSize() const {
+ double free = free_bytes_.load();
+ if (free < 0) free = 0;
+ size_t quota_size = quota_size_.load();
+ double size = quota_size;
+ if (size < 1) return std::make_pair(1.0, 1);
+ double pressure = (size - free) / size;
+ if (pressure < 0.0) pressure = 0.0;
+ if (pressure > 1.0) pressure = 1.0;
+ return std::make_pair(pressure, quota_size / 16);
+}
+
+//
+// MemoryQuota
+//
+
+MemoryAllocator MemoryQuota::CreateMemoryAllocator(absl::string_view name) {
+ auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
+ memory_quota_, absl::StrCat(memory_quota_->name(), "/allocator/", name));
+ return MemoryAllocator(std::move(impl));
+}
+
+MemoryOwner MemoryQuota::CreateMemoryOwner(absl::string_view name) {
+ auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
+ memory_quota_, absl::StrCat(memory_quota_->name(), "/owner/", name));
+ return MemoryOwner(std::move(impl));
+}
+
+} // namespace grpc_core