Skip to content

Commit 152ea97

Browse files
author
kjgbot
committed
Add broker delivery read ack bridge
1 parent 1301a31 commit 152ea97

10 files changed

Lines changed: 733 additions & 24 deletions

File tree

crates/broker/src/protocol.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,15 @@ pub struct ProtocolError {
337337
pub data: Option<Value>,
338338
}
339339

340+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
341+
#[serde(rename_all = "snake_case")]
342+
pub enum DeliveryReadAckStatus {
343+
Marked,
344+
Failed,
345+
SkippedSynthetic,
346+
SuppressedDuplicate,
347+
}
348+
340349
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
341350
#[serde(tag = "kind", rename_all = "snake_case")]
342351
pub enum BrokerEvent {
@@ -412,6 +421,14 @@ pub enum BrokerEvent {
412421
from: String,
413422
to: MessageTarget,
414423
},
424+
DeliveryReadAck {
425+
name: WorkerName,
426+
delivery_id: DeliveryId,
427+
event_id: EventId,
428+
status: DeliveryReadAckStatus,
429+
#[serde(default, skip_serializing_if = "Option::is_none")]
430+
reason: Option<String>,
431+
},
415432
MessageDeliveryFailed {
416433
name: WorkerName,
417434
#[serde(default)]

crates/broker/src/relaycast/ws.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,37 @@ impl RelaycastHttpClient {
417417
.map_err(|error| anyhow::anyhow!("{error}"))
418418
}
419419

420+
async fn registered_agent_client_as(
421+
&self,
422+
agent_name: &str,
423+
cli_hint: Option<&str>,
424+
) -> Result<AgentClient> {
425+
let registration = self
426+
.registration
427+
.as_ref()
428+
.as_ref()
429+
.context("SDK relay client not initialized")?;
430+
registration
431+
.registered_agent_client(agent_name, cli_hint.or(Some(self.default_cli.as_str())))
432+
.await
433+
.map_err(|error| anyhow::anyhow!("{error}"))
434+
}
435+
436+
/// Impersonation by design: delivery read-acks must be attributed to the
437+
/// recipient worker's agent identity, not the broker identity.
438+
pub async fn mark_read_as_agent(
439+
&self,
440+
agent_name: &str,
441+
cli_hint: Option<&str>,
442+
message_id: &str,
443+
) -> Result<serde_json::Value> {
444+
self.registered_agent_client_as(agent_name, cli_hint)
445+
.await?
446+
.mark_read(message_id)
447+
.await
448+
.map_err(|error| anyhow::anyhow!("relaycast mark_read failed: {error}"))
449+
}
450+
420451
/// Register an action whose handler is this broker's agent. Spawn/release
421452
/// are exposed as relaycast actions so other agents can invoke them as
422453
/// structured agent-to-agent RPC.
@@ -835,6 +866,54 @@ mod tests {
835866
assert!(message.contains("pre-register"));
836867
}
837868

869+
#[tokio::test]
870+
async fn mark_read_as_agent_uses_seeded_recipient_token_without_respawn() {
871+
let server = MockServer::start();
872+
let read_mock = server.mock(|when, then| {
873+
when.method(POST)
874+
.path("/v1/messages/msg_1/read")
875+
.header("authorization", "Bearer at_live_existing_recipient");
876+
then.status(200).json_body(json!({
877+
"ok": true,
878+
"data": {
879+
"message_id": "msg_1",
880+
"agent_id": "agent_existing_recipient",
881+
"read_at": "2026-06-08T10:00:00.000Z"
882+
}
883+
}));
884+
});
885+
let spawn_mock = server.mock(|when, then| {
886+
when.method(POST).path("/v1/agents/spawn");
887+
then.status(200).json_body(json!({
888+
"ok": true,
889+
"data": {
890+
"agent": {
891+
"id": "agent_fresh_wrong",
892+
"name": "recipient",
893+
"type": "agent",
894+
"status": "online",
895+
"created_at": "2026-06-08T10:00:00.000Z",
896+
"last_seen": "2026-06-08T10:00:00.000Z",
897+
"metadata": {}
898+
},
899+
"token": "at_live_fresh_wrong"
900+
}
901+
}));
902+
});
903+
904+
let client = RelaycastHttpClient::new(server.base_url(), "rk_live_test", "broker", "codex");
905+
client.seed_agent_token("recipient", "at_live_existing_recipient");
906+
907+
let result = client
908+
.mark_read_as_agent("recipient", Some("codex"), "msg_1")
909+
.await
910+
.expect("seeded recipient should mark read");
911+
912+
assert_eq!(result["agent_id"], "agent_existing_recipient");
913+
read_mock.assert_hits(1);
914+
spawn_mock.assert_hits(0);
915+
}
916+
838917
#[tokio::test]
839918
#[ignore = "relaycast API response fixture mismatch - needs investigation"]
840919
async fn send_with_mode_forwards_steer_for_relaycast_dm_targets() {

crates/broker/src/runtime/api.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,15 @@ impl BrokerRuntime {
9696
}
9797
};
9898

99-
// Caller-supplied agent_token overrides auto-registration
100-
let worker_relay_key = agent_token.or(worker_relay_key);
99+
// Caller-supplied agent_token overrides auto-registration.
100+
// Seed it so broker-side read-acks later act as this exact
101+
// recipient identity instead of minting a replacement token.
102+
let worker_relay_key = if let Some(token) = agent_token {
103+
seed_supplied_agent_token(relaycast_http, &name, &token);
104+
Some(token)
105+
} else {
106+
worker_relay_key
107+
};
101108

102109
let mut effective_task = normalize_initial_task(task);
103110
if let Some(ref continue_from) = continue_from {

crates/broker/src/runtime/delivery.rs

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,193 @@ pub(crate) struct DeliveryAckPayload {
116116
pub(super) event_id: EventId,
117117
}
118118

119+
/// Classify delivery ids that are meaningful Relaycast message ids for
120+
/// read-ack purposes. A read-ack means "delivered to the recipient location",
121+
/// not proof that a model turn cognitively processed the message.
122+
pub(crate) fn synthetic_delivery_read_ack_reason(event_id: &EventId) -> Option<&'static str> {
123+
let event_id = event_id.as_str().trim();
124+
if event_id.is_empty() {
125+
return Some("blank_event_id");
126+
}
127+
if event_id.starts_with("http_") {
128+
return Some("http_api_synthetic_event_id");
129+
}
130+
if event_id.starts_with("init_") {
131+
return Some("initial_task_synthetic_event_id");
132+
}
133+
if event_id.starts_with("cont_load_") {
134+
return Some("continuity_synthetic_event_id");
135+
}
136+
None
137+
}
138+
139+
#[cfg(test)]
140+
pub(crate) fn delivery_read_ack_is_relaycast_message(event_id: &EventId) -> bool {
141+
synthetic_delivery_read_ack_reason(event_id).is_none()
142+
}
143+
144+
pub(crate) fn seed_supplied_agent_token(
145+
relaycast_http: &RelaycastHttpClient,
146+
agent_name: &str,
147+
token: &str,
148+
) {
149+
relaycast_http.seed_agent_token(agent_name, token);
150+
}
151+
152+
const DELIVERY_READ_ACK_TIMEOUT: Duration = Duration::from_secs(2);
153+
154+
pub(crate) fn mark_delivery_read_ack(
155+
relaycast_http: &RelaycastHttpClient,
156+
sdk_out_tx: &mpsc::Sender<ProtocolEnvelope<Value>>,
157+
dedup: &mut DedupCache,
158+
worker_name: &WorkerName,
159+
cli_hint: Option<&str>,
160+
delivery_id: &DeliveryId,
161+
event_id: &EventId,
162+
) {
163+
mark_delivery_read_ack_with_timeout(
164+
relaycast_http,
165+
sdk_out_tx,
166+
dedup,
167+
worker_name,
168+
cli_hint,
169+
delivery_id,
170+
event_id,
171+
DELIVERY_READ_ACK_TIMEOUT,
172+
);
173+
}
174+
175+
pub(crate) fn mark_delivery_read_ack_with_timeout(
176+
relaycast_http: &RelaycastHttpClient,
177+
sdk_out_tx: &mpsc::Sender<ProtocolEnvelope<Value>>,
178+
dedup: &mut DedupCache,
179+
worker_name: &WorkerName,
180+
cli_hint: Option<&str>,
181+
delivery_id: &DeliveryId,
182+
event_id: &EventId,
183+
timeout_window: Duration,
184+
) {
185+
let dedup_key = format!("delivery_read_ack:{worker_name}:{event_id}");
186+
if !dedup.insert_if_new(&dedup_key, Instant::now()) {
187+
emit_delivery_read_ack_telemetry(
188+
sdk_out_tx.clone(),
189+
BrokerEvent::DeliveryReadAck {
190+
name: worker_name.clone(),
191+
delivery_id: delivery_id.clone(),
192+
event_id: event_id.clone(),
193+
status: DeliveryReadAckStatus::SuppressedDuplicate,
194+
reason: Some("duplicate_delivery_read_ack".to_string()),
195+
},
196+
);
197+
return;
198+
}
199+
200+
if let Some(reason) = synthetic_delivery_read_ack_reason(event_id) {
201+
emit_delivery_read_ack_telemetry(
202+
sdk_out_tx.clone(),
203+
BrokerEvent::DeliveryReadAck {
204+
name: worker_name.clone(),
205+
delivery_id: delivery_id.clone(),
206+
event_id: event_id.clone(),
207+
status: DeliveryReadAckStatus::SkippedSynthetic,
208+
reason: Some(reason.to_string()),
209+
},
210+
);
211+
return;
212+
}
213+
214+
let relaycast_http = relaycast_http.clone();
215+
let sdk_out_tx = sdk_out_tx.clone();
216+
let worker_name = worker_name.clone();
217+
let cli_hint = cli_hint.map(str::to_string);
218+
let delivery_id = delivery_id.clone();
219+
let event_id = event_id.clone();
220+
221+
tokio::spawn(async move {
222+
let result = timeout(
223+
timeout_window,
224+
relaycast_http.mark_read_as_agent(
225+
worker_name.as_str(),
226+
cli_hint.as_deref(),
227+
event_id.as_str(),
228+
),
229+
)
230+
.await;
231+
232+
match result {
233+
Ok(Ok(_)) => {
234+
let _ = send_broker_event(
235+
&sdk_out_tx,
236+
BrokerEvent::DeliveryReadAck {
237+
name: worker_name,
238+
delivery_id,
239+
event_id,
240+
status: DeliveryReadAckStatus::Marked,
241+
reason: None,
242+
},
243+
)
244+
.await;
245+
}
246+
Ok(Err(error)) => {
247+
let reason = error.to_string();
248+
tracing::warn!(
249+
target = "agent_relay::broker",
250+
worker = %worker_name,
251+
delivery_id = %delivery_id,
252+
event_id = %event_id,
253+
error = %reason,
254+
"failed to mark relaycast message read after delivery_ack"
255+
);
256+
let _ = send_broker_event(
257+
&sdk_out_tx,
258+
BrokerEvent::DeliveryReadAck {
259+
name: worker_name,
260+
delivery_id,
261+
event_id,
262+
status: DeliveryReadAckStatus::Failed,
263+
reason: Some(reason),
264+
},
265+
)
266+
.await;
267+
}
268+
Err(_) => {
269+
let reason = format!(
270+
"relaycast mark_read timed out after {}ms",
271+
timeout_window.as_millis()
272+
);
273+
tracing::warn!(
274+
target = "agent_relay::broker",
275+
worker = %worker_name,
276+
delivery_id = %delivery_id,
277+
event_id = %event_id,
278+
timeout_ms = %timeout_window.as_millis(),
279+
"timed out marking relaycast message read after delivery_ack"
280+
);
281+
let _ = send_broker_event(
282+
&sdk_out_tx,
283+
BrokerEvent::DeliveryReadAck {
284+
name: worker_name,
285+
delivery_id,
286+
event_id,
287+
status: DeliveryReadAckStatus::Failed,
288+
reason: Some(reason),
289+
},
290+
)
291+
.await;
292+
}
293+
}
294+
});
295+
}
296+
297+
fn emit_delivery_read_ack_telemetry(
298+
sdk_out_tx: mpsc::Sender<ProtocolEnvelope<Value>>,
299+
event: BrokerEvent,
300+
) {
301+
tokio::spawn(async move {
302+
let _ = send_broker_event(&sdk_out_tx, event).await;
303+
});
304+
}
305+
119306
/// Outcome of [`queue_inbound_for_delivery_mode`]. Distinguishes the
120307
/// three cases broker call sites care about: the message is queued and
121308
/// should wait for an explicit flush, the queue should be drained now,

crates/broker/src/runtime/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ use crate::{
3131
WorkspaceAlias, WorkspaceId,
3232
},
3333
protocol::{
34-
AgentRuntime, AgentSpec, BrokerEvent, HeadlessProvider as ProtocolHeadlessProvider,
35-
MessageInjectionMode, ProtocolEnvelope, RelayDelivery, ResolvedHarnessConfig,
36-
PROTOCOL_VERSION,
34+
AgentRuntime, AgentSpec, BrokerEvent, DeliveryReadAckStatus,
35+
HeadlessProvider as ProtocolHeadlessProvider, MessageInjectionMode, ProtocolEnvelope,
36+
RelayDelivery, ResolvedHarnessConfig, PROTOCOL_VERSION,
3737
},
3838
relaycast::{
3939
agent_name_eq, format_worker_preregistration_error, is_self_name, map_ws_event,

crates/broker/src/runtime/relaycast_events.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,9 @@ impl BrokerRuntime {
268268
// short timeout keeps spawn latency bounded while still
269269
// giving the registration call a real chance.
270270
let worker_relay_key = {
271-
let ws_token = relaycast_ws_spawn_token(&ws_value);
272-
if ws_token.is_some() {
273-
ws_token
271+
if let Some(token) = relaycast_ws_spawn_token(&ws_value) {
272+
seed_supplied_agent_token(&workspace_http, &name, &token);
273+
Some(token)
274274
} else {
275275
const REG_TIMEOUT: Duration = Duration::from_secs(3);
276276
match tokio::time::timeout(
@@ -498,9 +498,9 @@ impl BrokerRuntime {
498498

499499
// Pre-register (same logic as primary WS spawn path).
500500
let worker_relay_key = {
501-
let ws_token = relaycast_ws_spawn_token(&ws_value);
502-
if ws_token.is_some() {
503-
ws_token
501+
if let Some(token) = relaycast_ws_spawn_token(&ws_value) {
502+
seed_supplied_agent_token(&workspace_http, &name, &token);
503+
Some(token)
504504
} else {
505505
const REG_TIMEOUT: Duration = Duration::from_secs(3);
506506
match tokio::time::timeout(

0 commit comments

Comments
 (0)