Skip to content

Commit ca5db16

Browse files
committed
feat(acp-nats): add prompt handler (#20)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 263c14b commit ca5db16

File tree

12 files changed

+1198
-70
lines changed

12 files changed

+1198
-70
lines changed

rsworkspace/crates/AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Prefer domain-specific value objects over primitives (e.g. `AcpPrefix` not `String`). Each type's factory must guarantee correctness at construction—invalid instances should be unrepresentable. Validate per-type, not per-aggregate: avoid validating unrelated fields together in a single constructor.
22

3-
Every value object lives in its own file named after the type (e.g. `acp_prefix.rs`, `ext_method_name.rs`, `session_id.rs`). Never inline a value object into a config, aggregate, or service file.
3+
Every value object lives in its own file named after the type (e.g. `acp_prefix.rs`, `ext_method_name.rs`, `session_id.rs`). Never inline a value object into a config, aggregate, or service file. File layout: `src/{type_snake_case}.rs`; export in `lib.rs` as `pub use {module}::{Type, TypeError}`.
44

55
You must use the `test-support` feature to share test helpers between crates.
66
Prefer one trait per operation over a single trait with multiple operations.

rsworkspace/crates/acp-nats/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ trogon-std = { path = "../trogon-std" }
2121

2222
[dev-dependencies]
2323
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "metrics", "testing"] }
24+
tokio = { version = "1.49.0", features = ["test-util"] }
2425
trogon-nats = { path = "../trogon-nats", features = ["test-support"] }
2526
trogon-std = { path = "../trogon-std", features = ["test-support"] }

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use agent_client_protocol::{CancelNotification, Error, ErrorCode, Result};
55
use tracing::{info, instrument, warn};
66
use trogon_std::time::GetElapsed;
77

8-
/// Publishes the cancel notification to the backend via NATS (fire-and-forget).
9-
/// The publish failure is logged and recorded as a metric but does not propagate
10-
/// to the caller, so the client always receives `Ok(())`.
8+
/// Handles cancel notification requests.
9+
///
10+
/// Validates the session ID and publishes the cancellation to the backend (fire-and-forget).
11+
/// The backend owns session state and will respond to the in-flight prompt with `stopReason: cancelled`.
12+
/// Publish failure is logged and recorded in metrics but does not propagate to the caller.
1113
#[instrument(
1214
name = "acp.session.cancel",
1315
skip(bridge, args),
@@ -25,9 +27,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
2527
bridge
2628
.metrics
2729
.record_request("cancel", bridge.clock.elapsed(start).as_secs_f64(), false);
28-
bridge
29-
.metrics
30-
.record_error("session_validate", "invalid_session_id");
30+
bridge.metrics.record_error("cancel", "invalid_session_id");
3131
Error::new(
3232
ErrorCode::InvalidParams.into(),
3333
format!("Invalid session ID: {}", e),
@@ -46,7 +46,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
4646
)
4747
.await;
4848

49-
if let Err(error) = publish_result {
49+
if let Err(error) = &publish_result {
5050
warn!(
5151
session_id = %args.session_id,
5252
error = %error,
@@ -57,9 +57,11 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
5757
.record_error("cancel", "cancel_publish_failed");
5858
}
5959

60-
bridge
61-
.metrics
62-
.record_request("cancel", bridge.clock.elapsed(start).as_secs_f64(), true);
60+
bridge.metrics.record_request(
61+
"cancel",
62+
bridge.clock.elapsed(start).as_secs_f64(),
63+
publish_result.is_ok(),
64+
);
6365

6466
Ok(())
6567
}
@@ -223,8 +225,8 @@ mod tests {
223225
"expected acp.request.count with method=cancel, success=false on validation failure"
224226
);
225227
assert!(
226-
has_error_metric(&finished_metrics, "session_validate", "invalid_session_id"),
227-
"expected acp.errors.total with operation=session_validate, reason=invalid_session_id"
228+
has_error_metric(&finished_metrics, "cancel", "invalid_session_id"),
229+
"expected acp.errors.total with operation=cancel, reason=invalid_session_id"
228230
);
229231
provider.shutdown().unwrap();
230232
}
@@ -258,8 +260,8 @@ mod tests {
258260
"expected acp.errors.total with operation=cancel, reason=cancel_publish_failed"
259261
);
260262
assert!(
261-
has_request_metric(&finished_metrics, "cancel", true),
262-
"publish failure is fire-and-forget; caller still gets Ok, so success=true"
263+
has_request_metric(&finished_metrics, "cancel", false),
264+
"request metric records publish outcome; success=false when publish fails"
263265
);
264266
provider.shutdown().unwrap();
265267
}

