Skip to content

Commit c302236

Browse files
authored
refactor(acp-nats): introduce ReqId and ExtMethodName as value objects (#85)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent ab551d3 commit c302236

16 files changed

Lines changed: 166 additions & 53 deletions

File tree

rsworkspace/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ tracing-opentelemetry = "=0.32.1"
4848
tracing-subscriber = "=0.3.23"
4949

5050
# Misc
51-
uuid = { version = "=1.23.0", features = ["v4"] }
51+
uuid = { version = "=1.23.0", features = ["v4", "v7"] }
5252

5353
[profile.release]
5454
lto = "thin"

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ async fn dispatch_js_message<N: PublishClient + FlushClient, A: Agent, M: JsDisp
569569
.headers
570570
.as_ref()
571571
.and_then(|h| h.get(trogon_nats::REQ_ID_HEADER))
572-
.map(|v| v.as_str().to_string());
572+
.map(|v| acp_nats::ReqId::from_header(v.as_str()));
573573

574574
let reply_subject: Option<String> = match (&req_id, &method) {
575575
(Some(rid), SessionAgentMethod::Prompt) => Some(

rsworkspace/crates/acp-nats/src/agent/bridge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ where
153153
Res: serde::de::DeserializeOwned,
154154
{
155155
let subject_str = subject.to_string();
156-
let req_id = uuid::Uuid::new_v4().to_string();
156+
let req_id = crate::req_id::ReqId::new();
157157
js_request::js_request::<J, _, Res, _>(
158158
self.js(),
159159
&subject_str,

rsworkspace/crates/acp-nats/src/agent/ext_method.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub async fn handle<N: RequestClient, C: GetElapsed, J>(
3535
})?;
3636

3737
let nats = bridge.nats();
38-
let subject = agent::ExtSubject::new(bridge.config.acp_prefix_ref(), method_name.as_str());
38+
let subject = agent::ExtSubject::new(bridge.config.acp_prefix_ref(), &method_name);
3939

4040
let result = nats::request_with_timeout::<N, ExtRequest, ExtResponse>(
4141
nats,

rsworkspace/crates/acp-nats/src/agent/ext_notification.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ pub async fn handle<N: PublishClient + FlushClient, C: GetElapsed, J>(
3838
)
3939
})?;
4040

41-
let subject =
42-
agent::ExtNotifySubject::new(bridge.config.acp_prefix_ref(), method_name.as_str());
41+
let subject = agent::ExtNotifySubject::new(bridge.config.acp_prefix_ref(), &method_name);
4342

4443
let publish_result = nats::publish(
4544
bridge.nats(),

rsworkspace/crates/acp-nats/src/agent/js_request.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use trogon_std::JsonSerialize;
1515
use crate::acp_prefix::AcpPrefix;
1616
use crate::constants::SESSION_ID_HEADER;
1717
use crate::jetstream::{consumers, streams};
18+
use crate::req_id::ReqId;
1819

1920
#[allow(clippy::too_many_arguments)]
2021
pub async fn js_request<J, Req, Res, S>(
@@ -24,7 +25,7 @@ pub async fn js_request<J, Req, Res, S>(
2425
serializer: &S,
2526
prefix: &AcpPrefix,
2627
session_id: &crate::session_id::AcpSessionId,
27-
req_id: &str,
28+
req_id: &ReqId,
2829
operation_timeout: Duration,
2930
) -> agent_client_protocol::Result<Res>
3031
where
@@ -60,7 +61,7 @@ where
6061
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("serialize: {e}")))?;
6162

6263
let mut headers = async_nats::HeaderMap::new();
63-
headers.insert(REQ_ID_HEADER, req_id);
64+
headers.insert(REQ_ID_HEADER, req_id.as_str());
6465
headers.insert(SESSION_ID_HEADER, session_id.as_str());
6566

6667
js.publish_with_headers(subject.to_string(), headers, Bytes::from(payload_bytes))
@@ -114,6 +115,7 @@ mod tests {
114115
use trogon_nats::jetstream::mocks::*;
115116

116117
use crate::agent::test_support::MockJs;
118+
use crate::req_id::ReqId;
117119
use crate::session_id::AcpSessionId;
118120

119121
fn test_prefix() -> AcpPrefix {
@@ -153,7 +155,7 @@ mod tests {
153155
&trogon_std::StdJsonSerialize,
154156
&test_prefix(),
155157
&test_sid("s1"),
156-
"req-1",
158+
&ReqId::from_test("req-1"),
157159
Duration::from_secs(5),
158160
)
159161
.await;
@@ -179,7 +181,7 @@ mod tests {
179181
&trogon_std::StdJsonSerialize,
180182
&test_prefix(),
181183
&test_sid("s1"),
182-
"req-1",
184+
&ReqId::from_test("req-1"),
183185
Duration::from_secs(5),
184186
)
185187
.await;
@@ -199,7 +201,7 @@ mod tests {
199201
&trogon_std::StdJsonSerialize,
200202
&test_prefix(),
201203
&test_sid("s1"),
202-
"req-1",
204+
&ReqId::from_test("req-1"),
203205
Duration::from_secs(5),
204206
)
205207
.await;
@@ -226,7 +228,7 @@ mod tests {
226228
&trogon_std::StdJsonSerialize,
227229
&test_prefix(),
228230
&test_sid("s1"),
229-
"req-1",
231+
&ReqId::from_test("req-1"),
230232
Duration::from_secs(5),
231233
)
232234
.await;
@@ -251,7 +253,7 @@ mod tests {
251253
&trogon_std::StdJsonSerialize,
252254
&test_prefix(),
253255
&test_sid("s1"),
254-
"req-1",
256+
&ReqId::from_test("req-1"),
255257
Duration::from_secs(5),
256258
)
257259
.await;
@@ -273,7 +275,7 @@ mod tests {
273275
&trogon_std::StdJsonSerialize,
274276
&test_prefix(),
275277
&test_sid("s1"),
276-
"req-1",
278+
&ReqId::from_test("req-1"),
277279
Duration::from_millis(10),
278280
)
279281
.await;
@@ -310,7 +312,7 @@ mod tests {
310312
&trogon_std::StdJsonSerialize,
311313
&test_prefix(),
312314
&test_sid("s1"),
313-
"req-1",
315+
&ReqId::from_test("req-1"),
314316
Duration::from_secs(5),
315317
)
316318
.await;
@@ -335,7 +337,7 @@ mod tests {
335337
&trogon_std::StdJsonSerialize,
336338
&test_prefix(),
337339
&test_sid("s1"),
338-
"req-1",
340+
&ReqId::from_test("req-1"),
339341
Duration::from_secs(5),
340342
)
341343
.await;
@@ -367,7 +369,7 @@ mod tests {
367369
&trogon_std::StdJsonSerialize,
368370
&test_prefix(),
369371
&test_sid("s1"),
370-
"req-1",
372+
&ReqId::from_test("req-1"),
371373
Duration::from_secs(5),
372374
)
373375
.await;
@@ -389,7 +391,7 @@ mod tests {
389391
&trogon_std::FailNextSerialize::new(1),
390392
&test_prefix(),
391393
&test_sid("s1"),
392-
"req-1",
394+
&ReqId::from_test("req-1"),
393395
Duration::from_secs(5),
394396
)
395397
.await;

rsworkspace/crates/acp-nats/src/agent/prompt.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::agent::Bridge;
1616
use crate::constants::SESSION_ID_HEADER;
1717
use crate::jetstream::{consumers, streams};
1818
use crate::nats::{FlushClient, PublishClient, RequestClient, SubscribeClient, session};
19+
use crate::req_id::ReqId;
1920
use crate::session_id::AcpSessionId;
2021

2122
pub use trogon_nats::REQ_ID_HEADER;
@@ -44,7 +45,7 @@ where
4445
Error::new(ErrorCode::InvalidParams.into(), "Invalid session ID")
4546
})?;
4647

47-
let req_id = uuid::Uuid::new_v4().to_string();
48+
let req_id = ReqId::new();
4849
let prefix = bridge.config.acp_prefix_ref();
4950

5051
let result = handle_js(
@@ -74,7 +75,7 @@ async fn handle_js<N, C, J, S>(
7475
serializer: &S,
7576
session_id: &AcpSessionId,
7677
prefix: &crate::acp_prefix::AcpPrefix,
77-
req_id: &str,
78+
req_id: &ReqId,
7879
) -> agent_client_protocol::Result<PromptResponse>
7980
where
8081
N: SubscribeClient,
@@ -152,7 +153,7 @@ where
152153
.map_err(|e| Error::new(ErrorCode::InternalError.into(), format!("serialize: {e}")))?;
153154

154155
let mut headers = async_nats::HeaderMap::new();
155-
headers.insert(REQ_ID_HEADER, req_id);
156+
headers.insert(REQ_ID_HEADER, req_id.as_str());
156157
headers.insert(SESSION_ID_HEADER, session_id.as_str());
157158

158159
let prompt_subject = session::agent::PromptSubject::new(prefix, session_id);

rsworkspace/crates/acp-nats/src/jetstream/consumers.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ use async_nats::jetstream::consumer::pull::Config;
22
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, ReplayPolicy};
33

44
use crate::acp_prefix::AcpPrefix;
5+
use crate::req_id::ReqId;
56
use crate::session_id::AcpSessionId;
67

78
pub fn prompt_notifications_consumer(
89
prefix: &AcpPrefix,
910
session_id: &AcpSessionId,
10-
req_id: &str,
11+
req_id: &ReqId,
1112
) -> Config {
1213
let pfx = prefix.as_str();
1314
let sid = session_id.as_str();
@@ -23,7 +24,7 @@ pub fn prompt_notifications_consumer(
2324
pub fn prompt_response_consumer(
2425
prefix: &AcpPrefix,
2526
session_id: &AcpSessionId,
26-
req_id: &str,
27+
req_id: &ReqId,
2728
) -> Config {
2829
let pfx = prefix.as_str();
2930
let sid = session_id.as_str();
@@ -36,7 +37,7 @@ pub fn prompt_response_consumer(
3637
}
3738
}
3839

39-
pub fn response_consumer(prefix: &AcpPrefix, session_id: &AcpSessionId, req_id: &str) -> Config {
40+
pub fn response_consumer(prefix: &AcpPrefix, session_id: &AcpSessionId, req_id: &ReqId) -> Config {
4041
let pfx = prefix.as_str();
4142
let sid = session_id.as_str();
4243
Config {
@@ -73,9 +74,13 @@ mod tests {
7374
AcpSessionId::new(s).expect("test session id")
7475
}
7576

77+
fn rid(s: &str) -> ReqId {
78+
ReqId::from_test(s)
79+
}
80+
7681
#[test]
7782
fn prompt_notifications_consumer_filter() {
78-
let config = prompt_notifications_consumer(&p("acp"), &sid("sess-1"), "req-abc");
83+
let config = prompt_notifications_consumer(&p("acp"), &sid("sess-1"), &rid("req-abc"));
7984
assert_eq!(
8085
config.filter_subject,
8186
"acp.session.sess-1.agent.update.req-abc"
@@ -84,15 +89,15 @@ mod tests {
8489

8590
#[test]
8691
fn prompt_notifications_consumer_delivers_all() {
87-
let config = prompt_notifications_consumer(&p("acp"), &sid("s1"), "r1");
92+
let config = prompt_notifications_consumer(&p("acp"), &sid("s1"), &rid("r1"));
8893
assert_eq!(config.deliver_policy, DeliverPolicy::All);
8994
assert_eq!(config.ack_policy, AckPolicy::Explicit);
9095
assert_eq!(config.replay_policy, ReplayPolicy::Instant);
9196
}
9297

9398
#[test]
9499
fn prompt_response_consumer_filter() {
95-
let config = prompt_response_consumer(&p("acp"), &sid("sess-1"), "req-abc");
100+
let config = prompt_response_consumer(&p("acp"), &sid("sess-1"), &rid("req-abc"));
96101
assert_eq!(
97102
config.filter_subject,
98103
"acp.session.sess-1.agent.prompt.response.req-abc"
@@ -114,7 +119,7 @@ mod tests {
114119

115120
#[test]
116121
fn response_consumer_filter() {
117-
let config = response_consumer(&p("acp"), &sid("sess-1"), "req-abc");
122+
let config = response_consumer(&p("acp"), &sid("sess-1"), &rid("req-abc"));
118123
assert_eq!(
119124
config.filter_subject,
120125
"acp.session.sess-1.agent.response.req-abc"
@@ -123,21 +128,21 @@ mod tests {
123128

124129
#[test]
125130
fn response_consumer_delivers_all() {
126-
let config = response_consumer(&p("acp"), &sid("s1"), "r1");
131+
let config = response_consumer(&p("acp"), &sid("s1"), &rid("r1"));
127132
assert_eq!(config.deliver_policy, DeliverPolicy::All);
128133
assert_eq!(config.ack_policy, AckPolicy::Explicit);
129134
assert_eq!(config.replay_policy, ReplayPolicy::Instant);
130135
}
131136

132137
#[test]
133138
fn response_consumer_custom_prefix() {
134-
let config = response_consumer(&p("myapp"), &sid("s1"), "r1");
139+
let config = response_consumer(&p("myapp"), &sid("s1"), &rid("r1"));
135140
assert_eq!(config.filter_subject, "myapp.session.s1.agent.response.r1");
136141
}
137142

138143
#[test]
139144
fn custom_prefix_in_consumers() {
140-
let config = prompt_response_consumer(&p("myapp"), &sid("s1"), "r1");
145+
let config = prompt_response_consumer(&p("myapp"), &sid("s1"), &rid("r1"));
141146
assert_eq!(
142147
config.filter_subject,
143148
"myapp.session.s1.agent.prompt.response.r1"

rsworkspace/crates/acp-nats/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod jetstream;
1111
pub(crate) mod jsonrpc;
1212
pub mod nats;
1313
pub(crate) mod pending_prompt_waiters;
14+
pub mod req_id;
1415
pub mod session_id;
1516
pub(crate) mod telemetry;
1617

@@ -23,6 +24,7 @@ pub use config::{
2324
};
2425
pub use error::AGENT_UNAVAILABLE;
2526
pub use nats::{FlushClient, PublishClient, RequestClient, SubscribeClient};
27+
pub use req_id::ReqId;
2628
pub use session_id::AcpSessionId;
2729
#[cfg(not(coverage))]
2830
pub use trogon_nats::jetstream::NatsJetStreamClient;

rsworkspace/crates/acp-nats/src/nats/subjects/global/ext.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
#[derive(Debug)]
33
pub struct ExtSubject {
44
prefix: crate::acp_prefix::AcpPrefix,
5-
method: String,
5+
method: crate::ext_method_name::ExtMethodName,
66
}
77

88
impl ExtSubject {
9-
pub fn new(prefix: &crate::acp_prefix::AcpPrefix, method: &str) -> Self {
9+
pub fn new(
10+
prefix: &crate::acp_prefix::AcpPrefix,
11+
method: &crate::ext_method_name::ExtMethodName,
12+
) -> Self {
1013
Self {
1114
prefix: prefix.clone(),
12-
method: method.to_string(),
15+
method: method.clone(),
1316
}
1417
}
1518
}

0 commit comments

Comments
 (0)