Skip to content

relay#1056 §5: broker delivery read-ack bridge#1062

Open
kjgbot wants to merge 4 commits into
mainfrom
fix/readack-1056
Open

relay#1056 §5: broker delivery read-ack bridge#1062
kjgbot wants to merge 4 commits into
mainfrom
fix/readack-1056

Conversation

@kjgbot
Copy link
Copy Markdown
Contributor

@kjgbot kjgbot commented Jun 8, 2026

Scope

Implements relay#1056 §5 broker delivery read-ack bridge only.

This PR adds broker-side best-effort read-ack plumbing on the confirmed delivery path:

  • Emit typed broker delivery_read_ack telemetry with marked, failed, skipped_synthetic, and suppressed_duplicate statuses.
  • After clear_pending_delivery_if_event_matches(...) confirms a delivery_ack, schedule Relaycast mark_read(event_id) as the recipient agent.
  • Use recipient-scoped seeded/cached agent tokens instead of broker identity.
  • Seed caller/WS-supplied worker tokens so reused broker sessions do not mint replacement recipient tokens for read-acks.
  • Skip synthetic/non-Relaycast ids (blank, http_*, init_*, cont_load_*).
  • Dedupe by (recipient,event_id) to suppress duplicate/replayed delivery read-ack work and telemetry.
  • Add harness-driver protocol/lifecycle typing for delivery_read_ack.

Verification

  • cargo test -p agent-relay-broker read_ack -- --nocapture
  • cargo test -p agent-relay-broker stale_delivery_ack_event_id_does_not_mark_read -- --nocapture
  • cargo test -p agent-relay-broker mark_read_as_agent_uses_seeded_recipient_token_without_respawn -- --nocapture
  • cargo check -p agent-relay-broker --tests
  • git diff --check

relay-worker also reran the same verification set and completed non-author review with no blocking findings.

Known / Accepted Notes

Best-effort by design: read-ack is fire-and-forget with a detached 2s timeout, non-blocking on the delivery hot path. Delivery remains the guarantee; read-ack is a consistency optimization.

Follow-up, not a blocker: dedupe currently suppresses retry even after a failed first mark_read within the dedup window. Consider dedupe-on-success-only plus bounded retry-on-failure so a transient Relaycast blip does not permanently skip the read-ack. Filed separately as #1061.

Fleet Landing Note

Merging this PR lands the code in relay main, but does not make read-acks live in the fleet. Fleet landing needs a follow-up relay publish and sandbox snapshot rebake.

@codeant-ai
Copy link
Copy Markdown

codeant-ai Bot commented Jun 8, 2026

Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 8, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: aed6197f-af5a-4a8c-b6c1-e82c32699ccf

📥 Commits

Reviewing files that changed from the base of the PR and between f0a14f2 and 38d627b.

📒 Files selected for processing (2)
  • crates/broker/src/runtime/delivery.rs
  • crates/broker/src/runtime/tests.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/broker/src/runtime/delivery.rs
  • crates/broker/src/runtime/tests.rs

📝 Walkthrough

Walkthrough

This PR introduces delivery read-ack support to the broker. A new protocol event signals when a delivery message has been read on behalf of a worker; the broker tracks seeded agent tokens, marks messages read asynchronously with dedup caching and timeout protection, and emits telemetry for success, failure, and skipped outcomes.

Changes

Delivery Read-Ack Protocol and System

Layer / File(s) Summary
Protocol contracts for delivery read-ack
crates/broker/src/protocol.rs, packages/harness-driver/src/protocol.ts, packages/harness-driver/src/lifecycle-hooks.ts
DeliveryReadAckStatus enum with marked, failed, skipped_synthetic, suppressed_duplicate outcomes; BrokerEvent::DeliveryReadAck variant with name, delivery_id, event_id, status, and optional reason; TypeScript protocol and driver activity reason updated.
Relaycast HTTP client read-marking capability
crates/broker/src/relaycast/ws.rs
registered_agent_client_as helper creates an AgentClient for arbitrary agent identity; mark_read_as_agent public method marks message read on behalf of recipient, returning JSON and mapping errors; test verifies seeded-token usage and spawn endpoint not called.
Delivery read-ack core implementation
crates/broker/src/runtime/delivery.rs
synthetic_delivery_read_ack_reason classifies trimmed EventId prefixes as synthetic; seed_supplied_agent_token forwards to relaycast client; mark_delivery_read_ack wraps 2-second timeout version; mark_delivery_read_ack_with_timeout deduplicates per (worker, event), skips synthetics, spawns timeout-wrapped mark_read call, emits telemetry for marked/failed/skipped outcomes; emit_delivery_read_ack_telemetry fire-and-forget sends broker event.
Token seeding in spawn request handlers
crates/broker/src/runtime/relaycast_events.rs, crates/broker/src/runtime/api.rs
Before using supplied Relaycast token as worker_relay_key, call seed_supplied_agent_token to seed broker-side identity in Relaycast WS spawn, JSON fallback spawn, and HTTP API spawn paths.
Worker event delivery-ack integration
crates/broker/src/runtime/worker_events.rs
Replace inline delivery_ack confirmation with mark_delivery_read_ack: prepare relaycast_http clone and mutable dedup, extract delivery_id, event_id, and optional CLI hint from worker spec, delegate to core marking logic.
Comprehensive test suite for delivery read-ack
crates/broker/src/runtime/tests.rs
pending_delivery test fixture; unit tests for synthetic classification and JSON encoding; async tests for idempotent confirmed marking, dedup suppression, stale event-id rejection, synthetic skipping with dedup, and timeout non-blocking behavior (confirmation emitted promptly, read-ack failure emitted after timeout).
Import statement reflow
crates/broker/src/runtime/mod.rs
Reflow crate::protocol import across multiple lines for readability.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related issues

