Skip to content

Commit 0835a61

Browse files
willwashburnclaudegemini-code-assist[bot]github-actions[bot]
authored
fix(broker): route all delivery through Relaycast + honor spawn startup timeout (#1221)
* fix(harness-driver): poll broker handshake for full startupTimeoutMs HarnessDriverClient.spawn() polled the broker's startup handshake for a fixed 10 attempts × 1s (~10s), ignoring the caller's startupTimeoutMs. When the Relaycast handshake took longer than ~10s on a cold or slow network, the loop exhausted its attempts and threw the broker's retryable "Broker is starting" 503 even though the broker was healthy and still warming up. This failed the release smoke tests, which skipped the internal @agent-relay/* publish jobs, which then made "Publish Main Package" time out waiting for deps that were never published. Poll until the full startup budget (default 45s) elapses instead. The brokerExited race still surfaces a dead broker immediately, so this only extends how long we wait on a broker that is alive and warming up. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Update CHANGELOG.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix(broker): address PR review — workspace-scoped impersonation, drive queued event, steer/thread, structured 503 - api.rs/worker.rs: gate sender impersonation on workspace membership, not just name. `has_worker_in_workspace` requires the custodial worker to belong to the workspace being published into, so a worker attached to workspace A can't be impersonated (registering/rotating its token) in workspace B. - fleet.rs: re-emit the `delivery_queued` broker event when a node delivery is held under manual_flush. The removed local send path emitted it and `attach --drive` counts it to show pending messages; node delivery is the only path now, so without this queued messages were invisible until a later flush. - ws.rs: a `Steer` send with a `thread_id` can't be honored (`AgentClient::reply` takes no injection mode). Downgrade to a normal reply as before, but log a warning and document it instead of dropping steer silently. - client.ts: match the startup-handshake 503 on the structured `HarnessDriverProtocolError` fields (`status`/`code`) with a message-regex fallback, so a custom broker 503 body still retries. - CHANGELOG: trim the spawn-timeout entry to impact-first per the style guide. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(broker): only thread-reply on real Relaycast message ids Since the send handler now posts via AgentClient::reply when thread_id is present, a caller echoing back a synthetic id — the broker-minted http_* event_id from /api/send, or a #channel / direct:* grouping key from /api/threads — would make Relaycast reject the reply and fail the whole send. Gate the reply path on is_relaycast_reply_target (reusing the existing synthetic-event-id classifier plus the channel/DM grouping-key prefixes); non-message-id thread_ids fall back to a plain post so the message is still delivered, just unthreaded. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * style: auto-format Rust code with cargo fmt --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 7809b20 commit 0835a61

7 files changed

Lines changed: 152 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2626

2727
### Fixed
2828

29+
- `HarnessDriverClient.spawn()` now polls the broker's startup handshake for the full `startupTimeoutMs` budget (default 45s) instead of a fixed ~10s, so a slow-but-healthy Relaycast handshake that keeps answering `503` while warming up is no longer misreported as a spawn failure.
2930
- `agent-relay integration subscribe` now resolves provider-native `--resource` values through relayfile before binding, so Slack channel names, GitHub repos, Linear team keys, and Telegram chats bind to matching relayfile VFS globs while explicit `/`-prefixed globs still work.
3031
- `agent-relay integration subscribe` is now idempotent and supports multiple resources/channels per provider. Each inbound webhook is scoped to its `(provider, resource)` binding (not one-per-provider), so subscribing a second Slack channel — or two sources into the same relay channel — no longer collides on the unique `(workspace, webhook name)` index or clobbers the other binding's webhook. Re-subscribing creates the replacement webhook/subscription before retiring the old one, so a transient failure can't leave you with no working binding; a failed cleanup now warns instead of being silently swallowed. The relay channel id is normalized (`#general``general`) consistently across the webhook, subscription filter, relayfile bind, and writeback-secret lookup, and `listBindings` now maps relayfile's `pathGlob` field so unsubscribe/replace match correctly.
3132
- `agent-relay-broker` bootstrap `node.register` no longer advertises a generic `"spawn"` capability. Because the engine does not treat bare `"spawn"` as a placement capability (only `spawn:*`), it materialized a `spawn` action pinned to whichever node bootstrapped first, which then hijacked capability-based spawn placement for the whole workspace — every `spawn` invoke was dispatched to that node, ignoring `cli`/`target_node`/least-loaded routing. The pre-sidecar descriptor now carries no capabilities; real `spawn:*`/action capabilities arrive on the sidecar's `node.register`.

crates/broker/src/relaycast/ws.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,9 @@ impl RelaycastHttpClient {
507507
/// via [`AgentClient::reply`] instead of a plain channel post is what
508508
/// actually creates real thread/conversation grouping on the Relaycast
509509
/// side, as opposed to passing an opaque value the server doesn't
510-
/// interpret as a reply.
510+
/// interpret as a reply. `reply` takes no injection mode, so a threaded
511+
/// reply is always delivered with Wait semantics — a `Steer` request with
512+
/// a `thread_id` is downgraded to a normal reply (logged, not dropped).
511513
pub async fn send_with_mode(
512514
&self,
513515
to: &str,
@@ -523,6 +525,18 @@ impl RelaycastHttpClient {
523525
MessageInjectionMode::Steer => relaycast::MessageInjectionMode::Steer,
524526
};
525527
if let Some(thread_id) = thread_id {
528+
// `AgentClient::reply` has no injection-mode parameter, so a
529+
// threaded reply is always delivered with Wait semantics.
530+
// `Steer` can't be honored on a reply; downgrade rather than
531+
// drop the message, but log it so the loss of steer is visible
532+
// instead of silent.
533+
if matches!(mode, MessageInjectionMode::Steer) {
534+
tracing::warn!(
535+
target = "relay_broker::relaycast",
536+
thread_id = %thread_id,
537+
"steer injection mode is not supported on threaded replies; delivering as a normal reply"
538+
);
539+
}
526540
agent_client
527541
.reply(thread_id, text, None, None)
528542
.await

crates/broker/src/runtime/api.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -709,12 +709,17 @@ impl BrokerRuntime {
709709
// worse, ROTATE (and thereby invalidate) the live token of
710710
// an unrelated, already-registered agent that happens to
711711
// share the name. Falling back to the broker's own identity
712-
// is always safe; impersonation is not.
713-
let publish_from = if workers.has_worker(&delivery_from) {
714-
delivery_from.as_str()
715-
} else {
716-
workspace_self_name.as_str()
717-
};
712+
// is always safe; impersonation is not. The worker must also
713+
// belong to the workspace we're publishing into — a worker
714+
// attached to another attached workspace is not ours to
715+
// impersonate here (it would register/rotate that name in the
716+
// wrong Relaycast workspace).
717+
let publish_from =
718+
if workers.has_worker_in_workspace(&delivery_from, &selected_workspace_id) {
719+
delivery_from.as_str()
720+
} else {
721+
workspace_self_name.as_str()
722+
};
718723

719724
record_thread_history_event(
720725
recent_thread_messages,
@@ -755,6 +760,24 @@ impl BrokerRuntime {
755760
relaycast_timeout_ms = %relaycast_timeout.as_millis(),
756761
"publishing to relaycast"
757762
);
763+
// Only forward `thread_id` to the Relaycast publish when it's a
764+
// real message id we can reply to. Broker-minted synthetic ids
765+
// (`http_*`) and channel/DM grouping keys (`#general`,
766+
// `direct:*`) that a client may echo back from `/api/send` or
767+
// `/api/threads` aren't reply targets — Relaycast would reject
768+
// the reply and fail the whole send. Fall back to a plain post
769+
// (unthreaded) for those, preserving delivery.
770+
let reply_thread_id = thread_id
771+
.as_deref()
772+
.filter(|tid| is_relaycast_reply_target(tid));
773+
if thread_id.is_some() && reply_thread_id.is_none() {
774+
tracing::debug!(
775+
target = "relay_broker::http_api",
776+
event_id = %event_id,
777+
thread_id = ?thread_id,
778+
"thread_id is not a Relaycast message id; publishing without a thread reply"
779+
);
780+
}
758781
let relaycast_start = Instant::now();
759782
match timeout(
760783
relaycast_timeout,
@@ -763,7 +786,7 @@ impl BrokerRuntime {
763786
&text,
764787
mode.clone(),
765788
publish_from,
766-
thread_id.as_deref(),
789+
reply_thread_id,
767790
),
768791
)
769792
.await

crates/broker/src/runtime/delivery.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,20 @@ pub(crate) fn delivery_read_ack_is_relaycast_message(event_id: &EventId) -> bool
215215
synthetic_delivery_read_ack_reason(event_id).is_none()
216216
}
217217

218+
/// True when `thread_id` is a real Relaycast message id we can `reply()` to,
219+
/// as opposed to a broker-minted synthetic event id (`http_`/`init_`/… — see
220+
/// [`synthetic_delivery_read_ack_reason`]) or a channel/DM grouping key
221+
/// (`#channel`, `direct:*`) that `/api/threads` can surface. Relaycast rejects
222+
/// a reply to anything that isn't a real message id, so the publish path must
223+
/// fall back to a plain post for these rather than fail the whole send.
224+
pub(crate) fn is_relaycast_reply_target(thread_id: &str) -> bool {
225+
let id = thread_id.trim();
226+
if id.is_empty() || id.starts_with('#') || id.starts_with("direct:") {
227+
return false;
228+
}
229+
synthetic_delivery_read_ack_reason(&EventId::new(id)).is_none()
230+
}
231+
218232
pub(crate) fn seed_supplied_agent_token(
219233
relaycast_http: &RelaycastHttpClient,
220234
agent_name: &str,
@@ -903,3 +917,33 @@ pub(crate) fn clear_pending_delivery_if_event_matches(
903917
}
904918
None
905919
}
920+
921+
#[cfg(test)]
922+
mod reply_target_tests {
923+
use super::is_relaycast_reply_target;
924+
925+
#[test]
926+
fn real_message_ids_are_reply_targets() {
927+
assert!(is_relaycast_reply_target("msg_abc123"));
928+
assert!(is_relaycast_reply_target("evt_01hxyz"));
929+
}
930+
931+
#[test]
932+
fn synthetic_and_grouping_ids_are_not_reply_targets() {
933+
for id in [
934+
"",
935+
" ",
936+
"#general",
937+
"direct:alice",
938+
"http_deadbeef",
939+
"init_task",
940+
"cont_load_1",
941+
"flush_1",
942+
] {
943+
assert!(
944+
!is_relaycast_reply_target(id),
945+
"expected non-target: {id:?}"
946+
);
947+
}
948+
}
949+
}

crates/broker/src/runtime/fleet.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,25 @@ impl BrokerRuntime {
435435
msg_id = %deliver.msg_id,
436436
"queued node delivery (manual_flush inbound delivery mode)"
437437
);
438+
// Surface the hold as a `delivery_queued` event, as the
439+
// now-removed local send path did. `attach --drive`
440+
// counts these to show pending messages; node delivery
441+
// is the only delivery path now, so this is the only
442+
// place the event can originate. The `name` field is
443+
// what scopes it to the worker on the consumer side.
444+
let _ = send_event(
445+
&self.sdk_out_tx,
446+
json!({
447+
"kind": "delivery_queued",
448+
"name": deliver.agent.as_str(),
449+
"event_id": deliver.msg_id.as_str(),
450+
"delivery_id": deliver.delivery_id.as_str(),
451+
"from": fields.from.as_str(),
452+
"target": fields.target.as_str(),
453+
"reason": "inbound_delivery_manual_flush",
454+
}),
455+
)
456+
.await;
438457
Ok(())
439458
}
440459
InboundQueueOutcome::DrainNow(to_drain) => {

crates/broker/src/worker.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,25 @@ impl WorkerRegistry {
209209
self.workers.contains_key(name)
210210
}
211211

212+
/// True when a worker named `name` exists and either has no recorded
213+
/// workspace or belongs to `workspace_id`. Gates sender impersonation on
214+
/// Relaycast publish: a worker attached to workspace A must not be
215+
/// impersonated when publishing into workspace B, which would register or
216+
/// rotate that name's token in the wrong workspace.
217+
pub(crate) fn has_worker_in_workspace(
218+
&self,
219+
name: &str,
220+
workspace_id: &crate::ids::WorkspaceId,
221+
) -> bool {
222+
match self.workers.get(name) {
223+
Some(handle) => match &handle.workspace_id {
224+
Some(worker_ws) => worker_ws == workspace_id,
225+
None => true,
226+
},
227+
None => false,
228+
}
229+
}
230+
212231
pub(crate) fn worker_pid(&self, name: &str) -> Option<u32> {
213232
self.workers.get(name).and_then(|h| h.child.id())
214233
}
@@ -1568,6 +1587,13 @@ mod tests {
15681587
assert!(!reg.has_worker("nonexistent"));
15691588
}
15701589

1590+
#[test]
1591+
fn has_worker_in_workspace_returns_false_for_unknown() {
1592+
let reg = make_registry(vec![]);
1593+
let workspace = crate::ids::WorkspaceId::new("ws_1".to_string());
1594+
assert!(!reg.has_worker_in_workspace("nonexistent", &workspace));
1595+
}
1596+
15711597
#[test]
15721598
fn worker_log_path_rejects_path_traversal() {
15731599
let reg = make_registry(vec![]);

packages/harness-driver/src/client.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -374,15 +374,27 @@ export class HarnessDriverClient {
374374

375375
onStep?.('Waiting for broker session handshake...');
376376
let session: SessionInfo | undefined;
377-
for (let attempt = 0; attempt < 10; attempt++) {
377+
// The Relaycast handshake can take many seconds on a cold or slow network,
378+
// during which the startup-only API answers 503. Poll for the full startup
379+
// budget (`timeoutMs`) rather than a fixed attempt count so a slow-but-
380+
// healthy handshake isn't misreported as a spawn failure. The `brokerExited`
381+
// race still surfaces a dead broker immediately, so this only extends how
382+
// long we wait on a broker that is alive and warming up.
383+
const handshakeDeadline = Date.now() + timeoutMs;
384+
for (let attempt = 0; ; attempt++) {
378385
try {
379386
session = await Promise.race([client.getSession(), brokerExited]);
380387
break;
381388
} catch (err) {
382-
const message = err instanceof Error ? err.message : String(err);
383-
const is503 = message.includes('503') || message.includes('Service Unavailable');
384-
if (!is503 || attempt >= 9) throw err;
385-
onStep?.(`Broker still starting (handshake attempt ${attempt + 1}/10), retrying in 1s...`);
389+
// The broker's startup-only API returns a structured 503
390+
// (`http_503`) while it warms up. Prefer the typed fields over the
391+
// formatted message, which the broker is free to customize.
392+
const is503 =
393+
err instanceof HarnessDriverProtocolError
394+
? err.status === 503 || err.code === 'http_503'
395+
: /503|Service Unavailable/.test(err instanceof Error ? err.message : String(err));
396+
if (!is503 || Date.now() >= handshakeDeadline) throw err;
397+
onStep?.(`Broker still starting (handshake attempt ${attempt + 1}), retrying in 1s...`);
386398
await new Promise((resolve) => setTimeout(resolve, 1000));
387399
}
388400
}

0 commit comments

Comments
 (0)