Skip to content

Commit 466793c

Browse files
fix(acp-nats): fix compilation errors across the workspace
- Close unclosed handle() function in prompt.rs (missing `result` + `}`) - Remove dead code: delete unused helper functions from prompt.rs - Mark test-only helpers with #[cfg(test)]; trim cfg(test) imports to only what's needed - Centralize session_ready publish logic in agent/mod.rs; remove duplicates from load_session.rs and new_session.rs - Add terminal_output_cap field and supports_terminal_output() getter to Bridge - Fix metrics field name: errors_total → errors - Fix futures_util import in bridge_integration.rs (use futures::StreamExt) - Add prompt/prompt_wildcard/prompt_events aliases to subjects::agent for runner crate - Fix futures_util import in runner.rs (use futures_util::StreamExt — crate dependency) - Remove duplicate test_flush_policy_default in trogon-nats/messaging.rs - Declare prompt_event module in lib.rs
1 parent 2d9157e commit 466793c

10 files changed

Lines changed: 46 additions & 348 deletions

File tree

rsworkspace/crates/acp-nats/src/agent/load_session.rs

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,12 @@
11
use super::Bridge;
2-
use crate::acp_prefix::AcpPrefix;
32
use crate::error::AGENT_UNAVAILABLE;
4-
use crate::nats::{
5-
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
6-
RetryPolicy, agent,
7-
};
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
84
use crate::session_id::AcpSessionId;
9-
use crate::telemetry::metrics::Metrics;
105
use agent_client_protocol::{Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result};
11-
use std::time::Duration;
126
use tracing::{info, instrument, warn};
137
use trogon_nats::NatsError;
148
use trogon_std::time::GetElapsed;
159