Suggested reviewers

  • willwashburn
  • khaliqgant

Poem

🐰 A message was read, the broker needs to know,
With async and tokens, it starts quite a show.
Dedup caches block the duplicates fast,
While timeouts ensure no blocking will last! 📬✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 57.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'relay#1056 §5: broker delivery read-ack bridge' accurately summarizes the main change—implementing broker-side delivery read-ack bridging for relay issue #1056 section 5.
Description check ✅ Passed The PR description provides comprehensive coverage with a Scope section detailing implementation specifics, Verification section with test commands, and Known/Accepted Notes about design trade-offs and follow-ups.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/readack-1056

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for explicit workspace keys and broker instance names across agent-relay-broker and @agent-relay/harness-driver, enabling local and cloud brokers to join the same workspace with stable names. It also implements delivery read-ack tracking to attribute read-acks to the recipient worker's agent identity. The review feedback highlights important edge cases where empty or whitespace-only strings in instance_name and workspace_key could bypass fallback logic, and suggests skipping synthetic flush_ event IDs during read-ack classification along with adding a test case for verification.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +261 to +277
pub(crate) fn resolved_instance_name(&self, fallback: Option<&str>) -> String {
self.instance_name
.clone()
.or_else(|| std::env::var("AGENT_RELAY_BROKER_NAME").ok())
.or_else(|| {
let name = self.name.trim();
if name.is_empty() {
None
} else {
Some(name.to_string())
}
})
.or_else(|| fallback.map(ToOwned::to_owned))
.unwrap_or_default()
.trim()
.to_string()
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If self.instance_name is passed as an empty or whitespace-only string (e.g., Some(" ")), clone() will return Some(" "), which is not None. Consequently, the or_else chains will be skipped, and the fallback to AGENT_RELAY_BROKER_NAME or self.name will never be evaluated. To prevent this, we should filter out empty or whitespace-only strings before chaining or_else.

    pub(crate) fn resolved_instance_name(&self, fallback: Option<&str>) -> String {
        self.instance_name
            .as_ref()
            .map(|s| s.trim().to_string())
            .filter(|s| !s.is_empty())
            .or_else(|| {
                std::env::var("AGENT_RELAY_BROKER_NAME")
                    .ok()
                    .map(|s| s.trim().to_string())
                    .filter(|s| !s.is_empty())
            })
            .or_else(|| {
                let name = self.name.trim();
                if name.is_empty() {
                    None
                } else {
                    Some(name.to_string())
                }
            })
            .or_else(|| fallback.map(|s| s.trim().to_string()))
            .unwrap_or_default()
    }

Comment on lines +279 to +285
pub(crate) fn resolved_workspace_key(&self) -> Option<String> {
self.workspace_key
.clone()
.or_else(|| std::env::var("AGENT_RELAY_WORKSPACE_KEY").ok())
.map(|key| key.trim().to_string())
.filter(|key| !key.is_empty())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If self.workspace_key is passed as an empty or whitespace-only string (e.g., Some(" ")), the or_else chain is skipped because it is Some. Then, the .map() and .filter() operations will evaluate to None, completely ignoring the fallback to AGENT_RELAY_WORKSPACE_KEY. To fix this, we should filter out empty or whitespace-only strings before chaining or_else.

    pub(crate) fn resolved_workspace_key(&self) -> Option<String> {
        self.workspace_key
            .as_ref()
            .map(|key| key.trim().to_string())
            .filter(|key| !key.is_empty())
            .or_else(|| {
                std::env::var("AGENT_RELAY_WORKSPACE_KEY")
                    .ok()
                    .map(|key| key.trim().to_string())
                    .filter(|key| !key.is_empty())
            })
    }

Comment on lines +122 to +137
pub(crate) fn synthetic_delivery_read_ack_reason(event_id: &EventId) -> Option<&'static str> {
let event_id = event_id.as_str().trim();
if event_id.is_empty() {
return Some("blank_event_id");
}
if event_id.starts_with("http_") {
return Some("http_api_synthetic_event_id");
}
if event_id.starts_with("init_") {
return Some("initial_task_synthetic_event_id");
}
if event_id.starts_with("cont_load_") {
return Some("continuity_synthetic_event_id");
}
None
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The broker generates synthetic event IDs starting with flush_ when flushing the queue (e.g., flush_<uuid>). These synthetic event IDs are not real Relaycast message IDs and should be skipped for read-ack purposes to avoid unnecessary failed API calls to Relaycast.

pub(crate) fn synthetic_delivery_read_ack_reason(event_id: &EventId) -> Option<&'static str> {
    let event_id = event_id.as_str().trim();
    if event_id.is_empty() {
        return Some("blank_event_id");
    }
    if event_id.starts_with("http_") {
        return Some("http_api_synthetic_event_id");
    }
    if event_id.starts_with("init_") {
        return Some("initial_task_synthetic_event_id");
    }
    if event_id.starts_with("cont_load_") {
        return Some("continuity_synthetic_event_id");
    }
    if event_id.starts_with("flush_") {
        return Some("flush_synthetic_event_id");
    }
    None
}

Comment on lines +1378 to +1387
fn delivery_read_ack_classification_skips_synthetic_event_ids() {
let cases = [
("", Some("blank_event_id")),
(" ", Some("blank_event_id")),
("http_123", Some("http_api_synthetic_event_id")),
("init_123", Some("initial_task_synthetic_event_id")),
("cont_load_123", Some("continuity_synthetic_event_id")),
("msg_123", None),
("1780911342_317109", None),
];
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Add a test case for the synthetic flush_ event ID to ensure it is correctly classified and skipped.

Suggested change
fn delivery_read_ack_classification_skips_synthetic_event_ids() {
let cases = [
("", Some("blank_event_id")),
(" ", Some("blank_event_id")),
("http_123", Some("http_api_synthetic_event_id")),
("init_123", Some("initial_task_synthetic_event_id")),
("cont_load_123", Some("continuity_synthetic_event_id")),
("msg_123", None),
("1780911342_317109", None),
];
fn delivery_read_ack_classification_skips_synthetic_event_ids() {
let cases = [
("", Some("blank_event_id")),
(" ", Some("blank_event_id")),
("http_123", Some("http_api_synthetic_event_id")),
("init_123", Some("initial_task_synthetic_event_id")),
("cont_load_123", Some("continuity_synthetic_event_id")),
("flush_123", Some("flush_synthetic_event_id")),
("msg_123", None),
("1780911342_317109", None),
];

@kjgbot kjgbot force-pushed the fix/readack-1056 branch from 7f67178 to 152ea97 Compare June 8, 2026 11:39
@codeant-ai
Copy link
Copy Markdown

codeant-ai Bot commented Jun 8, 2026

Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/broker/src/runtime/api.rs (1)

78-107: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Skip auto-registration when agent_token is already supplied.

Lines 79-107 still run preregistration before honoring agent_token. That means a fatal preregistration error aborts the spawn even when the caller already provided a usable worker token, and retry exhaustion can surface a bogus registration_warning for a path that did not need preregistration at all.

Suggested fix
                 let mut preregistration_warning: Option<String> = None;
-                let registration_result =
-                    retry_agent_registration(relaycast_http, &name, Some(&cli)).await;
-                let worker_relay_key = match registration_result {
-                    Ok(token) => Some(token),
-                    Err(RegRetryOutcome::RetryableExhausted(error)) => {
-                        let message = format_worker_preregistration_error(&name, &error);
-                        tracing::warn!(
-                            worker = %name,
-                            error = %error,
-                            "continuing spawn without pre-registration after retries exhausted"
-                        );
-                        preregistration_warning = Some(message);
-                        None
-                    }
-                    Err(RegRetryOutcome::Fatal(error)) => {
-                        let _ = reply.send(Err(format_worker_preregistration_error(&name, &error)));
-                        return;
-                    }
-                };
-
-                // Caller-supplied agent_token overrides auto-registration.
-                // Seed it so broker-side read-acks later act as this exact
-                // recipient identity instead of minting a replacement token.
-                let worker_relay_key = if let Some(token) = agent_token {
+                let worker_relay_key = if let Some(token) = agent_token {
                     seed_supplied_agent_token(relaycast_http, &name, &token);
                     Some(token)
                 } else {
-                    worker_relay_key
+                    match retry_agent_registration(relaycast_http, &name, Some(&cli)).await {
+                        Ok(token) => Some(token),
+                        Err(RegRetryOutcome::RetryableExhausted(error)) => {
+                            let message = format_worker_preregistration_error(&name, &error);
+                            tracing::warn!(
+                                worker = %name,
+                                error = %error,
+                                "continuing spawn without pre-registration after retries exhausted"
+                            );
+                            preregistration_warning = Some(message);
+                            None
+                        }
+                        Err(RegRetryOutcome::Fatal(error)) => {
+                            let _ = reply.send(Err(format_worker_preregistration_error(&name, &error)));
+                            return;
+                        }
+                    }
                 };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/api.rs` around lines 78 - 107, The preregistration
flow runs unconditionally and can abort or set warnings even when a
caller-supplied agent_token exists; change the logic in the spawn path to check
agent_token first and, if present, call
seed_supplied_agent_token(relaycast_http, &name, &token) and set
worker_relay_key = Some(token) immediately, skipping retry_agent_registration,
preregistration_warning, registration_result handling, and any fatal reply send
via format_worker_preregistration_error/RegRetryOutcome; only run
retry_agent_registration(relaycast_http, &name, Some(&cli)).await when
agent_token is None and preserve the existing handling of RetryableExhausted and
Fatal cases, keeping variables name, relaycast_http, cli, reply, and
preregistration_warning consistent.
🧹 Nitpick comments (1)
crates/broker/src/protocol.rs (1)

340-347: ⚡ Quick win

Add a wire-contract round-trip test for delivery_read_ack.

This new broker event shape is part of the SDK contract but isn’t covered in this file’s serde tests yet. Add one round-trip asserting kind, status, and reason omission when None to lock the format.

Also applies to: 424-431

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/protocol.rs` around lines 340 - 347, Add a serde round-trip
test that constructs the broker event for "delivery_read_ack" (use the same
event struct/type used in this file), serializes to JSON, and deserializes back
to assert the wire contract: the top-level "kind" equals "delivery_read_ack",
the enum DeliveryReadAckStatus values round-trip and serialize as snake_case
(test Marked, Failed, SkippedSynthetic, SuppressedDuplicate), and when the
event's optional reason field is None the serialized JSON omits the "reason" key
(and when Some(reason) it round-trips). Add this test to the existing serde
tests in this file (also cover the same checks for the analogous case referenced
at lines ~424-431).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/broker/src/cli/mod.rs`:
- Around line 261-277: In the resolved_instance_name method, whitespace-only
values in self.instance_name and the AGENT_RELAY_BROKER_NAME environment
variable are being treated as valid "Some" values, which short-circuits the
fallback chain and prevents fallback resolution. To fix this, trim and validate
self.instance_name and the environment variable result early in the chain,
converting empty-after-trim results to None before proceeding to the next
or_else clause. Apply the same whitespace-normalization pattern that is already
correctly used for self.name to both the initial instance_name clone and the
environment variable lookup.

In `@crates/broker/src/runtime/worker_events.rs`:
- Around line 9-10: The code currently captures self.relaycast_http and routes
read-acks through the broker's default client (relaycast_http) when handling
PendingDelivery/delivery_ack; instead, use the originating delivery's workspace
client or workspace_id to call mark_read_as_agent so the ack targets the correct
workspace. Locate the handling of PendingDelivery/delivery_ack (references:
PendingDelivery, delivery_ack, relaycast_http, ws_control_tx,
mark_read_as_agent) and replace uses of self.relaycast_http.clone() with the
workspace-specific client from the delivery (e.g., delivery.relaycast_http or
lookup by delivery.workspace_id) and pass that client into mark_read_as_agent;
ensure both the immediate ack path and the looped logic in the 69-100 region
consistently use the delivery's workspace client rather than
self.relaycast_http.

In `@packages/harness-driver/src/client.ts`:
- Around line 456-463: The code assigns workspaceKey from several fallbacks
including RELAY_API_KEY without validating its form, causing non-workspace API
keys to be treated as explicit workspace keys; change the selection logic so
workspaceKey only accepts values that match a workspace key pattern (e.g., start
with "rk_") when picking from options?.workspaceKey,
options?.env.RELAY_WORKSPACE_KEY, options?.env.RELAY_API_KEY,
process.env.RELAY_WORKSPACE_KEY, and process.env.RELAY_API_KEY; leave other API
keys available for auth but do not set workspaceKey from them, and ensure the
same validation is applied where explicit-join inputs are created (the code
around the workspaceKey variable and the explicit join/explicit-join input
construction) so invalid RELAY_API_KEY values no longer trigger explicit-join
behavior.

---

Outside diff comments:
In `@crates/broker/src/runtime/api.rs`:
- Around line 78-107: The preregistration flow runs unconditionally and can
abort or set warnings even when a caller-supplied agent_token exists; change the
logic in the spawn path to check agent_token first and, if present, call
seed_supplied_agent_token(relaycast_http, &name, &token) and set
worker_relay_key = Some(token) immediately, skipping retry_agent_registration,
preregistration_warning, registration_result handling, and any fatal reply send
via format_worker_preregistration_error/RegRetryOutcome; only run
retry_agent_registration(relaycast_http, &name, Some(&cli)).await when
agent_token is None and preserve the existing handling of RetryableExhausted and
Fatal cases, keeping variables name, relaycast_http, cli, reply, and
preregistration_warning consistent.

---

Nitpick comments:
In `@crates/broker/src/protocol.rs`:
- Around line 340-347: Add a serde round-trip test that constructs the broker
event for "delivery_read_ack" (use the same event struct/type used in this
file), serializes to JSON, and deserializes back to assert the wire contract:
the top-level "kind" equals "delivery_read_ack", the enum DeliveryReadAckStatus
values round-trip and serialize as snake_case (test Marked, Failed,
SkippedSynthetic, SuppressedDuplicate), and when the event's optional reason
field is None the serialized JSON omits the "reason" key (and when Some(reason)
it round-trips). Add this test to the existing serde tests in this file (also
cover the same checks for the analogous case referenced at lines ~424-431).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 64e74e77-492a-41bd-9a7c-fba0852b6d32

📥 Commits

Reviewing files that changed from the base of the PR and between 1301a31 and 7f67178.

📒 Files selected for processing (17)
  • .agentworkforce/trajectories/active/traj_b1jrutolckfb/trajectory.json
  • CHANGELOG.md
  • crates/broker/src/cli/mod.rs
  • crates/broker/src/protocol.rs
  • crates/broker/src/relaycast/auth.rs
  • crates/broker/src/relaycast/ws.rs
  • crates/broker/src/runtime/api.rs
  • crates/broker/src/runtime/delivery.rs
  • crates/broker/src/runtime/init.rs
  • crates/broker/src/runtime/mod.rs
  • crates/broker/src/runtime/relaycast_events.rs
  • crates/broker/src/runtime/tests.rs
  • crates/broker/src/runtime/worker_events.rs
  • crates/broker/src/spawner.rs
  • packages/harness-driver/src/client.ts
  • packages/harness-driver/src/lifecycle-hooks.ts
  • packages/harness-driver/src/protocol.ts

Comment thread crates/broker/src/cli/mod.rs
Comment on lines +9 to 10
let relaycast_http = self.relaycast_http.clone();
let ws_control_tx = &self.ws_control_tx;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Use the delivery’s workspace client for mark_read.

Lines 9-10 and Lines 69-100 always route read-acks through self.relaycast_http, but the confirmed PendingDelivery still carries the originating workspace_id. In a multi-workspace broker, a delivery_ack for workspace B will call mark_read_as_agent(...) against the default workspace client instead, so the read-ack either fails or targets the wrong workspace.

Suggested fix
-        let relaycast_http = self.relaycast_http.clone();
+        let workspace_lookup = &self.workspace_lookup;
+        let default_workspace = &self.default_workspace;
...
                             if let Some(pending) = pending_for_confirmation {
                                 let read_ack_delivery_id = pending.delivery.delivery_id.clone();
                                 let read_ack_event_id = pending.delivery.event_id.clone();
+                                let read_ack_http = pending
+                                    .delivery
+                                    .workspace_id
+                                    .as_deref()
+                                    .and_then(|id| workspace_lookup.get(id))
+                                    .map(|workspace| workspace.http_client.clone())
+                                    .unwrap_or_else(|| default_workspace.http_client.clone());
                                 let cli_hint = workers
                                     .workers
                                     .get(&name)
                                     .and_then(|handle| handle.spec.cli.as_deref())
                                     .map(str::to_string);
...
                                 mark_delivery_read_ack(
-                                    &relaycast_http,
+                                    &read_ack_http,
                                     sdk_out_tx,
                                     dedup,
                                     &name,
                                     cli_hint.as_deref(),
                                     &read_ack_delivery_id,
                                     &read_ack_event_id,
                                 );

Also applies to: 69-100

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/broker/src/runtime/worker_events.rs` around lines 9 - 10, The code
currently captures self.relaycast_http and routes read-acks through the broker's
default client (relaycast_http) when handling PendingDelivery/delivery_ack;
instead, use the originating delivery's workspace client or workspace_id to call
mark_read_as_agent so the ack targets the correct workspace. Locate the handling
of PendingDelivery/delivery_ack (references: PendingDelivery, delivery_ack,
relaycast_http, ws_control_tx, mark_read_as_agent) and replace uses of
self.relaycast_http.clone() with the workspace-specific client from the delivery
(e.g., delivery.relaycast_http or lookup by delivery.workspace_id) and pass that
client into mark_read_as_agent; ensure both the immediate ack path and the
looped logic in the 69-100 region consistently use the delivery's workspace
client rather than self.relaycast_http.

Comment thread packages/harness-driver/src/client.ts Outdated
Comment on lines +456 to +463
const workspaceKey =
nonEmptyString(options?.workspaceKey) ??
nonEmptyString(options?.env?.AGENT_RELAY_WORKSPACE_KEY) ??
nonEmptyString(options?.env?.RELAY_WORKSPACE_KEY) ??
nonEmptyString(options?.env?.RELAY_API_KEY) ??
nonEmptyString(process.env.AGENT_RELAY_WORKSPACE_KEY) ??
nonEmptyString(process.env.RELAY_WORKSPACE_KEY) ??
nonEmptyString(process.env.RELAY_API_KEY);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

RELAY_API_KEY fallback is being promoted to strict workspace join without validation.

If RELAY_API_KEY is present but not an rk_ workspace key, Line 460/Line 463 still set workspaceKey, and then Line 490 + Line 478 force explicit-join inputs. Broker startup then hard-fails on invalid explicit workspace key.

Proposed fix
 function nonEmptyString(value: string | undefined): string | undefined {
   const trimmed = value?.trim();
   return trimmed ? trimmed : undefined;
 }

+function nonEmptyWorkspaceKey(value: string | undefined): string | undefined {
+  const trimmed = nonEmptyString(value);
+  return trimmed?.startsWith('rk_') ? trimmed : undefined;
+}
+
 // ...
     const workspaceKey =
       nonEmptyString(options?.workspaceKey) ??
       nonEmptyString(options?.env?.AGENT_RELAY_WORKSPACE_KEY) ??
       nonEmptyString(options?.env?.RELAY_WORKSPACE_KEY) ??
-      nonEmptyString(options?.env?.RELAY_API_KEY) ??
+      nonEmptyWorkspaceKey(options?.env?.RELAY_API_KEY) ??
       nonEmptyString(process.env.AGENT_RELAY_WORKSPACE_KEY) ??
       nonEmptyString(process.env.RELAY_WORKSPACE_KEY) ??
-      nonEmptyString(process.env.RELAY_API_KEY);
+      nonEmptyWorkspaceKey(process.env.RELAY_API_KEY);

Also applies to: 476-490

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/harness-driver/src/client.ts` around lines 456 - 463, The code
assigns workspaceKey from several fallbacks including RELAY_API_KEY without
validating its form, causing non-workspace API keys to be treated as explicit
workspace keys; change the selection logic so workspaceKey only accepts values
that match a workspace key pattern (e.g., start with "rk_") when picking from
options?.workspaceKey, options?.env.RELAY_WORKSPACE_KEY,
options?.env.RELAY_API_KEY, process.env.RELAY_WORKSPACE_KEY, and
process.env.RELAY_API_KEY; leave other API keys available for auth but do not
set workspaceKey from them, and ensure the same validation is applied where
explicit-join inputs are created (the code around the workspaceKey variable and
the explicit join/explicit-join input construction) so invalid RELAY_API_KEY
values no longer trigger explicit-join behavior.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 10 files

You’re at about 94% of the monthly reviewed-line limit. You may want to disable incremental reviews to conserve quota. Reviews will continue until that limit is exceeded. If you need help avoiding interruptions, please contact contact@cubic.dev.

Reply with feedback, questions, or to request a fix.

Re-trigger cubic

Comment thread crates/broker/src/runtime/delivery.rs
Comment thread crates/broker/src/runtime/delivery.rs
@agent-relay-code
Copy link
Copy Markdown
Contributor

⚠️ pr-reviewer push failed (exit 128) — fixes were not applied to the PR. The notes below are advisory and were not pushed.

Reviewed and fixed the validated PR issues.

Changes made:

  • crates/broker/src/cli/mod.rs: blank workspace_key now falls through to AGENT_RELAY_WORKSPACE_KEY.
  • crates/broker/src/runtime/delivery.rs: flush_ ids are treated as synthetic, and synthetic classification now happens before dedupe.
  • crates/broker/src/runtime/tests.rs: added coverage for flush_ classification, blank workspace-key fallback, and duplicate synthetic read-acks staying skipped.

Verification run:

  • cargo fmt --check
  • cargo test -p agent-relay-broker delivery_read_ack -- --nocapture
  • cargo test -p agent-relay-broker blank_workspace_key_falls_through_to_env -- --nocapture
  • cargo test -p agent-relay-broker workspace_key_flag_overrides_env -- --nocapture
  • cargo test -p agent-relay-broker mark_read_as_agent_uses_seeded_recipient_token_without_respawn -- --nocapture
  • npm ci
  • npm --workspace packages/sdk run build
  • npm --workspace packages/harness-driver run check

Addressed comments

  • gemini-code-assist: blank instance_name could bypass fallback; stale — current checkout already filters it in crates/broker/src/cli/mod.rs:261.
  • gemini-code-assist: blank workspace_key could bypass env fallback; fixed in crates/broker/src/cli/mod.rs:284.
  • gemini-code-assist: flush_ ids should be skipped as synthetic read-ack ids; fixed in crates/broker/src/runtime/delivery.rs:136.
  • gemini-code-assist: add flush_ classifier test case; fixed in crates/broker/src/runtime/tests.rs:1385.
  • Cubic: synthetic ids were deduped before classification; fixed in crates/broker/src/runtime/delivery.rs:188 and covered in crates/broker/src/runtime/tests.rs:1659.
  • Cubic: failed mark_read dedupe could block retry; no change — invalid for the current confirmation path because there is no retry path after a pending delivery is cleared, and expanding retry behavior is outside this PR’s demonstrated fix.
  • codeant-ai: no actionable review finding; comment only reported PR review limit reached.
  • coderabbitai: no actionable code finding visible; comment only showed processing/configuration text and UI actions.

I did not print READY because I cannot verify GitHub CI completion or mergeability from this sandbox.

@codeant-ai
Copy link
Copy Markdown

codeant-ai Bot commented Jun 8, 2026

Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI.

@agent-relay-code
Copy link
Copy Markdown
Contributor

pr-reviewer applied fixes — committed and pushed 28f59b5 to this PR. The notes below describe what changed.

Fixed the validated PR issues in the current checkout.

Changes made:

  • crates/broker/src/cli/mod.rs: blank --workspace-key now falls through to AGENT_RELAY_WORKSPACE_KEY.
  • crates/broker/src/runtime/delivery.rs: flush_ IDs are treated as synthetic, and synthetic classification happens before read-ack dedupe.
  • crates/broker/src/runtime/tests.rs: added regression coverage for flush_, repeated synthetic read-acks, and workspace-key fallback behavior.

Verification run:

  • cargo fmt --check
  • cargo test -p agent-relay-broker delivery_read_ack -- --nocapture
  • cargo test -p agent-relay-broker blank_workspace_key_falls_through_to_env -- --nocapture
  • cargo test -p agent-relay-broker workspace_key_flag_overrides_env -- --nocapture
  • npm ci
  • npm --workspace packages/sdk run build
  • npm --workspace packages/harness-driver run check

Addressed comments

  • gemini-code-assist: blank instance_name could bypass fallback; stale, already handled in crates/broker/src/cli/mod.rs:261.
  • gemini-code-assist: blank workspace_key could bypass env fallback; fixed in crates/broker/src/cli/mod.rs:284 and covered in crates/broker/src/cli/mod.rs:418.
  • gemini-code-assist: flush_ IDs should be skipped as synthetic read-ack IDs; fixed in crates/broker/src/runtime/delivery.rs:136.
  • gemini-code-assist: add flush_ classifier test case; fixed in crates/broker/src/runtime/tests.rs:1385.
  • Cubic: synthetic IDs were deduped before classification; fixed in crates/broker/src/runtime/delivery.rs:188 and covered in crates/broker/src/runtime/tests.rs:1647.
  • Cubic: failed mark_read dedupe could block retry; no change, invalid in the current confirmation path because the pending delivery is cleared before read-ack scheduling and there is no retry path for that delivery afterward.
  • codeant-ai: no actionable code finding visible in the local PR artifacts; prior summary says it only reported review limit reached.
  • coderabbitai: no actionable code finding visible in the local PR artifacts; prior summary says it only showed processing/configuration text and UI actions.
  • pr-reviewer-push-outcome: advisory fixes were reported as not pushed; validated and applied the still-current issues at crates/broker/src/cli/mod.rs:284, crates/broker/src/runtime/delivery.rs:136, and crates/broker/src/runtime/delivery.rs:188.

I did not print READY because GitHub CI status and mergeability cannot be verified from this sandbox without gh, which you explicitly asked me not to use.

@agent-relay-code
Copy link
Copy Markdown
Contributor

Reviewed PR #1062 against .workforce/pr.diff, changed files, and context. I did not find a current, reproducible defect, so I made no source edits.

Addressed comments

No bot or reviewer comments were present in the provided .workforce artifacts, so there were no comment threads to validate or fix.

Validation run:

  • cargo test -j1 -p agent-relay-broker delivery_read_ack --tests passed
  • cargo test -j1 -p agent-relay-broker mark_read_as_agent_uses_seeded_recipient_token_without_respawn --tests passed
  • cargo test -j1 -p agent-relay-broker clear_pending_delivery --tests passed
  • npm run build:sdk passed
  • npm --prefix packages/harness-driver run build passed

I did not verify GitHub CI status or mergeability because the task prohibited git/gh usage, and no separate CI/mergeability tool was available.

@codeant-ai
Copy link
Copy Markdown

codeant-ai Bot commented Jun 8, 2026

Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI.

@agent-relay-code
Copy link
Copy Markdown
Contributor

⚠️ pr-reviewer push failed (exit 1) — fixes were not applied to the PR. The notes below are advisory and were not pushed.

Reviewed PR #1062 against .workforce/pr.diff, changed files, and context. I did not find a current, reproducible defect, so I made no source edits.

Addressed comments

No bot or reviewer comments were present in the provided .workforce artifacts, so there were no comment threads to validate or fix.

Validation run:

  • cargo test -j1 -p agent-relay-broker delivery_read_ack --tests passed
  • cargo test -j1 -p agent-relay-broker mark_read_as_agent_uses_seeded_recipient_token_without_respawn --tests passed
  • cargo test -j1 -p agent-relay-broker clear_pending_delivery --tests passed
  • npm run build:sdk passed
  • npm --prefix packages/harness-driver run build passed

I did not verify GitHub CI status or mergeability because the task prohibited git/gh usage, and no separate CI/mergeability tool was available.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In @.agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/summary.md:
- Line 28: Normalize the ambiguous tokens in the decision rationale by replacing
accidental asterisks: change "workspace*key" to a clear form such as
"workspace_key" or a quoted literal "workspace key", and change "flush*
synthetic" to either "flush synthetic" or simply "flush" (or quote as "flush
synthetic") so the sentence is unambiguous; update the summary string that
contains those tokens accordingly (look for the phrase containing workspace*key
and flush* synthetic in the summary.md content and perform the token
replacements).
- Line 38: Remove the duplicated sentence in the Chapter 1 work item by keeping
only one copy of the sentence "events.connect() falls back to the relaycast 2.5
workspace stream when no agent client; fixes relay#1031 so workspace
relay.addListener streams. Bumped `@relaycast/sdk` to ^2.5.1. Also fixed
pre-existing vitest-4 constructor-mock breakage in agent-relay.test.ts (main
'Test' workflow was red)." so the line appears once (delete the redundant
repeated copy after the colon).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: c5eb48d7-7b10-47c0-8939-3f768607eb1e

📥 Commits

Reviewing files that changed from the base of the PR and between 152ea97 and f0a14f2.

📒 Files selected for processing (7)
  • .agentworkforce/trajectories/active/traj_b1jrutolckfb/trajectory.json
  • .agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb.trace.json
  • .agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/summary.md
  • .agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/trajectory.json
  • crates/broker/src/cli/mod.rs
  • crates/broker/src/runtime/delivery.rs
  • crates/broker/src/runtime/tests.rs
💤 Files with no reviewable changes (1)
  • .agentworkforce/trajectories/active/traj_b1jrutolckfb/trajectory.json
✅ Files skipped from review due to trivial changes (1)
  • .agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/trajectory.json
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/broker/src/runtime/tests.rs
  • crates/broker/src/runtime/delivery.rs

Comment thread .agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/summary.md Outdated

_Agent: default_

- events.connect() falls back to the relaycast 2.5 workspace stream when no agent client; fixes relay#1031 so workspace relay.addListener streams. Bumped @relaycast/sdk to ^2.5.1. Also fixed pre-existing vitest-4 constructor-mock breakage in agent-relay.test.ts (main 'Test' workflow was red).: events.connect() falls back to the relaycast 2.5 workspace stream when no agent client; fixes relay#1031 so workspace relay.addListener streams. Bumped @relaycast/sdk to ^2.5.1. Also fixed pre-existing vitest-4 constructor-mock breakage in agent-relay.test.ts (main 'Test' workflow was red).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove duplicated sentence in Chapter 1 work item.

This bullet repeats the same full sentence on both sides of the colon, which adds noise to the trajectory record. Keep only one copy.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/summary.md
at line 38, Remove the duplicated sentence in the Chapter 1 work item by keeping
only one copy of the sentence "events.connect() falls back to the relaycast 2.5
workspace stream when no agent client; fixes relay#1031 so workspace
relay.addListener streams. Bumped `@relaycast/sdk` to ^2.5.1. Also fixed
pre-existing vitest-4 constructor-mock breakage in agent-relay.test.ts (main
'Test' workflow was red)." so the line appears once (delete the redundant
repeated copy after the colon).

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 7 files (changes from recent commits).

You’re at about 95% of the monthly reviewed-line limit. You may want to disable incremental reviews to conserve quota. Reviews will continue until that limit is exceeded. If you need help avoiding interruptions, please contact contact@cubic.dev.

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="crates/broker/src/cli/mod.rs">

<violation number="1" location="crates/broker/src/cli/mod.rs:285">
P3: Duplicated `non_empty_trimmed` inner function in both `resolved_instance_name` and `resolved_workspace_key` — extract to a module-level private helper to avoid the DRY violation and keep future trimming behavior consistent.</violation>
</file>

Tip: Review your code locally with the cubic CLI to iterate faster.

Re-trigger cubic

Comment thread .agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/trajectory.json Outdated
Comment thread crates/broker/src/cli/mod.rs Outdated
}

pub(crate) fn resolved_workspace_key(&self) -> Option<String> {
fn non_empty_trimmed(value: &str) -> Option<String> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: Duplicated non_empty_trimmed inner function in both resolved_instance_name and resolved_workspace_key — extract to a module-level private helper to avoid the DRY violation and keep future trimming behavior consistent.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At crates/broker/src/cli/mod.rs, line 285:

<comment>Duplicated `non_empty_trimmed` inner function in both `resolved_instance_name` and `resolved_workspace_key` — extract to a module-level private helper to avoid the DRY violation and keep future trimming behavior consistent.</comment>

<file context>
@@ -282,11 +282,23 @@ impl InitCommand {
     }
 
     pub(crate) fn resolved_workspace_key(&self) -> Option<String> {
+        fn non_empty_trimmed(value: &str) -> Option<String> {
+            let trimmed = value.trim();
+            if trimmed.is_empty() {
</file context>

Comment thread .agentworkforce/trajectories/completed/2026-06/traj_b1jrutolckfb/summary.md Outdated
@codeant-ai
Copy link
Copy Markdown

codeant-ai Bot commented Jun 8, 2026

Your free trial PR review limit of 300 PRs has been reached. Please upgrade your plan to continue using CodeAnt AI.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 7 files (changes from recent commits).

You’re at about 99% of the monthly reviewed-line limit. You may want to disable incremental reviews to conserve quota. Reviews will continue until that limit is exceeded. If you need help avoiding interruptions, please contact contact@cubic.dev.

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="crates/broker/src/runtime/tests.rs">

<violation number="1" location="crates/broker/src/runtime/tests.rs:1657">
P2: Test assertions assume deterministic ordering of events emitted from `tokio::spawn` tasks. Both `mark_delivery_read_ack` calls emit telemetry via spawned tasks with no `.await` between them, making the receive order scheduler-dependent. This can produce a flaky test if the runtime schedules the tasks out of spawn order.</violation>
</file>

<file name=".agentworkforce/trajectories/active/traj_b1jrutolckfb/trajectory.json">

<violation number="1">
P3: The committed trajectory task title appears to describe a different feature than this PR, which can mislead trajectory/automation consumers and review traceability.</violation>
</file>

<file name="crates/broker/src/cli/mod.rs">

<violation number="1">
P2: `resolved_workspace_key` changed precedence so a blank `--workspace-key` no longer falls back to `AGENT_RELAY_WORKSPACE_KEY`, causing unintended loss of env-based workspace selection.</violation>
</file>

Tip: Review your code locally with the cubic CLI to iterate faster.

Re-trigger cubic

&EventId::new("init_123"),
);

let first = tokio::time::timeout(Duration::from_secs(1), rx.recv())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Test assertions assume deterministic ordering of events emitted from tokio::spawn tasks. Both mark_delivery_read_ack calls emit telemetry via spawned tasks with no .await between them, making the receive order scheduler-dependent. This can produce a flaky test if the runtime schedules the tasks out of spawn order.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At crates/broker/src/runtime/tests.rs, line 1657:

<comment>Test assertions assume deterministic ordering of events emitted from `tokio::spawn` tasks. Both `mark_delivery_read_ack` calls emit telemetry via spawned tasks with no `.await` between them, making the receive order scheduler-dependent. This can produce a flaky test if the runtime schedules the tasks out of spawn order.</comment>

<file context>
@@ -1654,15 +1654,21 @@ async fn synthetic_delivery_read_ack_skips_mark_read() {
-        assert_eq!(frame.payload["status"], "skipped_synthetic");
-        assert_eq!(frame.payload["reason"], "initial_task_synthetic_event_id");
-    }
+    let first = tokio::time::timeout(Duration::from_secs(1), rx.recv())
+        .await
+        .expect("synthetic skip telemetry should arrive")
</file context>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant