refactor(poa): move redis publish into PoA, native-async with cached connections#3280
refactor(poa): move redis publish into PoA, native-async with cached connections#3280Voxelot wants to merge 25 commits into
Conversation
…ed connections Background: as of the 2026-04-22 hotfix (#3278) the redis publish lived in the importer's rayon worker and used detached `std::thread::spawn` workers with an mpsc channel for quorum short-circuit. That fixed the mainnet hang but left two warts: * the importer owned `BlockReconciliationWritePort` even though only `Source::Local` blocks were ever published — an artifact of the original vibe-coded scaffolding, not a design intent, * the short-circuit left OS threads running until their sync redis call returned, which meant a steady trickle of leaked threads while a half-alive node stayed unresponsive. Changes: * Delete `BlockReconciliationWritePort` from `fuel_core_importer` and the matching field / forwarder on `BlockImporterAdapter`. The importer no longer knows about redis at all. * Add `BlockReconciliationWritePort` to `fuel_core_poa::ports` as an async trait, co-located with the existing `BlockReconciliationReadPort`. `RP` on `MainTask` now requires both. * `MainTask::produce_block` (and `produce_predefined_block`) call `reconciliation_port.publish_produced_block(&sealed_block).await?` before `block_importer.commit_result(...).await?`. Same publish-then-commit invariant as before; fewer hops to get there. * `RedisLeaderLeaseAdapter` goes native-async: - `publish_block_on_node` uses the existing `multiplexed_connection` helper (cached per-node `redis::aio::MultiplexedConnection`), calls `invoke_async`, and wraps in `tokio::time::timeout`. - `publish_block_on_all_nodes` uses `FuturesUnordered` to fan out, drains until quorum is reached, and drops the remaining futures — which cooperatively cancels their in-flight RESP exchange and closes the connection-side handles. No OS threads, no leak. - On timeout or transport error the node's cached multiplexed connection is cleared so the next call reconnects. * Delete `invoke_write_block_script`, the `'static` sync helper that only existed to satisfy `std::thread::spawn`'s `'static` bound. * `repair_sub_quorum_block` becomes `async` (it calls `publish_block_on_all_nodes`); the one caller in `unreconciled_blocks` gains an `.await`. * Tests updated: all existing `publish_produced_block__*` tests now `.await`; the hang-repro test drops its `spawn_blocking` wrapper because the publish path is already async and cancellable at the future level. * `FakeReconciliationPort` in `poa/service_test.rs` gains a stub `BlockReconciliationWritePort` impl alongside its existing read-port impl so `MainTask`'s combined trait bound is satisfied. Tests (leader_lock feature): * 29 adapter tests in `service::adapters::consensus_module::poa::tests`: all pass. * 51 service tests in `fuel-core-poa`: all pass. Connection caching: each `RedisNode` holds a `Mutex<Option<MultiplexedConnection>>`. `multiplexed_connection` was already used for reads and lease ops; publishes now go through the same cache instead of opening a fresh TCP connection per block. On transport errors the cache is cleared so the next call reconnects cleanly.
PR SummaryHigh Risk Overview The PoA Redis adapter is rewritten to be native async: per-node operations use cached Tests are updated/expanded to cover half-alive Redis peers, quorum short-circuit timing, late epoch-token folding, release-on-drop behavior with in-flight publishes, and eventual consistency expectations due to background tasks. Reviewed by Cursor Bugbot for commit 33b881d. Bugbot is set up for automated code reviews on this repo. Configure here. |
…to PoA
The two tests covering the importer's Source::Local publish branch
(commit_result__when_source_is_local_then_publishes_to_reconciliation_writer,
execute_and_commit__when_source_is_network_then_does_not_publish_to_reconciliation_writer,
commit_result__when_publish_to_reconciliation_writer_fails_then_returns_error)
exercised behavior that no longer lives in the importer. Equivalent
coverage now lives in the PoA adapter tests in crates/fuel-core/src/
service/adapters/consensus_module/poa.rs:tests::publish_produced_block__*.
Also removes the now-unused FakeBlockReconciliationWriter helper, the
unused std::sync::{Arc, Mutex} imports, restores the missing anyhow
import that the deleted block had been pulling in, and drops a stray
#[tokio::test] left dangling above #[test] fn one_lock_at_the_same_time
by the deletion.
| C: GetTime, | ||
| RS: WaitForReadySignal, | ||
| RP: BlockReconciliationReadPort + 'static, | ||
| RP: BlockReconciliationReadPort + BlockReconciliationWritePort + 'static, |
There was a problem hiding this comment.
nit: do these need to be separate traits?
…urability
Previously `publish_block_on_all_nodes` used `FuturesUnordered` and dropped
the unfinished futures on quorum short-circuit. That cancelled per-node
`invoke_async` calls mid-RESP for the slow nodes — quorum was satisfied
but those nodes either got no write attempt or a half-written frame on
the wire, dropping data durability and pushing recovery onto the
sub-quorum repair path.
Now each per-node publish runs in its own `tokio::spawn`ed task. The
parent only awaits the first quorum's worth of `Written` results via
mpsc. Remaining tasks continue to completion in the background so every
reachable node gets a real write attempt, bounded by `node_timeout`.
Background tasks are tokio futures (not OS threads), each holds shared
`Arc<RedisLeaderLeaseAdapter>` / `Arc<SealedBlock>` / `Arc<[u8]>`
references (no per-task heavy clone), and they're cooperatively
cancellable.
`RedisNode::cached_connection` is now `Arc<tokio::sync::Mutex<...>>` so
adapter clones share the per-node connection cache. Without that change
each spawned task would hold a separate clone with an empty cache and
open a fresh socket per publish, defeating the connection reuse.
Cost analysis for the half-alive-node scenario at 1 block/s with
node_timeout=1s and redis 1.2:
* before: 0 outstanding background tasks per dead node (cancelled) but
no write attempt landed on those nodes
* after: ~1 outstanding background task per dead node at any moment
(each lives ≤ node_timeout), every node gets its publish attempt
…Port Per review nit on #3280: every implementor (`RedisLeaderLeaseAdapter`, `NoopReconciliationAdapter`, `ReconciliationAdapter`, `FakeReconciliation Port`) implements both halves, every `RP:` trait bound on `MainTask` requires both, and there's no caller that uses just one side. Splitting the trait bought ISP in theory but cost a duplicate import + duplicate impl block at every site. `BlockReconciliationReadPort` and `BlockReconciliationWritePort` are merged into a single `BlockReconciliationPort` with `leader_state` / `release` / `publish_produced_block`. Trait bounds simplify from `RP: BlockReconciliationReadPort + BlockReconciliationWritePort` to `RP: BlockReconciliationPort` in all four sites. No behavior change. All 29 adapter tests + 51 PoA service tests pass; clippy clean across `--all-targets --all-features`.
|
Addressed in 5c2002f: collapsed |
Bugbot review on #3280 caught: each spawned task in publish_block_on_all_nodes did `let adapter = self.clone()`, which clones the entire RedisLeaderLeaseAdapter including drop_release_guard. Lingering background tasks after a quorum short-circuit kept Arc::strong_count(drop_release_guard) > 1, so dropping the owning adapter would skip the lease-release path until the slow nodes timed out (up to ~node_timeout per dead node). In crash/panic scenarios this delayed leader failover. Fix: make drop_release_guard an `Option<Arc<()>>`. Logical adapter clones (the existing Clone impl) carry `Some` and participate in the strong-count gate as before. Spawned-task clones in publish_block_on_all_nodes explicitly null out the guard (`adapter.drop_release_guard = None;`) so they don't pin the logical lifetime — the owning adapter's Drop sees count == 1 and triggers release immediately. Drop impl: early-return if drop_release_guard is None (task clones never trigger release) and otherwise check strong_count on the inner Arc as before. Adds a regression test (drop__after_publish_releases_lease_promptly_despite_inflight_background_tasks) that publishes against 3 healthy + 1 half-alive nodes, drops the adapter immediately, and asserts the lease is released on all 3 healthy nodes within a window well under node_timeout while the half-alive node's background task is still in flight.
|
Addressed in bf28529: |
Tracks background per-node publish tasks spawned by publish_block_on_all_nodes. Steady-state should be near zero; sustained non-zero values indicate per-node publishes are not exiting (regression in the per-call timeout, or a connection/protocol stall not covered by node_timeout). Cheap visibility for ops to alert on. Implementation: an OutstandingPublishTaskGuard RAII type increments the gauge synchronously in the parent when constructed (so the gauge counts the task from the instant it is queued, not from when the task starts running) and decrements on drop. The guard is moved into the spawn closure so the decrement runs whether the task completes normally, panics, or is cancelled mid-await. Adds a focused regression test (publish_produced_block__outstanding_tasks_metric_drains_to_baseline) that publishes against 3 healthy + 1 half-alive nodes, verifies the gauge climbs above baseline right after quorum short-circuit, then verifies it drains back to baseline after the half-alive node's background task times out. Also extends metrics__poa_metrics_appear_in_encoded_output_after_ exercising_all_paths to assert the new metric name appears in the prometheus encoding.
|
Added |
| // Clone the adapter for the spawned task, but null out the | ||
| // drop guard so this clone doesn't pin the logical adapter | ||
| // lifetime. Without this, a lingering background task after | ||
| // quorum short-circuit would keep `Arc::strong_count(...) > 1` | ||
| // and the owning adapter's `Drop` would skip lease release. | ||
| let mut adapter = self.clone(); | ||
| adapter.drop_release_guard = None; |
There was a problem hiding this comment.
I think this would be better as a helper method on self.
self.clone_non_reference_counted()
or something like that.
BUT... do we even need this reference counter at all anymore? Wasn't it because we were sharing it between the two services as separate port impls?
There was a problem hiding this comment.
I think we still clone it in sub_services.rs, but that's just residual.
There was a problem hiding this comment.
Ended up creating a new RedisPublishChild for handling the dropping for a given adapter. We don't need the drop_release_guard anymore.
| // unless it has been published to quorum by its producer". | ||
| self.reconciliation_port | ||
| .publish_produced_block(&block) | ||
| .await?; |
There was a problem hiding this comment.
Publish now precedes database validation, weakening atomicity
Medium Severity
The old code published to Redis inside commit_result, after local database validation (merkle root and height checks) but before commit_changes. The new code calls publish_produced_block before commit_result, so the block reaches Redis quorum before the importer validates local DB state. If commit_result subsequently fails (e.g. InvalidDatabaseStateAfterExecution from a merkle root mismatch), the block exists on Redis quorum but is never committed locally — an inconsistent state the old ordering prevented. The same issue applies to produce_predefined_block.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 765d9d2. Configure here.
There was a problem hiding this comment.
That was the old behavior... it just published inside of commit_result
There was a problem hiding this comment.
Acknowledged — this ordering was an explicit decision after talking with the original authors. The publish-inside-importer was an artifact and not load-bearing for atomicity: followers reconcile from Redis on next sync, so a published-but-uncommitted block converges naturally on local restart/sync. Keeping the publish in PoA so the importer remains a pure local-state component.
Pure-read quorum operations (has_lease_owner_quorum, should_reconcile_from_stream) previously relied on tokio::spawn + mpsc, so a slow node's redis call kept its connection pinned even after the parent short-circuited. Switch them to FuturesUnordered so dropping the stream cancels the in-flight redis op cooperatively — the slow connection is released immediately. Notes on each path: - has_lease_owner_quorum: short-circuits on quorum-of-owners and drops the stream. The expand-to-non-owners coverage extension (which actually writes via promote_leader) still spawns as a detached background task. - should_reconcile_from_stream: short-circuits ONLY on the first node reporting backlog. Concluding "no backlog" still requires all responses — otherwise empty-stream nodes can outrun an orphan-holding minority and we'd skip a sub-quorum repair cycle. Each per-node read is bounded by node_timeout, so the worst case for the no-backlog path is ~node_timeout. - acquire_lease_if_free keeps the tokio::spawn pattern: it writes the lease, so background tasks usefully extend coverage to slow peers (same durability rationale as the publish path). Drop OutstandingReadTaskGuard + poa_outstanding_read_tasks gauge — cancelled FuturesUnordered tasks are immediate, not long-lived, so the gauge would not be meaningful. Add four half-alive bounded-latency tests modeled on the publish- path regression test, covering acquire_lease_if_free, has_lease_owner_quorum, and unreconciled_blocks (both backlog and no-backlog cases). Each must complete in seconds against a half-alive TCP listener — never indefinitely.
|
@Voxelot Pushed Refactored to FuturesUnordered (cancel-on-quorum, no spawn):
Kept the spawn pattern:
Removed:
Added regression tests modeled on
All 35 leader_lock tests pass under |
Sequential `for op in candidates { op.promote_leader().await }` made
the worst-case background expansion N × node_timeout when one
candidate is unreachable. Switch to `futures::future::join_all` so
all non-owner promotions race in parallel — total background time is
1 × node_timeout regardless of N.
Also fix a pre-existing race in
has_lease_owner_quorum__expands_lock_to_non_owned_nodes: the test
read Redis state immediately after has_lease_owner_quorum returned,
but the expand task is detached. Add a small `wait_for` helper that
polls until the expected state is observed (or a generous deadline
elapses) so the assertion is deterministic.
| /// Per-node operation context. Carried by spawned tokio tasks so they | ||
| /// don't need a clone of the full adapter — covers publish, lease-owner | ||
| /// check, lease promotion, and stream reads. | ||
| struct RedisNodeOp { |
There was a problem hiding this comment.
not sure what Op stands for
There was a problem hiding this comment.
Renamed to PerNodeAdapter (factory per_node_adapter) in 1891e64 — it's the adapter scoped to one redis node, and the methods on it (publish_block_on_node, check_lease_owner, promote_leader, read_latest_stream_entry) are the same operations RedisLeaderLeaseAdapter would issue, just narrowed to a single node so they can be moved into a spawned task.
| /// `write_block.lua` uses a consistent epoch across owned nodes. | ||
| /// Runs as a fire-and-forget background task so the calling | ||
| /// `has_lease_owner_quorum` can short-circuit on quorum. | ||
| fn spawn_expand_to_non_owners(&self, confirmed_owner: &[bool]) { |
There was a problem hiding this comment.
Another issue I have with agents. This name is nonsense to me.
Why not just name it try_to_acquire_the_lease_on_nodes_we_don't_already_own and remove the comment? Or keep the comment for the other details.
| nodes_indicating_backlog = | ||
| nodes_indicating_backlog.saturating_add(1); | ||
| drop(reads); | ||
| return Ok(true); |
There was a problem hiding this comment.
This returns Ok(true) on the first one that is >=? But what if we don't have quorum why would we trust the first? Maybe I'm missing something?
After acquire_lease_if_free's quorum short-circuit, tokens from nodes that had not yet responded were silently discarded when the mpsc receiver was dropped. If one of those late nodes carried a drifted-high epoch (from prior election storms), our recorded current_epoch_token stayed below its stored epoch, and every write to that node would FENCING_ERROR until the next has_lease_owner_quorum expansion observed and adopted the higher value. Fix: each spawned promotion task now max-CASes its returned token into current_epoch_token directly via a new fold_epoch_max helper, in addition to sending on the channel for the parent's quorum tally. The parent's post-acquire epoch update uses the same helper instead of an unconditional set, so it can never lower a value a late task already raised. Both writers are monotonic-by-construction; the shared cell converges on the global max within node_timeout. Once the late, higher token has been folded, write_block.lua step 3 heals the lagging nodes upward on the next publish (submitted token greater than stored epoch -> SET, then proceed) — no separate expansion call needed. Add two regression tests: - acquire_lease_if_free__folds_late_higher_epoch_into_current_epoch_token: pre-drift node C's epoch to 100 and assert that current_epoch_token reaches >= 101 after acquire, regardless of C's response order. - fold_epoch_max__monotonic_max_under_concurrent_writers: unit-level guard that the helper reaches the global max under heavy concurrent contention with interleaved ascending/descending writes.
…_epoch_max The expand-to-non-owners background task did its own inline max-CAS update of `current_epoch_token` and never bumped the `leader_epoch` prometheus gauge. So a higher epoch adopted from lock expansion left the gauge stale until the next acquire happened to call fold_epoch_max with a newer value. Replace the inline update with per-token fold_epoch_max calls so the gauge moves in lock-step with the cell. The tracing::debug log is dropped — the value transition is already observable on the metric and not worth a separate log line per expansion.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 59231ec. Configure here.
…quorum Previously the FuturesUnordered drain returned Ok(true) on the first node reporting `latest_height >= next_height`, even if the rest of the cluster was unreachable. Net effect was just one extra round- trip — `unreconciled_blocks` re-checked quorum on `read_stream_entries` and propagated the same Err — but the should_reconcile_from_stream contract no longer matched the old `join_all + Err on degraded quorum` behavior, and it sent us into a wasted reconciliation read when the cluster couldn't quorum-respond anyway. Track a `backlog_seen` flag and only short-circuit Ok(true) once we have BOTH a positive backlog signal AND a quorum of successful responses. Empty-stream responses still count toward the quorum tally, so a single backlog signal isn't blocked behind the slowest node — just behind quorum-1 of the responses, which is the same wait the caller would do downstream anyway. The "no backlog" path still requires all responses (bounded by node_timeout) so an orphan-holding minority isn't outraced by empty-stream majorities.
Per Mitch's review feedback that "Op" is jargon and spawn_expand_to_non_owners doesn't say what it does: - RedisNodeOp -> PerNodeAdapter (with new_node_op -> per_node_adapter): it's the adapter scoped to one redis node, used to issue per-node calls from spawned tasks. "PerNodeAdapter" says exactly that. - spawn_expand_to_non_owners -> spawn_acquire_lease_on_unowned_nodes: names the action. Keeps the spawn_ prefix so callers see it's fire-and-forget background work. Also drag along a stray `child` local variable and an adjacent test name that still referenced the old "publish child" phrasing.
`wait_for_redis_ready` was checking only TCP connectability, which succeeds as soon as redis-server's listener binds — before the process is ready to handle commands. On a cold start this is fast enough to mask, but the restart-and-lose-data test (`unreconciled_blocks__when_redis_node_restarts_and_loses_data__drops_block_below_quorum`) hit the gap consistently enough to flake locally and in CI: adapter_b's first command on the restarted node would fail because redis was still warming up. Same class of half-ready issue as the 2026-04-22 mainnet hang — TCP accept doesn't imply protocol readiness. Replace the TCP probe with a real PING/PONG round-trip so we only return once redis is actually serving commands. Stress: previously flaky test now passes 20/20 in isolation and the full leader_lock suite passes 5/5.
This reverts commit f653a00.
`acquire_lease_if_free__adopts_higher_epoch_from_expanded_node` read `current_epoch_token` immediately after acquire returned, but with the spawn-and-drain refactor the late-arriving high-epoch token from node C is folded by a background task — possibly microseconds after acquire returns, but enough to flake CI. Same fix as `has_lease_owner_quorum__expands_lock_to_non_owned_nodes`: poll via wait_for so the assertion observes the eventual state instead of racing the fold.
`release_if_owner` was calling `has_lease_owner_quorum` first and
short-circuiting Ok(()) when we no longer held quorum — but that
helper now spawns `spawn_acquire_lease_on_unowned_nodes` as a
detached background task whenever quorum IS confirmed. The spawned
task races our own DELs and can `SET NX` the lease back on a node
we are simultaneously releasing.
Observed CI failure on
`unreconciled_blocks__when_redis_node_restarts_and_loses_data__drops_block_below_quorum`:
the restarted node's lease was empty, the expand task SET it with
the outgoing leader's token after release ran, and the next
leader's repair hit FENCING_ERROR ("Lost lock during repair") on
that node because the stale lease still held.
`RELEASE_LOCK_SCRIPT` is a per-node CAS-DEL — it only removes the
lease when the stored owner matches our token — so calling it on
every node unconditionally is safe regardless of whether we hold
quorum. Drop the leading quorum check entirely; clear the local
epoch token after the release fan-out completes. Same effect on
the happy path; no expand spawn to race with.
Stress: previously flaky test now passes 20/20 in isolation; full
leader_lock suite passes 5/5.
`publish_produced_block` returns once a quorum acks; per-node publish tasks for the slowest node(s) run in a detached background tokio task. Three tests asserted `stream_len == 1` on every node immediately after the call, racing with that background task and flaking on CI. Wrap the precondition checks in `wait_for` so they observe the eventual all-nodes-have-block state instead of the immediate quorum-acked state. Tests affected: - unreconciled_blocks__when_reads_fail_on_quorum_nodes__returns_error - has_lease_owner_quorum__expands_lock_to_non_owned_nodes - has_lease_owner_quorum__adopts_higher_epoch_from_expanded_node Stress: full leader_lock suite passes 10/10 with --test-threads=1.
`publish_produced_block__when_epoch_is_behind_on_one_node_then_first_write_heals_epoch` read node A's epoch immediately after `publish_produced_block` returned, but the publish task targeting A (the lagging node we expected to heal) may still be running in the background after the quorum short-circuit. If A is the slow responder, the heal hasn't landed when the assertion fires. Wrap the post-condition read in `wait_for` so the assertion observes the eventual healed state. Caught while auditing remaining spawn-and-drain race sites after the recent CI flakes.


Summary
BlockReconciliationWritePortout offuel_core_importer::portsand intofuel_core_poa::ports, making it async. The importer no longer knows anything about redis; PoA owns the publish path end to end.RedisLeaderLeaseAdapter::publish_produced_block(andpublish_block_on_all_nodes,repair_sub_quorum_block) as native async:FuturesUnorderedfan-out,tokio::time::timeout(node_timeout, ...)per node, short-circuit on quorum by dropping the remaining futures. No rayon bridge, nostd::thread::spawnleak.redis::aio::MultiplexedConnectioncache for publishes so we stop opening a fresh TCP connection per block.Context
Follow-up to #3278 (the 2026-04-22 mainnet hang hotfix). That PR kept the importer-owned publish + rayon bridge and used detached sync threads with an mpsc channel for the quorum short-circuit. Discussed with the original authors — the importer never had a reason to own publish, that was an artifact of the initial scaffolding — and confirmed with the team that moving it to PoA is the intended direction. This PR does that cleanup and goes native-async at the same time.
Behavior-equivalence
reconciliation_port.publish_produced_block(...).await?is called inMainTask::produce_block(andproduce_predefined_block) immediately beforeblock_importer.commit_result(...).await?. If publish fails to reach quorum, the commit is skipped — identical invariant to the oldSource::Localgate in_commit_result.write_block.luacontract are unchanged.MultiplexedConnections are cleared on transport errors / timeouts, same pattern already used byread_latest_stream_entry_on_nodeand the lease-ops paths.Test plan
service::adapters::consensus_module::poa::testspass with--features leader_lock, including the 2026-04-22 hang-repro (publish_produced_block__returns_within_bound_when_one_node_is_half_alive) — thespawn_blockingwrapper is gone and the publish returns cleanly in the normal timeout budget.fuel-core-poapass (new trait bound required aBlockReconciliationWritePortimpl onFakeReconciliationPort).cargo +nightly-2025-09-28 fmt --checkclean.cargo sort -w --checkclean.Notes
spawn_blocking. It still asserts a 5s wall-clock deadline; with the native-async fan-out + drop-cancel, a single half-alive node resolves the call in one timeout interval instead of contributing a leaked OS thread.