Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions src/cloud-resources/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ pub trait ManagedResource: Resource<DynamicType = ()> + Sized {

fn managed_resource_meta(&self, name: String) -> ObjectMeta {
let mut labels = self.default_labels();
if let Some(app) = self.app_name() {
labels.insert("app.kubernetes.io/name".to_owned(), app.to_owned());
}
labels.extend(recommended_k8s_labels(self.app_name()));
ObjectMeta {
namespace: Some(self.meta().namespace.clone().unwrap()),
name: Some(name),
Expand All @@ -92,6 +90,26 @@ pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
}
}

/// Get the recommended Kubernetes labels (app.kubernetes.io/*)
/// WARNING: this is duplicated in src/orchestrator/src/lib.rs and src/orchestratord/src/k8s.rs
pub fn recommended_k8s_labels(app_name: Option<&str>) -> BTreeMap<String, String> {
let mut labels = BTreeMap::new();
labels.insert(
"app.kubernetes.io/managed-by".to_owned(),
"materialize-operator".to_owned(),
);
labels.insert(
"app.kubernetes.io/part-of".to_owned(),
"materialize".to_owned(),
);
if let Some(app) = app_name {
labels.insert("app.kubernetes.io/name".to_owned(), app.to_owned());
// legacy label
labels.insert("app".to_owned(), app.to_owned());
}
labels
}

fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
OwnerReference {
api_version: T::api_version(&()).to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/controller/src/clusters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ impl Controller {
let service = self.orchestrator.ensure_service(
&service_name,
ServiceConfig {
app_name: "clusterd".into(),
image: self.clusterd_image.clone(),
init_container_image: self.init_container_image.clone(),
args: Box::new(move |assigned| {
Expand Down
10 changes: 9 additions & 1 deletion src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
use mz_orchestrator::{
DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
ServiceProcessMetrics, ServiceStatus, recommended_k8s_labels, scheduling_config::*,
};
use mz_ore::cast::CastInto;
use mz_ore::retry::Retry;
Expand Down Expand Up @@ -556,6 +556,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
&self,
id: &str,
ServiceConfig {
app_name,
image,
init_container_image,
args,
Expand Down Expand Up @@ -600,6 +601,11 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
labels.insert(self.make_label_key(&key), value);
}

let standard_labels = recommended_k8s_labels(app_name);

// Standard Kubernetes labels
labels.extend(standard_labels.clone());

labels.insert(self.make_label_key("scale"), scale.to_string());

for port in &ports_in {
Expand Down Expand Up @@ -645,6 +651,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
let service = K8sService {
metadata: ObjectMeta {
name: Some(name.clone()),
labels: Some(standard_labels.clone()),
..Default::default()
},
spec: Some(ServiceSpec {
Expand Down Expand Up @@ -1236,6 +1243,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
let stateful_set = StatefulSet {
metadata: ObjectMeta {
name: Some(name.clone()),
labels: Some(standard_labels.clone()),
..Default::default()
},
spec: Some(StatefulSetSpec {
Expand Down
20 changes: 20 additions & 0 deletions src/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ pub struct LabelSelector {
#[derive(Derivative)]
#[derivative(Debug)]
pub struct ServiceConfig {
/// Static application name (usually present in labels)
pub app_name: String,
/// An opaque identifier for the executable or container image to run.
///
/// Often names a container on Docker Hub or a path on the local machine.
Expand Down Expand Up @@ -250,6 +252,24 @@ pub struct ServiceConfig {
pub node_selector: BTreeMap<String, String>,
}

/// Get the recommended Kubernetes labels (app.kubernetes.io/*)
/// WARNING: this is duplicated in src/orchestratord/src/k8s.rs and src/cloud-resources/src/crd.rs
pub fn recommended_k8s_labels(app_name: String) -> BTreeMap<String, String> {
BTreeMap::from_iter([
(
"app.kubernetes.io/managed-by".to_owned(),
"materialize-operator".to_owned(),
),
(
"app.kubernetes.io/part-of".to_owned(),
"materialize".to_owned(),
),
("app.kubernetes.io/name".to_owned(), app_name.to_owned()),
// legacy label
("app".to_owned(), app_name.to_owned()),
])
}

/// A named port associated with a service.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServicePort {
Expand Down
7 changes: 5 additions & 2 deletions src/orchestratord/src/controller/balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tracing::{trace, warn};

use crate::{
Error,
k8s::{apply_resource, get_resource, replace_resource},
k8s::{apply_resource, get_resource, recommended_k8s_labels, replace_resource},
tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
};
use mz_cloud_resources::crd::{
Expand Down Expand Up @@ -354,10 +354,13 @@ impl Context {
..Default::default()
};

let match_labels = pod_template_labels.clone();
pod_template_labels.extend(recommended_k8s_labels(balancer.app_name()));

let deployment_spec = DeploymentSpec {
replicas: Some(balancer.replicas()),
selector: LabelSelector {
match_labels: Some(pod_template_labels.clone()),
match_labels: Some(match_labels),
..Default::default()
},
strategy: Some(DeploymentStrategy {
Expand Down
7 changes: 5 additions & 2 deletions src/orchestratord/src/controller/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tracing::{trace, warn};

use crate::{
Error,
k8s::{apply_resource, get_resource, replace_resource},
k8s::{apply_resource, get_resource, recommended_k8s_labels, replace_resource},
tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
};
use mz_cloud_resources::crd::{
Expand Down Expand Up @@ -402,10 +402,13 @@ ssl_certificate_key /nginx/tls/tls.key;",
..Default::default()
};

let match_labels = pod_template_labels.clone();
pod_template_labels.extend(recommended_k8s_labels("console".into()));

let deployment_spec = DeploymentSpec {
replicas: Some(console.replicas()),
selector: LabelSelector {
match_labels: Some(pod_template_labels.clone()),
match_labels: Some(match_labels),
..Default::default()
},
template: PodTemplateSpec {
Expand Down
8 changes: 2 additions & 6 deletions src/orchestratord/src/controller/materialize/generation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use super::matching_image_from_environmentd_image_ref;
use crate::k8s::{apply_resource, delete_resource, get_resource};
use crate::tls::issuer_ref_defined;
use mz_cloud_provider::CloudProvider;
use mz_cloud_resources::crd::ManagedResource;
use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
use mz_cloud_resources::crd::{ManagedResource, recommended_k8s_labels};
use mz_ore::instrument;

static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
Expand Down Expand Up @@ -1131,11 +1131,7 @@ fn create_environmentd_statefulset_object(
"materialize.cloud/app".to_owned(),
mz.environmentd_app_name(),
);
pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
pod_template_labels.insert(
"app.kubernetes.io/name".to_owned(),
"environmentd".to_string(),
);
pod_template_labels.extend(recommended_k8s_labels("environmentd".into()));
pod_template_labels.extend(
mz.spec
.pod_labels
Expand Down
16 changes: 16 additions & 0 deletions src/orchestratord/src/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::time::Duration;

use k8s_openapi::{
Expand Down Expand Up @@ -190,3 +191,18 @@ pub async fn register_crds(

Ok(())
}

/// Get the recommended Kubernetes labels (app.kubernetes.io/*)
/// WARNING: this is duplicated in src/orchestrator/src/lib.rs and src/cloud-resources/src/crd.rs
pub fn recommended_k8s_labels(app_name: String) -> BTreeMap<String, String> {
let mut labels = BTreeMap::new();
labels.insert(
"app.kubernetes.io/managed-by".into(),
"materialize-operator".into(),
);
labels.insert("app.kubernetes.io/part-of".into(), "materialize".into());
labels.insert("app.kubernetes.io/name".into(), app_name.to_owned());
// legacy label
labels.insert("app".into(), app_name.to_owned());
labels
}
49 changes: 49 additions & 0 deletions test/orchestratord/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,55 @@ def check() -> None:
retry(check, 360)


class RecommendedK8sLabels(Modification):
@classmethod
def values(cls, version: MzVersion) -> list[Any]:
return [None]

@classmethod
def default(cls) -> Any:
return None

def modify(self, definition: dict[str, Any]) -> None:
pass

def validate(self, mods: dict[type[Modification], Any]) -> None:
if MzVersion.parse_mz(mods[EnvironmentdImageRef]) < MzVersion.parse_mz(
"v26.24.0"
):
return

def get(kind: str, name: str) -> dict[str, Any]:
return json.loads(
spawn.capture(
[
"kubectl",
"get",
kind,
name,
"-n",
"materialize-environment",
"-o",
"json",
]
)
)

def check() -> None:
pod = get_environmentd_data()["items"][0]
statefulset = get(
"statefulset", pod["metadata"]["labels"]["materialize.cloud/name"]
)
service = get("service", statefulset["spec"]["serviceName"])
for kind, obj in (("statefulset", statefulset), ("service", service)):
actual = obj["metadata"].get("labels", {}).get("app.kubernetes.io/name")
assert (
actual == "environmentd"
), f"Expected app.kubernetes.io/name=environmentd on {kind}/{obj['metadata']['name']}, got {actual!r}"

retry(check, 120)


class AuthenticatorKind(Modification):
@classmethod
def values(cls, version: MzVersion) -> list[Any]:
Expand Down
Loading