From 84ef226a53c08a148423dda6e5f265cc15dc1798 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Mon, 25 May 2026 17:06:52 +1200 Subject: [PATCH 1/6] plan: publish restore errors as canopy events --- docs/plans/event-publishing.md | 97 ++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 docs/plans/event-publishing.md diff --git a/docs/plans/event-publishing.md b/docs/plans/event-publishing.md new file mode 100644 index 0000000..210325d --- /dev/null +++ b/docs/plans/event-publishing.md @@ -0,0 +1,97 @@ +# Publish restore errors as canopy events + +We have a fleet-wide event ingestion API at `https://meta.tamanu.app/api/events` +(the "canopy" public-server). Devices and operators can `POST /events` with a +small payload (`source`, `ref`, `message`, optional `description`, `severity`, +`occurredAt`, `active`) and the server folds repeated events with the same +`(source, ref)` into a single issue. + +We want the postgres-restore-operator to publish to that endpoint whenever a +restore fails, so failures show up as issues in canopy without anyone having to +notice in logs. + +## Authentication + +The `/events` endpoint is `server-device` mTLS: the client presents a TLS +client certificate, and the server's role/identity model decides what `source` +is permitted. We don't get a bearer token or shared secret — we need a real +certificate + private key. + +In Kubernetes the standard way to carry this is a `kubernetes.io/tls` Secret +with two keys: `tls.crt` (PEM-encoded certificate, possibly with a chain) and +`tls.key` (PEM-encoded private key). The user creates this Secret out of band. + +## Spec + +Add an optional section to `PostgresPhysicalReplicaSpec`: + +```yaml +spec: + eventPublisher: + url: https://meta.tamanu.app/api/events + clientCertificateSecretRef: + name: pgro-canopy-client + source: pgro # optional; defaults to "pgro" +``` + +Fields: + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `url` | string | yes | — | Full URL of the events endpoint. | +| `clientCertificateSecretRef` | `SecretReference` | yes | — | Secret holding mTLS client cert + key. Expected keys: `tls.crt`, `tls.key`. | +| `source` | string | no | `"pgro"` | Value placed in `NewEvent.source`. | + +We use `SecretReference` (same type as `kopiaSecretRef`) for consistency. The +section is optional — leaving it unset disables event publishing. + +We do not need a separate "private key secret" and "certificate secret": +Kubernetes' `kubernetes.io/tls` Secret type holds both, and that's what most +issuers (cert-manager, kubelet-served, etc.) produce. The fields `tls.crt` and +`tls.key` are conventional. + +## Event payload + +For a restore failure we emit: + +- `source`: from spec (default `"pgro"`) +- `ref`: `"//restore-failed"` — dedups so a single + replica with repeated failures rolls up under one issue rather than spamming. + Using the replica (not the restore) name as the ref is intentional: the + restore name changes per attempt, but the *condition* "this replica's + restores are failing" is one ongoing issue. +- `severity`: `"error"` +- `description`: `"Restore failed: /"` +- `message`: a few lines containing the restore name, snapshot id, and the + failure reason (e.g. job exit code, schema migration error). For now we use + whatever short reason we have at the `fail_restore` call site; we can + enrich later. +- `occurredAt`: `Timestamp::now()` +- `active`: true + +The endpoint folds duplicates by `(source, ref)`, so we don't need to track +sent state ourselves. + +## Implementation shape + +1. `src/types/replica.rs`: add `EventPublisherConfig { url, client_certificate_secret_ref, source }` and `event_publisher: Option` on the spec. +2. `src/event_publisher.rs` (new module): + - `pub struct EventPublisher` wrapping a `reqwest::Client` configured with mTLS Identity. + - Constructor reads the cert+key from the named Secret, concatenates them, calls `reqwest::Identity::from_pem`, builds a Client with `.identity(id)`. + - `pub async fn publish(&self, event: &NewEvent) -> Result<()>`. + - We rebuild the client on each call site (no caching). Restore failures are rare; a fresh client per publish is fine and avoids stale-cert hassles. +3. `Cargo.toml`: enable `rustls-tls` feature on `reqwest` so `Identity::from_pem` is available. The crate already uses rustls everywhere else (kube is configured with `rustls-tls`). +4. `src/controllers/restore.rs::fail_restore`: after the existing status update / event recorder / metrics block, if the parent replica has `event_publisher` configured, fire-and-log a publish. Errors get logged at `warn`; we never let event publishing fail the reconcile. +5. README: add a new section "EventPublisher" + the row in the spec table. + +## What we deliberately don't do + +- No retry loop or status field for event publish success/failure. Canopy + dedupes by `(source, ref)`; the next failed restore re-publishes anyway. + Adding a `NotificationStatus`-style record would be churn for little gain. +- No per-restore-CRD event publisher config. It lives on the replica because + configuration belongs to the user-facing resource; the restore CRD is + operator-managed. +- No operator-global config. We considered making this an operator-level + setting (one URL for the whole cluster) but that conflicts with the + per-namespace tenancy model the operator already follows. From 47f30506d709b0cefae098e905a9f4ba33ad8f41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Mon, 25 May 2026 17:08:10 +1200 Subject: [PATCH 2/6] feat(replica): add EventPublisher spec --- src/controllers/replica/scheduling.rs | 1 + src/controllers/replica/schema_migration.rs | 1 + src/controllers/restore/tests.rs | 2 ++ src/types/replica.rs | 27 +++++++++++++++++++++ tests/helpers.rs | 1 + 5 files changed, 32 insertions(+) diff --git a/src/controllers/replica/scheduling.rs b/src/controllers/replica/scheduling.rs index db0beb1..72d4e78 100644 --- a/src/controllers/replica/scheduling.rs +++ b/src/controllers/replica/scheduling.rs @@ -188,6 +188,7 @@ mod tests { ), persistent_schemas: None, + event_publisher: None, }, status: Some(PostgresPhysicalReplicaStatus { next_scheduled_restore: next_scheduled.map(Time), diff --git a/src/controllers/replica/schema_migration.rs b/src/controllers/replica/schema_migration.rs index a0b4f87..8e0da6b 100644 --- a/src/controllers/replica/schema_migration.rs +++ b/src/controllers/replica/schema_migration.rs @@ -285,6 +285,7 @@ mod tests { ), persistent_schemas: Some(schemas.into_iter().map(String::from).collect()), + event_publisher: None, }, status: None, } diff --git a/src/controllers/restore/tests.rs b/src/controllers/restore/tests.rs index a710b60..d22d937 100644 --- a/src/controllers/restore/tests.rs +++ b/src/controllers/restore/tests.rs @@ -40,6 +40,7 @@ fn deployment_uses_affinity_not_node_selector() { persistent_schemas: None, storage_size_maximum: Quantity("2Ti".to_string()), + event_publisher: None, }, ); replica.spec.affinity = Some(Affinity { @@ -124,6 +125,7 @@ fn test_restore_and_replica() -> (PostgresPhysicalRestore, PostgresPhysicalRepli persistent_schemas: None, storage_size_maximum: Quantity("2Ti".to_string()), + event_publisher: None, }, ); diff --git a/src/types/replica.rs b/src/types/replica.rs index 3886a6e..a27ccd5 100644 --- a/src/types/replica.rs +++ b/src/types/replica.rs @@ -101,6 +101,33 @@ pub struct PostgresPhysicalReplicaSpec { /// computed size exceeds this limit. Defaults to 2Ti. #[serde(default = "default_storage_size_maximum")] pub storage_size_maximum: Quantity, + + /// Publish restore-failure events to a canopy-style `/events` endpoint + /// (https://meta.tamanu.app/api/events) over mTLS. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_publisher: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct EventPublisherConfig { + /// Full URL of the events endpoint, e.g. + /// `https://meta.tamanu.app/api/events`. + pub url: String, + + /// Secret holding the mTLS client cert + key. Expected keys are + /// `tls.crt` (PEM cert, optionally with chain) and `tls.key` (PEM + /// private key) — the conventional layout of a + /// `kubernetes.io/tls` Secret. + pub client_certificate_secret_ref: SecretReference, + + /// Value placed in `NewEvent.source` on every published event. + #[serde(default = "default_event_source")] + pub source: String, +} + +fn default_event_source() -> String { + "pgro".to_string() } fn default_storage_size_maximum() -> Quantity { diff --git a/tests/helpers.rs b/tests/helpers.rs index 2c919cb..df8dc23 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -160,6 +160,7 @@ pub fn build_replica(name: &str, secret_ref: &str, opts: ReplicaOpts) -> Postgre notifications: vec![], persistent_schemas: None, storage_size_maximum: Quantity("2Ti".to_string()), + event_publisher: None, }, ) } From 1accca6634364dbaa12d0fcf91082dcff2cbc7c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Mon, 25 May 2026 17:10:33 +1200 Subject: [PATCH 3/6] feat: implement mTLS event publisher --- Cargo.lock | 77 +-------------------- Cargo.toml | 2 +- src/error.rs | 3 + src/event_publisher.rs | 153 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 5 files changed, 160 insertions(+), 76 deletions(-) create mode 100644 src/event_publisher.rs diff --git a/Cargo.lock b/Cargo.lock index 76a16c4..256f887 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,16 +370,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation" version = "0.10.1" @@ -619,15 +609,6 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "encoding_rs" -version = "0.8.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" -dependencies = [ - "cfg-if", -] - [[package]] name = "enum-ordinalize" version = "4.3.2" @@ -1113,11 +1094,9 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2", - "system-configuration", "tokio", "tower-service", "tracing", - "windows-registry", ] [[package]] @@ -2250,7 +2229,6 @@ checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" dependencies = [ "base64", "bytes", - "encoding_rs", "futures-core", "h2", "http", @@ -2261,7 +2239,6 @@ dependencies = [ "hyper-util", "js-sys", "log", - "mime", "percent-encoding", "pin-project-lite", "quinn", @@ -2400,7 +2377,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" dependencies = [ - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "jni", "log", @@ -2517,7 +2494,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d17b898a6d6948c3a8ee4372c17cb384f90d2e6e912ef00895b14fd7ab54ec38" dependencies = [ "bitflags", - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "libc", "security-framework-sys", @@ -2791,27 +2768,6 @@ dependencies = [ "syn 2.0.116", ] -[[package]] -name = "system-configuration" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" -dependencies = [ - "bitflags", - "core-foundation 0.9.4", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tap" version = "1.0.1" @@ -3475,35 +3431,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" -[[package]] -name = "windows-registry" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" -dependencies = [ - "windows-link", - "windows-result", - "windows-strings", -] - -[[package]] -name = "windows-result" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" -dependencies = [ - "windows-link", -] - -[[package]] -name = "windows-strings" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" -dependencies = [ - "windows-link", -] - [[package]] name = "windows-sys" version = "0.45.0" diff --git a/Cargo.toml b/Cargo.toml index de06b18..1e8346e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ kube = { version = "3.1.0", default-features = false, features = ["runtime", "de kube_quantity = "0.9.0" prometheus = "0.14.0" rand = "0.10.1" -reqwest = { version = "0.13.3", features = ["json"] } +reqwest = { version = "0.13.3", default-features = false, features = ["http2", "json", "rustls"] } rust_decimal = "1.41.0" schemars = { version = "1.2.1", features = ["jiff02"] } serde = { version = "1.0.228", features = ["derive"] } diff --git a/src/error.rs b/src/error.rs index bcbfb7e..69435a8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,6 +23,9 @@ pub enum Error { #[error("Notification error: {0}")] Notification(String), + #[error("Event publisher error: {0}")] + EventPublisher(String), + #[error("Serialization error: {0}")] Serialization(#[from] serde_json::Error), diff --git a/src/event_publisher.rs b/src/event_publisher.rs new file mode 100644 index 0000000..392197c --- /dev/null +++ b/src/event_publisher.rs @@ -0,0 +1,153 @@ +//! Publishes operator events to a canopy-style `/events` endpoint. +//! +//! Canopy is the BES fleet metadata server at . +//! Its event API accepts `(source, ref, message, ...)` tuples authenticated by +//! mTLS client certificate, and folds duplicates with the same `(source, ref)` +//! into a single rolling issue. + +use jiff::Timestamp; +use k8s_openapi::api::core::v1::{Secret, SecretReference}; +use kube::{Api, Client}; +use serde::Serialize; + +use crate::{ + error::{Error, Result}, + types::EventPublisherConfig, +}; + +/// Severity matching canopy's RFC 5424 enum. +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Severity { + Emergency, + Alert, + Critical, + Error, + Warning, + Notice, + Info, + Debug, +} + +/// Payload accepted by `POST /events`. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NewEvent { + pub source: String, + /// Dedup key. Repeated events with the same `(source, ref)` roll up + /// into a single issue server-side. + #[serde(rename = "ref")] + pub ref_: String, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub severity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub occurred_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub active: Option, +} + +/// Send a single event. Builds a fresh mTLS-configured `reqwest::Client` per +/// call — these publishes are rare (one per failed restore) so caching gains +/// nothing and a fresh client always picks up rotated certs. +pub async fn publish(client: &Client, config: &EventPublisherConfig, event: &NewEvent) -> Result<()> { + let identity_pem = load_identity_pem(client, &config.client_certificate_secret_ref).await?; + let identity = reqwest::Identity::from_pem(&identity_pem) + .map_err(|e| Error::EventPublisher(format!("invalid client certificate: {e}")))?; + + let http = reqwest::Client::builder() + .identity(identity) + .build() + .map_err(|e| Error::EventPublisher(format!("failed to build http client: {e}")))?; + + let resp = http.post(&config.url).json(event).send().await?; + let status = resp.status(); + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + return Err(Error::EventPublisher(format!( + "events endpoint returned {status}: {body}" + ))); + } + Ok(()) +} + +/// Pull `tls.crt` and `tls.key` out of the referenced Secret and concatenate +/// them into a single PEM buffer suitable for `Identity::from_pem`. +async fn load_identity_pem(client: &Client, secret_ref: &SecretReference) -> Result> { + let name = secret_ref + .name + .as_deref() + .ok_or_else(|| Error::EventPublisher("clientCertificateSecretRef.name is required".into()))?; + let namespace = secret_ref.namespace.as_deref().ok_or_else(|| { + Error::EventPublisher("clientCertificateSecretRef.namespace is required".into()) + })?; + + let secrets: Api = Api::namespaced(client.clone(), namespace); + let secret = secrets.get(name).await?; + let data = secret + .data + .as_ref() + .ok_or_else(|| Error::EventPublisher(format!("secret {name} has no data")))?; + + let cert = data + .get("tls.crt") + .ok_or_else(|| Error::EventPublisher(format!("secret {name} missing key tls.crt")))?; + let key = data + .get("tls.key") + .ok_or_else(|| Error::EventPublisher(format!("secret {name} missing key tls.key")))?; + + let mut buf = Vec::with_capacity(cert.0.len() + key.0.len() + 1); + buf.extend_from_slice(&cert.0); + if !cert.0.ends_with(b"\n") { + buf.push(b'\n'); + } + buf.extend_from_slice(&key.0); + Ok(buf) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn event_serializes_with_expected_field_names() { + let event = NewEvent { + source: "pgro".into(), + ref_: "ns/replica/restore-failed".into(), + message: "boom".into(), + description: Some("Restore failed".into()), + severity: Some(Severity::Error), + occurred_at: Some(Timestamp::from_second(1_700_000_000).unwrap()), + active: Some(true), + }; + let json = serde_json::to_value(&event).unwrap(); + assert_eq!(json["source"], "pgro"); + assert_eq!(json["ref"], "ns/replica/restore-failed"); + assert_eq!(json["message"], "boom"); + assert_eq!(json["description"], "Restore failed"); + assert_eq!(json["severity"], "error"); + assert_eq!(json["active"], true); + assert!(json.get("occurredAt").is_some()); + } + + #[test] + fn optional_fields_skipped_when_none() { + let event = NewEvent { + source: "pgro".into(), + ref_: "r".into(), + message: "m".into(), + description: None, + severity: None, + occurred_at: None, + active: None, + }; + let json = serde_json::to_value(&event).unwrap(); + let obj = json.as_object().unwrap(); + assert!(!obj.contains_key("description")); + assert!(!obj.contains_key("severity")); + assert!(!obj.contains_key("occurredAt")); + assert!(!obj.contains_key("active")); + } +} diff --git a/src/lib.rs b/src/lib.rs index 740f198..06afe75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod context; pub mod controllers; pub mod error; +pub mod event_publisher; pub mod kopia; pub mod metrics; pub mod notifications; From fe9762abdf766fcaee2f9d39f6d631799bae3c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Mon, 25 May 2026 17:13:20 +1200 Subject: [PATCH 4/6] feat(restore): publish failure events to canopy --- src/controllers/restore.rs | 35 ++++++++++++++++++++++++++++++++--- src/event_publisher.rs | 13 ++++++++----- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/src/controllers/restore.rs b/src/controllers/restore.rs index e7e8b29..9df6454 100644 --- a/src/controllers/restore.rs +++ b/src/controllers/restore.rs @@ -26,6 +26,7 @@ use super::read_job_termination_message; use crate::{ context::Context, error::{Error, Result}, + event_publisher::{self, NewEvent, Severity}, types::*, }; @@ -145,6 +146,7 @@ async fn fail_restore( namespace: &str, name: &str, replica_name: &str, + reason: &str, status_patch: serde_json::Value, ) -> Result { update_restore_status(&ctx.client, namespace, name, status_patch).await?; @@ -153,9 +155,11 @@ async fn fail_restore( info!(promoted = %promoted_name, "promoted queued restore after failure"); } - // Increment consecutiveRestoreFailures on the parent replica let replicas: Api = Api::namespaced(ctx.client.clone(), namespace); - if let Ok(replica) = replicas.get(replica_name).await { + let replica = replicas.get(replica_name).await.ok(); + + // Increment consecutiveRestoreFailures on the parent replica + if let Some(replica) = &replica { let current = replica .status .as_ref() @@ -200,7 +204,7 @@ async fn fail_restore( &Event { type_: EventType::Warning, reason: "RestoreFailed".into(), - note: Some(format!("Restore {name} failed")), + note: Some(format!("Restore {name} failed: {reason}")), action: "Restore".into(), secondary: None, }, @@ -211,6 +215,27 @@ async fn fail_restore( warn!(replica = replica_name, error = %e, "failed to publish RestoreFailed event"); } + if let Some(replica) = &replica + && let Some(publisher_config) = replica.spec.event_publisher.as_ref() + { + let event = NewEvent { + source: publisher_config.source.clone(), + ref_: format!("{namespace}/{replica_name}/restore-failed"), + message: format!("Restore {name} failed: {reason}"), + description: Some(format!("Restore failed: {namespace}/{replica_name}")), + severity: Some(Severity::Error), + occurred_at: Some(Timestamp::now()), + active: Some(true), + }; + if let Err(e) = event_publisher::publish(&ctx.client, publisher_config, &event).await { + warn!( + replica = replica_name, + error = %e, + "failed to publish restore-failed event to canopy" + ); + } + } + Ok(Action::requeue(Duration::from_secs(300))) } @@ -464,6 +489,7 @@ async fn reconcile_restoring( namespace, name, replica_name, + "restore job exceeded backoff limit", serde_json::json!({ "phase": "Failed", "restoreJob": { @@ -690,6 +716,7 @@ async fn reconcile_ready( namespace, name, replica_name, + "version detection job returned no version", serde_json::json!({ "phase": "Failed" }), ) .await; @@ -709,6 +736,7 @@ async fn reconcile_ready( namespace, name, replica_name, + "version detection job exceeded backoff limit", serde_json::json!({ "phase": "Failed" }), ) .await; @@ -772,6 +800,7 @@ async fn reconcile_ready( namespace, name, replica_name, + "deployment not ready after 10 minutes", serde_json::json!({ "phase": "Failed" }), ) .await; diff --git a/src/event_publisher.rs b/src/event_publisher.rs index 392197c..bf1dcab 100644 --- a/src/event_publisher.rs +++ b/src/event_publisher.rs @@ -52,7 +52,11 @@ pub struct NewEvent { /// Send a single event. Builds a fresh mTLS-configured `reqwest::Client` per /// call — these publishes are rare (one per failed restore) so caching gains /// nothing and a fresh client always picks up rotated certs. -pub async fn publish(client: &Client, config: &EventPublisherConfig, event: &NewEvent) -> Result<()> { +pub async fn publish( + client: &Client, + config: &EventPublisherConfig, + event: &NewEvent, +) -> Result<()> { let identity_pem = load_identity_pem(client, &config.client_certificate_secret_ref).await?; let identity = reqwest::Identity::from_pem(&identity_pem) .map_err(|e| Error::EventPublisher(format!("invalid client certificate: {e}")))?; @@ -76,10 +80,9 @@ pub async fn publish(client: &Client, config: &EventPublisherConfig, event: &New /// Pull `tls.crt` and `tls.key` out of the referenced Secret and concatenate /// them into a single PEM buffer suitable for `Identity::from_pem`. async fn load_identity_pem(client: &Client, secret_ref: &SecretReference) -> Result> { - let name = secret_ref - .name - .as_deref() - .ok_or_else(|| Error::EventPublisher("clientCertificateSecretRef.name is required".into()))?; + let name = secret_ref.name.as_deref().ok_or_else(|| { + Error::EventPublisher("clientCertificateSecretRef.name is required".into()) + })?; let namespace = secret_ref.namespace.as_deref().ok_or_else(|| { Error::EventPublisher("clientCertificateSecretRef.namespace is required".into()) })?; From e830d9f8ac32d1cdc6d84f3efc8012698a7e70ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Mon, 25 May 2026 17:15:05 +1200 Subject: [PATCH 5/6] docs(readme): document eventPublisher spec section --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index cbd8bb2..ecc4c76 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,7 @@ Defines a continuously-refreshed replica of a PostgreSQL database restored from | `postgresExtraConfig` | `string` | No | — | Extra lines appended to `postgresql.conf` (e.g. `shared_preload_libraries`). | | `notifications` | `[]NotificationConfig` | No | `[]` | Notification targets called on restore events. | | `persistentSchemas` | `[]string` | No | — | List of schema names to migrate from the previous restore to the new restore on each switchover. | +| `eventPublisher` | `EventPublisherConfig` | No | — | Publish restore-failure events to a canopy `/events` endpoint over mTLS. | The cron expression is parsed using the [cronexpr](https://docs.rs/cronexpr) crate. It has two interesting features: @@ -147,6 +148,18 @@ Additional fields for `target: graphQL`: | `mutation` | `string` | Yes | GraphQL mutation string. | | `variablesTemplate` | `string` | Yes | Template for the GraphQL variables payload. | +#### EventPublisherConfig + +Publishes restore-failure events to a canopy-style `/events` endpoint over +mTLS. Repeated failures with the same `(source, ref)` are folded into a +single rolling issue server-side. + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `url` | `string` | Yes | — | Full URL of the events endpoint (e.g. `https://meta.tamanu.app/api/events`). | +| `clientCertificateSecretRef` | `SecretReference` | Yes | — | Secret holding the mTLS client cert + key. Expected keys are `tls.crt` and `tls.key` (a standard `kubernetes.io/tls` Secret). | +| `source` | `string` | No | `"pgro"` | Value placed in `NewEvent.source` on every published event. | + #### Status | Field | Type | Description | From 4a0e7aa0fbaed10b3590dfb3188bdab730fe49b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Mon, 25 May 2026 17:16:17 +1200 Subject: [PATCH 6/6] unplan: event-publishing --- docs/plans/event-publishing.md | 97 ---------------------------------- 1 file changed, 97 deletions(-) delete mode 100644 docs/plans/event-publishing.md diff --git a/docs/plans/event-publishing.md b/docs/plans/event-publishing.md deleted file mode 100644 index 210325d..0000000 --- a/docs/plans/event-publishing.md +++ /dev/null @@ -1,97 +0,0 @@ -# Publish restore errors as canopy events - -We have a fleet-wide event ingestion API at `https://meta.tamanu.app/api/events` -(the "canopy" public-server). Devices and operators can `POST /events` with a -small payload (`source`, `ref`, `message`, optional `description`, `severity`, -`occurredAt`, `active`) and the server folds repeated events with the same -`(source, ref)` into a single issue. - -We want the postgres-restore-operator to publish to that endpoint whenever a -restore fails, so failures show up as issues in canopy without anyone having to -notice in logs. - -## Authentication - -The `/events` endpoint is `server-device` mTLS: the client presents a TLS -client certificate, and the server's role/identity model decides what `source` -is permitted. We don't get a bearer token or shared secret — we need a real -certificate + private key. - -In Kubernetes the standard way to carry this is a `kubernetes.io/tls` Secret -with two keys: `tls.crt` (PEM-encoded certificate, possibly with a chain) and -`tls.key` (PEM-encoded private key). The user creates this Secret out of band. - -## Spec - -Add an optional section to `PostgresPhysicalReplicaSpec`: - -```yaml -spec: - eventPublisher: - url: https://meta.tamanu.app/api/events - clientCertificateSecretRef: - name: pgro-canopy-client - source: pgro # optional; defaults to "pgro" -``` - -Fields: - -| Field | Type | Required | Default | Description | -|-------|------|----------|---------|-------------| -| `url` | string | yes | — | Full URL of the events endpoint. | -| `clientCertificateSecretRef` | `SecretReference` | yes | — | Secret holding mTLS client cert + key. Expected keys: `tls.crt`, `tls.key`. | -| `source` | string | no | `"pgro"` | Value placed in `NewEvent.source`. | - -We use `SecretReference` (same type as `kopiaSecretRef`) for consistency. The -section is optional — leaving it unset disables event publishing. - -We do not need a separate "private key secret" and "certificate secret": -Kubernetes' `kubernetes.io/tls` Secret type holds both, and that's what most -issuers (cert-manager, kubelet-served, etc.) produce. The fields `tls.crt` and -`tls.key` are conventional. - -## Event payload - -For a restore failure we emit: - -- `source`: from spec (default `"pgro"`) -- `ref`: `"//restore-failed"` — dedups so a single - replica with repeated failures rolls up under one issue rather than spamming. - Using the replica (not the restore) name as the ref is intentional: the - restore name changes per attempt, but the *condition* "this replica's - restores are failing" is one ongoing issue. -- `severity`: `"error"` -- `description`: `"Restore failed: /"` -- `message`: a few lines containing the restore name, snapshot id, and the - failure reason (e.g. job exit code, schema migration error). For now we use - whatever short reason we have at the `fail_restore` call site; we can - enrich later. -- `occurredAt`: `Timestamp::now()` -- `active`: true - -The endpoint folds duplicates by `(source, ref)`, so we don't need to track -sent state ourselves. - -## Implementation shape - -1. `src/types/replica.rs`: add `EventPublisherConfig { url, client_certificate_secret_ref, source }` and `event_publisher: Option` on the spec. -2. `src/event_publisher.rs` (new module): - - `pub struct EventPublisher` wrapping a `reqwest::Client` configured with mTLS Identity. - - Constructor reads the cert+key from the named Secret, concatenates them, calls `reqwest::Identity::from_pem`, builds a Client with `.identity(id)`. - - `pub async fn publish(&self, event: &NewEvent) -> Result<()>`. - - We rebuild the client on each call site (no caching). Restore failures are rare; a fresh client per publish is fine and avoids stale-cert hassles. -3. `Cargo.toml`: enable `rustls-tls` feature on `reqwest` so `Identity::from_pem` is available. The crate already uses rustls everywhere else (kube is configured with `rustls-tls`). -4. `src/controllers/restore.rs::fail_restore`: after the existing status update / event recorder / metrics block, if the parent replica has `event_publisher` configured, fire-and-log a publish. Errors get logged at `warn`; we never let event publishing fail the reconcile. -5. README: add a new section "EventPublisher" + the row in the spec table. - -## What we deliberately don't do - -- No retry loop or status field for event publish success/failure. Canopy - dedupes by `(source, ref)`; the next failed restore re-publishes anyway. - Adding a `NotificationStatus`-style record would be churn for little gain. -- No per-restore-CRD event publisher config. It lives on the replica because - configuration belongs to the user-facing resource; the restore CRD is - operator-managed. -- No operator-global config. We considered making this an operator-level - setting (one URL for the whole cluster) but that conflicts with the - per-namespace tenancy model the operator already follows.