rsworkspace/crates/acp-nats/src/agent/load_session.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::Bridge;
22
use crate::acp_prefix::AcpPrefix;
3+
use crate::config::SESSION_READY_DELAY;
34
use crate::error::AGENT_UNAVAILABLE;
45
use crate::nats::{
56
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
@@ -8,13 +9,10 @@ use crate::nats::{
89
use crate::session_id::AcpSessionId;
910
use crate::telemetry::metrics::Metrics;
1011
use agent_client_protocol::{Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result};
11-
use std::time::Duration;
1212
use tracing::{info, instrument, warn};
1313
use trogon_nats::NatsError;
1414
use trogon_std::time::GetElapsed;
1515

16-
const SESSION_READY_DELAY: Duration = Duration::from_millis(100);
17-
1816
fn map_load_session_error(e: NatsError) -> Error {
1917
match &e {
2018
NatsError::Timeout { subject } => {

rsworkspace/crates/acp-nats/src/agent/mod.rs

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ mod ext_notification;
55
mod initialize;
66
mod load_session;
77
mod new_session;
8+
mod prompt;
89
mod set_session_mode;
910

1011
use crate::config::Config;
1112
use crate::nats::{FlushClient, PublishClient, RequestClient};
13+
use crate::pending_prompt_waiters::PendingSessionPromptResponseWaiters;
14+
use crate::prompt_slot_counter::PromptSlotCounter;
1215
use crate::telemetry::metrics::Metrics;
13-
use agent_client_protocol::ErrorCode;
1416
use agent_client_protocol::{
15-
Agent, AuthenticateRequest, AuthenticateResponse, CancelNotification, Error, ExtNotification,
17+
Agent, AuthenticateRequest, AuthenticateResponse, CancelNotification, ExtNotification,
1618
ExtRequest, ExtResponse, InitializeRequest, InitializeResponse, LoadSessionRequest,
1719
LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse,
1820
Result, SetSessionModeRequest, SetSessionModeResponse,
@@ -23,17 +25,22 @@ use trogon_std::time::GetElapsed;
2325
pub struct Bridge<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> {
2426
pub(crate) nats: N,
2527
pub(crate) clock: C,
26-
pub(crate) config: Config,
2728
pub(crate) metrics: Metrics,
29+
pub(crate) pending_session_prompt_responses: PendingSessionPromptResponseWaiters<C::Instant>,
30+
pub(crate) prompt_slot_counter: PromptSlotCounter,
31+
pub(crate) config: Config,
2832
}
2933

3034
impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C> {
3135
pub fn new(nats: N, clock: C, meter: &Meter, config: Config) -> Self {
36+
let max_concurrent = config.max_concurrent_client_tasks();
3237
Self {
3338
nats,
3439
clock,
3540
config,
3641
metrics: Metrics::new(meter),
42+
pending_session_prompt_responses: PendingSessionPromptResponseWaiters::new(),
43+
prompt_slot_counter: PromptSlotCounter::new(max_concurrent),
3744
}
3845
}
3946

@@ -67,11 +74,8 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Agent for Br
6774
set_session_mode::handle(self, args).await
6875
}
6976

70-
async fn prompt(&self, _args: PromptRequest) -> Result<PromptResponse> {
71-
Err(Error::new(
72-
ErrorCode::InternalError.into(),
73-
"not yet implemented",
74-
))
77+
async fn prompt(&self, args: PromptRequest) -> Result<PromptResponse> {
78+
prompt::handle(self, args).await
7579
}
7680

7781
async fn cancel(&self, args: CancelNotification) -> Result<()> {
@@ -88,37 +92,14 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Agent for Br
8892
}
8993

9094
#[cfg(test)]
91-
mod tests {
95+
mod send_sync_tests {
9296
use super::Bridge;
93-
use crate::config::Config;
94-
use agent_client_protocol::{Agent, PromptRequest};
9597
use trogon_nats::AdvancedMockNatsClient;
98+
use trogon_std::time::SystemClock;
9699

97-
fn mock_bridge() -> Bridge<AdvancedMockNatsClient, trogon_std::time::SystemClock> {
98-
Bridge::new(
99-
AdvancedMockNatsClient::new(),
100-
trogon_std::time::SystemClock,
101-
&opentelemetry::global::meter("acp-nats-test"),
102-
Config::for_test("acp"),
103-
)
104-
}
105-
106-
#[tokio::test]
107-
async fn stub_methods_return_not_implemented() {
108-
let bridge = mock_bridge();
109-
let msg = "not yet implemented";
110-
111-
assert!(
112-
bridge
113-
.prompt(PromptRequest::new("s1", vec![]))
114-
.await
115-
.is_err()
116-
);
117-
118-
let err = bridge
119-
.prompt(PromptRequest::new("s1", vec![]))
120-
.await
121-
.unwrap_err();
122-
assert!(err.to_string().contains(msg));
100+
#[test]
101+
fn bridge_is_send_and_sync() {
102+
fn assert_send_sync<T: Send + Sync>() {}
103+
assert_send_sync::<Bridge<AdvancedMockNatsClient, SystemClock>>();
123104
}
124105
}

rsworkspace/crates/acp-nats/src/agent/new_session.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::Bridge;
2+
use crate::config::SESSION_READY_DELAY;
23
use crate::error::AGENT_UNAVAILABLE;
34
use crate::nats::{
45
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
@@ -8,25 +9,10 @@ use crate::telemetry::metrics::Metrics;
89
use agent_client_protocol::{
910
Error, ErrorCode, NewSessionRequest, NewSessionResponse, Result, SessionId,
1011
};
11-
use std::time::Duration;
1212
use tracing::{Span, info, instrument, warn};
1313
use trogon_nats::NatsError;
1414
use trogon_std::time::GetElapsed;
1515

16-
/// Delay before publishing `session.ready` to NATS.
17-
///
18-
/// The `Agent` trait returns the response value *before* the transport layer
19-
/// serializes and writes it to the client. Without a delay the spawned task
20-
/// could publish `session.ready` to NATS before the client has received the
21-
/// `session/new` response, violating the ordering guarantee documented on
22-
/// [`ExtSessionReady`].
23-
///
24-
/// A post-send callback from the transport would be the ideal fix, but the
25-
/// external `agent_client_protocol` crate does not expose one. This constant
26-
/// delay provides a practical safety margin (serialization + write is typically
27-
/// sub-millisecond).
28-
const SESSION_READY_DELAY: Duration = Duration::from_millis(100);
29-
3016
fn map_new_session_error(e: NatsError) -> Error {
3117
match &e {
3218
NatsError::Timeout { subject } => {

0 commit comments

Comments
 (0)