diff --git a/Cargo.lock b/Cargo.lock index 76a16c4..256f887 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,16 +370,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation" version = "0.10.1" @@ -619,15 +609,6 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "encoding_rs" -version = "0.8.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" -dependencies = [ - "cfg-if", -] - [[package]] name = "enum-ordinalize" version = "4.3.2" @@ -1113,11 +1094,9 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2", - "system-configuration", "tokio", "tower-service", "tracing", - "windows-registry", ] [[package]] @@ -2250,7 +2229,6 @@ checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" dependencies = [ "base64", "bytes", - "encoding_rs", "futures-core", "h2", "http", @@ -2261,7 +2239,6 @@ dependencies = [ "hyper-util", "js-sys", "log", - "mime", "percent-encoding", "pin-project-lite", "quinn", @@ -2400,7 +2377,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" dependencies = [ - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "jni", "log", @@ -2517,7 +2494,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d17b898a6d6948c3a8ee4372c17cb384f90d2e6e912ef00895b14fd7ab54ec38" dependencies = [ "bitflags", - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "libc", "security-framework-sys", @@ -2791,27 +2768,6 @@ dependencies = [ "syn 2.0.116", ] -[[package]] -name = "system-configuration" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" -dependencies = [ - "bitflags", - "core-foundation 0.9.4", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tap" version = "1.0.1" @@ -3475,35 +3431,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" -[[package]] -name = "windows-registry" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" -dependencies = [ - "windows-link", - "windows-result", - "windows-strings", -] - -[[package]] -name = "windows-result" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" -dependencies = [ - "windows-link", -] - -[[package]] -name = "windows-strings" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" -dependencies = [ - "windows-link", -] - [[package]] name = "windows-sys" version = "0.45.0" diff --git a/Cargo.toml b/Cargo.toml index de06b18..1e8346e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ kube = { version = "3.1.0", default-features = false, features = ["runtime", "de kube_quantity = "0.9.0" prometheus = "0.14.0" rand = "0.10.1" -reqwest = { version = "0.13.3", features = ["json"] } +reqwest = { version = "0.13.3", default-features = false, features = ["http2", "json", "rustls"] } rust_decimal = "1.41.0" schemars = { version = "1.2.1", features = ["jiff02"] } serde = { version = "1.0.228", features = ["derive"] } diff --git a/README.md b/README.md index cbd8bb2..ecc4c76 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,7 @@ Defines a continuously-refreshed replica of a PostgreSQL database restored from | `postgresExtraConfig` | `string` | No | — | Extra lines appended to `postgresql.conf` (e.g. `shared_preload_libraries`). | | `notifications` | `[]NotificationConfig` | No | `[]` | Notification targets called on restore events. | | `persistentSchemas` | `[]string` | No | — | List of schema names to migrate from the previous restore to the new restore on each switchover. | +| `eventPublisher` | `EventPublisherConfig` | No | — | Publish restore-failure events to a canopy `/events` endpoint over mTLS. | The cron expression is parsed using the [cronexpr](https://docs.rs/cronexpr) crate. It has two interesting features: @@ -147,6 +148,18 @@ Additional fields for `target: graphQL`: | `mutation` | `string` | Yes | GraphQL mutation string. | | `variablesTemplate` | `string` | Yes | Template for the GraphQL variables payload. | +#### EventPublisherConfig + +Publishes restore-failure events to a canopy-style `/events` endpoint over +mTLS. Repeated failures with the same `(source, ref)` are folded into a +single rolling issue server-side. + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `url` | `string` | Yes | — | Full URL of the events endpoint (e.g. `https://meta.tamanu.app/api/events`). | +| `clientCertificateSecretRef` | `SecretReference` | Yes | — | Secret holding the mTLS client cert + key. Expected keys are `tls.crt` and `tls.key` (a standard `kubernetes.io/tls` Secret). | +| `source` | `string` | No | `"pgro"` | Value placed in `NewEvent.source` on every published event. | + #### Status | Field | Type | Description | diff --git a/src/controllers/replica/scheduling.rs b/src/controllers/replica/scheduling.rs index db0beb1..72d4e78 100644 --- a/src/controllers/replica/scheduling.rs +++ b/src/controllers/replica/scheduling.rs @@ -188,6 +188,7 @@ mod tests { ), persistent_schemas: None, + event_publisher: None, }, status: Some(PostgresPhysicalReplicaStatus { next_scheduled_restore: next_scheduled.map(Time), diff --git a/src/controllers/replica/schema_migration.rs b/src/controllers/replica/schema_migration.rs index a0b4f87..8e0da6b 100644 --- a/src/controllers/replica/schema_migration.rs +++ b/src/controllers/replica/schema_migration.rs @@ -285,6 +285,7 @@ mod tests { ), persistent_schemas: Some(schemas.into_iter().map(String::from).collect()), + event_publisher: None, }, status: None, } diff --git a/src/controllers/restore.rs b/src/controllers/restore.rs index e7e8b29..9df6454 100644 --- a/src/controllers/restore.rs +++ b/src/controllers/restore.rs @@ -26,6 +26,7 @@ use super::read_job_termination_message; use crate::{ context::Context, error::{Error, Result}, + event_publisher::{self, NewEvent, Severity}, types::*, }; @@ -145,6 +146,7 @@ async fn fail_restore( namespace: &str, name: &str, replica_name: &str, + reason: &str, status_patch: serde_json::Value, ) -> Result { update_restore_status(&ctx.client, namespace, name, status_patch).await?; @@ -153,9 +155,11 @@ async fn fail_restore( info!(promoted = %promoted_name, "promoted queued restore after failure"); } - // Increment consecutiveRestoreFailures on the parent replica let replicas: Api = Api::namespaced(ctx.client.clone(), namespace); - if let Ok(replica) = replicas.get(replica_name).await { + let replica = replicas.get(replica_name).await.ok(); + + // Increment consecutiveRestoreFailures on the parent replica + if let Some(replica) = &replica { let current = replica .status .as_ref() @@ -200,7 +204,7 @@ async fn fail_restore( &Event { type_: EventType::Warning, reason: "RestoreFailed".into(), - note: Some(format!("Restore {name} failed")), + note: Some(format!("Restore {name} failed: {reason}")), action: "Restore".into(), secondary: None, }, @@ -211,6 +215,27 @@ async fn fail_restore( warn!(replica = replica_name, error = %e, "failed to publish RestoreFailed event"); } + if let Some(replica) = &replica + && let Some(publisher_config) = replica.spec.event_publisher.as_ref() + { + let event = NewEvent { + source: publisher_config.source.clone(), + ref_: format!("{namespace}/{replica_name}/restore-failed"), + message: format!("Restore {name} failed: {reason}"), + description: Some(format!("Restore failed: {namespace}/{replica_name}")), + severity: Some(Severity::Error), + occurred_at: Some(Timestamp::now()), + active: Some(true), + }; + if let Err(e) = event_publisher::publish(&ctx.client, publisher_config, &event).await { + warn!( + replica = replica_name, + error = %e, + "failed to publish restore-failed event to canopy" + ); + } + } + Ok(Action::requeue(Duration::from_secs(300))) } @@ -464,6 +489,7 @@ async fn reconcile_restoring( namespace, name, replica_name, + "restore job exceeded backoff limit", serde_json::json!({ "phase": "Failed", "restoreJob": { @@ -690,6 +716,7 @@ async fn reconcile_ready( namespace, name, replica_name, + "version detection job returned no version", serde_json::json!({ "phase": "Failed" }), ) .await; @@ -709,6 +736,7 @@ async fn reconcile_ready( namespace, name, replica_name, + "version detection job exceeded backoff limit", serde_json::json!({ "phase": "Failed" }), ) .await; @@ -772,6 +800,7 @@ async fn reconcile_ready( namespace, name, replica_name, + "deployment not ready after 10 minutes", serde_json::json!({ "phase": "Failed" }), ) .await; diff --git a/src/controllers/restore/tests.rs b/src/controllers/restore/tests.rs index a710b60..d22d937 100644 --- a/src/controllers/restore/tests.rs +++ b/src/controllers/restore/tests.rs @@ -40,6 +40,7 @@ fn deployment_uses_affinity_not_node_selector() { persistent_schemas: None, storage_size_maximum: Quantity("2Ti".to_string()), + event_publisher: None, }, ); replica.spec.affinity = Some(Affinity { @@ -124,6 +125,7 @@ fn test_restore_and_replica() -> (PostgresPhysicalRestore, PostgresPhysicalRepli persistent_schemas: None, storage_size_maximum: Quantity("2Ti".to_string()), + event_publisher: None, }, ); diff --git a/src/error.rs b/src/error.rs index bcbfb7e..69435a8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,6 +23,9 @@ pub enum Error { #[error("Notification error: {0}")] Notification(String), + #[error("Event publisher error: {0}")] + EventPublisher(String), + #[error("Serialization error: {0}")] Serialization(#[from] serde_json::Error), diff --git a/src/event_publisher.rs b/src/event_publisher.rs new file mode 100644 index 0000000..bf1dcab --- /dev/null +++ b/src/event_publisher.rs @@ -0,0 +1,156 @@ +//! Publishes operator events to a canopy-style `/events` endpoint. +//! +//! Canopy is the BES fleet metadata server at . +//! Its event API accepts `(source, ref, message, ...)` tuples authenticated by +//! mTLS client certificate, and folds duplicates with the same `(source, ref)` +//! into a single rolling issue. + +use jiff::Timestamp; +use k8s_openapi::api::core::v1::{Secret, SecretReference}; +use kube::{Api, Client}; +use serde::Serialize; + +use crate::{ + error::{Error, Result}, + types::EventPublisherConfig, +}; + +/// Severity matching canopy's RFC 5424 enum. +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Severity { + Emergency, + Alert, + Critical, + Error, + Warning, + Notice, + Info, + Debug, +} + +/// Payload accepted by `POST /events`. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NewEvent { + pub source: String, + /// Dedup key. Repeated events with the same `(source, ref)` roll up + /// into a single issue server-side. + #[serde(rename = "ref")] + pub ref_: String, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub severity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub occurred_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub active: Option, +} + +/// Send a single event. Builds a fresh mTLS-configured `reqwest::Client` per +/// call — these publishes are rare (one per failed restore) so caching gains +/// nothing and a fresh client always picks up rotated certs. +pub async fn publish( + client: &Client, + config: &EventPublisherConfig, + event: &NewEvent, +) -> Result<()> { + let identity_pem = load_identity_pem(client, &config.client_certificate_secret_ref).await?; + let identity = reqwest::Identity::from_pem(&identity_pem) + .map_err(|e| Error::EventPublisher(format!("invalid client certificate: {e}")))?; + + let http = reqwest::Client::builder() + .identity(identity) + .build() + .map_err(|e| Error::EventPublisher(format!("failed to build http client: {e}")))?; + + let resp = http.post(&config.url).json(event).send().await?; + let status = resp.status(); + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + return Err(Error::EventPublisher(format!( + "events endpoint returned {status}: {body}" + ))); + } + Ok(()) +} + +/// Pull `tls.crt` and `tls.key` out of the referenced Secret and concatenate +/// them into a single PEM buffer suitable for `Identity::from_pem`. +async fn load_identity_pem(client: &Client, secret_ref: &SecretReference) -> Result> { + let name = secret_ref.name.as_deref().ok_or_else(|| { + Error::EventPublisher("clientCertificateSecretRef.name is required".into()) + })?; + let namespace = secret_ref.namespace.as_deref().ok_or_else(|| { + Error::EventPublisher("clientCertificateSecretRef.namespace is required".into()) + })?; + + let secrets: Api = Api::namespaced(client.clone(), namespace); + let secret = secrets.get(name).await?; + let data = secret + .data + .as_ref() + .ok_or_else(|| Error::EventPublisher(format!("secret {name} has no data")))?; + + let cert = data + .get("tls.crt") + .ok_or_else(|| Error::EventPublisher(format!("secret {name} missing key tls.crt")))?; + let key = data + .get("tls.key") + .ok_or_else(|| Error::EventPublisher(format!("secret {name} missing key tls.key")))?; + + let mut buf = Vec::with_capacity(cert.0.len() + key.0.len() + 1); + buf.extend_from_slice(&cert.0); + if !cert.0.ends_with(b"\n") { + buf.push(b'\n'); + } + buf.extend_from_slice(&key.0); + Ok(buf) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn event_serializes_with_expected_field_names() { + let event = NewEvent { + source: "pgro".into(), + ref_: "ns/replica/restore-failed".into(), + message: "boom".into(), + description: Some("Restore failed".into()), + severity: Some(Severity::Error), + occurred_at: Some(Timestamp::from_second(1_700_000_000).unwrap()), + active: Some(true), + }; + let json = serde_json::to_value(&event).unwrap(); + assert_eq!(json["source"], "pgro"); + assert_eq!(json["ref"], "ns/replica/restore-failed"); + assert_eq!(json["message"], "boom"); + assert_eq!(json["description"], "Restore failed"); + assert_eq!(json["severity"], "error"); + assert_eq!(json["active"], true); + assert!(json.get("occurredAt").is_some()); + } + + #[test] + fn optional_fields_skipped_when_none() { + let event = NewEvent { + source: "pgro".into(), + ref_: "r".into(), + message: "m".into(), + description: None, + severity: None, + occurred_at: None, + active: None, + }; + let json = serde_json::to_value(&event).unwrap(); + let obj = json.as_object().unwrap(); + assert!(!obj.contains_key("description")); + assert!(!obj.contains_key("severity")); + assert!(!obj.contains_key("occurredAt")); + assert!(!obj.contains_key("active")); + } +} diff --git a/src/lib.rs b/src/lib.rs index 740f198..06afe75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod context; pub mod controllers; pub mod error; +pub mod event_publisher; pub mod kopia; pub mod metrics; pub mod notifications; diff --git a/src/types/replica.rs b/src/types/replica.rs index 3886a6e..a27ccd5 100644 --- a/src/types/replica.rs +++ b/src/types/replica.rs @@ -101,6 +101,33 @@ pub struct PostgresPhysicalReplicaSpec { /// computed size exceeds this limit. Defaults to 2Ti. #[serde(default = "default_storage_size_maximum")] pub storage_size_maximum: Quantity, + + /// Publish restore-failure events to a canopy-style `/events` endpoint + /// (https://meta.tamanu.app/api/events) over mTLS. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_publisher: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct EventPublisherConfig { + /// Full URL of the events endpoint, e.g. + /// `https://meta.tamanu.app/api/events`. + pub url: String, + + /// Secret holding the mTLS client cert + key. Expected keys are + /// `tls.crt` (PEM cert, optionally with chain) and `tls.key` (PEM + /// private key) — the conventional layout of a + /// `kubernetes.io/tls` Secret. + pub client_certificate_secret_ref: SecretReference, + + /// Value placed in `NewEvent.source` on every published event. + #[serde(default = "default_event_source")] + pub source: String, +} + +fn default_event_source() -> String { + "pgro".to_string() } fn default_storage_size_maximum() -> Quantity { diff --git a/tests/helpers.rs b/tests/helpers.rs index 2c919cb..df8dc23 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -160,6 +160,7 @@ pub fn build_replica(name: &str, secret_ref: &str, opts: ReplicaOpts) -> Postgre notifications: vec![], persistent_schemas: None, storage_size_maximum: Quantity("2Ti".to_string()), + event_publisher: None, }, ) }