-
Notifications
You must be signed in to change notification settings - Fork 57
fix(broker): recover existing observer token on name-conflict mint #1227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,5 +1,5 @@ | ||||||||||||||||||||
| use super::*; | ||||||||||||||||||||
| use relaycast::{CreateObserverTokenRequest, ObserverScope}; | ||||||||||||||||||||
| use relaycast::{CreateObserverTokenRequest, ObserverScope, ObserverToken, RelayError}; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// Default name recorded on observer tokens minted via `/api/observer-token` | ||||||||||||||||||||
| /// when the caller doesn't supply one. | ||||||||||||||||||||
|
|
@@ -23,6 +23,142 @@ pub(crate) fn default_observer_token_scopes() -> Vec<ObserverScope> { | |||||||||||||||||||
| ] | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// Outcome of `mint_or_recover_observer_token`: distinguishes a genuinely | ||||||||||||||||||||
| /// new token from one recovered by rotating a pre-existing token under the | ||||||||||||||||||||
| /// same name, purely so the caller can log/report the two cases | ||||||||||||||||||||
| /// differently. Both variants carry a normal, fully-usable `ObserverToken`. | ||||||||||||||||||||
| #[derive(Debug)] | ||||||||||||||||||||
| pub(crate) enum ObserverTokenMintOutcome { | ||||||||||||||||||||
| Created(ObserverToken), | ||||||||||||||||||||
| RecoveredViaRotate(ObserverToken), | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| impl ObserverTokenMintOutcome { | ||||||||||||||||||||
| pub(crate) fn is_recovered_via_rotate(&self) -> bool { | ||||||||||||||||||||
| matches!(self, ObserverTokenMintOutcome::RecoveredViaRotate(_)) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| pub(crate) fn into_token(self) -> ObserverToken { | ||||||||||||||||||||
| match self { | ||||||||||||||||||||
| ObserverTokenMintOutcome::Created(token) => token, | ||||||||||||||||||||
| ObserverTokenMintOutcome::RecoveredViaRotate(token) => token, | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// Error from `mint_or_recover_observer_token`, pre-classified so callers | ||||||||||||||||||||
| /// don't need to re-derive "was this a timeout" from string content. | ||||||||||||||||||||
| #[derive(Debug)] | ||||||||||||||||||||
| pub(crate) enum ObserverTokenMintError { | ||||||||||||||||||||
| /// A non-timeout failure; already formatted as a user-facing message. | ||||||||||||||||||||
| Failed(String), | ||||||||||||||||||||
| /// The create call (or, if triggered, the list+rotate fallback) didn't | ||||||||||||||||||||
| /// complete within the caller-supplied timeout. | ||||||||||||||||||||
| TimedOut, | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// True if `error` (as returned by `RelaycastHttpClient::create_observer_token`) | ||||||||||||||||||||
| /// is specifically the API's `observer_token_name_conflict` error (HTTP | ||||||||||||||||||||
| /// 409) — i.e. a token with this name already exists for the workspace — | ||||||||||||||||||||
| /// as opposed to a timeout, network failure, or any other API error. Only | ||||||||||||||||||||
| /// this specific error should trigger the list+rotate fallback; anything | ||||||||||||||||||||
| /// else must still propagate as a failure. | ||||||||||||||||||||
| fn is_observer_token_name_conflict(error: &anyhow::Error) -> bool { | ||||||||||||||||||||
| error | ||||||||||||||||||||
| .downcast_ref::<RelayError>() | ||||||||||||||||||||
| .is_some_and(|relay_error| relay_error.code() == Some("observer_token_name_conflict")) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// Mint an observer token named `token_name` for the workspace reachable | ||||||||||||||||||||
| /// via `http_client`, falling back to recovering a pre-existing token if | ||||||||||||||||||||
| /// creation fails because a token under that name already exists | ||||||||||||||||||||
| /// (`observer_token_name_conflict`, HTTP 409). Callers like Pear mint a | ||||||||||||||||||||
| /// token under a fixed default name once per workspace with no way to know | ||||||||||||||||||||
| /// in advance whether a previous mint already claimed that name, so without | ||||||||||||||||||||
| /// this fallback, repeat minting would fail outright forever. | ||||||||||||||||||||
| /// | ||||||||||||||||||||
| /// The initial create call is bounded by `timeout_duration`. If the | ||||||||||||||||||||
| /// list+rotate fallback is triggered, it gets its own fresh | ||||||||||||||||||||
| /// `timeout_duration` window (rather than sharing whatever budget the | ||||||||||||||||||||
| /// create call already spent), so it can't block the caller indefinitely | ||||||||||||||||||||
| /// either. | ||||||||||||||||||||
| pub(crate) async fn mint_or_recover_observer_token( | ||||||||||||||||||||
| http_client: &RelaycastHttpClient, | ||||||||||||||||||||
| token_name: &str, | ||||||||||||||||||||
| timeout_duration: Duration, | ||||||||||||||||||||
| ) -> Result<ObserverTokenMintOutcome, ObserverTokenMintError> { | ||||||||||||||||||||
| match timeout( | ||||||||||||||||||||
| timeout_duration, | ||||||||||||||||||||
| http_client.create_observer_token(CreateObserverTokenRequest { | ||||||||||||||||||||
| name: token_name.to_string(), | ||||||||||||||||||||
| scopes: default_observer_token_scopes(), | ||||||||||||||||||||
| description: None, | ||||||||||||||||||||
| filters: None, | ||||||||||||||||||||
| expires_at: None, | ||||||||||||||||||||
| }), | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| .await | ||||||||||||||||||||
| { | ||||||||||||||||||||
| Ok(Ok(observer_token)) => Ok(ObserverTokenMintOutcome::Created(observer_token)), | ||||||||||||||||||||
| Ok(Err(error)) if is_observer_token_name_conflict(&error) => { | ||||||||||||||||||||
| recover_observer_token_after_name_conflict( | ||||||||||||||||||||
| http_client, | ||||||||||||||||||||
| token_name, | ||||||||||||||||||||
| timeout_duration, | ||||||||||||||||||||
| error, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| .await | ||||||||||||||||||||
| } | ||||||||||||||||||||
| Ok(Err(error)) => Err(ObserverTokenMintError::Failed(format!( | ||||||||||||||||||||
| "Failed to create observer token: {error}" | ||||||||||||||||||||
| ))), | ||||||||||||||||||||
| Err(_) => Err(ObserverTokenMintError::TimedOut), | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// Fallback for `create_observer_token` failing with | ||||||||||||||||||||
| /// `observer_token_name_conflict`: list existing observer tokens for the | ||||||||||||||||||||
| /// workspace, find the one named `token_name`, and rotate it to obtain | ||||||||||||||||||||
| /// fresh, usable raw token material. | ||||||||||||||||||||
| /// | ||||||||||||||||||||
| /// **Behavioral note:** the raw token originally minted under this name was | ||||||||||||||||||||
| /// never persisted anywhere the broker can read it back, so rotating is the | ||||||||||||||||||||
| /// only way to recover a usable value — this necessarily invalidates | ||||||||||||||||||||
| /// whatever raw token was previously handed out under this name. This is | ||||||||||||||||||||
| /// acceptable for this endpoint's known caller (Pear's `mintObserverToken`, | ||||||||||||||||||||
| /// which always treats a freshly-returned token as authoritative and | ||||||||||||||||||||
| /// re-caches it), but any *other* holder of the previous raw value for this | ||||||||||||||||||||
| /// name silently loses access when this path is taken. | ||||||||||||||||||||
| /// | ||||||||||||||||||||
| /// If no existing token matches `token_name` despite the conflict error | ||||||||||||||||||||
| /// (e.g. a race with a concurrent revoke), the original conflict error is | ||||||||||||||||||||
| /// propagated as-is rather than panicking or synthesizing a misleading | ||||||||||||||||||||
| /// response. | ||||||||||||||||||||
| async fn recover_observer_token_after_name_conflict( | ||||||||||||||||||||
| http_client: &RelaycastHttpClient, | ||||||||||||||||||||
| token_name: &str, | ||||||||||||||||||||
| timeout_duration: Duration, | ||||||||||||||||||||
| conflict_error: anyhow::Error, | ||||||||||||||||||||
| ) -> Result<ObserverTokenMintOutcome, ObserverTokenMintError> { | ||||||||||||||||||||
| let fallback = timeout(timeout_duration, async move { | ||||||||||||||||||||
| let existing = http_client.list_observer_tokens().await?; | ||||||||||||||||||||
| let matched = existing | ||||||||||||||||||||
| .into_iter() | ||||||||||||||||||||
| .find(|candidate| candidate.name == token_name) | ||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: A repeat mint can return a token with different permissions than Prompt for AI agents
Suggested change
|
||||||||||||||||||||
| .ok_or(conflict_error)?; | ||||||||||||||||||||
| http_client.rotate_observer_token(&matched.id).await | ||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a token with the requested name already exists but was created with different scopes or filters, this fallback rotates and returns that token as-is. The normal create path always requests Useful? React with 👍 / 👎. |
||||||||||||||||||||
| }) | ||||||||||||||||||||
| .await; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| match fallback { | ||||||||||||||||||||
| Ok(Ok(observer_token)) => Ok(ObserverTokenMintOutcome::RecoveredViaRotate(observer_token)), | ||||||||||||||||||||
| Ok(Err(error)) => Err(ObserverTokenMintError::Failed(format!( | ||||||||||||||||||||
| "Failed to create observer token: {error}" | ||||||||||||||||||||
| ))), | ||||||||||||||||||||
| Err(_) => Err(ObserverTokenMintError::TimedOut), | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| impl BrokerRuntime { | ||||||||||||||||||||
| pub(super) async fn handle_api_request(&mut self, req: ListenApiRequest) { | ||||||||||||||||||||
| let req = match req { | ||||||||||||||||||||
|
|
@@ -934,27 +1070,34 @@ impl BrokerRuntime { | |||||||||||||||||||
| // `http_api_relaycast_send_timeout`) so tuning the `/api/send` | ||||||||||||||||||||
| // path can't unintentionally break token minting. | ||||||||||||||||||||
| let relaycast_timeout = http_api_observer_token_timeout(); | ||||||||||||||||||||
| match timeout( | ||||||||||||||||||||
| match mint_or_recover_observer_token( | ||||||||||||||||||||
| &selected_workspace.http_client, | ||||||||||||||||||||
| &token_name, | ||||||||||||||||||||
| relaycast_timeout, | ||||||||||||||||||||
| selected_workspace.http_client.create_observer_token( | ||||||||||||||||||||
| CreateObserverTokenRequest { | ||||||||||||||||||||
| name: token_name, | ||||||||||||||||||||
| scopes: default_observer_token_scopes(), | ||||||||||||||||||||
| description: None, | ||||||||||||||||||||
| filters: None, | ||||||||||||||||||||
| expires_at: None, | ||||||||||||||||||||
| }, | ||||||||||||||||||||
| ), | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| .await | ||||||||||||||||||||
| { | ||||||||||||||||||||
| Ok(Ok(observer_token)) => { | ||||||||||||||||||||
| tracing::info!( | ||||||||||||||||||||
| target = "relay_broker::http_api", | ||||||||||||||||||||
| workspace_id = %selected_workspace_id, | ||||||||||||||||||||
| observer_token_id = %observer_token.id, | ||||||||||||||||||||
| "minted observer token via HTTP API" | ||||||||||||||||||||
| ); | ||||||||||||||||||||
| Ok(outcome) => { | ||||||||||||||||||||
| let recovered_via_rotate = outcome.is_recovered_via_rotate(); | ||||||||||||||||||||
| let observer_token = outcome.into_token(); | ||||||||||||||||||||
| if recovered_via_rotate { | ||||||||||||||||||||
| tracing::info!( | ||||||||||||||||||||
| target = "relay_broker::http_api", | ||||||||||||||||||||
| workspace_id = %selected_workspace_id, | ||||||||||||||||||||
| observer_token_id = %observer_token.id, | ||||||||||||||||||||
| token_name = %token_name, | ||||||||||||||||||||
| "observer token name conflict on mint; recovered existing \ | ||||||||||||||||||||
| token via list+rotate (this invalidates whatever raw token \ | ||||||||||||||||||||
| was previously issued under this name)" | ||||||||||||||||||||
| ); | ||||||||||||||||||||
| } else { | ||||||||||||||||||||
| tracing::info!( | ||||||||||||||||||||
| target = "relay_broker::http_api", | ||||||||||||||||||||
| workspace_id = %selected_workspace_id, | ||||||||||||||||||||
| observer_token_id = %observer_token.id, | ||||||||||||||||||||
| "minted observer token via HTTP API" | ||||||||||||||||||||
| ); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| let _ = reply.send(Ok(json!({ | ||||||||||||||||||||
| "success": true, | ||||||||||||||||||||
| "id": observer_token.id, | ||||||||||||||||||||
|
|
@@ -965,17 +1108,16 @@ impl BrokerRuntime { | |||||||||||||||||||
| "workspace_alias": selected_workspace_alias, | ||||||||||||||||||||
| }))); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| Ok(Err(error)) => { | ||||||||||||||||||||
| Err(ObserverTokenMintError::Failed(message)) => { | ||||||||||||||||||||
| tracing::warn!( | ||||||||||||||||||||
| target = "relay_broker::http_api", | ||||||||||||||||||||
| workspace_id = %selected_workspace_id, | ||||||||||||||||||||
| error = %error, | ||||||||||||||||||||
| error = %message, | ||||||||||||||||||||
| "failed to mint observer token via HTTP API" | ||||||||||||||||||||
| ); | ||||||||||||||||||||
| let _ = | ||||||||||||||||||||
| reply.send(Err(format!("Failed to create observer token: {error}"))); | ||||||||||||||||||||
| let _ = reply.send(Err(message)); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| Err(_) => { | ||||||||||||||||||||
| Err(ObserverTokenMintError::TimedOut) => { | ||||||||||||||||||||
| tracing::warn!( | ||||||||||||||||||||
| target = "relay_broker::http_api", | ||||||||||||||||||||
| workspace_id = %selected_workspace_id, | ||||||||||||||||||||
|
|
||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the create call returns a name conflict near the default 20s observer-token timeout, passing the full timeout into the recovery path gives list+rotate another 20s;
/api/observer-tokenonly waits 30s forreply_rxinlisten_api_create_observer_token, so a recovery that succeeds after about 31–40s is reported to the caller asbroker request timed outand the broker reply is dropped. Please keep the total create+recover budget under the HTTP handler deadline, or coordinate/raise the outer deadline instead of granting a fresh full window here.Useful? React with 👍 / 👎.