@@ -6,7 +6,7 @@ use k8s_openapi::{
66 api:: {
77 batch:: v1:: { Job , JobSpec } ,
88 core:: v1:: {
9- Container , EnvVar , LocalObjectReference , PodSpec , PodTemplateSpec ,
9+ Container , EnvVar , LocalObjectReference , Pod , PodSpec , PodTemplateSpec ,
1010 ResourceRequirements , Secret , Service , ServicePort , ServiceSpec ,
1111 } ,
1212 } ,
@@ -415,13 +415,60 @@ impl PostgresPhysicalReplica {
415415 }
416416}
417417
418+ /// Label the operator sets on a restore's postgres pod once schema
419+ /// migration (and anything else that must run pre-handover) has completed
420+ /// and the pod is safe to receive external traffic. The per-replica
421+ /// Service selector requires this label, so a restore in `Switching`
422+ /// can't be reached via the Service — operator-side work (DROP SCHEMA,
423+ /// `pg_dump | psql` migration Job, etc.) runs without external clients
424+ /// racing to grab locks on the schemas being touched.
425+ pub const READY_FOR_TRAFFIC_LABEL : & str = "pgro.bes.au/ready-for-traffic" ;
426+
418427impl PostgresPhysicalRestore {
428+ /// Patch this restore's postgres pod to add the [`READY_FOR_TRAFFIC_LABEL`].
429+ /// Idempotent and resilient to the pod not existing yet (the restore's
430+ /// deployment may be mid-rollout); callers that need the label to be
431+ /// present should retry on the next reconcile pass.
432+ pub async fn mark_pod_ready_for_traffic ( & self , client : & Client ) -> Result < ( ) > {
433+ let pods: Api < Pod > = Api :: namespaced ( client. clone ( ) , & self . ns ( ) ) ;
434+ let selector = format ! ( "pgro.bes.au/restore={}" , self . name_any( ) ) ;
435+ let list = pods. list ( & ListParams :: default ( ) . labels ( & selector) ) . await ?;
436+ let pod = list
437+ . items
438+ . into_iter ( )
439+ . find ( |p| p. status . as_ref ( ) . and_then ( |s| s. phase . as_deref ( ) ) == Some ( "Running" ) ) ;
440+ let Some ( pod) = pod else {
441+ warn ! (
442+ restore = self . name_any( ) ,
443+ "no running pod for restore yet; will retry next reconcile"
444+ ) ;
445+ return Ok ( ( ) ) ;
446+ } ;
447+ let pod_name = pod. name_any ( ) ;
448+ let patch = serde_json:: json!( {
449+ "metadata" : {
450+ "labels" : {
451+ READY_FOR_TRAFFIC_LABEL : "true" ,
452+ }
453+ }
454+ } ) ;
455+ pods. patch ( & pod_name, & PatchParams :: default ( ) , & Patch :: Merge ( & patch) )
456+ . await ?;
457+ info ! (
458+ restore = self . name_any( ) ,
459+ pod = pod_name,
460+ "marked restore pod ready for traffic"
461+ ) ;
462+ Ok ( ( ) )
463+ }
464+
419465 pub async fn update_service_selector ( & self , client : & Client , service_name : & str ) -> Result < ( ) > {
420466 let services: Api < Service > = Api :: namespaced ( client. clone ( ) , & self . ns ( ) ) ;
421467 let patch = serde_json:: json!( {
422468 "spec" : {
423469 "selector" : {
424470 "pgro.bes.au/restore" : self . name_any( ) ,
471+ READY_FOR_TRAFFIC_LABEL : "true" ,
425472 }
426473 }
427474 } ) ;
0 commit comments