Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion crates/broker/src/relaycast/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ impl RelaycastHttpClient {
/// `rk_live_...` workspace key, which grants full read/write/spawn
/// access. The raw token material is only present on this response (and
/// on `rotate_observer_token`'s), never on subsequent reads.
///
/// Uses `anyhow::Error::from` (rather than formatting the SDK error into
/// a fresh string-only error) so callers can `downcast_ref::<RelayError>`
/// on the returned error to branch on the structured API error code
/// (e.g. `observer_token_name_conflict`) instead of string-matching the
/// `Display` output.
pub async fn create_observer_token(
&self,
request: CreateObserverTokenRequest,
Expand All @@ -207,7 +213,36 @@ impl RelaycastHttpClient {
relay
.create_observer_token(request)
.await
.map_err(|error| anyhow::anyhow!("{error}"))
.map_err(anyhow::Error::from)
}

/// List observer tokens for this workspace. Metadata only — no raw token
/// material is ever included, per the SDK's own doc comment on this
/// method. Used to recover the id of an existing token by name when
/// `create_observer_token` fails with `observer_token_name_conflict`.
pub async fn list_observer_tokens(&self) -> Result<Vec<ObserverToken>> {
let relay = self
.relay_client()
.context("SDK relay client not initialized")?;
relay
.list_observer_tokens()
.await
.map_err(anyhow::Error::from)
}

/// Rotate an observer token, returning fresh raw token material. Used as
/// a fallback when `create_observer_token` fails with
/// `observer_token_name_conflict`: since the original raw token was
/// never persisted anywhere, rotating the existing token under that name
/// is the only way to hand the caller a usable `ot_live_...` value.
pub async fn rotate_observer_token(&self, id: &str) -> Result<ObserverToken> {
let relay = self
.relay_client()
.context("SDK relay client not initialized")?;
relay
.rotate_observer_token(id)
.await
.map_err(anyhow::Error::from)
}

/// Fetch a single action invocation, including its `input`. The
Expand Down
188 changes: 165 additions & 23 deletions crates/broker/src/runtime/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use relaycast::{CreateObserverTokenRequest, ObserverScope};
use relaycast::{CreateObserverTokenRequest, ObserverScope, ObserverToken, RelayError};

/// Default name recorded on observer tokens minted via `/api/observer-token`
/// when the caller doesn't supply one.
Expand All @@ -23,6 +23,142 @@ pub(crate) fn default_observer_token_scopes() -> Vec<ObserverScope> {
]
}

/// Outcome of `mint_or_recover_observer_token`: distinguishes a genuinely
/// new token from one recovered by rotating a pre-existing token under the
/// same name, purely so the caller can log/report the two cases
/// differently. Both variants carry a normal, fully-usable `ObserverToken`.
#[derive(Debug)]
pub(crate) enum ObserverTokenMintOutcome {
Created(ObserverToken),
RecoveredViaRotate(ObserverToken),
}

impl ObserverTokenMintOutcome {
pub(crate) fn is_recovered_via_rotate(&self) -> bool {
matches!(self, ObserverTokenMintOutcome::RecoveredViaRotate(_))
}

pub(crate) fn into_token(self) -> ObserverToken {
match self {
ObserverTokenMintOutcome::Created(token) => token,
ObserverTokenMintOutcome::RecoveredViaRotate(token) => token,
}
}
}

/// Error from `mint_or_recover_observer_token`, pre-classified so callers
/// don't need to re-derive "was this a timeout" from string content.
#[derive(Debug)]
pub(crate) enum ObserverTokenMintError {
/// A non-timeout failure; already formatted as a user-facing message.
Failed(String),
/// The create call (or, if triggered, the list+rotate fallback) didn't
/// complete within the caller-supplied timeout.
TimedOut,
}

/// True if `error` (as returned by `RelaycastHttpClient::create_observer_token`)
/// is specifically the API's `observer_token_name_conflict` error (HTTP
/// 409) — i.e. a token with this name already exists for the workspace —
/// as opposed to a timeout, network failure, or any other API error. Only
/// this specific error should trigger the list+rotate fallback; anything
/// else must still propagate as a failure.
fn is_observer_token_name_conflict(error: &anyhow::Error) -> bool {
error
.downcast_ref::<RelayError>()
.is_some_and(|relay_error| relay_error.code() == Some("observer_token_name_conflict"))
}

/// Mint an observer token named `token_name` for the workspace reachable
/// via `http_client`, falling back to recovering a pre-existing token if
/// creation fails because a token under that name already exists
/// (`observer_token_name_conflict`, HTTP 409). Callers like Pear mint a
/// token under a fixed default name once per workspace with no way to know
/// in advance whether a previous mint already claimed that name, so without
/// this fallback, repeat minting would fail outright forever.
///
/// The initial create call is bounded by `timeout_duration`. If the
/// list+rotate fallback is triggered, it gets its own fresh
/// `timeout_duration` window (rather than sharing whatever budget the
/// create call already spent), so it can't block the caller indefinitely
/// either.
pub(crate) async fn mint_or_recover_observer_token(
http_client: &RelaycastHttpClient,
token_name: &str,
timeout_duration: Duration,
) -> Result<ObserverTokenMintOutcome, ObserverTokenMintError> {
match timeout(
timeout_duration,
http_client.create_observer_token(CreateObserverTokenRequest {
name: token_name.to_string(),
scopes: default_observer_token_scopes(),
description: None,
filters: None,
expires_at: None,
}),
)
.await
{
Ok(Ok(observer_token)) => Ok(ObserverTokenMintOutcome::Created(observer_token)),
Ok(Err(error)) if is_observer_token_name_conflict(&error) => {
recover_observer_token_after_name_conflict(
http_client,
token_name,
timeout_duration,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep observer-token recovery within the HTTP deadline

When the create call returns a name conflict near the default 20s observer-token timeout, passing the full timeout into the recovery path gives list+rotate another 20s; /api/observer-token only waits 30s for reply_rx in listen_api_create_observer_token, so a recovery that succeeds after about 31–40s is reported to the caller as broker request timed out and the broker reply is dropped. Please keep the total create+recover budget under the HTTP handler deadline, or coordinate/raise the outer deadline instead of granting a fresh full window here.

Useful? React with 👍 / 👎.

error,
)
.await
}
Ok(Err(error)) => Err(ObserverTokenMintError::Failed(format!(
"Failed to create observer token: {error}"
))),
Err(_) => Err(ObserverTokenMintError::TimedOut),
}
}

/// Fallback for `create_observer_token` failing with
/// `observer_token_name_conflict`: list existing observer tokens for the
/// workspace, find the one named `token_name`, and rotate it to obtain
/// fresh, usable raw token material.
///
/// **Behavioral note:** the raw token originally minted under this name was
/// never persisted anywhere the broker can read it back, so rotating is the
/// only way to recover a usable value — this necessarily invalidates
/// whatever raw token was previously handed out under this name. This is
/// acceptable for this endpoint's known caller (Pear's `mintObserverToken`,
/// which always treats a freshly-returned token as authoritative and
/// re-caches it), but any *other* holder of the previous raw value for this
/// name silently loses access when this path is taken.
///
/// If no existing token matches `token_name` despite the conflict error
/// (e.g. a race with a concurrent revoke), the original conflict error is
/// propagated as-is rather than panicking or synthesizing a misleading
/// response.
async fn recover_observer_token_after_name_conflict(
http_client: &RelaycastHttpClient,
token_name: &str,
timeout_duration: Duration,
conflict_error: anyhow::Error,
) -> Result<ObserverTokenMintOutcome, ObserverTokenMintError> {
let fallback = timeout(timeout_duration, async move {
let existing = http_client.list_observer_tokens().await?;
let matched = existing
.into_iter()
.find(|candidate| candidate.name == token_name)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: A repeat mint can return a token with different permissions than /api/observer-token requested: the recovery path rotates the first listed token whose name matches, without checking candidate.scopes against default_observer_token_scopes(). Consider requiring the exact expected scope set before rotating, and propagating the original conflict when the existing token doesn't match.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At crates/broker/src/runtime/api.rs, line 147:

<comment>A repeat mint can return a token with different permissions than `/api/observer-token` requested: the recovery path rotates the first listed token whose name matches, without checking `candidate.scopes` against `default_observer_token_scopes()`. Consider requiring the exact expected scope set before rotating, and propagating the original conflict when the existing token doesn't match.</comment>

<file context>
@@ -23,6 +23,142 @@ pub(crate) fn default_observer_token_scopes() -> Vec<ObserverScope> {
+        let existing = http_client.list_observer_tokens().await?;
+        let matched = existing
+            .into_iter()
+            .find(|candidate| candidate.name == token_name)
+            .ok_or(conflict_error)?;
+        http_client.rotate_observer_token(&matched.id).await
</file context>
Suggested change
.find(|candidate| candidate.name == token_name)
.find(|candidate| {
let expected_scopes = default_observer_token_scopes();
candidate.name == token_name
&& candidate.scopes.len() == expected_scopes.len()
&& expected_scopes
.iter()
.all(|scope| candidate.scopes.contains(scope))
})

.ok_or(conflict_error)?;
http_client.rotate_observer_token(&matched.id).await

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reconcile scopes before rotating an existing observer token

When a token with the requested name already exists but was created with different scopes or filters, this fallback rotates and returns that token as-is. The normal create path always requests default_observer_token_scopes() with no filters, so a manual or older pear-dashboard-observer token that includes extra scopes such as files:read (or restrictive filters) would make /api/observer-token hand out credentials with unexpected access or visibility; update/verify the matched token metadata before rotating, or fail if it does not match the endpoint contract.

Useful? React with 👍 / 👎.

})
.await;

match fallback {
Ok(Ok(observer_token)) => Ok(ObserverTokenMintOutcome::RecoveredViaRotate(observer_token)),
Ok(Err(error)) => Err(ObserverTokenMintError::Failed(format!(
"Failed to create observer token: {error}"
))),
Err(_) => Err(ObserverTokenMintError::TimedOut),
}
}

impl BrokerRuntime {
pub(super) async fn handle_api_request(&mut self, req: ListenApiRequest) {
let req = match req {
Expand Down Expand Up @@ -934,27 +1070,34 @@ impl BrokerRuntime {
// `http_api_relaycast_send_timeout`) so tuning the `/api/send`
// path can't unintentionally break token minting.
let relaycast_timeout = http_api_observer_token_timeout();
match timeout(
match mint_or_recover_observer_token(
&selected_workspace.http_client,
&token_name,
relaycast_timeout,
selected_workspace.http_client.create_observer_token(
CreateObserverTokenRequest {
name: token_name,
scopes: default_observer_token_scopes(),
description: None,
filters: None,
expires_at: None,
},
),
)
.await
{
Ok(Ok(observer_token)) => {
tracing::info!(
target = "relay_broker::http_api",
workspace_id = %selected_workspace_id,
observer_token_id = %observer_token.id,
"minted observer token via HTTP API"
);
Ok(outcome) => {
let recovered_via_rotate = outcome.is_recovered_via_rotate();
let observer_token = outcome.into_token();
if recovered_via_rotate {
tracing::info!(
target = "relay_broker::http_api",
workspace_id = %selected_workspace_id,
observer_token_id = %observer_token.id,
token_name = %token_name,
"observer token name conflict on mint; recovered existing \
token via list+rotate (this invalidates whatever raw token \
was previously issued under this name)"
);
} else {
tracing::info!(
target = "relay_broker::http_api",
workspace_id = %selected_workspace_id,
observer_token_id = %observer_token.id,
"minted observer token via HTTP API"
);
}
let _ = reply.send(Ok(json!({
"success": true,
"id": observer_token.id,
Expand All @@ -965,17 +1108,16 @@ impl BrokerRuntime {
"workspace_alias": selected_workspace_alias,
})));
}
Ok(Err(error)) => {
Err(ObserverTokenMintError::Failed(message)) => {
tracing::warn!(
target = "relay_broker::http_api",
workspace_id = %selected_workspace_id,
error = %error,
error = %message,
"failed to mint observer token via HTTP API"
);
let _ =
reply.send(Err(format!("Failed to create observer token: {error}")));
let _ = reply.send(Err(message));
}
Err(_) => {
Err(ObserverTokenMintError::TimedOut) => {
tracing::warn!(
target = "relay_broker::http_api",
workspace_id = %selected_workspace_id,
Expand Down
5 changes: 4 additions & 1 deletion crates/broker/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ mod util;
mod worker_events;

#[cfg(test)]
pub(crate) use api::{default_observer_token_scopes, resolve_workspace};
pub(crate) use api::{
default_observer_token_scopes, mint_or_recover_observer_token, resolve_workspace,
ObserverTokenMintError, ObserverTokenMintOutcome,
};
pub(crate) use app_server::*;
pub(crate) use connection::*;
pub(crate) use delivery::*;
Expand Down
Loading
Loading