Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ go_test(
"scheduler_latency_listener_test.go",
"sequencer_test.go",
"snapshot_queue_test.go",
"sql_cpu_handle_test.go",
"store_token_estimation_test.go",
"tokens_linear_model_test.go",
"work_queue_test.go",
Expand All @@ -99,6 +100,7 @@ go_test(
"//pkg/cli/exit",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/echotest",
"//pkg/testutils/skip",
Expand Down
297 changes: 260 additions & 37 deletions pkg/util/admission/sql_cpu_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,63 @@ type SQLCPUHandle struct {
p *sqlCPUProviderImpl
wq *WorkQueue

// admitTurn gates the blocking WorkQueue.Admit path for this handle. A
// statement can run many DistSQL goroutines; when the token reservation is
// short, each of them could otherwise enter Admit at once. Serializing that
// path keeps at most one blocking Admit per handle at a time.
//
// Any goroutine that needs that path must take the turn first: buffer size
// 1, send to acquire and deferred receive to release. The send blocks while
// another goroutine holds the turn (so only one runs the serialized section
// at a time). A channel is used so that wait can select on <-ctx.Done();
// Mutex.Lock cannot.
//
// Holding the turn is a precondition for blocking Admit, not a
// commitment to call it. After the turn is acquired, two conditions
// can prevent blocking Admit from running:
// 1. The handle was closed while waiting for the turn. The
// remaining deficit is accounted via BypassAdmission.
// 2. The previous turn-holder refilled reservation while this
// goroutine waited, and the second deductFromReservation
// covers the shortfall entirely. No Admit call is needed.
admitTurn chan struct{}

mu struct {
syncutil.Mutex
closed bool

// Once true, cannot be set to false.
closed bool

// reservation holds tokens obtained from WorkQueue.Admit in
// excess of the current checkpoint's deficit. These surplus
// tokens cover future checkpoints without re-acquiring
// WorkQueue.mu. Without the reservation, every measureAndAdmit
// call would call WorkQueue.Admit directly, acquiring
// WorkQueue.mu on every checkpoint. The reservation allows
// lock-free CAS deductions that skip the Admit call entirely.
//
// reservation is modified in three places:
//
// - CAS decrement (tryDeductReservation): does not need
// mu. Goroutines race only with each other and with
// Close's Swap(0); a failed CAS retries and eventually
// sees 0. No tokens are lost.
//
// - Swap(0) drain (Close): does not need mu. Close sets
// closed=true under mu first, which prevents any future
// Add. After that, Swap(0) races only with CAS
// decrements, which is safe (see above).
//
// - Add increment (after Admit): needs mu. The caller
// must check closed before adding — if closed, tokens
// go to AdmittedSQLWorkDone instead. Without mu, Close
// could set closed=true and Swap(0) between the check
// and the Add, leaking tokens.
//
// INVARIANT: reservation >= 0.
// INVARIANT: closed == true => reservation == 0.
reservation atomic.Int64

gHandles []*GoroutineCPUHandle
// Backing for up to 2 goroutine handles, to avoid allocations in
// gHandles when there are 2 or fewer goroutines.
Expand All @@ -114,45 +168,190 @@ func newSQLCPUAdmissionHandle(
atGateway: atGateway,
p: p,
wq: wq,
admitTurn: make(chan struct{}, 1),
}
h.mu.gHandles = h.mu.handlesBacking[:0]
return h
}

// reportAndAcquireConsumedCPU updates cumulative CPU counters and, if a CTT
// WorkQueue is attached, calls Admit to deduct the consumed CPU from the token
// bucket. This may block until tokens are available unless noWait is true.
func (h *SQLCPUHandle) reportAndAcquireConsumedCPU(
ctx context.Context, diff time.Duration, noWait bool,
) error {
// reportCPU atomically adds the CPU time difference to the appropriate
// cumulative counter.
func (h *SQLCPUHandle) reportCPU(diff time.Duration) {
if h.atGateway {
h.p.cumulativeGatewayCPUNanos.Add(diff.Nanoseconds())
} else {
h.p.cumulativeDistSQLCPUNanos.Add(diff.Nanoseconds())
}
}

if h.wq == nil {
return nil
// tryDeductReservation deducts up to diffNanos from reservation via
// CAS. Returns the amount grabbed (may be less than diffNanos).
func (h *SQLCPUHandle) tryDeductReservation(diffNanos int64) int64 {
for {
current := h.mu.reservation.Load()
if current <= 0 {
return 0
}
grab := min(current, diffNanos)
if h.mu.reservation.CompareAndSwap(current, current-grab) {
return grab
}
}
}

// RequestedCount is set to the exact CPU consumed (from grunning), so the
// WorkQueue's CPU time token estimator is skipped (see Admit). Because the
// exact amount is deducted at Admit time, there is no estimate to correct,
// so AdmittedWorkDone is not called. This also avoids training the KV
// estimator with SQL CPU data, which would corrupt its estimates.
//
// TODO(wenyi): Currently we call Admit on every measureAndAdmit invocation,
// which happens every ~1024 rows. This means each SQL goroutine takes the
// WorkQueue mutex on every check. Consider reserving more tokens than the
// exact amount consumed (e.g., 2x the last diff, or a smoothed estimate of
// upcoming usage) and tracking remaining reservation locally. This would
// allow subsequent measureAndAdmit calls to deduct from the local
// reservation without calling Admit, reducing contention on the WorkQueue.
// maxRefillBuffer caps the reservation buffer per Admit call to
// prevent large checkpoints from holding excessive tokens idle.
const maxRefillBuffer = int64(1 * time.Millisecond)

// refillHeuristic determines how many tokens to request from the WorkQueue when
// the reservation runs out. It requests the deficit (to cover the current
// shortfall) plus a buffer (to pre-pay future fast-path CAS deductions). A
// larger buffer means fewer blocking Admit calls and less contention on
// WorkQueue.mu, but it also means more tokens are held in this handle's
// reservation instead of the shared pool. Tokens sitting in reservation are
// unavailable to other tenants and other handles within the same tenant, which
// can reduce fairness. The buffer is capped at maxRefillBuffer to bound
// this unfairness.
//
// TODO(wenyihu6): replace this simple 2x heuristic with an adaptive scheme
// (e.g. exponential growth) that grows the buffer when Admit calls are too
// frequent and shrinks it when they are infrequent.
func (h *SQLCPUHandle) refillHeuristic(deficit int64) int64 {
buffer := min(deficit, maxRefillBuffer)
return deficit + buffer
}

// constructWorkInfo returns a WorkInfo copy with the given
// RequestedCount and BypassAdmission.
//
// NB: setting RequestedCount > 0 causes WorkQueue.Admit to skip the
// cpuTimeTokenEstimator (see callerSetRequestedCount in Admit). This is
// required for SQL CPU admission, which already knows the exact CPU consumed
// from grunning and does not need/want estimation.
//
// REQUIRES: reqCount > 0
func (h *SQLCPUHandle) constructWorkInfo(reqCount int64, noWait bool) WorkInfo {
workInfo := h.workInfo
workInfo.RequestedCount = diff.Nanoseconds()
workInfo.RequestedCount = reqCount
workInfo.BypassAdmission = noWait
_, err := h.wq.Admit(ctx, workInfo)
return err
return workInfo
}

func (h *SQLCPUHandle) isClosed() bool {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.closed
}

// deductFromReservation deducts what it can from reservation via CAS.
// Returns the shortfall still needed after the deduction.
func (h *SQLCPUHandle) deductFromReservation(needed int64) (shortfall int64) {
grabbed := h.tryDeductReservation(needed)
return needed - grabbed
}

// reportAndAcquireConsumedCPU acquires tokens for consumed CPU.
//
// 1. Fast path: reservation covers the deficit via CAS. No Admit.
// 2. noWait path: deduct what is available, account the rest via
// BypassAdmission.
// 3. Slow path: take a turn via admitTurn, call Admit for the
// deficit plus a buffer, store the buffer in reservation.
//
// In winding-down cases (noWait, context cancellation, Close having
// run), the goroutine deducts what it can and accounts the rest via
// BypassAdmission. It never blocks and never refills the reservation.
//
// INVARIANT: Every token obtained from WorkQueue.Admit must be accounted for:
// tokens are either
// (a) consumed to pay for measured CPU usage,
// (b) held in reservation for future usage, or
// (c) returned via AdmittedSQLWorkDone when the handle is closed.
//
// When SQLCPUHandle is closed, any remaining reservation is returned
// via AdmittedSQLWorkDone, so this code does not leak tokens.
func (h *SQLCPUHandle) reportAndAcquireConsumedCPU(
ctx context.Context, diff time.Duration, noWait bool,
) error {
h.reportCPU(diff)

if h.wq == nil {
return nil
}

diffNanos := diff.Nanoseconds()

// Deduct from reservation (lock-free CAS).
remaining := h.deductFromReservation(diffNanos)

if noWait {
// Winding down: account the deficit without blocking.
if remaining > 0 {
_, _ = h.wq.Admit(ctx, h.constructWorkInfo(remaining, true /*noWait*/))
}
return nil
}

// Fast path: reservation covered the deficit.
if remaining == 0 {
return nil
}

// Slow path: serialize blocking Admit calls via admitTurn.
select {
case h.admitTurn <- struct{}{}:
defer func() { <-h.admitTurn }()
case <-ctx.Done():
// Winding down: account the deficit without blocking.
_, _ = h.wq.Admit(ctx, h.constructWorkInfo(remaining, true /*noWait*/))
return ctx.Err()
}

// Close may have run while waiting for the turn.
if h.isClosed() {
// Winding down: account the deficit without blocking.
_, _ = h.wq.Admit(ctx, h.constructWorkInfo(remaining, true /*noWait*/))
return nil
}

// The previous turn-holder may have refilled the reservation.
remaining = h.deductFromReservation(remaining)
if remaining == 0 {
return nil
}

// Request the deficit plus a buffer (see refillHeuristic).
resp, err := h.wq.Admit(ctx, h.constructWorkInfo(h.refillHeuristic(remaining), false /*noWait*/))
if err != nil {
// Error is only due to context cancellation. Account the deficit
// without blocking.
_, _ = h.wq.Admit(ctx, h.constructWorkInfo(remaining, true /*noWait*/))
return err
}
if resp.Enabled {
buffer := resp.requestedCount - remaining
if buffer > 0 {
closed := func() bool {
h.mu.Lock()
defer h.mu.Unlock()
if h.mu.closed {
return true
}
// NB: closed check and reservation.Add must be atomic under mu.
// Without the lock, this goroutine could read closed=false, then
// Close sets closed=true and Swap(0) drains reservation, then this
// goroutine does Add(buffer), leaking tokens.
h.mu.reservation.Add(buffer)
return false
}()
// Close already ran and drained reservation. Return the buffer
// directly.
if closed {
h.wq.AdmittedSQLWorkDone(h.workInfo.TenantID, buffer)
}
}
}
return nil
}

// TODO(sumeer): see the comment
Expand Down Expand Up @@ -204,21 +403,45 @@ func (h *SQLCPUHandle) RegisterGoroutine() *GoroutineCPUHandle {
return gh
}

// Close is called when no more reporting is needed. It pools
// GoroutineCPUHandles that have been closed. GoroutineCPUHandles that are not
// yet closed are left for GC.
// Close sets closed=true under mu, drains reservation via Swap(0),
// and returns any remaining tokens via AdmittedSQLWorkDone.
//
// Close returns even if there exist goroutines in reportAndAcquireConsumedCPU
// blocked on Admit. reportAndAcquireConsumedCPU that raced with Close are
// handled in two ways:
// - Before taking the turn: they see closed=true after acquiring
// the turn and fall back to BypassAdmission.
// - After Admit returns: they see closed=true under mu and return
// the surplus buffer via AdmittedSQLWorkDone instead of adding
// it to reservation.
//
// Closed GoroutineCPUHandles are pooled; unclosed ones are left for GC.
func (h *SQLCPUHandle) Close() {
h.mu.Lock()
defer h.mu.Unlock()
h.mu.closed = true
for i, gh := range h.mu.gHandles {
if gh.closed.Load() {
gh.reset()
goroutineCPUHandlePool.Put(gh)
// After this, goroutines in reportAndAcquireConsumedCPU observe
// closed=true when they acquire mu.
func() {
h.mu.Lock()
defer h.mu.Unlock()
h.mu.closed = true
for i, gh := range h.mu.gHandles {
if gh.closed.Load() {
gh.reset()
goroutineCPUHandlePool.Put(gh)
}
h.mu.gHandles[i] = nil
}
h.mu.gHandles[i] = nil
h.mu.gHandles = nil
}()
if h.wq == nil {
return
}
// Drain reservation outside the lock. Swap(0) races safely with CAS
// deductions (CAS retries on conflict and finds 0). NB: No new tokens should
// be added to mu.reservation after this.
remaining := h.mu.reservation.Swap(0)
if remaining > 0 {
h.wq.AdmittedSQLWorkDone(h.workInfo.TenantID, remaining)
}
h.mu.gHandles = nil
}

// GoroutineCPUHandle is used for CPU accounting on a single goroutine. It
Expand Down
Loading
Loading