Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 2 additions & 75 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ kube = { version = "3.1.0", default-features = false, features = ["runtime", "de
kube_quantity = "0.9.0"
prometheus = "0.14.0"
rand = "0.10.1"
reqwest = { version = "0.13.3", features = ["json"] }
reqwest = { version = "0.13.3", default-features = false, features = ["http2", "json", "rustls"] }
rust_decimal = "1.41.0"
schemars = { version = "1.2.1", features = ["jiff02"] }
serde = { version = "1.0.228", features = ["derive"] }
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Defines a continuously-refreshed replica of a PostgreSQL database restored from
| `postgresExtraConfig` | `string` | No | — | Extra lines appended to `postgresql.conf` (e.g. `shared_preload_libraries`). |
| `notifications` | `[]NotificationConfig` | No | `[]` | Notification targets called on restore events. |
| `persistentSchemas` | `[]string` | No | — | List of schema names to migrate from the previous restore to the new restore on each switchover. |
| `eventPublisher` | `EventPublisherConfig` | No | — | Publish restore-failure events to a canopy `/events` endpoint over mTLS. |

The cron expression is parsed using the [cronexpr](https://docs.rs/cronexpr) crate.
It has two interesting features:
Expand Down Expand Up @@ -147,6 +148,18 @@ Additional fields for `target: graphQL`:
| `mutation` | `string` | Yes | GraphQL mutation string. |
| `variablesTemplate` | `string` | Yes | Template for the GraphQL variables payload. |

#### EventPublisherConfig

Publishes restore-failure events to a canopy-style `/events` endpoint over
mTLS. Repeated failures with the same `(source, ref)` are folded into a
single rolling issue server-side.

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `url` | `string` | Yes | — | Full URL of the events endpoint (e.g. `https://meta.tamanu.app/api/events`). |
| `clientCertificateSecretRef` | `SecretReference` | Yes | — | Secret holding the mTLS client cert + key. Expected keys are `tls.crt` and `tls.key` (a standard `kubernetes.io/tls` Secret). |
| `source` | `string` | No | `"pgro"` | Value placed in `NewEvent.source` on every published event. |

#### Status

| Field | Type | Description |
Expand Down
1 change: 1 addition & 0 deletions src/controllers/replica/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ mod tests {
),

persistent_schemas: None,
event_publisher: None,
},
status: Some(PostgresPhysicalReplicaStatus {
next_scheduled_restore: next_scheduled.map(Time),
Expand Down
1 change: 1 addition & 0 deletions src/controllers/replica/schema_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ mod tests {
),

persistent_schemas: Some(schemas.into_iter().map(String::from).collect()),
event_publisher: None,
},
status: None,
}
Expand Down
35 changes: 32 additions & 3 deletions src/controllers/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::read_job_termination_message;
use crate::{
context::Context,
error::{Error, Result},
event_publisher::{self, NewEvent, Severity},
types::*,
};

Expand Down Expand Up @@ -145,6 +146,7 @@ async fn fail_restore(
namespace: &str,
name: &str,
replica_name: &str,
reason: &str,
status_patch: serde_json::Value,
) -> Result<Action> {
update_restore_status(&ctx.client, namespace, name, status_patch).await?;
Expand All @@ -153,9 +155,11 @@ async fn fail_restore(
info!(promoted = %promoted_name, "promoted queued restore after failure");
}

// Increment consecutiveRestoreFailures on the parent replica
let replicas: Api<PostgresPhysicalReplica> = Api::namespaced(ctx.client.clone(), namespace);
if let Ok(replica) = replicas.get(replica_name).await {
let replica = replicas.get(replica_name).await.ok();

// Increment consecutiveRestoreFailures on the parent replica
if let Some(replica) = &replica {
let current = replica
.status
.as_ref()
Expand Down Expand Up @@ -200,7 +204,7 @@ async fn fail_restore(
&Event {
type_: EventType::Warning,
reason: "RestoreFailed".into(),
note: Some(format!("Restore {name} failed")),
note: Some(format!("Restore {name} failed: {reason}")),
action: "Restore".into(),
secondary: None,
},
Expand All @@ -211,6 +215,27 @@ async fn fail_restore(
warn!(replica = replica_name, error = %e, "failed to publish RestoreFailed event");
}

if let Some(replica) = &replica
&& let Some(publisher_config) = replica.spec.event_publisher.as_ref()
{
let event = NewEvent {
source: publisher_config.source.clone(),
ref_: format!("{namespace}/{replica_name}/restore-failed"),
message: format!("Restore {name} failed: {reason}"),
description: Some(format!("Restore failed: {namespace}/{replica_name}")),
severity: Some(Severity::Error),
occurred_at: Some(Timestamp::now()),
active: Some(true),
};
if let Err(e) = event_publisher::publish(&ctx.client, publisher_config, &event).await {
warn!(
replica = replica_name,
error = %e,
"failed to publish restore-failed event to canopy"
);
}
}

Ok(Action::requeue(Duration::from_secs(300)))
}

Expand Down Expand Up @@ -464,6 +489,7 @@ async fn reconcile_restoring(
namespace,
name,
replica_name,
"restore job exceeded backoff limit",
serde_json::json!({
"phase": "Failed",
"restoreJob": {
Expand Down Expand Up @@ -690,6 +716,7 @@ async fn reconcile_ready(
namespace,
name,
replica_name,
"version detection job returned no version",
serde_json::json!({ "phase": "Failed" }),
)
.await;
Expand All @@ -709,6 +736,7 @@ async fn reconcile_ready(
namespace,
name,
replica_name,
"version detection job exceeded backoff limit",
serde_json::json!({ "phase": "Failed" }),
)
.await;
Expand Down Expand Up @@ -772,6 +800,7 @@ async fn reconcile_ready(
namespace,
name,
replica_name,
"deployment not ready after 10 minutes",
serde_json::json!({ "phase": "Failed" }),
)
.await;
Expand Down
2 changes: 2 additions & 0 deletions src/controllers/restore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ fn deployment_uses_affinity_not_node_selector() {

persistent_schemas: None,
storage_size_maximum: Quantity("2Ti".to_string()),
event_publisher: None,
},
);
replica.spec.affinity = Some(Affinity {
Expand Down Expand Up @@ -124,6 +125,7 @@ fn test_restore_and_replica() -> (PostgresPhysicalRestore, PostgresPhysicalRepli

persistent_schemas: None,
storage_size_maximum: Quantity("2Ti".to_string()),
event_publisher: None,
},
);

Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum Error {
#[error("Notification error: {0}")]
Notification(String),

#[error("Event publisher error: {0}")]
EventPublisher(String),

#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),

Expand Down
Loading