feat(metrics): track peak resident state bytes for stateful operators#6883
Draft
samstokes wants to merge 4 commits into
Draft
feat(metrics): track peak resident state bytes for stateful operators#6883samstokes wants to merge 4 commits into
samstokes wants to merge 4 commits into
Conversation
Adds a uniform `memory.peak_state` gauge to the runtime stats system so stateful operators (Sort, Aggregate, TopN, Dedup) can report the high-water mark of their working-set memory. This complements the existing `bytes.in`/`bytes.out` (per-batch throughput) and the process-level RSS / jemalloc gauges by attributing memory pressure to the operator that holds it. The new metric flows through the existing event pipeline: each RuntimeStats impl gets a `record_state_bytes` method (default no-op for stateless operators), DefaultSnapshot gains an Option<u64> peak_state_bytes that is summed across input_ids on the same worker, and the dashboard's cross-source merge picks max for any key prefixed `memory.peak`. The dashboard renders a new "Peak Memory" column; operators that don't report state-size show "-" so stateless and stateful are visibly distinguished. https://claude.ai/code/session_012tEp7R8QPzkK16NmBbD62d
Rust Dependency DiffHead: ✅ OK: Within budget.
|
…odes Several distributed pipeline nodes (Sort, IntoBatches, Limit, StageCheckpointKeys) define their own RuntimeStats impls that destructure StatSnapshot::Default for phase-aware row/byte accounting. None of them forwarded the new peak_state_bytes field, so peak memory reported by stateful local sinks (Sort, Aggregate, TopN, Dedup) was silently dropped at the distributed boundary. On Flotilla this surfaced as a "-" in the dashboard's Peak Memory column even for the distributed Sort node whose final-sort phase wraps the local Sort sink. Adds a forward_default_snapshot_peak helper on BaseCounters that the four affected handlers call after their own bookkeeping. Distributed nodes that use the catch-all DefaultRuntimeStats (Aggregate, TopN, Distinct, etc.) were already covered by the previous commit. Extends the existing distributed Sort regression test to assert peak_state_bytes propagates to the exported snapshot. https://claude.ai/code/session_012tEp7R8QPzkK16NmBbD62d
…instrumentation-mV5Sb
7 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Changes Made
Added comprehensive tracking of peak resident state size (memory high-water mark) for stateful operators across the execution pipeline:
Core Metrics Infrastructure
MaxGaugemetric type (src/common/metrics/src/meters.rs): A gauge that tracks the maximum value ever recorded using atomicfetch_max, ensuring concurrent writers cannot regress the value. Includespeak_state_bytes_metric()factory method onMeter.DefaultSnapshot(src/common/metrics/src/snapshot.rs): Added optionalpeak_state_bytesfield to track peak state across operator lifetime. Merging logic sums peaks across concurrent input_ids (upper bound on simultaneous memory held).Runtime Statistics
RuntimeStatstrait (src/daft-local-execution/src/runtime_stats/values.rs): Addedrecord_state_bytes()method for stateful operators to report current working set size. Default no-op allows stateless operators to opt out.DefaultRuntimeStats: Tracks peak state bytes usingMaxGaugeand only reports in snapshots when explicitly recorded (distinguishes stateless from stateful operators).Distributed Engine Integration
BaseCountersaccumulation (src/daft-distributed/src/statistics/stats.rs): Sums peak-state-bytes readings from worker snapshots using saturating addition (total memory pressure is additive across workers).Stateful Operator Instrumentation
Updated Sort, Aggregate, TopN, and Dedup sinks to:
held_bytesalongside their buffered partitionsrecord_state_bytes()after each state updateDashboard Integration
src/daft-dashboard/src/engine.rs): Memory peak metrics now usemaxsemantics (not sum) when merging across workers, reflecting the hottest single worker rather than fictitious sums.src/daft-dashboard/frontend/src/app/query/): Added peak state bytes column to progress table and stats utilities.Testing
MaxGaugebehavior (high-water mark tracking, no regression)DefaultSnapshotpeak merging logicpeak_state_bytes: Nonein snapshot constructionThe implementation uses
Option<u64>to distinguish operators that report state (Some) from those that don't (None), enabling accurate dashboard representation of operator memory characteristics.https://claude.ai/code/session_012tEp7R8QPzkK16NmBbD62d