Skip to content

HCD-641: Fix flush_and_sync_group returning early with in-flight tasks#29

Merged
wolfiestyle merged 4 commits into
masterfrom
alanhernandez/hcd-641-investigate-job-stage-overlap
Apr 1, 2026
Merged

HCD-641: Fix flush_and_sync_group returning early with in-flight tasks#29
wolfiestyle merged 4 commits into
masterfrom
alanhernandez/hcd-641-investigate-job-stage-overlap

Conversation

@wolfiestyle
Copy link
Copy Markdown
Contributor

@wolfiestyle wolfiestyle commented Apr 1, 2026

HCD-641

Summary

  • Fix flush_and_sync_group returning early while thousands of tasks are still in flight, which caused job stages to overlap in VRBP (e.g. scrape_listings starting while scrape_urls was still running)
  • Replace yield_now() + 64-iteration fuse in the drain loop with wait_idle() + 100ms timeout

Root cause

The drain loop used tokio::task::yield_now() between iterations with a hard fuse limit of 64. For large recursive workloads like URL scraping (~6k concurrent ScrapeRegionUrls tasks), 64 single-tick yields was nowhere near enough time for all tasks to complete. When the fuse fired, the function returned early and the caller proceeded to the next stage.

Why not unbounded wait_idle()?

Handlers blocked on full bounded channels need periodic flushing to relieve backpressure. Without the timeout, wait_idle() would deadlock: blocked handlers can't complete, so the counter never reaches zero, and nobody flushes to free channel space.

Test plan

  • All 134 existing messagebus tests pass
  • Run a full scrape job and verify no stage overlap in logs

Summary by CodeRabbit

  • Refactor
    • Updated group synchronization in the message bus: waiting now uses timeouts and emits periodic warnings for prolonged waits instead of bounded iteration. This reduces busy-waiting, improves efficiency during drain phases, and provides clearer observability of long-running waits.

Replace yield_now() + 64-iteration fuse with wait_idle() + 100ms timeout
in the drain loop. The fuse was insufficient for large recursive workloads
(e.g. URL scraping with thousands of ScrapeRegionUrls), causing the next
job stage to start while the previous one was still running.

Using wait_idle with a timeout avoids both problems:
- No arbitrary iteration limit that can fire too early
- Periodic flushing prevents deadlock when handlers block on full channels

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 1, 2026

Warning

Rate limit exceeded

@wolfiestyle has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 9 minutes and 43 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 9 minutes and 43 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ed585945-5d97-4b7b-a76c-e52b665fa507

📥 Commits

Reviewing files that changed from the base of the PR and between 72140cc and 7aaafea.

📒 Files selected for processing (1)
  • src/lib.rs

Walkthrough

Replaced the fixed-iteration "drain fuse" in Bus::flush_and_sync_group with a wait loop that, after flushing receivers, awaits tokio::time::timeout(DRAIN_IDLE_TIMEOUT, group_registry.wait_idle(group_id)) and relies on group_registry.is_idle(group_id) to decide completion; periodic warnings use DRAIN_WARN_INTERVAL.

Changes

Cohort / File(s) Summary
Timeout-based drain loop
src/lib.rs
Removed iteration-limited drain/fuse logic and break-on-excess warning. Introduced DRAIN_IDLE_TIMEOUT and DRAIN_WARN_INTERVAL; after each receiver flush the loop awaits tokio::time::timeout(..., group_registry.wait_idle(group_id)).await, logs periodic warnings, and terminates when group_registry.is_idle(group_id) is true.

Sequence Diagram(s)

sequenceDiagram
  participant Bus
  participant Receiver
  participant GroupRegistry
  participant Tokio as "Tokio Timer"
  participant Logger

  Bus->>Receiver: flush_all()
  Receiver-->>Bus: flushed
  Bus->>Tokio: timeout(DRAIN_IDLE_TIMEOUT, wait_idle(group_id))
  Tokio->>GroupRegistry: wait_idle(group_id) (bounded)
  GroupRegistry-->>Tokio: idle or not_ready
  Tokio-->>Bus: timeout result (discarded)
  alt group_registry.is_idle(group_id) == false
    Bus->>Logger: maybe_log_warning (based on DRAIN_WARN_INTERVAL)
    Bus->>Receiver: flush_all() (repeat)
  else idle
    Bus->>Logger: final_log/stop
    Bus-->>Bus: exit loop
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰
I hop the loop, no brittle snap,
A gentle timeout maps my map,
I flush, I wait, then peek to see,
When all are idle — then I'm free.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly addresses the main change: fixing flush_and_sync_group returning early with in-flight tasks, which aligns with the core objective of the PR.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch alanhernandez/hcd-641-investigate-job-stage-overlap

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 1, 2026

