Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c3aad6f
fix(acp-nats-ws): align websocket bridge with acp transport draft
yordis Apr 22, 2026
38d7d3b
feat(acp-nats-ws): unblock streamable HTTP clients
yordis Apr 22, 2026
db2f17a
chore(acp-nats-ws): keep lockfile aligned
yordis Apr 22, 2026
32deb24
fix(acp-nats-ws): restore rust ci signal
yordis Apr 22, 2026
5c738a7
fix(acp-nats-ws): keep websocket logs accurate
yordis Apr 22, 2026
0646d6b
fix(acp-nats-ws): drain HTTP connections on shutdown
yordis Apr 22, 2026
bb6d8a0
fix(acp-nats-ws): preserve session stream boundaries
yordis Apr 22, 2026
a252d07
fix(acp-nats-ws): accept proxy-shaped headers
yordis Apr 22, 2026
d7d81ac
fix(acp-nats-ws): protect HTTP streams from slow clients
yordis Apr 22, 2026
38fbedc
fix(acp-nats-ws): preserve HTTP transport failure context
yordis Apr 22, 2026
92146c6
fix(acp-nats-ws): keep GET stream headers consistent
yordis Apr 22, 2026
bd76027
fix(acp-nats-ws): accept null JSON-RPC responses
yordis Apr 22, 2026
55623d6
fix(acp-nats-ws): close transport review gaps
yordis Apr 22, 2026
ff6742a
refactor(acp-nats-ws): align transport errors with workspace style
yordis Apr 22, 2026
8b56be4
test(acp-nats-ws): keep manual transport errors covered
yordis Apr 22, 2026
c73e94c
refactor(acp-nats-ws): drop pre-spec websocket alias
yordis Apr 22, 2026
7b3306a
test(acp-nats-ws): cover the single-endpoint router
yordis Apr 22, 2026
f5f83ff
fix(acp-nats-ws): prevent protocol drift on HTTP
yordis Apr 22, 2026
32a0c51
fix(acp-nats-ws): close transport compliance gaps
yordis Apr 22, 2026
0e92e05
fix(acp-nats-ws): honor session activation flow
yordis Apr 22, 2026
3322300
refactor(acp-nats-ws): make explicit ids the primary constructor
yordis Apr 23, 2026
8898cb1
refactor(acp-nats-ws): keep generated ids behind default
yordis Apr 23, 2026
8805ee2
refactor(acp-nats-ws): keep transport defaults centralized
yordis Apr 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rsworkspace/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion rsworkspace/crates/acp-nats-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ bytes = { workspace = true }
clap = { workspace = true, features = ["env"] }
futures-util = { workspace = true, features = ["sink"] }
opentelemetry = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "net", "sync", "io-util"] }
tracing = { workspace = true }
trogon-nats = { workspace = true }
trogon-std = { workspace = true, features = ["telemetry-http"] }
uuid = { workspace = true }

[dev-dependencies]
serde_json = { workspace = true }
tower = { version = "0.5", features = ["util"] }
tokio-tungstenite = { workspace = true }
tracing-subscriber = { workspace = true, features = ["fmt"] }
trogon-nats = { workspace = true, features = ["test-support"] }
Expand Down
31 changes: 24 additions & 7 deletions rsworkspace/crates/acp-nats-ws/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
# ACP NATS WebSocket
# ACP NATS Streamable HTTP & WebSocket