16-
const SESSION_READY_DELAY: Duration = Duration::from_millis(100);
17-
1810
fn map_load_session_error(e: NatsError) -> Error {
1911
match &e {
2012
NatsError::Timeout { subject } => {
@@ -102,38 +94,6 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
10294
result
10395
}
10496

105-
async fn publish_session_ready<N: PublishClient + FlushClient>(
106-
nats: &N,
107-
prefix: &AcpPrefix,
108-
session_id: &AcpSessionId,
109-
metrics: &Metrics,
110-
) {
111-
tokio::time::sleep(SESSION_READY_DELAY).await;
112-
113-
let subject = agent::ext_session_ready(prefix.as_str(), session_id.as_str());
114-
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
115-
116-
let message = ExtSessionReady::new(agent_client_protocol::SessionId::from(
117-
session_id.to_string(),
118-
));
119-
120-
let options = PublishOptions::builder()
121-
.publish_retry_policy(RetryPolicy::standard())
122-
.flush_policy(FlushPolicy::standard())
123-
.build();
124-
125-
if let Err(e) = nats::publish(nats, &subject, &message, options).await {
126-
warn!(
127-
error = %e,
128-
session_id = %session_id,
129-
"Failed to publish session.ready"
130-
);
131-
metrics.record_error("session_ready", "session_ready_publish_failed");
132-
} else {
133-
info!(session_id = %session_id, "Published session.ready");
134-
}
135-
}
136-
13797
#[cfg(test)]
13898
mod tests {
13999
use super::{Bridge, map_load_session_error};

rsworkspace/crates/acp-nats/src/agent/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ use agent_client_protocol::{
2929
ExtRequest, ExtResponse, ForkSessionRequest, ForkSessionResponse, InitializeRequest,
3030
InitializeResponse, ListSessionsRequest, ListSessionsResponse, LoadSessionRequest,
3131
LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse,
32-
Result, SessionId, SessionNotification, SetSessionModeRequest, SetSessionModeResponse,
32+
Result, ResumeSessionRequest, ResumeSessionResponse, SessionId, SessionNotification,
33+
SetSessionConfigOptionRequest, SetSessionConfigOptionResponse, SetSessionModeRequest,
34+
SetSessionModeResponse, SetSessionModelRequest, SetSessionModelResponse,
3335
};
3436
use opentelemetry::metrics::Meter;
3537
use std::cell::RefCell;
@@ -54,6 +56,7 @@ pub struct Bridge<N, C: GetElapsed> {
5456
pub(crate) notification_sender: mpsc::Sender<SessionNotification>,
5557
pub(crate) pending_session_prompt_responses: PendingSessionPromptResponseWaiters<C::Instant>,
5658
pub(crate) background_tasks: RefCell<Vec<JoinHandle<()>>>,
59+
pub(crate) terminal_output_cap: std::cell::Cell<bool>,
5760
}
5861

5962
impl<N, C: GetElapsed> Bridge<N, C> {
@@ -72,9 +75,18 @@ impl<N, C: GetElapsed> Bridge<N, C> {
7275
notification_sender,
7376
pending_session_prompt_responses: PendingSessionPromptResponseWaiters::new(),
7477
background_tasks: RefCell::new(Vec::new()),
78+
terminal_output_cap: std::cell::Cell::new(false),
7579
}
7680
}
7781

82+
pub fn set_terminal_output_cap(&self, value: bool) {
83+
self.terminal_output_cap.set(value);
84+
}
85+
86+
pub fn supports_terminal_output(&self) -> bool {
87+
self.terminal_output_cap.get()
88+
}
89+
7890
pub(crate) fn nats(&self) -> &N {
7991
&self.nats
8092
}

rsworkspace/crates/acp-nats/src/agent/new_session.rs

Lines changed: 2 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,11 @@
11
use super::Bridge;
22
use crate::error::AGENT_UNAVAILABLE;
3-
use crate::nats::{
4-
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
5-
RetryPolicy, agent,
6-
};
7-
use crate::telemetry::metrics::Metrics;
8-
use agent_client_protocol::{
9-
Error, ErrorCode, NewSessionRequest, NewSessionResponse, Result, SessionId,
10-
};
11-
use std::time::Duration;
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
4+
use agent_client_protocol::{Error, ErrorCode, NewSessionRequest, NewSessionResponse, Result};
125
use tracing::{Span, info, instrument, warn};
136
use trogon_nats::NatsError;
147
use trogon_std::time::GetElapsed;
158

16-
/// Delay before publishing `session.ready` to NATS.
17-
///
18-
/// The `Agent` trait returns the response value *before* the transport layer
19-
/// serializes and writes it to the client. Without a delay the spawned task
20-
/// could publish `session.ready` to NATS before the client has received the
21-
/// `session/new` response, violating the ordering guarantee documented on
22-
/// [`ExtSessionReady`].
23-
///
24-
/// A post-send callback from the transport would be the ideal fix, but the
25-
/// external `agent_client_protocol` crate does not expose one. This constant
26-
/// delay provides a practical safety margin (serialization + write is typically
27-
/// sub-millisecond).
28-
const SESSION_READY_DELAY: Duration = Duration::from_millis(100);
29-
309
fn map_new_session_error(e: NatsError) -> Error {
3110
match &e {
3211
NatsError::Timeout { subject } => {
@@ -108,36 +87,6 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
10887
result
10988
}
11089

111-
async fn publish_session_ready<N: PublishClient + FlushClient>(
112-
nats: &N,
113-
prefix: &str,
114-
session_id: &SessionId,
115-
metrics: &Metrics,
116-
) {
117-
tokio::time::sleep(SESSION_READY_DELAY).await;
118-
119-
let subject = agent::ext_session_ready(prefix, &session_id.to_string());
120-
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
121-
122-
let message = ExtSessionReady::new(session_id.clone());
123-
124-
let options = PublishOptions::builder()
125-
.publish_retry_policy(RetryPolicy::standard())
126-
.flush_policy(FlushPolicy::standard())
127-
.build();
128-
129-
if let Err(e) = nats::publish(nats, &subject, &message, options).await {
130-
warn!(
131-
error = %e,
132-
session_id = %session_id,
133-
"Failed to publish session.ready"
134-
);
135-
metrics.record_error("session_ready", "session_ready_publish_failed");
136-
} else {
137-
info!(session_id = %session_id, "Published session.ready");
138-
}
139-
}
140-
14190
#[cfg(test)]
14291
mod tests {
14392
use super::{Bridge, map_new_session_error};

0 commit comments

Comments
 (0)