enhancement(transforms): Add metric to measure concurrency blockage in Function Transforms#25453
enhancement(transforms): Add metric to measure concurrency blockage in Function Transforms#25453ArunPiduguDD wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d2bae00fbf
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let blocked_completions = completed_count | ||
| .fetch_sub(yielded_since_last_record, Ordering::Relaxed); | ||
| yielded_since_last_record = 0; | ||
| histogram!( | ||
| HistogramName::EstimatedConcurrentTransformSchedulingPressure, | ||
| "component_id" => self.component_id.clone() | ||
| ) | ||
| .record( | ||
| (blocked_completions as f64 / *TRANSFORM_CONCURRENCY_LIMIT as f64).min(1.0), |
There was a problem hiding this comment.
Subtract yielded tasks before recording pressure
When a transform task completes in order with no head-of-line blocking, its worker has already incremented completed_count, and the ordered future then increments yielded_since_last_record before this branch runs. fetch_sub returns the pre-subtraction value, so the histogram records that just-yielded task as blocked; even a fully ordered, non-blocked stream reports non-zero pressure after each completion. Use the post-subtraction count for the recorded value so the metric only includes tasks that are still completed-but-not-yielded.
Useful? React with 👍 / 👎.
Summary
Introduces a new metric
estimated_concurrent_transform_scheduling_pressureto measure how often head-of-line blocking in FunctionTransforms causes concurrency inefficiencies.Detailed Context
In Vector's existing concurrency model stateless function transforms are run concurrently (e.g. a function transform can have multiple threads working on batches of events in parallel). However, the existing implementation still guarantees event ordering (e.g. if 1000 events arrive at a transform and are processed across 10 batches/Tasks, they will still leave the transform in the same order they arrived, even if later batches complete before earlier batches).
In cases where processing latency of the events within the transform is both high & variable, then this can lead to inefficiencies - as mentioned above events that are processed in later batches can be blocked by batches/Tasks scheduled earlier (if the earlier batch is still processing when the later batch finishes)
The effect can be illustrated by this wall-time profile (measured during a benchmark test with 8 CPUs / parallel threads)
In this test the vector instance was constantly flooded with events so there are always events waiting to be processed. Multiple threads finish processing their batch, however new batches / Tasks are unable to be scheduled for these idle threads due to the fact that the head Task in the
FuturesOrderedqueue is still processing its batch, leading to a CPU utilization inefficiency and overall lower throughput.As a test, tried switching this to an ordered queue, the the transform is not held up by long-running tasks and the overall ingress throughput increases (graph below shows bytes / second throughput of ordered vs unordered queue - test was done using remap processor with many regex rules)
Changes
This PR adds a new metric
estimated_concurrent_transform_scheduling_pressurewhich keeps track of how many Tasks have been completed and are blocked by the head task from being scheduled (metric is a distribution which ranges from 0-1). B/c this introduces a shared counter to the transform "hot path", ran regression benchmark tests to confirm there are no issues: https://github.com/vectordotdev/vector/actions/runs/24585515449 (note: a few tests failed to run but seems to be unrelated issues - tests also failing to run on the latest merged commit in master)How did you test this PR?
Ran regression tests
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details here.