Skip to content

Commit 4ee9b92

Browse files
committed
docs(acp-nats): document subject-to-transport mapping in subjects.rs
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent d187733 commit 4ee9b92

1 file changed

Lines changed: 99 additions & 0 deletions

File tree

rsworkspace/crates/acp-nats/src/nats/subjects.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,92 @@
1+
//! # ACP NATS Subject Hierarchy
2+
//!
3+
//! Every ACP subject, its transport, and why.
4+
//!
5+
//! ## Core NATS (request/reply or fire-and-forget)
6+
//!
7+
//! Stateless operations where the agent must be online now and the response
8+
//! is immediate. No session state to recover on failure — just retry.
9+
//!
10+
//! | Subject | Pattern | Rationale |
11+
//! |--------------------------------------|------------------|----------------------------------------------|
12+
//! | `agent.initialize` | request/reply | Handshake, agent must be alive |
13+
//! | `agent.authenticate` | request/reply | Auth, agent must be alive |
14+
//! | `agent.session.list` | request/reply | Read-only query, no state change |
15+
//! | `agent.ext.>` | request/reply | Stateless global extension calls |
16+
//! | `session.{id}.client.*` | request/reply | Agent-to-bridge ops (fs, terminal, perms) |
17+
//!
18+
//! ## JetStream (durable delivery via streams)
19+
//!
20+
//! Commands that create or mutate session state. Durable delivery means the
21+
//! agent receives the command even after a restart, and the bridge can replay
22+
//! responses on reconnect.
23+
//!
24+
//! | Subject | Stream | Rationale |
25+
//! |----------------------------------------------|-----------------|--------------------------------------------|
26+
//! | `agent.session.new` | GLOBAL | Creates session state |
27+
//! | `session.*.agent.cancel` | COMMANDS | Must survive agent reconnect |
28+
//! | `session.*.agent.prompt` | COMMANDS | Long-running, must survive restarts |
29+
//! | `session.*.agent.load` | COMMANDS | Loads session into agent, mutates state |
30+
//! | `session.*.agent.close` | COMMANDS | Ends session lifecycle |
31+
//! | `session.*.agent.set_mode` | COMMANDS | Mutates session config |
32+
//! | `session.*.agent.set_config_option` | COMMANDS | Mutates session config |
33+
//! | `session.*.agent.set_model` | COMMANDS | Mutates session config |
34+
//! | `session.*.agent.fork` | COMMANDS | Creates new session from existing |
35+
//! | `session.*.agent.resume` | COMMANDS | Resumes session, mutates state |
36+
//! | `session.*.agent.prompt.response.>` | RESPONSES | Streamed prompt responses |
37+
//! | `session.*.agent.response.>` | RESPONSES | One-shot command responses |
38+
//! | `session.*.agent.ext.ready` | RESPONSES | Extension ready signal |
39+
//! | `session.*.agent.cancelled` | RESPONSES | Cancellation confirmation |
40+
//! | `session.*.agent.update.>` | NOTIFICATIONS | Async streaming updates |
41+
//!
42+
//! ## JetStream Streams on Core NATS Subjects
43+
//!
44+
//! The Core NATS subjects above also have JetStream streams configured on
45+
//! them. NATS is pub/sub — when a message is published, every subscriber
46+
//! gets a copy, and a JetStream stream is just another subscriber. The
47+
//! stream persists a copy of every message that matches its subject filter.
48+
//!
49+
//! The bridge doesn't use these streams for the request/reply flow — it
50+
//! uses Core NATS directly. But the streams are there, capturing every
51+
//! message for observability: audit logs, metrics, debugging replay,
52+
//! alerting. Create consumers on these streams after the fact to read them.
53+
//!
54+
//! | Subject | Stream |
55+
//! |-------------------------|------------|
56+
//! | `agent.initialize` | GLOBAL |
57+
//! | `agent.authenticate` | GLOBAL |
58+
//! | `agent.session.new` | GLOBAL |
59+
//! | `agent.ext.>` | GLOBAL_EXT |
60+
//!
61+
//! ## Not in JetStream
62+
//!
63+
//! | Subject | Reason |
64+
//! |------------------------|-----------------------------------------------------------|
65+
//! | `agent.session.list` | Pure read, no state change, no audit value |
66+
//! | `session.*.client.>` | Agent-to-bridge ops, bridge IS the process, no disconnect |
67+
168
pub mod agent {
69+
/// Bridge: Core NATS request/reply. Stream: GLOBAL (observability).
270
pub fn initialize(prefix: &str) -> String {
371
format!("{}.agent.initialize", prefix)
472
}
573

74+
/// Bridge: Core NATS request/reply. Stream: GLOBAL (observability).
675
pub fn authenticate(prefix: &str) -> String {
776
format!("{}.agent.authenticate", prefix)
877
}
978

79+
/// Bridge: JetStream publish. Stream: GLOBAL.
1080
pub fn session_new(prefix: &str) -> String {
1181
format!("{}.agent.session.new", prefix)
1282
}
1383

84+
/// Bridge: Core NATS request/reply. No stream.
1485
pub fn session_list(prefix: &str) -> String {
1586
format!("{}.agent.session.list", prefix)
1687
}
1788

89+
/// Bridge: Core NATS request/reply. Stream: GLOBAL_EXT (observability).
1890
pub fn ext(prefix: &str, method: &str) -> String {
1991
format!("{}.agent.ext.{}", prefix, method)
2092
}
@@ -28,10 +100,12 @@ pub mod agent {
28100

29101
pub mod session {
30102
pub mod agent {
103+
/// Bridge: JetStream publish. Stream: COMMANDS.
31104
pub fn load(prefix: &str, session_id: &str) -> String {
32105
format!("{}.session.{}.agent.load", prefix, session_id)
33106
}
34107

108+
/// Bridge: JetStream publish. Stream: COMMANDS.
35109
pub fn prompt(prefix: &str, session_id: &str) -> String {
36110
format!("{}.session.{}.agent.prompt", prefix, session_id)
37111
}
@@ -40,53 +114,69 @@ pub mod session {
40114
format!("{}.session.*.agent.prompt", prefix)
41115
}
42116

117+
/// Bridge: Core NATS publish (fire-and-forget). Stream: COMMANDS.
118+
///
119+
/// Published via `PublishClient`, not `JetStreamPublisher`. The COMMANDS
120+
/// stream captures it because the stream subscribes to this subject.
121+
/// The agent-side consumes it from the stream.
43122
pub fn cancel(prefix: &str, session_id: &str) -> String {
44123
format!("{}.session.{}.agent.cancel", prefix, session_id)
45124
}
46125

126+
/// Agent → bridge broadcast confirming cancellation. Stream: RESPONSES.
47127
pub fn cancelled(prefix: &str, session_id: &str) -> String {
48128
format!("{}.session.{}.agent.cancelled", prefix, session_id)
49129
}
50130

131+
/// Bridge: JetStream publish. Stream: COMMANDS.
51132
pub fn set_mode(prefix: &str, session_id: &str) -> String {
52133
format!("{}.session.{}.agent.set_mode", prefix, session_id)
53134
}
54135

136+
/// Bridge: JetStream publish. Stream: COMMANDS.
55137
pub fn set_config_option(prefix: &str, session_id: &str) -> String {
56138
format!("{}.session.{}.agent.set_config_option", prefix, session_id)
57139
}
58140

141+
/// Bridge: JetStream publish. Stream: COMMANDS.
59142
pub fn set_model(prefix: &str, session_id: &str) -> String {
60143
format!("{}.session.{}.agent.set_model", prefix, session_id)
61144
}
62145

146+
/// Bridge: JetStream publish. Stream: COMMANDS.
63147
pub fn fork(prefix: &str, session_id: &str) -> String {
64148
format!("{}.session.{}.agent.fork", prefix, session_id)
65149
}
66150

151+
/// Bridge: JetStream publish. Stream: COMMANDS.
67152
pub fn resume(prefix: &str, session_id: &str) -> String {
68153
format!("{}.session.{}.agent.resume", prefix, session_id)
69154
}
70155

156+
/// Bridge: JetStream publish. Stream: COMMANDS.
71157
pub fn close(prefix: &str, session_id: &str) -> String {
72158
format!("{}.session.{}.agent.close", prefix, session_id)
73159
}
74160

161+
/// Agent → bridge signal. Stream: RESPONSES.
75162
pub fn ext_ready(prefix: &str, session_id: &str) -> String {
76163
format!("{}.session.{}.agent.ext.ready", prefix, session_id)
77164
}
78165

166+
/// Agent → bridge async notification. Stream: NOTIFICATIONS.
79167
pub fn update(prefix: &str, session_id: &str, req_id: &str) -> String {
80168
format!("{}.session.{}.agent.update.{}", prefix, session_id, req_id)
81169
}
82170

171+
/// Agent → bridge streamed response. Stream: RESPONSES.
83172
pub fn prompt_response(prefix: &str, session_id: &str, req_id: &str) -> String {
84173
format!(
85174
"{}.session.{}.agent.prompt.response.{}",
86175
prefix, session_id, req_id
87176
)
88177
}
89178

179+
/// Agent → bridge one-shot response. Stream: RESPONSES.
90180
pub fn response(prefix: &str, session_id: &str, req_id: &str) -> String {
91181
format!(
92182
"{}.session.{}.agent.response.{}",
@@ -96,44 +186,53 @@ pub mod session {
96186
}
97187

98188
pub mod client {
189+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
99190
pub fn fs_read_text_file(prefix: &str, session_id: &str) -> String {
100191
format!("{}.session.{}.client.fs.read_text_file", prefix, session_id)
101192
}
102193

194+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
103195
pub fn fs_write_text_file(prefix: &str, session_id: &str) -> String {
104196
format!(
105197
"{}.session.{}.client.fs.write_text_file",
106198
prefix, session_id
107199
)
108200
}
109201

202+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
110203
pub fn session_request_permission(prefix: &str, session_id: &str) -> String {
111204
format!(
112205
"{}.session.{}.client.session.request_permission",
113206
prefix, session_id
114207
)
115208
}
116209

210+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
117211
pub fn session_update(prefix: &str, session_id: &str) -> String {
118212
format!("{}.session.{}.client.session.update", prefix, session_id)
119213
}
120214

215+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
121216
pub fn terminal_create(prefix: &str, session_id: &str) -> String {
122217
format!("{}.session.{}.client.terminal.create", prefix, session_id)
123218
}
124219

220+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
125221
pub fn terminal_kill(prefix: &str, session_id: &str) -> String {
126222
format!("{}.session.{}.client.terminal.kill", prefix, session_id)
127223
}
128224

225+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
129226
pub fn terminal_output(prefix: &str, session_id: &str) -> String {
130227
format!("{}.session.{}.client.terminal.output", prefix, session_id)
131228
}
132229

230+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
133231
pub fn terminal_release(prefix: &str, session_id: &str) -> String {
134232
format!("{}.session.{}.client.terminal.release", prefix, session_id)
135233
}
136234

235+
/// Agent → bridge. Core NATS request/reply. Stream: CLIENT_OPS.
137236
pub fn terminal_wait_for_exit(prefix: &str, session_id: &str) -> String {
138237
format!(
139238
"{}.session.{}.client.terminal.wait_for_exit",

0 commit comments

Comments
 (0)