Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions crates/broker/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,15 @@ pub struct ProtocolError {
pub data: Option<Value>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DeliveryReadAckStatus {
Marked,
Failed,
SkippedSynthetic,
SuppressedDuplicate,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum BrokerEvent {
Expand Down Expand Up @@ -412,6 +421,14 @@ pub enum BrokerEvent {
from: String,
to: MessageTarget,
},
DeliveryReadAck {
name: WorkerName,
delivery_id: DeliveryId,
event_id: EventId,
status: DeliveryReadAckStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
reason: Option<String>,
},
MessageDeliveryFailed {
name: WorkerName,
#[serde(default)]
Expand Down
79 changes: 79 additions & 0 deletions crates/broker/src/relaycast/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,37 @@ impl RelaycastHttpClient {
.map_err(|error| anyhow::anyhow!("{error}"))
}

async fn registered_agent_client_as(
&self,
agent_name: &str,
cli_hint: Option<&str>,
) -> Result<AgentClient> {
let registration = self
.registration
.as_ref()
.as_ref()
.context("SDK relay client not initialized")?;
registration
.registered_agent_client(agent_name, cli_hint.or(Some(self.default_cli.as_str())))
.await
.map_err(|error| anyhow::anyhow!("{error}"))
}

/// Impersonation by design: delivery read-acks must be attributed to the
/// recipient worker's agent identity, not the broker identity.
pub async fn mark_read_as_agent(
&self,
agent_name: &str,
cli_hint: Option<&str>,
message_id: &str,
) -> Result<serde_json::Value> {
self.registered_agent_client_as(agent_name, cli_hint)
.await?
.mark_read(message_id)
.await
.map_err(|error| anyhow::anyhow!("relaycast mark_read failed: {error}"))
}

/// Register an action whose handler is this broker's agent. Spawn/release
/// are exposed as relaycast actions so other agents can invoke them as
/// structured agent-to-agent RPC.
Expand Down Expand Up @@ -835,6 +866,54 @@ mod tests {
assert!(message.contains("pre-register"));
}

#[tokio::test]
async fn mark_read_as_agent_uses_seeded_recipient_token_without_respawn() {
let server = MockServer::start();
let read_mock = server.mock(|when, then| {
when.method(POST)
.path("/v1/messages/msg_1/read")
.header("authorization", "Bearer at_live_existing_recipient");
then.status(200).json_body(json!({
"ok": true,
"data": {
"message_id": "msg_1",
"agent_id": "agent_existing_recipient",
"read_at": "2026-06-08T10:00:00.000Z"
}
}));
});
let spawn_mock = server.mock(|when, then| {
when.method(POST).path("/v1/agents/spawn");
then.status(200).json_body(json!({
"ok": true,
"data": {
"agent": {
"id": "agent_fresh_wrong",
"name": "recipient",
"type": "agent",
"status": "online",
"created_at": "2026-06-08T10:00:00.000Z",
"last_seen": "2026-06-08T10:00:00.000Z",
"metadata": {}
},
"token": "at_live_fresh_wrong"
}
}));
});

let client = RelaycastHttpClient::new(server.base_url(), "rk_live_test", "broker", "codex");
client.seed_agent_token("recipient", "at_live_existing_recipient");

let result = client
.mark_read_as_agent("recipient", Some("codex"), "msg_1")
.await
.expect("seeded recipient should mark read");

assert_eq!(result["agent_id"], "agent_existing_recipient");
read_mock.assert_hits(1);
spawn_mock.assert_hits(0);
}

#[tokio::test]
#[ignore = "relaycast API response fixture mismatch - needs investigation"]
async fn send_with_mode_forwards_steer_for_relaycast_dm_targets() {
Expand Down
11 changes: 9 additions & 2 deletions crates/broker/src/runtime/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ impl BrokerRuntime {
}
};

// Caller-supplied agent_token overrides auto-registration
let worker_relay_key = agent_token.or(worker_relay_key);
// Caller-supplied agent_token overrides auto-registration.
// Seed it so broker-side read-acks later act as this exact
// recipient identity instead of minting a replacement token.
let worker_relay_key = if let Some(token) = agent_token {
seed_supplied_agent_token(relaycast_http, &name, &token);
Some(token)
} else {
worker_relay_key
};

let mut effective_task = normalize_initial_task(task);
if let Some(ref continue_from) = continue_from {
Expand Down
190 changes: 190 additions & 0 deletions crates/broker/src/runtime/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,196 @@ pub(crate) struct DeliveryAckPayload {
pub(super) event_id: EventId,
}

/// Classify delivery ids that are meaningful Relaycast message ids for
/// read-ack purposes. A read-ack means "delivered to the recipient location",
/// not proof that a model turn cognitively processed the message.
pub(crate) fn synthetic_delivery_read_ack_reason(event_id: &EventId) -> Option<&'static str> {
let event_id = event_id.as_str().trim();
if event_id.is_empty() {
return Some("blank_event_id");
}
if event_id.starts_with("http_") {
return Some("http_api_synthetic_event_id");
}
if event_id.starts_with("init_") {
return Some("initial_task_synthetic_event_id");
}
if event_id.starts_with("cont_load_") {
return Some("continuity_synthetic_event_id");
}
if event_id.starts_with("flush_") {
return Some("manual_flush_synthetic_event_id");
}
None
}
Comment on lines +122 to +140

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The broker generates synthetic event IDs starting with flush_ when flushing the queue (e.g., flush_<uuid>). These synthetic event IDs are not real Relaycast message IDs and should be skipped for read-ack purposes to avoid unnecessary failed API calls to Relaycast.

pub(crate) fn synthetic_delivery_read_ack_reason(event_id: &EventId) -> Option<&'static str> {
    let event_id = event_id.as_str().trim();
    if event_id.is_empty() {
        return Some("blank_event_id");
    }
    if event_id.starts_with("http_") {
        return Some("http_api_synthetic_event_id");
    }
    if event_id.starts_with("init_") {
        return Some("initial_task_synthetic_event_id");
    }
    if event_id.starts_with("cont_load_") {
        return Some("continuity_synthetic_event_id");
    }
    if event_id.starts_with("flush_") {
        return Some("flush_synthetic_event_id");
    }
    None
}


#[cfg(test)]
pub(crate) fn delivery_read_ack_is_relaycast_message(event_id: &EventId) -> bool {
synthetic_delivery_read_ack_reason(event_id).is_none()
}

pub(crate) fn seed_supplied_agent_token(
relaycast_http: &RelaycastHttpClient,
agent_name: &str,
token: &str,
) {
relaycast_http.seed_agent_token(agent_name, token);
}

const DELIVERY_READ_ACK_TIMEOUT: Duration = Duration::from_secs(2);

pub(crate) fn mark_delivery_read_ack(
relaycast_http: &RelaycastHttpClient,
sdk_out_tx: &mpsc::Sender<ProtocolEnvelope<Value>>,
dedup: &mut DedupCache,
worker_name: &WorkerName,
cli_hint: Option<&str>,
delivery_id: &DeliveryId,
event_id: &EventId,
) {
mark_delivery_read_ack_with_timeout(
relaycast_http,
sdk_out_tx,
dedup,
worker_name,
cli_hint,
delivery_id,
event_id,
DELIVERY_READ_ACK_TIMEOUT,
);
}

pub(crate) fn mark_delivery_read_ack_with_timeout(
relaycast_http: &RelaycastHttpClient,
sdk_out_tx: &mpsc::Sender<ProtocolEnvelope<Value>>,
dedup: &mut DedupCache,
worker_name: &WorkerName,
cli_hint: Option<&str>,
delivery_id: &DeliveryId,
event_id: &EventId,
timeout_window: Duration,
) {
let dedup_key = format!("delivery_read_ack:{worker_name}:{event_id}");
if !dedup.insert_if_new(&dedup_key, Instant::now()) {
emit_delivery_read_ack_telemetry(
sdk_out_tx.clone(),
BrokerEvent::DeliveryReadAck {
name: worker_name.clone(),
delivery_id: delivery_id.clone(),
event_id: event_id.clone(),
status: DeliveryReadAckStatus::SuppressedDuplicate,
reason: Some("duplicate_delivery_read_ack".to_string()),
},
);
return;
}

if let Some(reason) = synthetic_delivery_read_ack_reason(event_id) {
emit_delivery_read_ack_telemetry(
sdk_out_tx.clone(),
BrokerEvent::DeliveryReadAck {
name: worker_name.clone(),
delivery_id: delivery_id.clone(),
event_id: event_id.clone(),
status: DeliveryReadAckStatus::SkippedSynthetic,
reason: Some(reason.to_string()),
},
);
return;
}

let relaycast_http = relaycast_http.clone();
let sdk_out_tx = sdk_out_tx.clone();
let worker_name = worker_name.clone();
let cli_hint = cli_hint.map(str::to_string);
let delivery_id = delivery_id.clone();
let event_id = event_id.clone();

tokio::spawn(async move {
let result = timeout(
timeout_window,
relaycast_http.mark_read_as_agent(
worker_name.as_str(),
cli_hint.as_deref(),
event_id.as_str(),
),
)
.await;

match result {
Ok(Ok(_)) => {
let _ = send_broker_event(
&sdk_out_tx,
BrokerEvent::DeliveryReadAck {
name: worker_name,
delivery_id,
event_id,
status: DeliveryReadAckStatus::Marked,
reason: None,
},
)
.await;
}
Ok(Err(error)) => {
let reason = error.to_string();
tracing::warn!(
target = "agent_relay::broker",
worker = %worker_name,
delivery_id = %delivery_id,
event_id = %event_id,
error = %reason,
"failed to mark relaycast message read after delivery_ack"
);
let _ = send_broker_event(
&sdk_out_tx,
BrokerEvent::DeliveryReadAck {
name: worker_name,
delivery_id,
event_id,
status: DeliveryReadAckStatus::Failed,
reason: Some(reason),
},
)
.await;
}
Err(_) => {
let reason = format!(
"relaycast mark_read timed out after {}ms",
timeout_window.as_millis()
);
tracing::warn!(
target = "agent_relay::broker",
worker = %worker_name,
delivery_id = %delivery_id,
event_id = %event_id,
timeout_ms = %timeout_window.as_millis(),
"timed out marking relaycast message read after delivery_ack"
);
let _ = send_broker_event(
&sdk_out_tx,
BrokerEvent::DeliveryReadAck {
name: worker_name,
delivery_id,
event_id,
status: DeliveryReadAckStatus::Failed,
reason: Some(reason),
},
)
.await;
}
}
});
}

fn emit_delivery_read_ack_telemetry(
sdk_out_tx: mpsc::Sender<ProtocolEnvelope<Value>>,
event: BrokerEvent,
) {
tokio::spawn(async move {
let _ = send_broker_event(&sdk_out_tx, event).await;
});
}

/// Outcome of [`queue_inbound_for_delivery_mode`]. Distinguishes the
/// three cases broker call sites care about: the message is queued and
/// should wait for an explicit flush, the queue should be drained now,
Expand Down
6 changes: 3 additions & 3 deletions crates/broker/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use crate::{
WorkspaceAlias, WorkspaceId,
},
protocol::{
AgentRuntime, AgentSpec, BrokerEvent, HeadlessProvider as ProtocolHeadlessProvider,
MessageInjectionMode, ProtocolEnvelope, RelayDelivery, ResolvedHarnessConfig,
PROTOCOL_VERSION,
AgentRuntime, AgentSpec, BrokerEvent, DeliveryReadAckStatus,
HeadlessProvider as ProtocolHeadlessProvider, MessageInjectionMode, ProtocolEnvelope,
RelayDelivery, ResolvedHarnessConfig, PROTOCOL_VERSION,
},
relaycast::{
agent_name_eq, format_worker_preregistration_error, is_self_name, map_ws_event,
Expand Down
12 changes: 6 additions & 6 deletions crates/broker/src/runtime/relaycast_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ impl BrokerRuntime {
// short timeout keeps spawn latency bounded while still
// giving the registration call a real chance.
let worker_relay_key = {
let ws_token = relaycast_ws_spawn_token(&ws_value);
if ws_token.is_some() {
ws_token
if let Some(token) = relaycast_ws_spawn_token(&ws_value) {
seed_supplied_agent_token(&workspace_http, &name, &token);
Some(token)
} else {
const REG_TIMEOUT: Duration = Duration::from_secs(3);
match tokio::time::timeout(
Expand Down Expand Up @@ -498,9 +498,9 @@ impl BrokerRuntime {

// Pre-register (same logic as primary WS spawn path).
let worker_relay_key = {
let ws_token = relaycast_ws_spawn_token(&ws_value);
if ws_token.is_some() {
ws_token
if let Some(token) = relaycast_ws_spawn_token(&ws_value) {
seed_supplied_agent_token(&workspace_http, &name, &token);
Some(token)
} else {
const REG_TIMEOUT: Duration = Duration::from_secs(3);
match tokio::time::timeout(
Expand Down
Loading
Loading