Translates [Agent Client Protocol](https://agentclientprotocol.com) (ACP) messages between WebSocket connections and [NATS](https://nats.io), letting browser-based UIs and remote clients talk to distributed agent backends over a standard WebSocket endpoint.
Translates [Agent Client Protocol](https://agentclientprotocol.com) (ACP) messages between [NATS](https://nats.io) and the draft remote transport served on `/acp`, including both Streamable HTTP (`POST`/`GET`/`DELETE`) and WebSocket upgrade.

For managed NATS infrastructure in production, we recommend <a href="https://synadia.com"><img src="../acp-nats-stdio/assets/synadia-logo.png" alt="Synadia" width="20" style="vertical-align: middle;"> Synadia</a>.

```mermaid
graph LR
A1[Client1] <-->|ws| B[acp-nats-ws]
A2[Client2] <-->|ws| B
A1[Client1] <-->|http or ws| B[acp-nats-ws]
A2[Client2] <-->|http or ws| B
AN[ClientN] <-->|ws| B
B <-->|NATS| C[Backend]
```

## Features

- Multiple concurrent WebSocket connections, each with its own ACP session
- Bidirectional ACP bridge with request forwarding
- Streamable HTTP transport on `/acp` with session-scoped SSE listeners
- WebSocket upgrade on `/acp`
- Multiple concurrent ACP connections sharing the same NATS bridge
- OpenTelemetry integration (logs, metrics, traces)
- Graceful shutdown (SIGINT/SIGTERM) with per-connection drain
- Custom prefix support for multi-tenancy
Expand All @@ -33,9 +34,25 @@ cargo build --release -p acp-nats-ws
Connect with any WebSocket client:

```bash
websocat ws://127.0.0.1:8080/ws
websocat ws://127.0.0.1:8080/acp
```

Or use Streamable HTTP:

```bash
curl -i \
-H 'Content-Type: application/json' \
-H 'Accept: application/json, text/event-stream' \
-d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":0}}' \
http://127.0.0.1:8080/acp
```

`POST /acp` returns an SSE response for JSON-RPC requests, `GET /acp` opens a session-scoped SSE listener with `Acp-Connection-Id` and `Acp-Session-Id`, and `DELETE /acp` terminates a connection. The WebSocket upgrade response and HTTP initialize response both include `Acp-Connection-Id`.

After `initialize`, HTTP clients may send `Acp-Protocol-Version` on `POST`/`GET`/`DELETE`. When present, it must match the negotiated ACP protocol version for that connection.

When clients send an `Origin` header, `/acp` validates it against the bound host and rejects disallowed origins with `403 Forbidden`. Streamable HTTP `POST` SSE responses also emit a priming SSE event ID before JSON-RPC payloads and attach event IDs to streamed JSON events.

## Configuration

### WebSocket Server
Expand Down
88 changes: 88 additions & 0 deletions rsworkspace/crates/acp-nats-ws/src/acp_connection_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct AcpConnectionId(uuid::Uuid);

impl AcpConnectionId {
pub fn new(uuid: uuid::Uuid) -> Self {
Self(uuid)
}

pub fn parse(s: &str) -> Result<Self, AcpConnectionIdError> {
uuid::Uuid::parse_str(s)
.map(Self::new)
.map_err(AcpConnectionIdError::InvalidUuid)
}
}

impl Default for AcpConnectionId {
fn default() -> Self {
Self::new(uuid::Uuid::now_v7())
}
}

impl std::fmt::Display for AcpConnectionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

#[derive(Debug)]
pub enum AcpConnectionIdError {
InvalidUuid(uuid::Error),
}

impl std::fmt::Display for AcpConnectionIdError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidUuid(error) => write!(f, "invalid ACP connection id: {error}"),
}
}
}

impl std::error::Error for AcpConnectionIdError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::InvalidUuid(error) => Some(error),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::error::Error as _;

#[test]
fn new_wraps_existing_uuid() {
let uuid = uuid::Uuid::nil();
assert_eq!(AcpConnectionId::new(uuid).to_string(), uuid.to_string());
}

#[test]
fn parse_round_trips_uuid() {
let id = AcpConnectionId::default();
let parsed = AcpConnectionId::parse(&id.to_string()).unwrap();
assert_eq!(parsed, id);
}

#[test]
fn parse_rejects_invalid_uuid() {
assert!(AcpConnectionId::parse("not-a-uuid").is_err());
}

#[test]
fn default_generates_non_empty_id() {
assert!(!AcpConnectionId::default().to_string().is_empty());
}

#[test]
fn parse_error_displays_context() {
let error = AcpConnectionId::parse("not-a-uuid").unwrap_err();
assert!(error.to_string().contains("invalid ACP connection id"));
}

#[test]
fn parse_error_exposes_uuid_source() {
let error = AcpConnectionId::parse("not-a-uuid").unwrap_err();
assert!(error.source().is_some());
}
}
74 changes: 47 additions & 27 deletions rsworkspace/crates/acp-nats-ws/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use tokio::sync::watch;
use tracing::{error, info, warn};
use trogon_std::time::SystemClock;

use crate::acp_connection_id::AcpConnectionId;
use crate::constants::DUPLEX_BUFFER_SIZE;