Review: HCD-641 — Fix flush_and_sync_group returning early

The root-cause diagnosis is correct and the switch from yield_now() to wait_idle() with a periodic flush is the right approach for large in-flight workloads. However, the PR introduces a new concern: the loop is now unbounded with no convergence safeguard.

Critical: No upper bound on drain iterations

The original fuse (drain_fuse_count = 64) was removed entirely. drain_iters is now incremented only for logging—it is never checked against any limit. If the system enters a state where is_idle never returns true (e.g., a handler continually spawns new messages, or a channel deadlock the 100 ms flush can't relieve), flush_and_sync_group will spin forever at 100 ms intervals with no warning, no timeout, and no way out. In production this would silently hang indefinitely.

Suggested mitigation: keep the fast path (replace the old fuse) but add a periodic warn! log after a configurable or generous threshold of iterations (e.g., every 100 iterations / ~10 s), so operators can detect and diagnose runaway loops without reintroducing an early-return that breaks correctness.

// Example: warn periodically so runaway loops surface in logs
if drain_iters % 100 == 0 {
    log::warn!(
        "flush_and_sync_group: group {} drain loop still running after {} iterations (~{}s), processing_count={}",
        group_id,
        drain_iters,
        drain_iters / 10,
        self.inner.group_registry.processing_count(group_id)
    );
}

Minor: 100 ms timeout is a magic constant

The value is well-motivated in the PR description (backpressure relief), but it's undocumented in the code itself. A named constant with a comment referencing the deadlock scenario it prevents would make future tuning easier:

/// Interval between flush passes in the drain loop.
/// Must be short enough to relieve backpressure on bounded channels, but
/// long enough to avoid busy-looping on large workloads (~6k concurrent tasks).
const DRAIN_FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);

The fix itself is sound

  • wait_idle() is correctly implemented (notify registered before the count check to avoid the lost-wakeup race).
  • The flush-then-wait structure correctly handles the "handler completes → sends to batched receiver → new partial batch" cycle.
  • Existing tests pass; the real validation is the end-to-end scrape job.

The PR is good to merge after addressing the observability gap (the warning log), since a silent infinite loop is worse than the original premature return.

Comment thread src/lib.rs Outdated
Comment thread src/lib.rs
@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Apr 1, 2026

Greptile Summary

This PR fixes a real production bug in flush_and_sync_group where the drain loop returned prematurely for large workloads (~6k concurrent tasks), causing job stages to overlap. The fix replaces a 64-iteration hard fuse + yield_now() with an unbounded loop using wait_idle() wrapped in a 100ms timeout, which correctly handles both the large-workload case and the backpressure-deadlock scenario.

Key changes:

  • Removes drain_fuse_count = 64 and the early-exit break that was the root cause of the stage-overlap bug
  • Replaces tokio::task::yield_now() with tokio::time::timeout(100ms, wait_idle(group_id)) so the runtime can efficiently park the caller until tasks finish, re-waking only when needed or when backpressure flushing is required
  • Adds clear explanatory comments detailing why an unbounded wait_idle() alone would deadlock and why the timeout interval is necessary
  • One minor observability regression: the old log::warn! that fired when the fuse tripped has been removed entirely; all loop diagnostics are now debug-level only, leaving no production-visible signal if the loop ever stalls for an unusual amount of time

Confidence Score: 5/5

  • Safe to merge — the fix is well-reasoned, all 134 tests pass, and the single remaining finding is a P2 observability suggestion.
  • The only open finding is a P2 style suggestion (adding a periodic warn log). There are no correctness, data-integrity, or reliability defects in the changed code. The unbounded loop is intentional and correctly designed; the 100ms timeout reliably prevents the backpressure deadlock described in the PR.
  • No files require special attention beyond the optional observability improvement noted in src/lib.rs.

Important Files Changed

