Skip to content

refactor(pegboard-gateway2): consolidate hibernation in-flight and tunnel flush#4754

Closed
NathanFlurry wants to merge 1 commit into04-24-chore_kitchen-sink_configurable_endpointfrom
04-24-refactor_pegboard_gateway2_hibernation_inflight_state
Closed

refactor(pegboard-gateway2): consolidate hibernation in-flight and tunnel flush#4754
NathanFlurry wants to merge 1 commit into04-24-chore_kitchen-sink_configurable_endpointfrom
04-24-refactor_pegboard_gateway2_hibernation_inflight_state

Conversation

@NathanFlurry
Copy link
Copy Markdown
Member

Extracts pegboard-gateway2 refinements previously embedded in #4701 and #4711 into a dedicated engine PR on top of 04-24-chore_kitchen-sink_configurable_endpoint.

@NathanFlurry NathanFlurry force-pushed the 04-24-refactor_pegboard_gateway2_hibernation_inflight_state branch from 0cda947 to 7a5e467 Compare April 24, 2026 12:32
@NathanFlurry NathanFlurry force-pushed the 04-24-chore_kitchen-sink_configurable_endpoint branch from de1ee1c to 5ef7bf6 Compare April 24, 2026 12:32
Copy link
Copy Markdown
Member Author

NathanFlurry commented Apr 24, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 24, 2026

PR Review: refactor(pegboard-gateway2): consolidate hibernation in-flight and tunnel flush

The refactor correctly fixes two real protocol bugs (the four-way close handshake and message loss during reconnect) and the hard-abort addition is a meaningful reliability improvement. A few items worth addressing before merge:


Code Quality

wait_for_envoy_websocket_open — significant duplication

The stop_on_actor_stopped: bool parameter drives a runtime branch, but the two select! arms are nearly identical (~50 lines copied). The only difference is the presence of the stopped_sub.next() arm. Replace the if/else with futures::future::pending() when the flag is false:

let stopped_fut = if stop_on_actor_stopped {
    futures::future::Either::Left(stopped_sub.next())
} else {
    futures::future::Either::Right(std::future::pending())
};

This halves the function body with no behavior change.

should_buffer — operator precedence

In shared_state.rs, the expression is should_buffer_restored_open || <hibernating_check> && matches!(...). && binds tighter than || so this is correct, but the indentation makes it hard to read at a glance. Add explicit parentheses around the && clause:

let should_buffer = should_buffer_restored_open
    || (in_flight.hibernation_state.as_ref().is_some_and(|hs| hs.hibernating)
        && matches!(msg.message_kind, ...));

Potential Bugs

hibernating_request::delete propagates DB errors after a normal close

In handle_websocket_inner, the new delete call uses ? which propagates any DB error upward. If the delete fails after a normal client-initiated close, the close frame is never returned and the WebSocket is left in an inconsistent state. The existing pattern in handle_websocket_hibernation wraps the delete in a tokio::join! with separate logging. The same treatment here would be more resilient.

hibernating = true ordering dependency is fragile and undocumented

In keepalive_hws, hs.hibernating = true is set before handle_websocket is re-invoked on the early-exit path. start_in_flight_request later sets it back to false. Any await that yields between these two calls will cause tunnel messages to be buffered — which is the intended behavior — but the ordering invariant is not documented. A short comment explaining why hibernating is set before re-invoking the handler would prevent a future refactor from breaking this.


Performance / Reliability

buffered_inbound is unbounded

InFlightRequest::buffered_inbound has no size cap. During the reconnect transition window a misbehaving or very active actor could accumulate unbounded messages here. The existing hws_max_pending_size guard already applies to pending_ws_msgs — it should be extended to cover buffered_inbound as well.

initial_messages drain in tunnel_to_ws_task can starve the select loop

The pre-select drain loop processes all buffered messages synchronously without yielding. Under a large replay burst this starves stopped_sub and abort-signal observation. Adding tokio::task::yield_now().await every N messages (or capping drain to a bounded count per iteration) keeps the task cooperative.


Summary

Close handshake fix (try_next() after hibernate) Correct
Hard abort on task cancellation + is_cancelled() guard Correct
Buffering during reconnect window Correct, but unbounded
wait_for_envoy_websocket_open duplication Refactor recommended
should_buffer parentheses Clarity fix
delete call error propagation Bug risk
initial_messages drain yield Robustness improvement

The logic is sound. Addressing the duplication and the delete error propagation are the most important items before merge.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 24, 2026

Review: refactor(pegboard-gateway2): consolidate hibernation in-flight and tunnel flush