/// Handles a single WebSocket connection by bridging it to NATS via ACP.
pub async fn handle<N, J>(
connection_id: AcpConnectionId,
socket: WebSocket,
nats_client: N,
js_client: J,
Expand Down Expand Up @@ -68,7 +70,7 @@ pub async fn handle<N, J>(

let mut io_task = tokio::task::spawn_local(io_task);

info!("WebSocket connection established, ACP bridge running");
info!(%connection_id, "WebSocket connection established, ACP bridge running");

let shutdown_result = tokio::select! {
result = &mut client_task => {
Expand Down Expand Up @@ -118,8 +120,8 @@ pub async fn handle<N, J>(
}

match shutdown_result {
Ok(()) => info!("WebSocket connection closed cleanly"),
Err(e) => warn!(error = e, "WebSocket connection closed with error"),
Ok(()) => info!(%connection_id, "WebSocket connection closed cleanly"),
Err(e) => warn!(%connection_id, error = e, "WebSocket connection closed with error"),
}
}

Expand All @@ -128,34 +130,24 @@ async fn run_recv_pump(
mut ws_recv_write: tokio::io::DuplexStream,
) {
while let Some(Ok(msg)) = ws_receiver.next().await {
let bytes = match msg {
Message::Text(t) => bytes::Bytes::from(t),
Message::Binary(b) => b,
let text = match msg {
Message::Text(text) => text,
Message::Binary(_) => continue,
Comment thread
cursor[bot] marked this conversation as resolved.
Message::Close(_) => break,
_ => continue,
};

match std::str::from_utf8(&bytes) {
Ok(text) => {
let line = text.trim_end_matches(['\r', '\n']);
if line.is_empty() {
continue;
}
let line = text.trim_end_matches(['\r', '\n']);
if line.is_empty() {
continue;
}

if ws_recv_write.write_all(line.as_bytes()).await.is_err() {
break;
}
if ws_recv_write.write_all(line.as_bytes()).await.is_err() {
break;
}

if ws_recv_write.write_all(b"\n").await.is_err() {
break;
}
}
Err(e) => {
warn!(
error = %e,
"Received non-UTF-8 WebSocket message, dropping frame"
);
}
if ws_recv_write.write_all(b"\n").await.is_err() {
break;
}
}
}
Expand Down Expand Up @@ -196,6 +188,8 @@ mod tests {
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;

use crate::constants::ACP_ENDPOINT;

#[derive(Clone)]
struct EchoState;

Expand All @@ -213,12 +207,12 @@ mod tests {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let app = axum::Router::new()
.route("/ws", axum::routing::get(echo_handler))
.route(ACP_ENDPOINT, axum::routing::get(echo_handler))
.with_state(EchoState);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("ws://{}/ws", addr)
format!("ws://{}{}", addr, ACP_ENDPOINT)
}

#[tokio::test]
Expand All @@ -245,4 +239,30 @@ mod tests {
}
}
}

#[tokio::test]
async fn binary_messages_are_ignored() {
let url = start_echo_server().await;
let (mut ws, _) = connect_async(&url).await.unwrap();

ws.send(TungsteniteMessage::Binary(bytes::Bytes::from_static(
b"ignored",
)))
.await
.unwrap();
ws.send(TungsteniteMessage::Text("kept".into()))
.await
.unwrap();

let msg = tokio::time::timeout(Duration::from_secs(2), ws.next())
.await
.expect("timeout")
.expect("stream ended")
.unwrap();

match msg {
TungsteniteMessage::Text(text) => assert_eq!(text, "kept"),
other => panic!("expected text frame, got {other:?}"),
}
}
}
6 changes: 6 additions & 0 deletions rsworkspace/crates/acp-nats-ws/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::net::{IpAddr, Ipv4Addr};

pub const ACP_CONNECTION_ID_HEADER: &str = "acp-connection-id";
pub const ACP_ENDPOINT: &str = "/acp";
pub const ACP_PROTOCOL_VERSION_HEADER: &str = "acp-protocol-version";
pub const ACP_SESSION_ID_HEADER: &str = "acp-session-id";
pub const DEFAULT_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub const DEFAULT_PORT: u16 = 8080;
pub const DUPLEX_BUFFER_SIZE: usize = 64 * 1024;
pub const HTTP_CHANNEL_CAPACITY: usize = 64;
pub const THREAD_NAME: &str = "acp-ws-local";
pub const X_ACCEL_BUFFERING_HEADER: &str = "x-accel-buffering";
Loading
Loading