HCD-641: Fix flush_and_sync_group returning early with in-flight tasks#29
Conversation
Replace yield_now() + 64-iteration fuse with wait_idle() + 100ms timeout in the drain loop. The fuse was insufficient for large recursive workloads (e.g. URL scraping with thousands of ScrapeRegionUrls), causing the next job stage to start while the previous one was still running. Using wait_idle with a timeout avoids both problems: - No arbitrary iteration limit that can fire too early - Periodic flushing prevents deadlock when handlers block on full channels Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 9 minutes and 43 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughReplaced the fixed-iteration "drain fuse" in Changes
Sequence Diagram(s)sequenceDiagram
participant Bus
participant Receiver
participant GroupRegistry
participant Tokio as "Tokio Timer"
participant Logger
Bus->>Receiver: flush_all()
Receiver-->>Bus: flushed
Bus->>Tokio: timeout(DRAIN_IDLE_TIMEOUT, wait_idle(group_id))
Tokio->>GroupRegistry: wait_idle(group_id) (bounded)
GroupRegistry-->>Tokio: idle or not_ready
Tokio-->>Bus: timeout result (discarded)
alt group_registry.is_idle(group_id) == false
Bus->>Logger: maybe_log_warning (based on DRAIN_WARN_INTERVAL)
Bus->>Receiver: flush_all() (repeat)
else idle
Bus->>Logger: final_log/stop
Bus-->>Bus: exit loop
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Review: HCD-641 — Fix
|
Greptile SummaryThis PR fixes a real production bug in Key changes:
Confidence Score: 5/5
Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A([flush_and_sync_group]) --> B{force?}
B -- yes --> D
B -- no --> C[Two-pass flush loop\n max 32 iterations]
C --> D[drain_iters = 0]
D --> E[drain_iters += 1]
E --> F[Flush all receivers\nin group]
F --> G{is_idle?}
G -- yes --> H[sync_group]
H --> Z([return])
G -- no --> I[log debug: waiting for idle]
I --> J[wait_idle with\n100ms timeout]
J --> E
style J fill:#d4edda,stroke:#28a745
style G fill:#fff3cd,stroke:#ffc107
Reviews (1): Last reviewed commit: "HCD-641: Fix flush_and_sync_group return..." | Re-trigger Greptile |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/lib.rs (1)
969-1013: Keep a warn-level heartbeat for stuck groups.Line 1009 removed the last warn-level signal from this drain path. If a counter leak or cyclic workload keeps a group non-idle forever, callers will now just hang with debug-only logs. Consider warning every N timeouts and pulling the 100 ms interval into a named variable so this stays observable and tunable.
♻️ Example tweak
- let mut drain_iters = 0u32; + let mut drain_iters = 0u32; + let wait_idle_timeout = Duration::from_millis(100); + let drain_started = tokio::time::Instant::now(); ... + if drain_iters % 50 == 0 { + log::warn!( + "flush_and_sync_group: group={} still draining after {:?}, processing_count={}", + group_id, + drain_started.elapsed(), + self.inner.group_registry.processing_count(group_id) + ); + } let _ = tokio::time::timeout( - std::time::Duration::from_millis(100), + wait_idle_timeout, self.inner.group_registry.wait_idle(group_id), ) .await;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lib.rs` around lines 969 - 1013, The drain loop in flush_and_sync_group is missing a periodic warn-level heartbeat when wait_idle keeps timing out; restore a warn log every N timeouts (e.g., when drain_iters % N == 0) that includes group_id and processing_count to surface stuck groups, and pull the hard-coded 100ms into a named variable (e.g., idle_wait_duration) used by tokio::time::timeout(self.inner.group_registry.wait_idle(group_id)), so the interval is tunable; reference flush_and_sync_group, drain_iters, group_id, self.inner.group_registry.wait_idle, and tokio::time::timeout when making the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/lib.rs`:
- Around line 969-1013: The drain loop in flush_and_sync_group is missing a
periodic warn-level heartbeat when wait_idle keeps timing out; restore a warn
log every N timeouts (e.g., when drain_iters % N == 0) that includes group_id
and processing_count to surface stuck groups, and pull the hard-coded 100ms into
a named variable (e.g., idle_wait_duration) used by
tokio::time::timeout(self.inner.group_registry.wait_idle(group_id)), so the
interval is tunable; reference flush_and_sync_group, drain_iters, group_id,
self.inner.group_registry.wait_idle, and tokio::time::timeout when making the
changes.
- Add periodic warn! log every ~10s when drain loop is long-running - Extract 100ms timeout into DRAIN_IDLE_TIMEOUT const with rationale - Add DRAIN_WARN_INTERVAL const for the warning threshold Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/lib.rs (1)
972-973: Minor style inconsistency:Durationis already imported.
Durationis imported fromcore::timeat line 178. Consider using the imported type for consistency with the rest of the file.Suggested change
- const DRAIN_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100); - const DRAIN_WARN_INTERVAL: u32 = 100; // warn every ~10s (100 * DRAIN_IDLE_TIMEOUT) + const DRAIN_IDLE_TIMEOUT: Duration = Duration::from_millis(100); + const DRAIN_WARN_INTERVAL: u32 = 100; // warn every ~10s (100 * 100ms)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lib.rs` around lines 972 - 973, The constants declare types using the fully-qualified std::time::Duration even though Duration is already imported; change the type on DRAIN_IDLE_TIMEOUT to use the imported Duration (keep the value construction via Duration::from_millis) and leave DRAIN_WARN_INTERVAL as-is, i.e., replace std::time::Duration with Duration for the DRAIN_IDLE_TIMEOUT constant to match the rest of the file's style and imports.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/lib.rs`:
- Around line 972-973: The constants declare types using the fully-qualified
std::time::Duration even though Duration is already imported; change the type on
DRAIN_IDLE_TIMEOUT to use the imported Duration (keep the value construction via
Duration::from_millis) and leave DRAIN_WARN_INTERVAL as-is, i.e., replace
std::time::Duration with Duration for the DRAIN_IDLE_TIMEOUT constant to match
the rest of the file's style and imports.
ReviewThe root cause analysis is accurate and the fix is well-reasoned. Replacing a fuse that fires too early with No termination guarantee (main concern)The old 64-iteration fuse was broken in the wrong direction — too small — but the new loop has no upper bound at all. If a bug elsewhere causes Recommend adding an absolute last-resort circuit breaker — a large limit (e.g. 10 000 iterations ≈ ~16 min at 100 ms) that logs at const DRAIN_ABORT_LIMIT: u32 = 10_000;
if drain_iters >= DRAIN_ABORT_LIMIT {
log::error!(
"flush_and_sync_group: group {} ABORTING after {} iterations (processing_count={}) — possible counter leak",
group_id, drain_iters,
self.inner.group_registry.processing_count(group_id)
);
break;
}Flush cost on every iterationThe drain loop calls the blocking Everything else looks good
|
| const DRAIN_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100); | ||
| const DRAIN_WARN_INTERVAL: u32 = 100; // warn every ~10s (100 * DRAIN_IDLE_TIMEOUT) | ||
|
|
||
| loop { |
There was a problem hiding this comment.
The loop is now unbounded — if a group never reaches idle (e.g. a handler that continuously spawns new tasks), flush_and_sync_group will never return. The old fuse was a correctness bug, but the old code at least guaranteed termination.
This tradeoff may be intentional (liveness is more correct than early exit), but it would help to document it explicitly as a caller contract: callers must guarantee that the workload eventually converges, otherwise this will hang indefinitely.
Also worth considering: if there's ever a true deadlock scenario that isn't the bounded-channel backpressure case (e.g. a circular dependency between groups), the 100ms flush won't help and this loop will spin forever. A configurable hard ceiling (e.g. emit error! and break after N seconds) could act as a last-resort safety valve without reintroducing the old correctness bug.
ReviewThe fix is correct and well-motivated. Replacing the 64-iteration yield_now() fuse with wait_idle() + timeout is the right approach: the old fuse was a genuine correctness bug for large workloads, and the timeout handles the bounded-channel backpressure deadlock cleanly. The comments in the code explain the design clearly. Two items worth addressing:
Minor observation: The warning fires at iteration 100 (~10 s). For very large workloads that legitimately take longer, operators will see repeated warnings with no action to take. Consider logging at info! on the first iteration beyond the threshold, then warn! only if it continues well past a reasonable upper bound — or at minimum document that the warning is expected for large jobs. |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Review: HCD-641 — Fix
|
| loop { | ||
| drain_iters += 1; | ||
| if drain_iters > drain_fuse_count { | ||
| if drain_iters % DRAIN_WARN_INTERVAL == 0 { |
There was a problem hiding this comment.
The ~10s estimate only holds when every wait_idle() call hits the full timeout. If tasks are slow but not stuck (e.g., each batch completes in ~20ms), wait_idle() returns early and 100 iterations pass in ~2s, generating warnings much sooner than expected.
A wall-clock approach would give a more stable threshold:
| if drain_iters % DRAIN_WARN_INTERVAL == 0 { | |
| if drain_iters == 1 { | |
| // record start time once, then warn based on elapsed | |
| } | |
| if drain_iters % DRAIN_WARN_INTERVAL == 0 { |
Or, simpler: capture let drain_start = std::time::Instant::now(); before the loop and warn when drain_start.elapsed() > Duration::from_secs(10).
| const DRAIN_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100); | ||
| const DRAIN_WARN_INTERVAL: u64 = 100; // warn every ~10s (100 * DRAIN_IDLE_TIMEOUT) | ||
|
|
||
| loop { |
There was a problem hiding this comment.
With the fuse removed, this loop has no exit path other than is_idle(). If a handler permanently stalls — leaked future, counter bug, or a deadlock not relieved by the 100ms flush — this will loop forever. The warning fires but nothing ever breaks out.
Consider one of:
- A maximum wall-clock timeout (e.g. configurable, defaulting to something large) that returns
Errso the caller can surface it. - A large hard cap (e.g.
if drain_iters > 36_000 { log::error!(...); break; }) as a last-resort safety net. - At minimum, a doc comment on
flush_and_sync_groupnoting that it can block indefinitely and that callers needing a deadline must wrap it intokio::time::timeout.
HCD-641
Summary
flush_and_sync_groupreturning early while thousands of tasks are still in flight, which caused job stages to overlap in VRBP (e.g.scrape_listingsstarting whilescrape_urlswas still running)yield_now()+ 64-iteration fuse in the drain loop withwait_idle()+ 100ms timeoutRoot cause
The drain loop used
tokio::task::yield_now()between iterations with a hard fuse limit of 64. For large recursive workloads like URL scraping (~6k concurrentScrapeRegionUrlstasks), 64 single-tick yields was nowhere near enough time for all tasks to complete. When the fuse fired, the function returned early and the caller proceeded to the next stage.Why not unbounded
wait_idle()?Handlers blocked on full bounded channels need periodic flushing to relieve backpressure. Without the timeout,
wait_idle()would deadlock: blocked handlers can't complete, so the counter never reaches zero, and nobody flushes to free channel space.Test plan
Summary by CodeRabbit