feat(acp-nats-ws): unblock remote ACP clients#126
Conversation
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
PR SummaryMedium Risk Overview Introduces a connection manager and per-HTTP-connection runtime ( WebSocket handling is updated to emit/propagate connection IDs in logs and headers, ignore binary frames, and route through the new manager; extensive new tests cover HTTP flows, header validation, origin checks, and lifecycle behaviors. Reviewed by Cursor Bugbot for commit 8805ee2. Bugbot is set up for automated code reviews on this repo. Configure here. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughReplaces the WebSocket-only upgrade with a unified ACP transport at Changes
Sequence DiagramsequenceDiagram
participant Client as Client
participant HTTP as Transport HTTP Server
participant Manager as Connection Manager
participant Conn as Connection Runtime
participant NATS as NATS/JetStream
Client->>HTTP: POST/GET/DELETE /acp or WebSocket upgrade
HTTP->>Manager: send ManagerRequest (HttpPost/HttpGet/WebSocket/HttpDelete)
Manager->>Conn: spawn or attach run_http_connection / handoff WebSocket
Conn->>NATS: publish/subscribe ACP messages
NATS-->>Conn: inbound ACP messages
Conn-->>Client: SSE events or WebSocket text frames
Client->>Conn: JSON-RPC messages (POST bodies or WS frames)
Conn->>NATS: forward outbound messages
Client->>HTTP: DELETE /acp
HTTP->>Manager: ManagerRequest::HttpDelete -> Manager closes Conn
Conn->>NATS: teardown subscriptions
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rsworkspace/crates/acp-nats-ws/src/main.rs (1)
52-53: Add a regression test for the legacy/wsalias.The alias is key to the PR’s compatibility goal, but the updated tests only exercise
/acp. A small handshake test forLEGACY_WS_ENDPOINTwould lock this in.🧪 Proposed test coverage addition
@@ #[tokio::test] async fn test_websocket_connection_lifecycle() { @@ } + + #[tokio::test] + async fn legacy_ws_endpoint_still_upgrades() { + let (shutdown_tx, _shutdown_rx) = watch::channel(false); + let (conn_tx, mut conn_rx) = mpsc::unbounded_channel::<ConnectionRequest>(); + + let state = UpgradeState { + conn_tx, + shutdown_tx, + }; + + let app = axum::Router::new() + .route(ACP_ENDPOINT, axum::routing::get(upgrade::handle)) + .route(LEGACY_WS_ENDPOINT, axum::routing::get(upgrade::handle)) + .with_state(state); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + let ws_url = format!("ws://{}{}", addr, LEGACY_WS_ENDPOINT); + let (_ws_stream, response) = connect_async(ws_url).await.unwrap(); + + assert!(response.headers().contains_key(ACP_CONNECTION_ID_HEADER)); + let _req = tokio::time::timeout(Duration::from_secs(2), conn_rx.recv()) + .await + .expect("timeout waiting for legacy ConnectionRequest") + .expect("channel closed"); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/main.rs` around lines 52 - 53, The test suite lacks a regression test verifying the legacy LEGACY_WS_ENDPOINT ("/ws") handshake; add a new async integration test that mirrors the existing ACP_ENDPOINT handshake test but targets LEGACY_WS_ENDPOINT, asserting that an HTTP GET to the route handled by upgrade::handle completes the websocket handshake and returns the expected status/headers; place the test alongside the existing handshake tests and reference ACP_ENDPOINT, LEGACY_WS_ENDPOINT and upgrade::handle so the behavior for the alias is locked in.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@rsworkspace/crates/acp-nats-ws/src/main.rs`:
- Around line 52-53: The test suite lacks a regression test verifying the legacy
LEGACY_WS_ENDPOINT ("/ws") handshake; add a new async integration test that
mirrors the existing ACP_ENDPOINT handshake test but targets LEGACY_WS_ENDPOINT,
asserting that an HTTP GET to the route handled by upgrade::handle completes the
websocket handshake and returns the expected status/headers; place the test
alongside the existing handshake tests and reference ACP_ENDPOINT,
LEGACY_WS_ENDPOINT and upgrade::handle so the behavior for the alias is locked
in.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5f732cd0-7551-452b-921b-d892f6763050
⛔ Files ignored due to path filters (1)
rsworkspace/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
rsworkspace/crates/acp-nats-ws/Cargo.tomlrsworkspace/crates/acp-nats-ws/README.mdrsworkspace/crates/acp-nats-ws/src/acp_connection_id.rsrsworkspace/crates/acp-nats-ws/src/connection.rsrsworkspace/crates/acp-nats-ws/src/constants.rsrsworkspace/crates/acp-nats-ws/src/main.rsrsworkspace/crates/acp-nats-ws/src/upgrade.rs
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Code Coverage SummaryDetailsDiff against mainResults for commit: 8805ee2 Minimum allowed coverage is ♻️ This comment has been updated with latest results |
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
rsworkspace/crates/acp-nats-ws/src/transport.rs (1)
86-100: Prefer implementingIntoResponserather than an inherentinto_response.
HttpTransportError::into_responseshadows the trait method name but is inherent, soErr(error).into_response()only works because of autoref. Implementingaxum::response::IntoResponsedirectly lets the type be returned inResult<_, HttpTransportError>from handlers (Axum handles the conversion), removes the repetitivematch … { Ok(r) => r, Err(e) => e.into_response() }inget/post/delete, and is idiomatic for axum 0.8.♻️ Sketch
-impl HttpTransportError { - fn into_response(self) -> Response { +impl axum::response::IntoResponse for HttpTransportError { + fn into_response(self) -> Response { let (status, message) = match self { /* … */ }; (status, message).into_response() } }Handlers then become:
pub async fn post(headers: HeaderMap, State(state): State<AppState>, body: String) -> Result<Response, HttpTransportError> { http_post(headers, state, body).await }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/transport.rs` around lines 86 - 100, Replace the inherent method with an implementation of axum::response::IntoResponse for HttpTransportError: implement impl IntoResponse for HttpTransportError { fn into_response(self) -> Response { ... } } and move the existing match logic into that trait method (or keep the inherent helper but have the trait call it), so HttpTransportError can be returned directly from handlers as Result<Response, HttpTransportError>; update handlers to return Result<_, HttpTransportError> where needed and remove the repetitive explicit error-to-response mapping such as e.into_response() in get/post/delete paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rsworkspace/crates/acp-nats-ws/src/transport.rs`:
- Around line 25-26: The SSE and internal messaging channels use unbounded mpsc
which can exhaust memory for slow/stalled clients; replace SseSender/SseReceiver
and all uses that create unbounded channels (notably stream_tx, each
AttachListener subscriber, input_tx and output_tx) with bounded mpsc::channel(N)
(choose a sensible N, e.g. 64 to match the notification channel) and implement
an explicit overflow policy: when send fails due to full buffer either drop that
listener/subscription (remove/close the AttachListener and clean up resources)
or return backpressure/error upstream so the outbound pump slows; ensure send
errors are handled where SseSender::send / input_tx / output_tx are used so
subscribers aren’t left accumulating frames indefinitely.
- Around line 736-786: The request-handling paths in HttpConnectionCommand::Post
(creating PendingRequest::Buffered and PendingRequest::Live) currently
fire-and-forget input_tx.send(message.raw) and can hang the HTTP caller if send
fails; change the flow to check input_tx.send(...) result and fail fast: for
both Buffered and Live, attempt input_tx.send(message.raw) before committing
pending_request (or if you must assign pending_request first, on Err
remove/reset pending_request and take the stored response to send
Err(HttpTransportError::Internal(...))); for Live also ensure you only send the
Ok(HttpPostOutcome::Live { ... }) after a successful send; in short, on send
error reset pending_request and respond with
Err(HttpTransportError::Internal(...)) via the response channel instead of
silently dropping the message so callers fail immediately.
- Around line 75-116: HttpTransportError currently holds only &'static str
messages and all call sites use map_err(|_| HttpTransportError::X("literal")),
which discards underlying errors; change HttpTransportError into structured
variants that carry the source error (e.g. BadRequest(#[source] Box<dyn
std::error::Error + Send + Sync>) or concrete error types) and derive
Display/Error via thiserror, update into_response to match on these variants and
map them to status codes, remove the manual Display impl, and replace the
map_err(...) call sites (e.g. where serde_json::Error, AcpSessionIdError,
AcpConnectionIdError::InvalidUuid, mpsc::SendError, oneshot::RecvError are
mapped) to propagate the typed errors (use ? or map_err(|e|
HttpTransportError::X(e.into())) so the original error is preserved) so callers
can see the source chain.
- Around line 382-417: The SSE GET handler http_get currently only sets
X_ACCEL_BUFFERING_HEADER and omits echoing Acp-Connection-Id/Acp-Session-Id like
the POST paths; update http_get to call
set_transport_headers(response.headers_mut(), &connection_id, &session_id) (or
equivalent) after building the response so the Acp-Connection-Id and
Acp-Session-Id are included in the response headers, matching build_sse_response
/ build_buffered_sse_response behavior.
---
Nitpick comments:
In `@rsworkspace/crates/acp-nats-ws/src/transport.rs`:
- Around line 86-100: Replace the inherent method with an implementation of
axum::response::IntoResponse for HttpTransportError: implement impl IntoResponse
for HttpTransportError { fn into_response(self) -> Response { ... } } and move
the existing match logic into that trait method (or keep the inherent helper but
have the trait call it), so HttpTransportError can be returned directly from
handlers as Result<Response, HttpTransportError>; update handlers to return
Result<_, HttpTransportError> where needed and remove the repetitive explicit
error-to-response mapping such as e.into_response() in get/post/delete paths.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 04c76481-b35c-4aa2-823b-4b35cdb3ae74
⛔ Files ignored due to path filters (1)
rsworkspace/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
rsworkspace/crates/acp-nats-ws/Cargo.tomlrsworkspace/crates/acp-nats-ws/README.mdrsworkspace/crates/acp-nats-ws/src/acp_connection_id.rsrsworkspace/crates/acp-nats-ws/src/constants.rsrsworkspace/crates/acp-nats-ws/src/main.rsrsworkspace/crates/acp-nats-ws/src/transport.rsrsworkspace/crates/acp-nats-ws/src/upgrade.rs
💤 Files with no reviewable changes (1)
- rsworkspace/crates/acp-nats-ws/src/upgrade.rs
✅ Files skipped from review due to trivial changes (2)
- rsworkspace/crates/acp-nats-ws/src/constants.rs
- rsworkspace/crates/acp-nats-ws/README.md
🚧 Files skipped from review as they are similar to previous changes (3)
- rsworkspace/crates/acp-nats-ws/Cargo.toml
- rsworkspace/crates/acp-nats-ws/src/acp_connection_id.rs
- rsworkspace/crates/acp-nats-ws/src/main.rs
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
rsworkspace/crates/acp-nats-ws/src/main.rs (2)
83-86:⚠️ Potential issue | 🟡 MinorStale doc comment —
AppState.conn_txrenamed tomanager_tx.The comment still references
AppState.conn_tx, which no longer exists. Update toAppState.manager_txto avoid confusing future readers.📝 Proposed fix
- // `serve` returning drops the Router (and its AppState.conn_tx), which + // `serve` returning drops the Router (and its AppState.manager_tx), which // closes the channel and lets the connection thread's recv-loop exit and🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/main.rs` around lines 83 - 86, The doc comment mentions the old field name `AppState.conn_tx`; update this comment to reference the new field `AppState.manager_tx` so it accurately describes that dropping `serve` (and the Router, which owns `AppState.manager_tx`) closes the channel and lets the connection thread's recv-loop exit and drain active connections before telemetry teardown; locate the comment near the `serve`/`Router` code in main.rs and replace `conn_tx` with `manager_tx`.
106-178:⚠️ Potential issue | 🔴 CriticalAdd
Clonebound toJin bothrun_connection_threadandprocess_connections.
process_manager_requestin transport.rs requiresJ: Clone + 'static, but both functions pass references tojs_clientwithout declaring theClonebound. This will fail to compile when a concrete generic type is monomorphized.Proposed fix
fn run_connection_thread<N, J>( manager_rx: mpsc::UnboundedReceiver<ManagerRequest>, nats_client: N, js_client: J, config: acp_nats::Config, ) where N: acp_nats::RequestClient + acp_nats::PublishClient + acp_nats::FlushClient + acp_nats::SubscribeClient + Clone + Send + 'static, - J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Send + 'static, + J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Clone + Send + 'static, trogon_nats::jetstream::JsMessageOf<J>: trogon_nats::jetstream::JsRequestMessage, async fn process_connections<N, J>( mut manager_rx: mpsc::UnboundedReceiver<ManagerRequest>, nats_client: N, js_client: J, config: acp_nats::Config, ) where N: acp_nats::RequestClient + acp_nats::PublishClient + acp_nats::FlushClient + acp_nats::SubscribeClient + Clone + Send + 'static, - J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + 'static, + J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Clone + 'static, trogon_nats::jetstream::JsMessageOf<J>: trogon_nats::jetstream::JsRequestMessage,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/main.rs` around lines 106 - 178, The error is that both run_connection_thread and process_connections use js_client by reference but their generic bound for J is missing Clone; update the where clauses for J in both functions (run_connection_thread and process_connections) to include Clone (e.g., change J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Send + 'static to J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Clone + Send + 'static in run_connection_thread, and add Clone to the J bound in process_connections similarly) so process_manager_request can clone/passthrough js_client as required.
🧹 Nitpick comments (1)
rsworkspace/crates/acp-nats-ws/src/transport.rs (1)
943-948: Defensive branch is unreachable for HTTP posts routed viapost().
http_postalready callsvalidate_http_context(line 346), which rejects non-initializemessages with a missing connection id before theManagerRequestis ever sent. This fallback inprocess_manager_requestonly fires if a future caller bypasseshttp_post. Fine as defense-in-depth, but worth a one-line comment so readers don't assume validation is centralized here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/transport.rs` around lines 943 - 948, The branch in process_manager_request that checks if !message.is_initialize() and returns a BadRequest is unreachable for HTTP posts routed via http_post because http_post already calls validate_http_context (which rejects non-initialize messages missing Acp-Connection-Id); add a one-line comment above this defensive if in process_manager_request explaining that validate_http_context (called by http_post) enforces this for normal HTTP paths and that this check is only defense-in-depth for callers that bypass http_post/validate_http_context (reference process_manager_request, http_post, and validate_http_context to locate the code).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rsworkspace/crates/acp-nats-ws/src/transport.rs`:
- Around line 787-798: AttachListener currently pushes a new sender into
get_listeners[session_id] but code that prunes dropped senders
(listeners.retain(...)) leaves empty Vec entries and Close doesn't clear maps,
causing a leak; update the logic so after any retain/filter step you check if
the Vec for that session_id is empty and remove the HashMap entry
(get_listeners.remove(&session_id)), and in the handler for
HttpConnectionCommand::Close clear or drain get_listeners and sessions to free
all entries; ensure the retain logic that removes dead senders runs wherever
listeners are cleaned so empty vectors are always evicted.
- Around line 805-855: The outbound-path currently short-circuits when
pending_request is Some(PendingRequest::Live { request_id, sender, .. }) and
continues before dispatching to per-session GET listeners; change the logic so
that for Live pending requests you still check parsed.params_session_id() and
route the frame to get_listeners for that session when the frame's sessionId
does not match the Live request_id (or when the frame has a sessionId for some
other session), while still delivering frames that match request_id to the Live
sender and clearing pending_request; in practice update the PendingRequest::Live
branch in the outbound handling to (1) attempt sender.send(frame.clone()) as
now, (2) if parsed.params_session_id() exists and is not the same as the Live
request's request_id (or parsed.id != request_id), additionally look up
get_listeners.get_mut(&session_id) and retain/send to those listeners, and (3)
only skip the get_listeners dispatch entirely when the frame both was delivered
to the Live sender and its parsed id/session matches the Live request_id.
---
Outside diff comments:
In `@rsworkspace/crates/acp-nats-ws/src/main.rs`:
- Around line 83-86: The doc comment mentions the old field name
`AppState.conn_tx`; update this comment to reference the new field
`AppState.manager_tx` so it accurately describes that dropping `serve` (and the
Router, which owns `AppState.manager_tx`) closes the channel and lets the
connection thread's recv-loop exit and drain active connections before telemetry
teardown; locate the comment near the `serve`/`Router` code in main.rs and
replace `conn_tx` with `manager_tx`.
- Around line 106-178: The error is that both run_connection_thread and
process_connections use js_client by reference but their generic bound for J is
missing Clone; update the where clauses for J in both functions
(run_connection_thread and process_connections) to include Clone (e.g., change
J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Send + 'static
to J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Clone + Send
+ 'static in run_connection_thread, and add Clone to the J bound in
process_connections similarly) so process_manager_request can clone/passthrough
js_client as required.
---
Nitpick comments:
In `@rsworkspace/crates/acp-nats-ws/src/transport.rs`:
- Around line 943-948: The branch in process_manager_request that checks if
!message.is_initialize() and returns a BadRequest is unreachable for HTTP posts
routed via http_post because http_post already calls validate_http_context
(which rejects non-initialize messages missing Acp-Connection-Id); add a
one-line comment above this defensive if in process_manager_request explaining
that validate_http_context (called by http_post) enforces this for normal HTTP
paths and that this check is only defense-in-depth for callers that bypass
http_post/validate_http_context (reference process_manager_request, http_post,
and validate_http_context to locate the code).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 823b2979-ce85-4a22-a569-6563854ad1c3
📒 Files selected for processing (3)
rsworkspace/crates/acp-nats-ws/src/connection.rsrsworkspace/crates/acp-nats-ws/src/main.rsrsworkspace/crates/acp-nats-ws/src/transport.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rsworkspace/crates/acp-nats-ws/src/connection.rs
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
rsworkspace/crates/acp-nats-ws/src/transport.rs (2)
439-473:⚠️ Potential issue | 🟡 MinorEcho ACP transport headers on GET SSE responses.
http_getalready has both IDs but only setsX-Accel-Buffering; POST SSE paths includeAcp-Connection-IdandAcp-Session-Id. Keep GET consistent by callingset_transport_headers.🐛 Proposed fix
let mut response = Sse::new(stream::unfold(stream, |mut stream| async move { stream .recv() .await .map(|item| (Ok::<Event, Infallible>(item.into_event()), stream)) })) .into_response(); + set_transport_headers(response.headers_mut(), &connection_id, Some(&session_id)); response .headers_mut() .insert(X_ACCEL_BUFFERING_HEADER, HeaderValue::from_static("no"));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/transport.rs` around lines 439 - 473, http_get constructs the SSE response but only sets X-Accel-Buffering; update it to also echo the ACP transport headers by calling set_transport_headers on the response (use the existing connection_id and session_id values). Locate http_get and, after creating response via Sse::new(...).into_response(), invoke set_transport_headers(&mut response, &connection_id, &session_id) (or the project's equivalent signature) before returning so Acp-Connection-Id and Acp-Session-Id are included on GET SSE responses.
893-909:⚠️ Potential issue | 🟠 MajorDo not buffer unrelated session frames into
session/newresponses.The
Bufferedpath still captures every outbound frame and skips GET listener dispatch until the matching response arrives. Duringsession/new, notifications for existing sessions can be starved from their GET listeners and incorrectly included in the buffered POST response.Route frames with
params.sessionIdfor known existing sessions toget_listeners, and only buffer frames that belong to the pending request or have no better session route.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/transport.rs` around lines 893 - 909, The Buffered branch in PendingRequest (the match arm handling PendingRequest::Buffered { request_id, events, .. }) is currently pushing every outbound frame into events and delaying GET listener dispatch; change it so frames that carry params.sessionId for an already-known session (check parsed.as_ref().and_then(|m| m.params.sessionId) or equivalent) are routed to get_listeners instead of being pushed into events, and only push into events when the frame belongs to the pending request (message.id == request_id) or when no known-session route applies; preserve existing behavior for matching request_id (keep extracting session_id, inserting into sessions, taking events and sending response via response.send(Ok(HttpPostOutcome::Buffered { ... }))) but ensure known-session frames are dispatched to get_listeners before buffering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rsworkspace/crates/acp-nats-ws/src/transport.rs`:
- Around line 166-204: IncomingHttpMessage currently doesn't validate the
"jsonrpc" envelope and allows responses with both result and error; update the
IncomingHttpMessage struct to include a jsonrpc field (e.g., pub jsonrpc:
Option<String>), then in IncomingHttpMessage::parse after deserializing check
that jsonrpc == Some("2.0") and return an HttpTransportError::BadRequest for any
other/missing value, and update IncomingHttpMessage::is_response to require
exactly one of result or error (i.e., result.is_some() != error.is_some()) so
responses with both fields are rejected.
- Around line 497-541: The Content-Type check in validate_post_headers currently
does an exact string compare and rejects parameterized types like
"application/json; charset=utf-8"; change validate_post_headers to strip
media-type parameters (split on ';' and trim, case-insensitive) before comparing
or reuse the existing accept_contains logic to test that the media type equals
"application/json" so parameterized Content-Type headers are accepted; update
the branch that returns HttpTransportError::UnsupportedMediaType accordingly
while keeping the same error message.
---
Duplicate comments:
In `@rsworkspace/crates/acp-nats-ws/src/transport.rs`:
- Around line 439-473: http_get constructs the SSE response but only sets
X-Accel-Buffering; update it to also echo the ACP transport headers by calling
set_transport_headers on the response (use the existing connection_id and
session_id values). Locate http_get and, after creating response via
Sse::new(...).into_response(), invoke set_transport_headers(&mut response,
&connection_id, &session_id) (or the project's equivalent signature) before
returning so Acp-Connection-Id and Acp-Session-Id are included on GET SSE
responses.
- Around line 893-909: The Buffered branch in PendingRequest (the match arm
handling PendingRequest::Buffered { request_id, events, .. }) is currently
pushing every outbound frame into events and delaying GET listener dispatch;
change it so frames that carry params.sessionId for an already-known session
(check parsed.as_ref().and_then(|m| m.params.sessionId) or equivalent) are
routed to get_listeners instead of being pushed into events, and only push into
events when the frame belongs to the pending request (message.id == request_id)
or when no known-session route applies; preserve existing behavior for matching
request_id (keep extracting session_id, inserting into sessions, taking events
and sending response via response.send(Ok(HttpPostOutcome::Buffered { ... })))
but ensure known-session frames are dispatched to get_listeners before
buffering.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d57b3219-21ca-4796-ab6f-39d4df2ed62f
📒 Files selected for processing (1)
rsworkspace/crates/acp-nats-ws/src/transport.rs
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rsworkspace/crates/acp-nats-ws/src/main.rs (1)
102-157:⚠️ Potential issue | 🟠 MajorAdd
Cloneto theJtrait bounds in bothrun_connection_threadandprocess_connections.
process_connectionspasses&js_clienttotransport::process_manager_request(line 164), which requiresJ: Clone + 'static(transport.rs line 1394). However,process_connectionsboundsJwithoutClone(line 156), creating a trait mismatch that will prevent compilation. The same issue applies torun_connection_thread(line 115), which passesJthrough toprocess_connections.Additionally, the doc comment at lines 99–101 states that "All WebSocket connections are processed here" due to the
?Sendtrait, but the function now also handles HTTP connections. Update the comment to reflect both connection types.♻️ Proposed fix
- J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Send + 'static, + J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Clone + Send + 'static,(apply to both
run_connection_threadat line 115 andprocess_connectionsat line 156)Update the doc comment to indicate both WebSocket and HTTP connections are handled here.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/main.rs` around lines 102 - 157, Add Clone to the generic bound for J in both run_connection_thread and process_connections (so J: acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Clone + Send + 'static) because process_connections passes &js_client into transport::process_manager_request which requires J: Clone; update the corresponding doc comment above run_connection_thread to say it handles both WebSocket and HTTP connections (not only WebSocket) to reflect the current behavior.
🧹 Nitpick comments (3)
rsworkspace/crates/acp-nats-ws/src/transport.rs (2)
1473-1510: Minor: silent drop on HTTP command dispatch failure surfaces asInternalinstead ofNotFound.For
HttpGet(line 1491-1494) and implicitlyHttpDelete(line 1506-1509), whencommand_tx.send(...)fails, theresponseoneshot is consumed into theSendErrorand discarded, so the caller'sresponse_rx.awaitresolves to aRecvErrorwhich is mapped toHttpTransportError::Internal("connection manager dropped the request")inhttp_get/http_delete. The actual condition here is a dead HTTP connection task (stale handle not yet pruned), i.e. it should read asNotFound("unknown ACP connection")— matching the "removed" cleanup on the very next line.Consider extracting the
responsebefore the send so you can emit a typedNotFoundon failure, which also makes theHttpPostbranch (line 1460-1471) more consistent.♻️ Sketch for the HttpGet branch
- if handle - .command_tx - .send(HttpConnectionCommand::AttachListener { - session_id, - protocol_version, - response, - }) - .is_err() - { - http_connections.remove(&connection_id); - } + if let Err(err) = handle.command_tx.send(HttpConnectionCommand::AttachListener { + session_id, + protocol_version, + response, + }) { + http_connections.remove(&connection_id); + if let HttpConnectionCommand::AttachListener { response, .. } = err.0 { + let _ = response.send(Err(HttpTransportError::not_found( + "unknown ACP connection", + ))); + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/transport.rs` around lines 1473 - 1510, The send failure currently consumes the response oneshot and causes callers to observe an Internal error; in ManagerRequest::HttpGet (and similarly in ManagerRequest::HttpDelete) extract the response oneshot out of the HttpGet/HttpDelete match arm before calling handle.command_tx.send(HttpConnectionCommand::AttachListener { ... response }) so you can detect a SendError and explicitly reply with response.send(Err(HttpTransportError::not_found("unknown ACP connection"))) (and then remove the stale handle from http_connections), ensuring a NotFound is returned instead of losing the oneshot and surfacing Internal; apply the same pattern to the HttpDelete branch when sending HttpConnectionCommand::Close.
316-338: Double JSON parse forhas_result/has_errorprojection.
parsedeserializes the body twice: once toserde_json::Valueto read theresult/errorkeys, and again viafrom_valueto populateSelf. For request/notification-heavy traffic this is a measurable tax. You can do one pass by derivinghas_result/has_errorfromOption<Value>fields with#[serde(default)], or by projecting from the singleValueround-trip manually.rsworkspace/crates/acp-nats-ws/src/main.rs (1)
99-101: Stale doc comment — this thread now also hosts HTTP transport connections.The comment still claims "All WebSocket connections are processed here", but
process_manager_requestnow also dispatchesHttpPost/HttpGet/HttpDeleteand spawnsrun_http_connectionon the sameLocalSet(also whytokio::task::spawn_localcontinues to be required). Tighten the wording so future readers aren't misled about what lives on this runtime.♻️ Proposed fix
-/// Runs a single-threaded tokio runtime with a -/// `LocalSet`. All WebSocket connections are processed here because the ACP -/// `Agent` trait is `?Send`, requiring `spawn_local` / `Rc`. +/// Runs a single-threaded tokio runtime with a `LocalSet`. All ACP +/// connections (WebSocket and streamable HTTP) are processed here because +/// the ACP `Agent` trait is `?Send`, requiring `spawn_local` / `Rc`.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats-ws/src/main.rs` around lines 99 - 101, Update the stale doc comment that says "All WebSocket connections are processed here" to reflect that this single-threaded tokio runtime / LocalSet also hosts HTTP transport connections; mention that process_manager_request dispatches HttpPost/HttpGet/HttpDelete and spawns run_http_connection on the same LocalSet, which is why spawn_local (and Rc) is still required for the ACP Agent trait being ?Send. Locate the comment above the runtime setup (the LocalSet creation and spawn_local usage) and change the wording to encompass both WebSocket and HTTP connection handling and the reason spawn_local is needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@rsworkspace/crates/acp-nats-ws/src/main.rs`:
- Around line 102-157: Add Clone to the generic bound for J in both
run_connection_thread and process_connections (so J:
acp_nats::JetStreamPublisher + acp_nats::JetStreamGetStream + Clone + Send +
'static) because process_connections passes &js_client into
transport::process_manager_request which requires J: Clone; update the
corresponding doc comment above run_connection_thread to say it handles both
WebSocket and HTTP connections (not only WebSocket) to reflect the current
behavior.
---
Nitpick comments:
In `@rsworkspace/crates/acp-nats-ws/src/main.rs`:
- Around line 99-101: Update the stale doc comment that says "All WebSocket
connections are processed here" to reflect that this single-threaded tokio
runtime / LocalSet also hosts HTTP transport connections; mention that
process_manager_request dispatches HttpPost/HttpGet/HttpDelete and spawns
run_http_connection on the same LocalSet, which is why spawn_local (and Rc) is
still required for the ACP Agent trait being ?Send. Locate the comment above the
runtime setup (the LocalSet creation and spawn_local usage) and change the
wording to encompass both WebSocket and HTTP connection handling and the reason
spawn_local is needed.
In `@rsworkspace/crates/acp-nats-ws/src/transport.rs`:
- Around line 1473-1510: The send failure currently consumes the response
oneshot and causes callers to observe an Internal error; in
ManagerRequest::HttpGet (and similarly in ManagerRequest::HttpDelete) extract
the response oneshot out of the HttpGet/HttpDelete match arm before calling
handle.command_tx.send(HttpConnectionCommand::AttachListener { ... response })
so you can detect a SendError and explicitly reply with
response.send(Err(HttpTransportError::not_found("unknown ACP connection"))) (and
then remove the stale handle from http_connections), ensuring a NotFound is
returned instead of losing the oneshot and surfacing Internal; apply the same
pattern to the HttpDelete branch when sending HttpConnectionCommand::Close.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 49f98959-ddb5-4bf0-9230-9f1ec5445b25
📒 Files selected for processing (4)
rsworkspace/crates/acp-nats-ws/README.mdrsworkspace/crates/acp-nats-ws/src/constants.rsrsworkspace/crates/acp-nats-ws/src/main.rsrsworkspace/crates/acp-nats-ws/src/transport.rs
✅ Files skipped from review due to trivial changes (1)
- rsworkspace/crates/acp-nats-ws/src/constants.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rsworkspace/crates/acp-nats-ws/README.md
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 8805ee2. Configure here.
|
|
||
| fn is_initialize(&self) -> bool { | ||
| self.method_name() == Some("initialize") | ||
| } |
There was a problem hiding this comment.
Initialize notification creates orphaned HTTP connection resource leak
Medium Severity
is_initialize() only checks the method name without verifying the message is a request (has an id). An initialize notification (no id) passes both validate_http_context and process_manager_request's is_initialize() gate, spawning a new run_http_connection task. Since the notification isn't a request, run_http_connection returns HttpPostOutcome::Accepted (202) with no Acp-Connection-Id header, so the client can never address or DELETE the connection. The orphaned task persists until server shutdown, leaking resources.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 8805ee2. Configure here.


Uh oh!
There was an error while loading. Please reload this page.