Filename Overview
src/lib.rs Drain loop in flush_and_sync_group updated to use wait_idle() with a 100ms timeout instead of yield_now() with a 64-iteration hard fuse; fixes premature return for large workloads but removes the warn-level diagnostic that signaled a stuck loop.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A([flush_and_sync_group]) --> B{force?}
    B -- yes --> D
    B -- no --> C[Two-pass flush loop\n max 32 iterations]
    C --> D[drain_iters = 0]
    D --> E[drain_iters += 1]
    E --> F[Flush all receivers\nin group]
    F --> G{is_idle?}
    G -- yes --> H[sync_group]
    H --> Z([return])
    G -- no --> I[log debug: waiting for idle]
    I --> J[wait_idle with\n100ms timeout]
    J --> E

    style J fill:#d4edda,stroke:#28a745
    style G fill:#fff3cd,stroke:#ffc107
Loading

Reviews (1): Last reviewed commit: "HCD-641: Fix flush_and_sync_group return..." | Re-trigger Greptile

Comment thread src/lib.rs Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/lib.rs (1)

969-1013: Keep a warn-level heartbeat for stuck groups.

Line 1009 removed the last warn-level signal from this drain path. If a counter leak or cyclic workload keeps a group non-idle forever, callers will now just hang with debug-only logs. Consider warning every N timeouts and pulling the 100 ms interval into a named variable so this stays observable and tunable.

♻️ Example tweak
-        let mut drain_iters = 0u32;
+        let mut drain_iters = 0u32;
+        let wait_idle_timeout = Duration::from_millis(100);
+        let drain_started = tokio::time::Instant::now();
...
+            if drain_iters % 50 == 0 {
+                log::warn!(
+                    "flush_and_sync_group: group={} still draining after {:?}, processing_count={}",
+                    group_id,
+                    drain_started.elapsed(),
+                    self.inner.group_registry.processing_count(group_id)
+                );
+            }
             let _ = tokio::time::timeout(
-                std::time::Duration::from_millis(100),
+                wait_idle_timeout,
                 self.inner.group_registry.wait_idle(group_id),
             )
             .await;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib.rs` around lines 969 - 1013, The drain loop in flush_and_sync_group
is missing a periodic warn-level heartbeat when wait_idle keeps timing out;
restore a warn log every N timeouts (e.g., when drain_iters % N == 0) that
includes group_id and processing_count to surface stuck groups, and pull the
hard-coded 100ms into a named variable (e.g., idle_wait_duration) used by
tokio::time::timeout(self.inner.group_registry.wait_idle(group_id)), so the
interval is tunable; reference flush_and_sync_group, drain_iters, group_id,
self.inner.group_registry.wait_idle, and tokio::time::timeout when making the
changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/lib.rs`:
- Around line 969-1013: The drain loop in flush_and_sync_group is missing a
periodic warn-level heartbeat when wait_idle keeps timing out; restore a warn
log every N timeouts (e.g., when drain_iters % N == 0) that includes group_id
and processing_count to surface stuck groups, and pull the hard-coded 100ms into
a named variable (e.g., idle_wait_duration) used by
tokio::time::timeout(self.inner.group_registry.wait_idle(group_id)), so the
interval is tunable; reference flush_and_sync_group, drain_iters, group_id,
self.inner.group_registry.wait_idle, and tokio::time::timeout when making the
changes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f68c19b4-6970-402e-b66c-a79ca18903c4

📥 Commits

Reviewing files that changed from the base of the PR and between 69679ef and 1f724dd.

📒 Files selected for processing (1)
  • src/lib.rs

- Add periodic warn! log every ~10s when drain loop is long-running
- Extract 100ms timeout into DRAIN_IDLE_TIMEOUT const with rationale
- Add DRAIN_WARN_INTERVAL const for the warning threshold

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread src/lib.rs Outdated
Comment thread src/lib.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/lib.rs (1)

972-973: Minor style inconsistency: Duration is already imported.

Duration is imported from core::time at line 178. Consider using the imported type for consistency with the rest of the file.

Suggested change
-        const DRAIN_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
-        const DRAIN_WARN_INTERVAL: u32 = 100; // warn every ~10s (100 * DRAIN_IDLE_TIMEOUT)
+        const DRAIN_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
+        const DRAIN_WARN_INTERVAL: u32 = 100; // warn every ~10s (100 * 100ms)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib.rs` around lines 972 - 973, The constants declare types using the
fully-qualified std::time::Duration even though Duration is already imported;
change the type on DRAIN_IDLE_TIMEOUT to use the imported Duration (keep the
value construction via Duration::from_millis) and leave DRAIN_WARN_INTERVAL
as-is, i.e., replace std::time::Duration with Duration for the
DRAIN_IDLE_TIMEOUT constant to match the rest of the file's style and imports.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/lib.rs`:
- Around line 972-973: The constants declare types using the fully-qualified
std::time::Duration even though Duration is already imported; change the type on
DRAIN_IDLE_TIMEOUT to use the imported Duration (keep the value construction via
Duration::from_millis) and leave DRAIN_WARN_INTERVAL as-is, i.e., replace
std::time::Duration with Duration for the DRAIN_IDLE_TIMEOUT constant to match
the rest of the file's style and imports.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ecae3ba1-5c31-4010-84c3-123e43ca2dc3