The changes are well-motivated and fix real race conditions in the hibernation handover path. Overall the approach is sound; a few things worth addressing.

1. Significant code duplication in wait_for_envoy_websocket_open

The stop_on_actor_stopped: bool flag causes the entire tokio::select! block to be duplicated verbatim — the only difference is whether stopped_sub.next() is included. This is the largest code quality issue in the PR.

A cleaner approach uses select arm guards to make the stopped arm conditional without duplicating the rest:

async fn wait_for_envoy_websocket_open(..., stop_on_actor_stopped: bool) -> Result<...> {
    let fut = async {
        loop {
            tokio::select! {
                res = msg_rx.recv() => { /* ... */ }
                _ = stopped_sub.next(), if stop_on_actor_stopped => {
                    return Err(WebSocketServiceUnavailable.build());
                }
                _ = drop_rx.changed() => { /* ... */ }
            }
        }
        Err(WebSocketServiceUnavailable.build())
    };
    // ...
}

The if <guard> syntax on select arms is exactly for this pattern. This halves the function body.

2. should_buffer logic needs explicit parentheses

let should_buffer =
    should_buffer_restored_open
        || in_flight
            .hibernation_state
            .as_ref()
            .is_some_and(|hs| hs.hibernating)
            && matches!(...);

&& binds tighter than ||, so this reads as A || (B && C) — which is likely the intended semantics, but the multi-line formatting makes it look like (A || B) && C at a glance. Add explicit parentheses around the second clause for clarity.

3. initial_messages loop has no abort check

In tunnel_to_ws_task:

loop {
    if let Some(msg) = initial_messages.next() {
        // process without checking tunnel_to_ws_abort_rx / stopped_sub
        continue;
    }
    // select! with abort signals follows here
}

If the abort signal fires while draining initial messages, cancellation is not observed until the drain completes. In practice the initial buffer is small, but the signal semantics are inconsistent with the rest of the loop. Consider checking tunnel_to_ws_abort_rx inside the drain, or add a comment noting the intentional choice.

4. receiver_subject dropped from a downgraded log

The original warn (now debug) for a closed receiver channel included receiver_subject=%in_flight.receiver_subject. That field helps correlate which envoy tunnel the gateway was talking to and was dropped alongside the level change — worth keeping.

Minor observations

  • drain_ready_tunnel_messages error arms: TryRecvError::Empty and TryRecvError::Disconnected both break. A single Err(_) => break would be cleaner since the handling is identical.

  • should_buffer_restored_open comment missing: The name alone does not explain why a restored Open needs to be buffered rather than forwarded. A short comment (e.g. "races between the NATS message handler and the reconnect resetting opened") would help future readers.

  • hibernate_ws close fix is correct: Consuming the close frame with try_next() to complete the WebSocket handshake while the actor is hibernating is the right fix.

  • Abort handle additions are correct: Adding .abort() alongside the watch channel signal closes the window where the peer task hangs if it isn't polling the channel. Good catch.

@github-actions
Copy link
Copy Markdown
Contributor

Preview packages published to npm

Install with:

npm install rivetkit@pr-4754

All packages published as 0.0.0-pr.4754.fdc5894 with tag pr-4754.

Engine binary is shipped via @rivetkit/engine-cli on linux-x64-musl, linux-arm64-musl, darwin-x64, and darwin-arm64. Windows users should use the release installer or set RIVET_ENGINE_BINARY.

Docker images:

docker pull rivetdev/engine:slim-fdc5894
docker pull rivetdev/engine:full-fdc5894
Individual packages
npm install rivetkit@pr-4754
npm install @rivetkit/react@pr-4754
npm install @rivetkit/rivetkit-napi@pr-4754
npm install @rivetkit/workflow-engine@pr-4754

1 similar comment
@github-actions
Copy link
Copy Markdown
Contributor

Preview packages published to npm

Install with:

npm install rivetkit@pr-4754

All packages published as 0.0.0-pr.4754.fdc5894 with tag pr-4754.

Engine binary is shipped via @rivetkit/engine-cli on linux-x64-musl, linux-arm64-musl, darwin-x64, and darwin-arm64. Windows users should use the release installer or set RIVET_ENGINE_BINARY.

Docker images:

docker pull rivetdev/engine:slim-fdc5894
docker pull rivetdev/engine:full-fdc5894
Individual packages
npm install rivetkit@pr-4754
npm install @rivetkit/react@pr-4754
npm install @rivetkit/rivetkit-napi@pr-4754
npm install @rivetkit/workflow-engine@pr-4754

@MasterPtato
Copy link
Copy Markdown
Contributor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants