Skip to content

feat(metrics): track peak resident state bytes for stateful operators#6883

Draft
samstokes wants to merge 4 commits into
mainfrom
claude/review-memory-instrumentation-mV5Sb
Draft

feat(metrics): track peak resident state bytes for stateful operators#6883
samstokes wants to merge 4 commits into
mainfrom
claude/review-memory-instrumentation-mV5Sb

Conversation

@samstokes
Copy link
Copy Markdown
Collaborator

Changes Made

Added comprehensive tracking of peak resident state size (memory high-water mark) for stateful operators across the execution pipeline:

Core Metrics Infrastructure

  • New MaxGauge metric type (src/common/metrics/src/meters.rs): A gauge that tracks the maximum value ever recorded using atomic fetch_max, ensuring concurrent writers cannot regress the value. Includes peak_state_bytes_metric() factory method on Meter.
  • Updated DefaultSnapshot (src/common/metrics/src/snapshot.rs): Added optional peak_state_bytes field to track peak state across operator lifetime. Merging logic sums peaks across concurrent input_ids (upper bound on simultaneous memory held).

Runtime Statistics

  • Enhanced RuntimeStats trait (src/daft-local-execution/src/runtime_stats/values.rs): Added record_state_bytes() method for stateful operators to report current working set size. Default no-op allows stateless operators to opt out.
  • Updated DefaultRuntimeStats: Tracks peak state bytes using MaxGauge and only reports in snapshots when explicitly recorded (distinguishes stateless from stateful operators).

Distributed Engine Integration

  • BaseCounters accumulation (src/daft-distributed/src/statistics/stats.rs): Sums peak-state-bytes readings from worker snapshots using saturating addition (total memory pressure is additive across workers).

Stateful Operator Instrumentation

Updated Sort, Aggregate, TopN, and Dedup sinks to:

  • Track held_bytes alongside their buffered partitions
  • Call record_state_bytes() after each state update
  • Report accurate peak memory usage to runtime stats

Dashboard Integration

  • Stats aggregation (src/daft-dashboard/src/engine.rs): Memory peak metrics now use max semantics (not sum) when merging across workers, reflecting the hottest single worker rather than fictitious sums.
  • Frontend updates (src/daft-dashboard/frontend/src/app/query/): Added peak state bytes column to progress table and stats utilities.

Testing

  • Added unit tests for MaxGauge behavior (high-water mark tracking, no regression)
  • Added tests for DefaultSnapshot peak merging logic
  • Added integration test for runtime stats peak tracking
  • Existing tests updated to include peak_state_bytes: None in snapshot construction

The implementation uses Option<u64> to distinguish operators that report state (Some) from those that don't (None), enabling accurate dashboard representation of operator memory characteristics.

https://claude.ai/code/session_012tEp7R8QPzkK16NmBbD62d

Adds a uniform `memory.peak_state` gauge to the runtime stats system so
stateful operators (Sort, Aggregate, TopN, Dedup) can report the
high-water mark of their working-set memory. This complements the
existing `bytes.in`/`bytes.out` (per-batch throughput) and the
process-level RSS / jemalloc gauges by attributing memory pressure to
the operator that holds it.

The new metric flows through the existing event pipeline: each
RuntimeStats impl gets a `record_state_bytes` method (default no-op for
stateless operators), DefaultSnapshot gains an Option<u64>
peak_state_bytes that is summed across input_ids on the same worker,
and the dashboard's cross-source merge picks max for any key prefixed
`memory.peak`. The dashboard renders a new "Peak Memory" column;
operators that don't report state-size show "-" so stateless and
stateful are visibly distinguished.

https://claude.ai/code/session_012tEp7R8QPzkK16NmBbD62d
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 4, 2026

Rust Dependency Diff

Head: 9b78cdcbd60c1e7741c4568059770957a56d5acd vs Base: da3a4d95155ab0d194e738efb4dd8e8d4362516a.

OK: Within budget.

  • New Crates: 0
  • Removed Crates: 0