📥 Commits

Reviewing files that changed from the base of the PR and between 1f724dd and 859975d.

📒 Files selected for processing (1)
  • src/lib.rs

@claude
Copy link
Copy Markdown

claude Bot commented Apr 1, 2026

Review

The root cause analysis is accurate and the fix is well-reasoned. Replacing a fuse that fires too early with wait_idle() + periodic flush is the right approach for large recursive workloads. A few issues worth addressing:

No termination guarantee (main concern)

The old 64-iteration fuse was broken in the wrong direction — too small — but the new loop has no upper bound at all. If a bug elsewhere causes processing_count to never reach zero (missed decrement, leaked GroupGuard, panic that bypasses Drop, counter underflow masked by saturating_sub), flush_and_sync_group will loop forever. The periodic warning is visible but doesn't stop anything.

Recommend adding an absolute last-resort circuit breaker — a large limit (e.g. 10 000 iterations ≈ ~16 min at 100 ms) that logs at error level and breaks, so a truly stuck group doesn't hang a process indefinitely:

const DRAIN_ABORT_LIMIT: u32 = 10_000;
if drain_iters >= DRAIN_ABORT_LIMIT {
    log::error!(
        "flush_and_sync_group: group {} ABORTING after {} iterations (processing_count={}) — possible counter leak",
        group_id, drain_iters,
        self.inner.group_registry.processing_count(group_id)
    );
    break;
}

Flush cost on every iteration

The drain loop calls the blocking r.flush(self).await on every receiver in the group on every iteration, even receivers that are already idle. For groups with many receivers that time out frequently, this is N sequential awaits per 100 ms cycle. The !force path above skips idle receivers with !r.is_idling() — the same guard would help here.

Everything else looks good

  • The wait_idle implementation correctly uses notified.enable() before the count check to avoid the TOCTOU race.
  • The Acquire/Release pairing between decrement_by and wait_idle is correct.
  • Constants with inline rationale comments are a nice improvement over the magic numbers they replaced.
  • One small inline nit filed separately: the drain_iters > 1 guard in the warning condition is redundant.

Comment thread src/lib.rs
const DRAIN_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
const DRAIN_WARN_INTERVAL: u32 = 100; // warn every ~10s (100 * DRAIN_IDLE_TIMEOUT)

loop {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop is now unbounded — if a group never reaches idle (e.g. a handler that continuously spawns new tasks), flush_and_sync_group will never return. The old fuse was a correctness bug, but the old code at least guaranteed termination.

This tradeoff may be intentional (liveness is more correct than early exit), but it would help to document it explicitly as a caller contract: callers must guarantee that the workload eventually converges, otherwise this will hang indefinitely.

Also worth considering: if there's ever a true deadlock scenario that isn't the bounded-channel backpressure case (e.g. a circular dependency between groups), the 100ms flush won't help and this loop will spin forever. A configurable hard ceiling (e.g. emit error! and break after N seconds) could act as a last-resort safety valve without reintroducing the old correctness bug.

Comment thread src/lib.rs
@claude
Copy link
Copy Markdown

claude Bot commented Apr 1, 2026

Review

The fix is correct and well-motivated. Replacing the 64-iteration yield_now() fuse with wait_idle() + timeout is the right approach: the old fuse was a genuine correctness bug for large workloads, and the timeout handles the bounded-channel backpressure deadlock cleanly. The comments in the code explain the design clearly.

Two items worth addressing:

  1. Unbounded loop (inline comment on the loop block): The function now has no guaranteed termination. For the bounded-channel backpressure case this is correct — the 100ms flush will eventually drain the channel and let handlers complete. But for any deadlock that is not that case, the function will spin forever. Documenting the caller contract (workloads must converge) and/or adding a configurable hard ceiling with an error! log would guard against silent hangs in production.

  2. drain_iters overflow (inline comment on line 976): u32 += 1 panics on overflow in debug builds. Practically harmless at 100ms/iter (~497 days), but saturating_add(1) costs nothing.

Minor observation: The warning fires at iteration 100 (~10 s). For very large workloads that legitimately take longer, operators will see repeated warnings with no action to take. Consider logging at info! on the first iteration beyond the threshold, then warn! only if it continues well past a reasonable upper bound — or at minimum document that the warning is expected for large jobs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Apr 1, 2026

Review: HCD-641 — Fix flush_and_sync_group returning early

The fix correctly diagnoses and addresses the root cause: 64 yield_now() iterations were insufficient for large recursive workloads with thousands of in-flight tasks. Replacing the fuse with wait_idle() + timeout is a sound architectural improvement.

Strengths

  • The wait_idle() implementation correctly uses Notified::enable() before checking the counter, avoiding a missed-wakeup race condition.
  • The timeout prevents deadlock when handlers block on full bounded channels.
  • Good observability: periodic warn logs make it visible when the loop runs longer than expected.
  • Comment quality is high; the rationale for the design choices is well documented.

Concerns

1. No upper bound — potential infinite loop

The old 64-iteration fuse was wrong (too small), but removing it entirely creates the opposite risk: if a handler permanently stalls (leaked future, counter bug, or a deadlock scenario not relieved by flushing), this loop now spins forever. The warn log fires but nothing ever breaks out of the loop.

Consider adding a hard upper bound (even something large like 36,000 iterations, ~1 hour) and returning an Err or logging a terminal error+break so the caller has a way to detect a stuck group rather than hanging indefinitely. Alternatively, document explicitly that flush_and_sync_group is expected to block unboundedly and that callers must add their own timeout.

2. ~10s warning comment is only accurate in the worst case

DRAIN_WARN_INTERVAL = 100 with the note // warn every ~10s (100 * DRAIN_IDLE_TIMEOUT) assumes every wait_idle() call exhausts the full 100ms timeout. In practice, when tasks are just slow (not stuck), wait_idle() returns as soon as the group becomes idle — possibly in 5–20ms. In that scenario warnings appear every ~0.5–2s rather than ~10s, which could produce unexpectedly noisy logs for legitimately long-running jobs.

The warning threshold might be better expressed as a wall-clock elapsed duration (e.g., Instant::now() at loop start, warn if elapsed > 10s) rather than an iteration count.

3. Minor: first warning fires at exactly iteration 100, not "after ~10s"

drain_iters is incremented before the % DRAIN_WARN_INTERVAL check, so the first warning is at iteration 100. This is intentional, but the "~10s" framing in the comment only holds if every iteration takes the full timeout.

Comment thread src/lib.rs
loop {
drain_iters += 1;
if drain_iters > drain_fuse_count {
if drain_iters % DRAIN_WARN_INTERVAL == 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ~10s estimate only holds when every wait_idle() call hits the full timeout. If tasks are slow but not stuck (e.g., each batch completes in ~20ms), wait_idle() returns early and 100 iterations pass in ~2s, generating warnings much sooner than expected.

A wall-clock approach would give a more stable threshold:

Suggested change
if drain_iters % DRAIN_WARN_INTERVAL == 0 {
if drain_iters == 1 {
// record start time once, then warn based on elapsed
}
if drain_iters % DRAIN_WARN_INTERVAL == 0 {

Or, simpler: capture let drain_start = std::time::Instant::now(); before the loop and warn when drain_start.elapsed() > Duration::from_secs(10).

Comment thread src/lib.rs
const DRAIN_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
const DRAIN_WARN_INTERVAL: u64 = 100; // warn every ~10s (100 * DRAIN_IDLE_TIMEOUT)

loop {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the fuse removed, this loop has no exit path other than is_idle(). If a handler permanently stalls — leaked future, counter bug, or a deadlock not relieved by the 100ms flush — this will loop forever. The warning fires but nothing ever breaks out.

Consider one of:

  1. A maximum wall-clock timeout (e.g. configurable, defaulting to something large) that returns Err so the caller can surface it.
  2. A large hard cap (e.g. if drain_iters > 36_000 { log::error!(...); break; }) as a last-resort safety net.
  3. At minimum, a doc comment on flush_and_sync_group noting that it can block indefinitely and that callers needing a deadline must wrap it in tokio::time::timeout.

@wolfiestyle wolfiestyle merged commit 2e9cd68 into master Apr 1, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant