Skip to content

Commit f648125

Browse files
committed
feat(trogon-nats): propagate X-Req-Id header across NATS requests
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 0523f13 commit f648125

5 files changed

Lines changed: 45 additions & 17 deletions

File tree

rsworkspace/crates/acp-nats-agent/src/connection.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -256,17 +256,29 @@ async fn reply<N: PublishClient + FlushClient, T: serde::Serialize>(
256256
nats: &N,
257257
reply_to: &str,
258258
value: &T,
259+
req_id: Option<&str>,
259260
) -> Result<(), DispatchError> {
260-
trogon_nats::publish(
261-
nats,
262-
reply_to,
263-
value,
264-
trogon_nats::PublishOptions::builder()
265-
.flush_policy(trogon_nats::FlushPolicy::standard())
266-
.build(),
267-
)
268-
.await
269-
.map_err(DispatchError::Reply)
261+
let mut headers = trogon_nats::headers_with_trace_context();
262+
if let Some(id) = req_id {
263+
headers.insert(trogon_nats::REQ_ID_HEADER, id);
264+
}
265+
266+
let payload = serde_json::to_vec(value)
267+
.map_err(|e| DispatchError::Reply(trogon_nats::NatsError::Serialize(e)))?;
268+
269+
nats.publish_with_headers(reply_to.to_string(), headers, payload.into())
270+
.await
271+
.map_err(|e| {
272+
DispatchError::Reply(trogon_nats::NatsError::PublishOperation(
273+
trogon_nats::PublishOperationError(e.to_string()),
274+
))
275+
})?;
276+
277+
nats.flush().await.map_err(|e| {
278+
DispatchError::Reply(trogon_nats::NatsError::PublishOperation(
279+
trogon_nats::PublishOperationError(e.to_string()),
280+
))
281+
})
270282
}
271283

272284
async fn handle_request<N, Resp, ReqT, F>(
@@ -281,6 +293,7 @@ where
281293
Resp: serde::Serialize,
282294
{
283295
let reply_to = msg.reply.as_deref().ok_or(DispatchError::NoReplySubject)?;
296+
let req_id = trogon_nats::extract_req_id(msg.headers.as_ref());
284297

285298
let request: ReqT = match serde_json::from_slice(&msg.payload) {
286299
Ok(req) => req,
@@ -289,14 +302,14 @@ where
289302
agent_client_protocol::ErrorCode::InvalidParams.into(),
290303
format!("Failed to deserialize request: {}", e),
291304
);
292-
let _ = reply(nats, reply_to, &error).await;
305+
let _ = reply(nats, reply_to, &error, req_id).await;
293306
return Err(DispatchError::DeserializeRequest(e));
294307
}
295308
};
296309

297310
match handler(request).await {
298-
Ok(resp) => reply(nats, reply_to, &resp).await,
299-
Err(err) => reply(nats, reply_to, &err).await,
311+
Ok(resp) => reply(nats, reply_to, &resp, req_id).await,
312+
Err(err) => reply(nats, reply_to, &err, req_id).await,
300313
}
301314
}
302315

rsworkspace/crates/acp-nats/src/agent/prompt.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::agent::Bridge;
1111
use crate::nats::{FlushClient, PublishClient, RequestClient, SubscribeClient, agent};
1212
use crate::session_id::AcpSessionId;
1313

14-
pub const REQ_ID_HEADER: &str = "X-Req-Id";
14+
pub use trogon_nats::REQ_ID_HEADER;
1515

1616
#[instrument(
1717
name = "acp.session.prompt",

rsworkspace/crates/trogon-nats/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ tokio = { workspace = true, features = ["time"] }
1717
tracing = { workspace = true }
1818
tracing-opentelemetry = { workspace = true }
1919
trogon-std = { workspace = true }
20+
uuid = { workspace = true }
2021

2122
[dev-dependencies]
2223
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

rsworkspace/crates/trogon-nats/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ pub use client::{FlushClient, PublishClient, RequestClient, SubscribeClient};
5050
pub use connect::{ConnectError, connect};
5151
pub use messaging::{
5252
FlushPolicy, NatsError, PublishOperationError, PublishOptions, PublishOptionsBuilder,
53-
RetryPolicy, headers_with_trace_context, inject_trace_context, publish, request,
54-
request_with_timeout,
53+
REQ_ID_HEADER, RetryPolicy, build_request_headers, extract_req_id, headers_with_trace_context,
54+
inject_trace_context, publish, request, request_with_timeout,
5555
};
5656

5757
#[cfg(feature = "test-support")]

rsworkspace/crates/trogon-nats/src/messaging.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,26 @@ pub fn inject_trace_context(headers: &mut HeaderMap) {
2323
});
2424
}
2525

26+
pub const REQ_ID_HEADER: &str = "X-Req-Id";
27+
2628
pub fn headers_with_trace_context() -> HeaderMap {
2729
let mut headers = HeaderMap::new();
2830
inject_trace_context(&mut headers);
2931
headers
3032
}
3133

34+
pub fn build_request_headers() -> HeaderMap {
35+
let mut headers = headers_with_trace_context();
36+
headers.insert(REQ_ID_HEADER, uuid::Uuid::new_v4().to_string().as_str());
37+
headers
38+
}
39+
40+
pub fn extract_req_id(headers: Option<&HeaderMap>) -> Option<&str> {
41+
headers
42+
.and_then(|h| h.get(REQ_ID_HEADER))
43+
.map(|v| v.as_str())
44+
}
45+
3246
pub async fn request_with_timeout<N: RequestClient, Req, Res>(
3347
client: &N,
3448
subject: &str,
@@ -40,7 +54,7 @@ where
4054
Res: DeserializeOwned,
4155
{
4256
let payload = serde_json::to_vec(request).map_err(NatsError::Serialize)?;
43-
let headers = headers_with_trace_context();
57+
let headers = build_request_headers();
4458

4559
let response = tokio::time::timeout(
4660
timeout,

0 commit comments

Comments
 (0)