@samstokes samstokes changed the title Track peak resident state bytes for stateful operators feat(metrics): track peak resident state bytes for stateful operators May 4, 2026
@github-actions github-actions Bot added the feat label May 4, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented May 4, 2026

Codecov Report

❌ Patch coverage is 83.05085% with 50 lines in your changes missing coverage. Please review.
✅ Project coverage is 75.36%. Comparing base (50da953) to head (e0f588b).

Files with missing lines Patch % Lines
src/daft-dashboard/src/engine.rs 36.58% 26 Missing ⚠️
src/common/metrics/src/meters.rs 88.05% 8 Missing ⚠️
...ributed/src/pipeline_node/stage_checkpoint_keys.rs 0.00% 3 Missing ⚠️
src/common/metrics/src/snapshot.rs 96.66% 2 Missing ⚠️
src/daft-distributed/src/statistics/stats.rs 93.10% 2 Missing ⚠️
src/daft-local-execution/src/runtime_stats/mod.rs 85.71% 2 Missing ⚠️
...daft-distributed/src/pipeline_node/into_batches.rs 0.00% 1 Missing ⚠️
src/daft-distributed/src/pipeline_node/sort.rs 85.71% 1 Missing ⚠️
...c/daft-local-execution/src/runtime_stats/values.rs 91.66% 1 Missing ⚠️
src/daft-local-execution/src/sinks/aggregate.rs 92.85% 1 Missing ⚠️
... and 3 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6883      +/-   ##
==========================================
- Coverage   75.38%   75.36%   -0.02%     
==========================================
  Files        1134     1134              
  Lines      159942   160206     +264     
==========================================
+ Hits       120568   120738     +170     
- Misses      39374    39468      +94     
Files with missing lines Coverage Δ
src/common/metrics/src/lib.rs 55.26% <ø> (ø)
src/daft-distributed/src/pipeline_node/limit.rs 88.71% <100.00%> (+0.02%) ⬆️
...daft-distributed/src/pipeline_node/into_batches.rs 30.40% <0.00%> (-0.21%) ⬇️
src/daft-distributed/src/pipeline_node/sort.rs 65.98% <85.71%> (+0.25%) ⬆️
...c/daft-local-execution/src/runtime_stats/values.rs 98.27% <91.66%> (-1.73%) ⬇️
src/daft-local-execution/src/sinks/aggregate.rs 95.95% <92.85%> (-0.67%) ⬇️
src/daft-local-execution/src/sinks/dedup.rs 91.22% <93.75%> (+0.05%) ⬆️
src/daft-local-execution/src/sinks/sort.rs 76.59% <92.85%> (+1.59%) ⬆️
src/daft-local-execution/src/sinks/top_n.rs 76.80% <93.75%> (+1.57%) ⬆️
src/common/metrics/src/snapshot.rs 77.68% <96.66%> (+3.70%) ⬆️
... and 5 more

... and 9 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

claude added 2 commits May 4, 2026 23:20
…odes

Several distributed pipeline nodes (Sort, IntoBatches, Limit,
StageCheckpointKeys) define their own RuntimeStats impls that
destructure StatSnapshot::Default for phase-aware row/byte accounting.
None of them forwarded the new peak_state_bytes field, so peak memory
reported by stateful local sinks (Sort, Aggregate, TopN, Dedup) was
silently dropped at the distributed boundary. On Flotilla this
surfaced as a "-" in the dashboard's Peak Memory column even for the
distributed Sort node whose final-sort phase wraps the local Sort sink.

Adds a forward_default_snapshot_peak helper on BaseCounters that the
four affected handlers call after their own bookkeeping. Distributed
nodes that use the catch-all DefaultRuntimeStats (Aggregate, TopN,
Distinct, etc.) were already covered by the previous commit.

Extends the existing distributed Sort regression test to assert
peak_state_bytes propagates to the exported snapshot.

https://claude.ai/code/session_012tEp7R8QPzkK16NmBbD62d
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants