diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java index 1eabf09a..a24d4b91 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java @@ -1,35 +1,39 @@ package com.linkedin.hoptimator; -import java.util.List; +import javax.annotation.Nullable; import java.util.Map; public class Trigger implements Deployable { public static final String PAUSED_OPTION = "paused"; + private final String name; private final UserJob job; - private final List path; private final String cronSchedule; private final Map options; + private final Source source; + private final Sink sink; - public Trigger(String name, UserJob job, List path, String cronSchedule, - Map options) { + /** + * Contains an optional downstream sink for triggers that operate between a source + * sink (think ETL/rETL). + * TODO: need to collapse the "job.properties.online.table.name" logic into a sink for adhoc triggers + */ + public Trigger(String name, UserJob job, String cronSchedule, Map options, + Source source, @Nullable Sink sink) { this.name = name; this.job = job; - this.path = path; this.cronSchedule = cronSchedule; this.options = options; + this.source = source; + this.sink = sink; } public String name() { return name; } - public List path() { - return path; - } - public UserJob job() { return job; } @@ -38,27 +42,24 @@ public String cronSchedule() { return cronSchedule; } - public String table() { - return path.get(path.size() - 1); - } - - /** - * Returns the schema name if present. - */ - public String schema() { - return path.size() >= 2 ? path.get(path.size() - 2) : null; - } - public Map options() { return options; } - private String pathString() { - return String.join(".", path); + /** Upstream source the trigger fires on, or {@code null} when only the name is known + * (e.g. during DROP TRIGGER / PAUSE / RESUME, which only need to look up the existing CRD). */ + public Source source() { + return source; + } + + /** Downstream sink the trigger's job writes to, or {@code null} when the trigger has no declared sink. */ + public @Nullable Sink sink() { + return sink; } @Override public String toString() { - return "Trigger[" + name() + ", " + pathString() + "]"; + String path = source == null ? "" : String.join(".", source.path()); + return "Trigger[" + name + ", " + path + "]"; } } diff --git a/hoptimator-api/src/test/java/com/linkedin/hoptimator/PendingDeleteTest.java b/hoptimator-api/src/test/java/com/linkedin/hoptimator/PendingDeleteTest.java index 4b5b9144..99abc023 100644 --- a/hoptimator-api/src/test/java/com/linkedin/hoptimator/PendingDeleteTest.java +++ b/hoptimator-api/src/test/java/com/linkedin/hoptimator/PendingDeleteTest.java @@ -67,7 +67,7 @@ void toStringOmitsSelfOwnerWhenNull() { @Test void toStringOmitsSelfOwnerWhenOnlyOneFieldSet() { // The toString contract says self= appears only when both kind and name are non-null. - // (The K8sPipelineDependencyChecker.isSelfOwned guard also requires both.) + // (The DependencyChecker.isSelfOwned guard also requires both.) PendingDelete kindOnly = new PendingDelete<>("t", "LogicalTable", null); assertFalse(kindOnly.toString().contains("self=")); diff --git a/hoptimator-api/src/test/java/com/linkedin/hoptimator/TriggerTest.java b/hoptimator-api/src/test/java/com/linkedin/hoptimator/TriggerTest.java index 29558823..4f901499 100644 --- a/hoptimator-api/src/test/java/com/linkedin/hoptimator/TriggerTest.java +++ b/hoptimator-api/src/test/java/com/linkedin/hoptimator/TriggerTest.java @@ -2,10 +2,12 @@ import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -15,26 +17,37 @@ class TriggerTest { void testAccessors() { UserJob userJob = new UserJob("ns", "jobName"); Map options = Map.of("paused", "true"); - Trigger trigger = new Trigger("myTrigger", userJob, List.of("schema", "table"), "0 * * * *", options); + Source source = new Source("db", List.of("schema", "table"), Collections.emptyMap()); + Trigger trigger = new Trigger("myTrigger", userJob, "0 * * * *", options, source, null); assertEquals("myTrigger", trigger.name()); assertEquals(userJob, trigger.job()); - assertEquals(List.of("schema", "table"), trigger.path()); assertEquals("0 * * * *", trigger.cronSchedule()); assertEquals(options, trigger.options()); - assertEquals("table", trigger.table()); - assertEquals("schema", trigger.schema()); + assertNotNull(trigger.source()); + assertEquals(List.of("schema", "table"), trigger.source().path()); + assertEquals("table", trigger.source().table()); + assertEquals("schema", trigger.source().schema()); + assertNull(trigger.sink()); } @Test - void testSchemaReturnsNullForSingleElementPath() { - Trigger trigger = new Trigger("t", null, List.of("table"), null, Map.of()); - assertNull(trigger.schema()); + void testNullSourceAccessor() { + Trigger trigger = new Trigger("t", null, null, Map.of(), null, null); + assertNull(trigger.source()); + assertNull(trigger.sink()); } @Test void testToString() { - Trigger trigger = new Trigger("myTrig", null, List.of("a", "b"), null, Map.of()); + Source source = new Source("db", List.of("a", "b"), Collections.emptyMap()); + Trigger trigger = new Trigger("myTrig", null, null, Map.of(), source, null); assertEquals("Trigger[myTrig, a.b]", trigger.toString()); } + + @Test + void testToStringWithoutSource() { + Trigger trigger = new Trigger("myTrig", null, null, Map.of(), null, null); + assertEquals("Trigger[myTrig, ]", trigger.toString()); + } } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 18e229e7..47b3f871 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -229,7 +229,8 @@ public void execute(SqlCreateTrigger create, CalcitePrepare.Context context) { String cronSchedule = create.cron != null ? ((SqlLiteral) create.cron).getValueAs(String.class) : null; UserJob job = new UserJob(jobNamespace, jobName); - Trigger trigger = new Trigger(name, job, targetPath, cronSchedule, options); + Source source = new Source(databaseOf(target), targetPath, Collections.emptyMap()); + Trigger trigger = new Trigger(name, job, cronSchedule, options, source, null); Collection deployers = null; try { @@ -255,6 +256,23 @@ public void execute(SqlCreateTrigger create, CalcitePrepare.Context context) { } } + /** + * Best-effort lookup of the database name that owns a Calcite Table. Mirrors what the + * {@code DROP TABLE} path does so triggers and tables agree on the same + * {@code (database, path)} identifier. + */ + private static String databaseOf(Table target) { + if (target instanceof HoptimatorJdbcTable) { + HoptimatorJdbcSchema jdbcSchema = + (HoptimatorJdbcSchema) ((HoptimatorJdbcTable) target).jdbcTable().jdbcSchema; + return jdbcSchema.databaseName(); + } + if (target instanceof TemporaryTable) { + return ((TemporaryTable) target).databaseName(); + } + return null; + } + // N.B. originally copy-pasted from Apache Calcite /** Executes a {@code CREATE TABLE} command. */ @@ -307,7 +325,7 @@ public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) { } String name = drop.name.names.get(0); - Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, new HashMap<>()); + Trigger trigger = new Trigger(name, null, null, new HashMap<>(), null, null); Collection deployers = null; try { @@ -344,7 +362,7 @@ private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName Map options = new HashMap<>(); options.put(Trigger.PAUSED_OPTION, String.valueOf(paused)); - Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, options); + Trigger trigger = new Trigger(name, null, null, options, null, null); Collection deployers = null; try { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/DependencyChecker.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/DependencyChecker.java new file mode 100644 index 00000000..b539b8e2 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/DependencyChecker.java @@ -0,0 +1,141 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import io.kubernetes.client.common.KubernetesObject; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1OwnerReference; + +import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList; + +import javax.annotation.Nullable; + + +/** + * Checks whether any Pipeline or TableTrigger CRDs still depend on a resource a + * {@link com.linkedin.hoptimator.Deployer} is about to delete. + * + *

Both CRDs carry the same {@code depends-on-} label and {@code depends-on-sources}/ + * {@code depends-on-sinks} annotations (stamped by {@link K8sPipelineDeployer} and + * {@link K8sTriggerDeployer}), so the same lookup works for either: a label-selector list against + * the CRD group is O(matches) on the wire, then each candidate is cross-checked against the union + * of the source + sink annotations to rule out hash collisions in the slug and stale labels left + * over from a prior version of the resource ({@link K8sApi#update}'s additive label merge can leak + * old {@code depends-on-*} keys). + * + *

Resources owned (directly) by {@code (selfOwnerKind, selfOwnerName)} are excluded from the + * blocker list: those will be cascade-deleted alongside the parent resource, so counting them as + * external dependents would make composite deletes (e.g. {@code LogicalTableDeployer.delete()}) + * impossible. + */ +public final class DependencyChecker { + + private DependencyChecker() { + } + + public static void assertNoExternalDependents(K8sContext context, String database, + List path, @Nullable String selfOwnerKind, @Nullable String selfOwnerName) throws SQLException { + assertNoExternalDependents( + new K8sApi<>(context, K8sApiEndpoints.PIPELINES), + new K8sApi<>(context, K8sApiEndpoints.TABLE_TRIGGERS), + database, path, selfOwnerKind, selfOwnerName); + } + + /** Variant that takes pre-built {@link K8sApi}s — used by tests to inject mocks. */ + static void assertNoExternalDependents(K8sApi pipelineApi, + K8sApi triggerApi, + String database, List path, @Nullable String selfOwnerKind, + @Nullable String selfOwnerName) throws SQLException { + + String labelKey = DependencyLabels.labelKey(database, path); + String identifier = DependencyLabels.identifier(database, path); + + List blockers = new ArrayList<>(); + blockers.addAll(findBlockers(pipelineApi, labelKey, identifier, "pipeline", + selfOwnerKind, selfOwnerName)); + blockers.addAll(findBlockers(triggerApi, labelKey, identifier, "trigger", + selfOwnerKind, selfOwnerName)); + + if (!blockers.isEmpty()) { + throw new SQLException(String.format( + "Cannot delete %s — %d active dependent(s): %s", + identifier, blockers.size(), String.join(", ", blockers))); + } + } + + /** + * Generic blocker enumeration: list resources of type {@code T} that carry the given + * {@code labelKey}, confirm via the depends-on annotations, and exclude self-owned children. + * The {@code kindLabel} is prefixed onto each blocker description so a unified error message + * can attribute each entry to its CRD kind. + */ + private static List findBlockers(K8sApi api, + String labelKey, String identifier, String kindLabel, + @Nullable String selfOwnerKind, @Nullable String selfOwnerName) throws SQLException { + Collection matches = api.select(labelKey); + List blockers = new ArrayList<>(); + for (T obj : matches) { + V1ObjectMeta meta = obj.getMetadata(); + if (isSelfOwned(meta, selfOwnerKind, selfOwnerName)) { + continue; + } + if (!annotationConfirms(meta, identifier)) { + // Label matched but annotation doesn't — slug collision or stale label, skip it. + continue; + } + blockers.add(kindLabel + "/" + describeBlocker(meta)); + } + return blockers; + } + + private static boolean isSelfOwned(V1ObjectMeta meta, @Nullable String selfOwnerKind, + @Nullable String selfOwnerName) { + if (selfOwnerKind == null || selfOwnerName == null) { + return false; + } + if (meta == null || meta.getOwnerReferences() == null) { + return false; + } + for (V1OwnerReference owner : meta.getOwnerReferences()) { + if (selfOwnerKind.equals(owner.getKind()) && selfOwnerName.equals(owner.getName())) { + return true; + } + } + return false; + } + + private static boolean annotationConfirms(V1ObjectMeta meta, String identifier) { + if (meta == null || meta.getAnnotations() == null) { + return true; // pre-labeling resource — conservatively trust the label match + } + String sourcesAnno = meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SOURCES); + String sinksAnno = meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SINKS); + if (sourcesAnno == null && sinksAnno == null) { + return true; // same — no annotations to cross-check against + } + if (sourcesAnno != null && DependencyLabels.parseAnnotation(sourcesAnno).contains(identifier)) { + return true; + } + return sinksAnno != null && DependencyLabels.parseAnnotation(sinksAnno).contains(identifier); + } + + /** + * Builds a human-readable blocker description: the resource name, plus (when present) the top + * ownerReference's {@code kind/name} so the user knows which higher-level resource owns it. + */ + private static String describeBlocker(V1ObjectMeta meta) { + String name = meta == null ? "" : meta.getName(); + String ownerSuffix = ""; + if (meta != null && meta.getOwnerReferences() != null && !meta.getOwnerReferences().isEmpty()) { + V1OwnerReference owner = meta.getOwnerReferences().get(0); + ownerSuffix = " (owned by " + owner.getKind() + "/" + owner.getName() + ")"; + } + return name + ownerSuffix; + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/DependencyLabels.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/DependencyLabels.java new file mode 100644 index 00000000..93d3ff70 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/DependencyLabels.java @@ -0,0 +1,146 @@ +package com.linkedin.hoptimator.k8s; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; + +import com.linkedin.hoptimator.Sink; +import com.linkedin.hoptimator.Source; + + +/** + * Encodes a resource's dependency edges as K8s labels + annotations. Used by both + * {@link K8sPipelineDeployer} and {@link K8sTriggerDeployer} so the dep-guard can + * find dependents with a single label-selector query. + * + *

Every source and sink the resource references becomes a label: + * {@code hoptimator.linkedin.com/depends-on-: "_"} where + * {@code } is a deterministic hash derived from {@code database + "_" + pathString}. + * The hash keeps label keys within Kubernetes's 63-character name limit for arbitrary paths, + * and lets {@code K8sApi.select} filter by dependency on the server. + * + *

Two annotations preserve the directional information the labels lose: + *

    + *
  • {@link #ANNOTATION_KEY_SOURCES} — comma-separated list of source identifiers verbatim.
  • + *
  • {@link #ANNOTATION_KEY_SINKS} — comma-separated list of sink identifiers verbatim.
  • + *
+ * Together they serve two purposes: + *
    + *
  1. Collision guard for the delete-time dependency checker — a label match is only + * trusted when the resource appears in either annotation, so slug collisions and stale + * labels (from {@link K8sApi#update}'s additive label merge) can't produce false positives.
  2. + *
  3. Direction recovery for visualization — the renderer draws source → resource → sink + * arrows from the split.
  4. + *
+ */ +public final class DependencyLabels { + + static final String LABEL_PREFIX = "hoptimator.linkedin.com/depends-on-"; + /** Annotation listing source identifiers, comma-separated. */ + public static final String ANNOTATION_KEY_SOURCES = "hoptimator.linkedin.com/depends-on-sources"; + /** Annotation listing sink identifiers, comma-separated. */ + public static final String ANNOTATION_KEY_SINKS = "hoptimator.linkedin.com/depends-on-sinks"; + + private static final int SLUG_LENGTH = 16; // 64 bits of SHA-256 → ~1 in 1.8e19 collisions + private static final int MAX_LABEL_VALUE = 63; + + private DependencyLabels() { + } + + /** + * Canonical logical identifier for a resource: {@code _}. + */ + public static String identifier(String database, Iterable path) { + return database + "_" + String.join(".", path); + } + + /** Hex slug derived from the full identifier; same identifier always produces the same slug. */ + public static String slug(String database, Iterable path) { + byte[] digest = sha256(identifier(database, path).getBytes(StandardCharsets.UTF_8)); + StringBuilder sb = new StringBuilder(SLUG_LENGTH); + for (int i = 0; i < SLUG_LENGTH / 2; i++) { + sb.append(String.format("%02x", digest[i])); + } + return sb.toString(); + } + + /** Label key a resource carries if it depends on the given source/sink. */ + public static String labelKey(String database, Iterable path) { + return LABEL_PREFIX + slug(database, path); + } + + /** + * Stamps the depends-on labels and directional annotations onto {@code meta}, merging with any + * existing labels/annotations on the object. Both edges matter to the guard: dropping a source + * orphans resources that read from it; dropping a sink orphans resources that write to it. + * + *

Sources and sinks whose {@code database()} is null are skipped — they don't have a stable + * identifier. Callers with a single sink should pass {@link Collections#singletonList}. + */ + public static void stamp(V1ObjectMeta meta, Collection sources, Collection sinks) { + Map labels = meta.getLabels() != null ? meta.getLabels() : new HashMap<>(); + Map annotations = meta.getAnnotations() != null ? meta.getAnnotations() : new HashMap<>(); + Set sourceIds = collectIdentifiers(sources, labels); + Set sinkIds = collectIdentifiers(sinks, labels); + if (!sourceIds.isEmpty()) { + annotations.put(ANNOTATION_KEY_SOURCES, String.join(",", sourceIds)); + } + if (!sinkIds.isEmpty()) { + annotations.put(ANNOTATION_KEY_SINKS, String.join(",", sinkIds)); + } + meta.setLabels(labels); + meta.setAnnotations(annotations); + } + + private static Set collectIdentifiers(Collection deps, + Map labels) { + Set ids = new LinkedHashSet<>(); + if (deps == null) { + return ids; + } + for (Source dep : deps) { + if (dep == null || dep.database() == null) { + continue; + } + String id = identifier(dep.database(), dep.path()); + labels.put(labelKey(dep.database(), dep.path()), truncate(id)); + ids.add(id); + } + return ids; + } + + /** Parses the collision-guard annotation back into the set of identifiers it encoded. */ + public static Set parseAnnotation(String annotation) { + Set out = new LinkedHashSet<>(); + if (annotation == null || annotation.isEmpty()) { + return out; + } + for (String id : annotation.split(",")) { + String trimmed = id.trim(); + if (!trimmed.isEmpty()) { + out.add(trimmed); + } + } + return out; + } + + private static String truncate(String value) { + return value.length() <= MAX_LABEL_VALUE ? value : value.substring(0, MAX_LABEL_VALUE); + } + + private static byte[] sha256(byte[] input) { + try { + return MessageDigest.getInstance("SHA-256").digest(input); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 unavailable", e); + } + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidator.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDependencyValidator.java similarity index 74% rename from hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidator.java rename to hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDependencyValidator.java index ff06a3be..5a746206 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidator.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDependencyValidator.java @@ -11,16 +11,16 @@ /** * Pre-delete dependency check, run by the validator framework when a {@link Source} is wrapped * in {@link com.linkedin.hoptimator.PendingDelete}. Delegates to the existing - * {@link PipelineDependencyChecker} (label-selector + annotation collision-guard + self-owner + * {@link DependencyChecker} (label-selector + annotation collision-guard + self-owner * exclusion) and surfaces any blocking pipeline as a validation error. */ -final class K8sPipelineDependencyValidator implements Validator { +final class K8sDependencyValidator implements Validator { private final Source source; private final String selfOwnerKind; private final String selfOwnerName; - K8sPipelineDependencyValidator(Source source, @Nullable String selfOwnerKind, @Nullable String selfOwnerName) { + K8sDependencyValidator(Source source, @Nullable String selfOwnerKind, @Nullable String selfOwnerName) { this.source = source; this.selfOwnerKind = selfOwnerKind; this.selfOwnerName = selfOwnerName; @@ -29,7 +29,7 @@ final class K8sPipelineDependencyValidator implements Validator { @Override public void validate(Issues issues, Connection connection) { try { - PipelineDependencyChecker.assertNoExternalDependents( + DependencyChecker.assertNoExternalDependents( K8sContext.create(connection), source.database(), source.path(), selfOwnerKind, selfOwnerName); } catch (SQLException e) { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineBundle.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineBundle.java index 8fe455d2..5a09a6da 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineBundle.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineBundle.java @@ -30,7 +30,7 @@ public class K8sPipelineBundle implements Deployer { /** * {@code sources} and {@code sink} are stamped as {@code depends-on-*} - * labels on the Pipeline CRD so the delete-time guard in {@link PipelineDependencyChecker} + * labels on the Pipeline CRD so the delete-time guard in {@link DependencyChecker} * can find this pipeline by label selector. */ public K8sPipelineBundle(String name, List pipelineSpecs, String sql, diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java index 0a9cfedf..4f2be404 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java @@ -3,9 +3,7 @@ import java.sql.SQLException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import io.kubernetes.client.openapi.models.V1ObjectMeta; @@ -19,7 +17,7 @@ /** * Deploys a Pipeline object. Stamps {@code depends-on-*} labels and a {@code depends-on} * collision-guard annotation describing which sources/sink the pipeline references, so - * {@link PipelineDependencyChecker} can look up dependents by label selector at delete time. + * {@link DependencyChecker} can look up dependents by label selector at delete time. * *

{@link K8sApi#update} merges labels additively, so stale {@code depends-on-*} labels from * a previous version of the pipeline's SQL can linger. Correctness is preserved by the @@ -48,14 +46,8 @@ class K8sPipelineDeployer extends K8sDeployer labels = PipelineDependencyLabels.labelsFor(sources, sink); - if (!labels.isEmpty()) { - meta.setLabels(new HashMap<>(labels)); - Map annotations = new HashMap<>(); - annotations.put(PipelineDependencyLabels.ANNOTATION_KEY, - PipelineDependencyLabels.annotationFor(sources, sink)); - meta.setAnnotations(annotations); - } + DependencyLabels.stamp(meta, sources, + sink == null ? Collections.emptyList() : Collections.singletonList(sink)); return new V1alpha1Pipeline().kind(K8sApiEndpoints.PIPELINES.kind()) .apiVersion(K8sApiEndpoints.PIPELINES.apiVersion()) .metadata(meta) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java index 29289593..6929b421 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java @@ -1,5 +1,6 @@ package com.linkedin.hoptimator.k8s; +import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.Trigger; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList; @@ -10,6 +11,7 @@ import io.kubernetes.client.openapi.models.V1ObjectMeta; import java.sql.SQLException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -60,12 +62,28 @@ public void update() throws SQLException { existingTrigger.spec(spec); } spec.setPaused(targetPaused); + // Refresh dependency-tracking labels and annotation here too — without this, the partial + // update path (used when the LogicalTable is re-applied) would leave triggers with stale + // or missing depends-on metadata. + stampDependencyMetadata(existingTrigger); triggerApi.update(existingTrigger); return; } super.update(); } + private void stampDependencyMetadata(V1alpha1TableTrigger target) { + V1ObjectMeta meta = target.getMetadata(); + if (meta == null) { + meta = new V1ObjectMeta(); + target.metadata(meta); + } + Source source = trigger.source(); + DependencyLabels.stamp(meta, + source != null ? Collections.singletonList(source) : Collections.emptyList(), + trigger.sink() != null ? Collections.singletonList(trigger.sink()) : Collections.emptyList()); + } + @Override public void delete() throws SQLException { String canonicalName = K8sUtils.canonicalizeName(trigger.name()); @@ -78,9 +96,10 @@ public void delete() throws SQLException { @Override protected V1alpha1TableTrigger toK8sObject() throws SQLException { + Source source = trigger.source(); String name = K8sUtils.canonicalizeName(trigger.name(), trigger.job().name()); String triggerName = K8sUtils.canonicalizeName(trigger.name()); - String viewName = K8sUtils.canonicalizeName(trigger.path()); + String viewName = source != null ? K8sUtils.canonicalizeName(source.path()) : triggerName; String jobName = K8sUtils.canonicalizeName(trigger.job().name()); String jobNamespace = trigger.job().namespace() != null ? trigger.job().namespace() : context.namespace(); @@ -91,12 +110,20 @@ protected V1alpha1TableTrigger toK8sObject() throws SQLException { .with("trigger", triggerName) .with("job", jobName) .with("schedule", trigger.cronSchedule()) - .with("table", trigger.table()) - .with("schema", trigger.schema()) + .with("table", source != null ? source.table() : null) + .with("schema", source != null ? source.schema() : null) .with(properties); V1alpha1JobTemplate jobTemplate = jobTemplateApi.get(jobNamespace, jobName); + V1ObjectMeta meta = new V1ObjectMeta().name(triggerName); Map labels = new HashMap<>(); labels.put("view", viewName); // a corresponding view object may or may not exist. + meta.setLabels(labels); + // Stamp depends-on labels so the dep-guard can find triggers via + // label selector. The Trigger's source is the upstream table the job reads from. When the + // trigger carries a Sink, we additionally stamp the sink. + DependencyLabels.stamp(meta, + source != null ? Collections.singletonList(source) : Collections.emptyList(), + trigger.sink() != null ? Collections.singletonList(trigger.sink()) : Collections.emptyList()); String template = jobTemplate.getSpec().getYaml(); String rendered = new Template.SimpleTemplate(template).render(env); Map jobProps = new HashMap<>(); @@ -106,8 +133,8 @@ protected V1alpha1TableTrigger toK8sObject() throws SQLException { } }); V1alpha1TableTriggerSpec spec = new V1alpha1TableTriggerSpec() - .schema(trigger.schema()) - .table(trigger.table()) + .schema(source != null ? source.schema() : null) + .table(source != null ? source.table() : null) .schedule(trigger.cronSchedule()) .yaml(rendered); if (!jobProps.isEmpty()) { @@ -119,7 +146,7 @@ protected V1alpha1TableTrigger toK8sObject() throws SQLException { return new V1alpha1TableTrigger() .kind(K8sApiEndpoints.TABLE_TRIGGERS.kind()) .apiVersion(K8sApiEndpoints.TABLE_TRIGGERS.apiVersion()) - .metadata(new V1ObjectMeta().name(triggerName).labels(labels)) + .metadata(meta) .spec(spec); } } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sValidatorProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sValidatorProvider.java index 3cc45e4b..7cf14d20 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sValidatorProvider.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sValidatorProvider.java @@ -26,7 +26,7 @@ public Collection validators(T obj, Connection connection) { Object target = pd.target(); if (target instanceof Source) { return Collections.singletonList( - new K8sPipelineDependencyValidator((Source) target, pd.selfOwnerKind(), pd.selfOwnerName())); + new K8sDependencyValidator((Source) target, pd.selfOwnerKind(), pd.selfOwnerName())); } } return Collections.emptyList(); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyChecker.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyChecker.java deleted file mode 100644 index ee9a6b64..00000000 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyChecker.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.linkedin.hoptimator.k8s; - -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1OwnerReference; - -import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; -import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; - -import javax.annotation.Nullable; - - -/** - * Checks whether any Pipeline CRDs still depend on a resource a {@link com.linkedin.hoptimator.Deployer} - * is about to delete. - * - *

The lookup is a label-selector list against the Pipeline CRD group, so it is O(matches) on - * the wire — not a full scan. Each candidate is then cross-checked against the - * {@link PipelineDependencyLabels#ANNOTATION_KEY} annotation to rule out the (rare) case of a - * hash collision in the label slug. - * - *

Pipelines owned (directly) by {@code (selfOwnerKind, selfOwnerName)} are excluded from the - * blocker list: those pipelines will be cascade-deleted alongside the parent resource, so counting - * them as external dependents would make composite deletes (e.g. {@code LogicalTableDeployer.delete()}) - * impossible. - */ -public final class PipelineDependencyChecker { - - private PipelineDependencyChecker() { - } - - public static void assertNoExternalDependents(K8sContext context, String database, - List path, @Nullable String selfOwnerKind, @Nullable String selfOwnerName) throws SQLException { - assertNoExternalDependents(new K8sApi<>(context, K8sApiEndpoints.PIPELINES), - database, path, selfOwnerKind, selfOwnerName); - } - - /** Variant that takes a pre-built {@link K8sApi} — used by tests to inject mocks. */ - static void assertNoExternalDependents(K8sApi api, - String database, List path, @Nullable String selfOwnerKind, - @Nullable String selfOwnerName) throws SQLException { - - String labelKey = PipelineDependencyLabels.labelKey(database, path); - String identifier = PipelineDependencyLabels.identifier(database, path); - - Collection matches = api.select(labelKey); - - List blockers = new ArrayList<>(); - for (V1alpha1Pipeline p : matches) { - if (isSelfOwned(p, selfOwnerKind, selfOwnerName)) { - continue; - } - if (!annotationConfirms(p, identifier)) { - // Label matched but annotation doesn't — this is a slug collision or a stale label, skip it. - continue; - } - blockers.add(describeBlocker(p)); - } - - if (!blockers.isEmpty()) { - throw new SQLException(String.format( - "Cannot delete %s — %d active pipeline(s) depend on it: %s", - identifier, blockers.size(), String.join(", ", blockers))); - } - } - - private static boolean isSelfOwned(V1alpha1Pipeline pipeline, @Nullable String selfOwnerKind, - @Nullable String selfOwnerName) { - if (selfOwnerKind == null || selfOwnerName == null) { - return false; - } - V1ObjectMeta meta = pipeline.getMetadata(); - if (meta == null || meta.getOwnerReferences() == null) { - return false; - } - for (V1OwnerReference owner : meta.getOwnerReferences()) { - if (selfOwnerKind.equals(owner.getKind()) && selfOwnerName.equals(owner.getName())) { - return true; - } - } - return false; - } - - private static boolean annotationConfirms(V1alpha1Pipeline pipeline, String identifier) { - V1ObjectMeta meta = pipeline.getMetadata(); - if (meta == null || meta.getAnnotations() == null) { - return true; // pre-labeling pipeline — conservatively trust the label match - } - String annotation = meta.getAnnotations().get(PipelineDependencyLabels.ANNOTATION_KEY); - if (annotation == null) { - return true; // same — no annotation to cross-check against - } - Set listed = PipelineDependencyLabels.parseAnnotation(annotation); - return listed.contains(identifier); - } - - /** - * Builds a human-readable blocker description: the pipeline name, plus (when present) the top - * ownerReference's {@code kind/name} so the user knows which higher-level resource owns it. - */ - private static String describeBlocker(V1alpha1Pipeline pipeline) { - V1ObjectMeta meta = pipeline.getMetadata(); - String name = meta == null ? "" : meta.getName(); - String ownerSuffix = ""; - if (meta != null && meta.getOwnerReferences() != null && !meta.getOwnerReferences().isEmpty()) { - V1OwnerReference owner = meta.getOwnerReferences().get(0); - ownerSuffix = " (owned by " + owner.getKind() + "/" + owner.getName() + ")"; - } - return name + ownerSuffix; - } -} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabels.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabels.java deleted file mode 100644 index d9662337..00000000 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabels.java +++ /dev/null @@ -1,123 +0,0 @@ -package com.linkedin.hoptimator.k8s; - -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; - -import com.linkedin.hoptimator.Sink; -import com.linkedin.hoptimator.Source; - - -/** - * Computes the labels and annotation that encode a Pipeline CRD's dependency edges. - * - *

Every source and sink a pipeline references is recorded as a label: - * {@code hoptimator.linkedin.com/depends-on-: "_"} where - * {@code } is a deterministic hash derived from {@code database + "_" + pathString}. - * The hash keeps label keys within Kubernetes's 63-character name limit for arbitrary paths, - * and lets {@code K8sApi.select} filter pipelines by dependency on the server. - * - *

A collision-guard annotation ({@code ANNOTATION_KEY}) lists all logical identifiers verbatim, - * so the delete-time check can distinguish a real dependency match from a rare hash collision. - */ -public final class PipelineDependencyLabels { - - static final String LABEL_PREFIX = "hoptimator.linkedin.com/depends-on-"; - public static final String ANNOTATION_KEY = "hoptimator.linkedin.com/depends-on"; - - private static final int SLUG_LENGTH = 16; // 64 bits of SHA-256 → ~1 in 1.8e19 collisions - private static final int MAX_LABEL_VALUE = 63; - - private PipelineDependencyLabels() { - } - - /** - * Canonical logical identifier for a resource: {@code _}. - */ - public static String identifier(String database, Iterable path) { - return database + "_" + String.join(".", path); - } - - /** Hex slug derived from the full identifier; same identifier always produces the same slug. */ - public static String slug(String database, Iterable path) { - byte[] digest = sha256(identifier(database, path).getBytes(StandardCharsets.UTF_8)); - StringBuilder sb = new StringBuilder(SLUG_LENGTH); - for (int i = 0; i < SLUG_LENGTH / 2; i++) { - sb.append(String.format("%02x", digest[i])); - } - return sb.toString(); - } - - /** Label key a Pipeline carries if it depends on the given resource. */ - public static String labelKey(String database, Iterable path) { - return LABEL_PREFIX + slug(database, path); - } - - /** - * Labels to stamp on a Pipeline CRD — one entry per source and the sink. Both edges - * matter to the guard: dropping a source orphans pipelines that read from it; dropping a sink - * orphans pipelines that write to it. - * - *

Keys are the same as {@link #labelKey}. Values are the readable identifier, truncated - * to 63 chars if necessary (the annotation preserves the untruncated form). Values are - * for debugging purposes only. - */ - public static Map labelsFor(Collection sources, Sink sink) { - Map labels = new LinkedHashMap<>(); - for (Source src : sources) { - labels.put(labelKey(src.database(), src.path()), truncate(identifier(src.database(), src.path()))); - } - if (sink != null) { - labels.put(labelKey(sink.database(), sink.path()), truncate(identifier(sink.database(), sink.path()))); - } - return labels; - } - - /** - * Collision-guard annotation value — comma-separated list of full source and sink identifiers, - * deduplicated and not truncated. The delete-time check cross-references this annotation after - * the label selector narrows the candidate set. - */ - public static String annotationFor(Collection sources, Sink sink) { - Set ids = new LinkedHashSet<>(); - for (Source src : sources) { - ids.add(identifier(src.database(), src.path())); - } - if (sink != null) { - ids.add(identifier(sink.database(), sink.path())); - } - return String.join(",", ids); - } - - /** Parses the collision-guard annotation back into the set of identifiers it encoded. */ - public static Set parseAnnotation(String annotation) { - Set out = new LinkedHashSet<>(); - if (annotation == null || annotation.isEmpty()) { - return out; - } - for (String id : annotation.split(",")) { - String trimmed = id.trim(); - if (!trimmed.isEmpty()) { - out.add(trimmed); - } - } - return out; - } - - private static String truncate(String value) { - return value.length() <= MAX_LABEL_VALUE ? value : value.substring(0, MAX_LABEL_VALUE); - } - - private static byte[] sha256(byte[] input) { - try { - return MessageDigest.getInstance("SHA-256").digest(input); - } catch (NoSuchAlgorithmException e) { - throw new IllegalStateException("SHA-256 unavailable", e); - } - } -} diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/DependencyCheckerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/DependencyCheckerTest.java new file mode 100644 index 00000000..535c3987 --- /dev/null +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/DependencyCheckerTest.java @@ -0,0 +1,206 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1OwnerReference; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +class DependencyCheckerTest { + + @Mock + private K8sApi pipelineApi; + @Mock + private K8sApi triggerApi; + + private static final String DB = "kafka1"; + private static final List PATH = Collections.singletonList("my-topic"); + private static final String IDENTIFIER = "kafka1_my-topic"; + + private static V1alpha1Pipeline pipeline(String name, String ownerKind, String ownerName, + String annotationValue) { + return new V1alpha1Pipeline().metadata(metadata(name, ownerKind, ownerName, annotationValue)); + } + + private static V1alpha1TableTrigger trigger(String name, String ownerKind, String ownerName, + String sourcesAnnotation) { + return new V1alpha1TableTrigger().metadata(metadata(name, ownerKind, ownerName, sourcesAnnotation)); + } + + private static V1ObjectMeta metadata(String name, String ownerKind, String ownerName, + String annotationValue) { + V1ObjectMeta meta = new V1ObjectMeta().name(name); + if (ownerKind != null && ownerName != null) { + meta.addOwnerReferencesItem(new V1OwnerReference().kind(ownerKind).name(ownerName)); + } + if (annotationValue != null) { + Map annotations = new HashMap<>(); + annotations.put(DependencyLabels.ANNOTATION_KEY_SOURCES, annotationValue); + meta.setAnnotations(annotations); + } + return meta; + } + + /** Default both APIs to empty so each test only stubs the side it cares about. */ + private void emptyByDefault() throws SQLException { + String labelKey = DependencyLabels.labelKey(DB, PATH); + lenient().when(pipelineApi.select(labelKey)).thenReturn(Collections.emptyList()); + lenient().when(triggerApi.select(labelKey)).thenReturn(Collections.emptyList()); + } + + @Test + void passesWhenNoMatches() throws SQLException { + emptyByDefault(); + + assertDoesNotThrow(() -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, null, null)); + } + + @Test + void blocksOnExternalPipeline() throws SQLException { + emptyByDefault(); + when(pipelineApi.select(DependencyLabels.labelKey(DB, PATH))) + .thenReturn(Collections.singletonList(pipeline("ext-pipe", "View", "owner", IDENTIFIER))); + + SQLException ex = assertThrows(SQLException.class, + () -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, null, null)); + assertTrue(ex.getMessage().contains("ext-pipe")); + assertTrue(ex.getMessage().contains(IDENTIFIER)); + assertTrue(ex.getMessage().contains("pipeline/ext-pipe"), + "blocker should be tagged with its CRD kind: " + ex.getMessage()); + } + + @Test + void blocksOnExternalTrigger() throws SQLException { + emptyByDefault(); + when(triggerApi.select(DependencyLabels.labelKey(DB, PATH))) + .thenReturn(Collections.singletonList(trigger("backfill", "LogicalTable", "lt-name", IDENTIFIER))); + + SQLException ex = assertThrows(SQLException.class, + () -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, null, null)); + assertTrue(ex.getMessage().contains("trigger/backfill"), + "blocker should be tagged with its CRD kind so the user knows what to look at: " + + ex.getMessage()); + assertTrue(ex.getMessage().contains("LogicalTable/lt-name"), + "owner reference should appear in the blocker description: " + ex.getMessage()); + } + + @Test + void skipsSelfOwnedPipeline() throws SQLException { + emptyByDefault(); + when(pipelineApi.select(DependencyLabels.labelKey(DB, PATH))) + .thenReturn(Collections.singletonList(pipeline("owned-pipe", "LogicalTable", "self-name", IDENTIFIER))); + + assertDoesNotThrow(() -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, "LogicalTable", "self-name")); + } + + @Test + void skipsSelfOwnedTrigger() throws SQLException { + emptyByDefault(); + when(triggerApi.select(DependencyLabels.labelKey(DB, PATH))) + .thenReturn(Collections.singletonList(trigger("owned-trigger", "LogicalTable", "self-name", IDENTIFIER))); + + // Triggers spawned by the same LogicalTable cascade-delete with the parent — they must not + // count as external dependents or you can't drop the LT at all. + assertDoesNotThrow(() -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, "LogicalTable", "self-name")); + } + + @Test + void blocksOnExternalWhenSomeAreSelfOwned() throws SQLException { + emptyByDefault(); + when(pipelineApi.select(DependencyLabels.labelKey(DB, PATH))).thenReturn(Arrays.asList( + pipeline("owned-pipe", "LogicalTable", "self-name", IDENTIFIER), + pipeline("external-pipe", "View", "other-owner", IDENTIFIER))); + + SQLException ex = assertThrows(SQLException.class, + () -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, "LogicalTable", "self-name")); + assertTrue(ex.getMessage().contains("external-pipe")); + assertFalse(ex.getMessage().contains("owned-pipe"), "self-owned pipeline must not be listed"); + } + + @Test + void rejectsSlugCollisionViaAnnotation() throws SQLException { + emptyByDefault(); + // Pipeline labels collide on the slug (which is what api.select matched on) but the + // annotation reveals the actual identifier is different — so this should NOT block. + when(pipelineApi.select(DependencyLabels.labelKey(DB, PATH))) + .thenReturn(Collections.singletonList(pipeline("colliding-pipe", "View", "owner", + "some-other-database/some-other-path"))); + + assertDoesNotThrow(() -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, null, null)); + } + + @Test + void treatsMissingAnnotationAsTrusted() throws SQLException { + emptyByDefault(); + // A pipeline with the matching label but no depends-on annotation (pre-labeling migration + // case, or future code path that didn't write the annotation) is still treated as a blocker. + when(pipelineApi.select(DependencyLabels.labelKey(DB, PATH))) + .thenReturn(Collections.singletonList(pipeline("legacy-pipe", "View", "owner", null))); + + SQLException ex = assertThrows(SQLException.class, + () -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, null, null)); + assertTrue(ex.getMessage().contains("legacy-pipe")); + } + + @Test + void errorMessageIncludesOwnerKindAndName() throws SQLException { + emptyByDefault(); + when(pipelineApi.select(DependencyLabels.labelKey(DB, PATH))) + .thenReturn(Collections.singletonList(pipeline("ext-pipe", "View", "owner", IDENTIFIER))); + + SQLException ex = assertThrows(SQLException.class, + () -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, null, null)); + assertTrue(ex.getMessage().contains("View/owner"), + "error should name the owning View so the user knows what to unhook: " + ex.getMessage()); + } + + @Test + void errorMessageListsAllBlockersAcrossKinds() throws SQLException { + emptyByDefault(); + when(pipelineApi.select(DependencyLabels.labelKey(DB, PATH))).thenReturn(Arrays.asList( + pipeline("p1", "View", "owner1", IDENTIFIER), + pipeline("p2", "View", "owner2", IDENTIFIER))); + when(triggerApi.select(DependencyLabels.labelKey(DB, PATH))).thenReturn( + Collections.singletonList(trigger("t1", "LogicalTable", "lt-name", IDENTIFIER))); + + SQLException ex = assertThrows(SQLException.class, + () -> DependencyChecker.assertNoExternalDependents( + pipelineApi, triggerApi, DB, PATH, null, null)); + assertTrue(ex.getMessage().contains("pipeline/p1")); + assertTrue(ex.getMessage().contains("pipeline/p2")); + assertTrue(ex.getMessage().contains("trigger/t1")); + assertTrue(ex.getMessage().contains("3 active dependent")); + } +} diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/DependencyLabelsTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/DependencyLabelsTest.java new file mode 100644 index 00000000..eea31ef3 --- /dev/null +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/DependencyLabelsTest.java @@ -0,0 +1,179 @@ +package com.linkedin.hoptimator.k8s; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.Sink; +import com.linkedin.hoptimator.Source; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +class DependencyLabelsTest { + + private static Source src(String db, String... path) { + return new Source(db, Arrays.asList(path), Collections.emptyMap()); + } + + private static Sink sink(String db, String... path) { + return new Sink(db, Arrays.asList(path), Collections.emptyMap()); + } + + @Test + void identifierJoinsDatabaseAndPath() { + // Separator is "_" so the identifier is also a valid K8s label value out of the box. + assertEquals("mydb_a.b.c", DependencyLabels.identifier("mydb", Arrays.asList("a", "b", "c"))); + } + + @Test + void slugIsDeterministic() { + String s1 = DependencyLabels.slug("db", Arrays.asList("foo", "bar")); + String s2 = DependencyLabels.slug("db", Arrays.asList("foo", "bar")); + assertEquals(s1, s2); + } + + @Test + void slugVariesByDatabase() { + String a = DependencyLabels.slug("db1", Collections.singletonList("t")); + String b = DependencyLabels.slug("db2", Collections.singletonList("t")); + assertNotEquals(a, b); + } + + @Test + void slugVariesByPath() { + String a = DependencyLabels.slug("db", Arrays.asList("schema", "t")); + String b = DependencyLabels.slug("db", Arrays.asList("schema", "u")); + assertNotEquals(a, b); + } + + @Test + void labelKeyFitsKubernetesNameLimit() { + // Long path stressing the slug — name portion (after the /) must be ≤ 63 chars. + String key = DependencyLabels.labelKey( + "a-really-long-database-name", + Arrays.asList("catalog", "schema", "a_very_long_table_name_that_exceeds_sixty_three_chars")); + String namePortion = key.substring(key.indexOf('/') + 1); + assertTrue(namePortion.length() <= 63, "name portion must be ≤63 chars, got " + namePortion.length()); + assertTrue(namePortion.matches("[a-z0-9]([-a-z0-9_.]*[a-z0-9])?"), + "name portion must match K8s label-name regex, got: " + namePortion); + } + + private static V1ObjectMeta stamp(java.util.List sources, Sink sink) { + V1ObjectMeta meta = new V1ObjectMeta(); + DependencyLabels.stamp(meta, sources, + sink == null ? Collections.emptyList() : Collections.singletonList(sink)); + return meta; + } + + @Test + void stampIncludesSourcesAndSink() { + // Both edges matter: dropping a source orphans readers; dropping a sink orphans writers. + V1ObjectMeta meta = stamp( + Arrays.asList(src("kafka1", "events"), src("venice1", "store")), + sink("mysql1", "outbox")); + + Map labels = meta.getLabels(); + assertEquals(3, labels.size()); + assertTrue(labels.containsKey(DependencyLabels.labelKey("kafka1", Collections.singletonList("events")))); + assertTrue(labels.containsKey(DependencyLabels.labelKey("venice1", Collections.singletonList("store")))); + assertTrue(labels.containsKey(DependencyLabels.labelKey("mysql1", Collections.singletonList("outbox")))); + } + + @Test + void stampHandlesNullSink() { + V1ObjectMeta meta = stamp(Collections.singletonList(src("db", "t")), null); + assertEquals(1, meta.getLabels().size()); + assertNull(meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SINKS)); + } + + @Test + void stampCollapsesSelfLoopIntoOneLabel() { + // Self-loop pipeline: source and sink share a slug, so the map collapses to one entry + // rather than producing duplicate keys. + V1ObjectMeta meta = stamp(Collections.singletonList(src("db", "t")), sink("db", "t")); + assertEquals(1, meta.getLabels().size()); + } + + @Test + void stampLabelValueTruncatedAtSixtyThreeChars() { + String longPath = "this_is_a_really_long_table_name_that_exceeds_sixty_three_chars_by_a_lot"; + V1ObjectMeta meta = stamp(Collections.singletonList(src("db", longPath)), null); + String value = meta.getLabels().values().iterator().next(); + assertTrue(value.length() <= 63); + } + + @Test + void stampLabelValueIsKubernetesLabelValueCompliant() { + // K8s label values must match (([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])? + // — the identifier separator is "_" precisely so this holds out of the box for typical + // (database, path) shapes seen in production. + V1ObjectMeta meta = stamp( + Collections.singletonList(src("ads-database", "ADS", "PAGE_VIEWS")), null); + String value = meta.getLabels().values().iterator().next(); + + assertTrue(value.length() <= 63); + assertTrue(value.matches("(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?"), + "value must satisfy K8s label-value regex, got: " + value); + assertFalse(value.contains("/"), "no '/' separator should leak into the label value"); + } + + @Test + void stampSourcesAnnotationListsOnlySources() { + V1ObjectMeta meta = stamp( + Arrays.asList(src("kafka", "a"), src("venice", "b")), sink("mysql", "c")); + String annotation = meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SOURCES); + assertTrue(annotation.contains("kafka_a")); + assertTrue(annotation.contains("venice_b")); + assertFalse(annotation.contains("mysql_c"), "sinks must not appear in sources annotation"); + } + + @Test + void stampSourcesAnnotationDeduplicatesIdenticalSources() { + V1ObjectMeta meta = stamp(Arrays.asList(src("db", "t"), src("db", "t")), null); + assertEquals("db_t", meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SOURCES)); + } + + @Test + void stampSinkAnnotationCarriesSinkIdentifier() { + V1ObjectMeta meta = stamp(Collections.emptyList(), sink("mysql", "c")); + assertEquals("mysql_c", meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SINKS)); + } + + @Test + void stampSkipsNullDatabaseSource() { + // Triggers may carry a source with a null database (e.g. table-name-only DROP); the stamp + // helper drops those rather than producing a "null_" identifier. + V1ObjectMeta meta = stamp( + Collections.singletonList(src(null, "t")), null); + assertTrue(meta.getLabels() == null || meta.getLabels().isEmpty(), + "no depends-on labels expected when source has null database"); + assertNull(meta.getAnnotations() == null ? null + : meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SOURCES)); + } + + @Test + void parseAnnotationRoundtripFromSourcesValue() { + V1ObjectMeta meta = stamp(Arrays.asList(src("a", "1"), src("b", "2")), null); + String annotation = meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SOURCES); + Set parsed = DependencyLabels.parseAnnotation(annotation); + assertEquals(2, parsed.size()); + assertTrue(parsed.contains("a_1")); + assertTrue(parsed.contains("b_2")); + } + + @Test + void parseAnnotationHandlesNullAndEmpty() { + assertTrue(DependencyLabels.parseAnnotation(null).isEmpty()); + assertTrue(DependencyLabels.parseAnnotation("").isEmpty()); + assertTrue(DependencyLabels.parseAnnotation(" , ").isEmpty()); + } +} diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidatorTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sDependencyValidatorTest.java similarity index 78% rename from hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidatorTest.java rename to hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sDependencyValidatorTest.java index ac94636d..f5e4ab92 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidatorTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sDependencyValidatorTest.java @@ -24,20 +24,20 @@ /** - * Unit tests for {@link K8sPipelineDependencyValidator}. The validator is a thin adapter that - * forwards to {@link PipelineDependencyChecker#assertNoExternalDependents} — these tests use - * {@link MockedStatic} on both K8sContext and PipelineDependencyChecker to verify that source + * Unit tests for {@link K8sDependencyValidator}. The validator is a thin adapter that + * forwards to {@link DependencyChecker#assertNoExternalDependents} — these tests use + * {@link MockedStatic} on both K8sContext and DependencyChecker to verify that source * fields and self-owner fields are forwarded correctly, and that thrown SQLException becomes a * validation issue rather than propagating. */ @ExtendWith(MockitoExtension.class) -class K8sPipelineDependencyValidatorTest { +class K8sDependencyValidatorTest { @Mock private MockedStatic contextStatic; @Mock - private MockedStatic checkerStatic; + private MockedStatic checkerStatic; @Mock private Connection connection; @@ -53,8 +53,8 @@ private static Source source() { void validateForwardsSourceFieldsAndSelfOwnerToChecker() { contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); - K8sPipelineDependencyValidator validator = - new K8sPipelineDependencyValidator(source(), "LogicalTable", "my-table"); + K8sDependencyValidator validator = + new K8sDependencyValidator(source(), "LogicalTable", "my-table"); Validator.Issues issues = new Validator.Issues("test"); validator.validate(issues, connection); @@ -65,7 +65,7 @@ void validateForwardsSourceFieldsAndSelfOwnerToChecker() { ArgumentCaptor kindCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class); - checkerStatic.verify(() -> PipelineDependencyChecker.assertNoExternalDependents( + checkerStatic.verify(() -> DependencyChecker.assertNoExternalDependents( eq(context), dbCaptor.capture(), pathCaptor.capture(), kindCaptor.capture(), nameCaptor.capture())); @@ -81,15 +81,15 @@ void validateForwardsSourceFieldsAndSelfOwnerToChecker() { void validatePassesNullSelfOwnerWhenUnset() { contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); - K8sPipelineDependencyValidator validator = - new K8sPipelineDependencyValidator(source(), null, null); + K8sDependencyValidator validator = + new K8sDependencyValidator(source(), null, null); Validator.Issues issues = new Validator.Issues("test"); validator.validate(issues, connection); ArgumentCaptor kindCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class); - checkerStatic.verify(() -> PipelineDependencyChecker.assertNoExternalDependents( + checkerStatic.verify(() -> DependencyChecker.assertNoExternalDependents( eq(context), nullable(String.class), nullable(List.class), kindCaptor.capture(), nameCaptor.capture())); @@ -101,13 +101,13 @@ void validatePassesNullSelfOwnerWhenUnset() { @SuppressWarnings("unchecked") void validateRecordsCheckerSqlExceptionAsIssue() { contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); - checkerStatic.when(() -> PipelineDependencyChecker.assertNoExternalDependents( + checkerStatic.when(() -> DependencyChecker.assertNoExternalDependents( nullable(K8sContext.class), nullable(String.class), nullable(List.class), nullable(String.class), nullable(String.class))) .thenThrow(new SQLException("3 active pipeline(s) depend on it: p1, p2, p3")); - K8sPipelineDependencyValidator validator = - new K8sPipelineDependencyValidator(source(), null, null); + K8sDependencyValidator validator = + new K8sDependencyValidator(source(), null, null); Validator.Issues issues = new Validator.Issues("test"); validator.validate(issues, connection); diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployerTest.java index a18122ed..1eebc401 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployerTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployerTest.java @@ -62,15 +62,15 @@ void stampsDependencyLabelsForSourcesAndSink() throws SQLException { assertEquals(3, labels.size(), "should have one label per source + one for the sink"); assertTrue(labels.containsKey( - PipelineDependencyLabels.labelKey("kafka1", Collections.singletonList("topic-a")))); + DependencyLabels.labelKey("kafka1", Collections.singletonList("topic-a")))); assertTrue(labels.containsKey( - PipelineDependencyLabels.labelKey("kafka2", Collections.singletonList("topic-b")))); + DependencyLabels.labelKey("kafka2", Collections.singletonList("topic-b")))); assertTrue(labels.containsKey( - PipelineDependencyLabels.labelKey("mysql", Collections.singletonList("outbox")))); + DependencyLabels.labelKey("mysql", Collections.singletonList("outbox")))); } @Test - void stampsCollisionGuardAnnotation() throws SQLException { + void stampsDirectionalAnnotations() throws SQLException { K8sPipelineDeployer deployer = new K8sPipelineDeployer( "p1", List.of("spec"), "SELECT 1", Collections.singletonList(src("kafka", "topic")), @@ -79,9 +79,12 @@ void stampsCollisionGuardAnnotation() throws SQLException { V1alpha1Pipeline pipeline = deployer.toK8sObject(); Map annotations = pipeline.getMetadata().getAnnotations(); - String annotation = annotations.get(PipelineDependencyLabels.ANNOTATION_KEY); - assertNotNull(annotation); - assertTrue(annotation.contains("kafka_topic")); - assertTrue(annotation.contains("mysql_outbox")); + String sources = annotations.get(DependencyLabels.ANNOTATION_KEY_SOURCES); + String sink = annotations.get(DependencyLabels.ANNOTATION_KEY_SINKS); + assertNotNull(sources); + assertNotNull(sink); + assertTrue(sources.contains("kafka_topic"), + "sources annotation should list the kafka source: " + sources); + assertEquals("mysql_outbox", sink, "sink annotation holds the single identifier verbatim"); } } diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java index 56e0c10d..b2c60ab7 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java @@ -1,5 +1,7 @@ package com.linkedin.hoptimator.k8s; +import com.linkedin.hoptimator.Sink; +import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.Trigger; import com.linkedin.hoptimator.UserJob; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; @@ -98,8 +100,8 @@ void updateWithPausedOptionPausesTrigger() throws SQLException { Map options = new HashMap<>(); options.put(Trigger.PAUSED_OPTION, "true"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), "0 * * * *", options, + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); @@ -117,8 +119,8 @@ void updateWithPausedOptionUnpausesTrigger() throws SQLException { Map options = new HashMap<>(); options.put(Trigger.PAUSED_OPTION, "false"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), "0 * * * *", options, + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); @@ -136,8 +138,8 @@ void updateWithPausedOptionCreatesSpecIfNull() throws SQLException { Map options = new HashMap<>(); options.put(Trigger.PAUSED_OPTION, "true"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), "0 * * * *", options, + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); @@ -147,12 +149,78 @@ void updateWithPausedOptionCreatesSpecIfNull() throws SQLException { assertTrue(existing.getSpec().getPaused()); } + @Test + void updateWithPausedOptionAlsoStampsDependsOnLabelsWhenDatabaseSet() throws SQLException { + // The partial-update path (paused-only) used to skip toK8sObject and never refresh the + // depends-on metadata. Pin down that re-applying the LogicalTable through this path now + // stamps the labels/annotation + V1alpha1TableTrigger existing = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("mytrigger")) + .spec(new V1alpha1TableTriggerSpec().paused(true)); + triggers.add(existing); + + Map options = new HashMap<>(); + options.put(Trigger.PAUSED_OPTION, "true"); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), null, options, + new Source("mysql-db", Arrays.asList("MYSQL", "testdb", "events"), Collections.emptyMap()), null); + + K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); + + deployer.update(); + + String expectedLabel = DependencyLabels.labelKey( + "mysql-db", Arrays.asList("MYSQL", "testdb", "events")); + String expectedIdentifier = DependencyLabels.identifier( + "mysql-db", Arrays.asList("MYSQL", "testdb", "events")); + assertNotNull(existing.getMetadata().getLabels(), "labels must be set after partial update"); + assertTrue(existing.getMetadata().getLabels().containsKey(expectedLabel), + "depends-on label must be stamped on the partial-update path: " + existing.getMetadata().getLabels()); + assertNotNull(existing.getMetadata().getAnnotations(), "annotations must be set"); + assertEquals(expectedIdentifier, + existing.getMetadata().getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SOURCES)); + } + + @Test + void updateStampsSinkLabelWhenTriggerCarriesASink() throws SQLException { + // Bridging-tier triggers (LogicalTableDeployer's offline→online flow) carry a Sink in + // addition to their source. Pin that the partial-update path stamps both source and sink + // depends-on labels so the dep-guard reverse lookup finds the trigger from either end. + V1alpha1TableTrigger existing = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("mytrigger")) + .spec(new V1alpha1TableTriggerSpec().paused(true)); + triggers.add(existing); + + Map options = new HashMap<>(); + options.put(Trigger.PAUSED_OPTION, "true"); + Source source = new Source("hdfs-db", Arrays.asList("HDFS", "events"), Collections.emptyMap()); + Sink sink = new Sink("venice-db", Arrays.asList("VENICE", "events"), Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), null, options, source, sink); + + K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); + deployer.update(); + + String sourceLabel = DependencyLabels.labelKey( + "hdfs-db", Arrays.asList("HDFS", "events")); + String sinkLabel = DependencyLabels.labelKey( + "venice-db", Arrays.asList("VENICE", "events")); + String sinkIdentifier = DependencyLabels.identifier( + "venice-db", Arrays.asList("VENICE", "events")); + + assertTrue(existing.getMetadata().getLabels().containsKey(sourceLabel), + "source-side depends-on label must be stamped: " + existing.getMetadata().getLabels()); + assertTrue(existing.getMetadata().getLabels().containsKey(sinkLabel), + "sink-side depends-on label must be stamped: " + existing.getMetadata().getLabels()); + assertEquals(sinkIdentifier, + existing.getMetadata().getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SINKS), + "depends-on-sink annotation must record the sink identifier verbatim"); + } + @Test void updateWithPausedOptionThrowsWhenTriggerNotFound() { Map options = new HashMap<>(); options.put(Trigger.PAUSED_OPTION, "true"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), "0 * * * *", options, + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); @@ -166,8 +234,8 @@ void deleteRemovesTrigger() throws SQLException { .spec(new V1alpha1TableTriggerSpec()); triggers.add(existing); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); @@ -178,8 +246,8 @@ void deleteRemovesTrigger() throws SQLException { @Test void deleteThrowsWhenTriggerNotFound() { - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); @@ -194,8 +262,8 @@ void toK8sObjectBuildsCorrectTrigger() throws SQLException { + "metadata:\n name: {{name}}")); jobTemplates.add(jobTemplate); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "TABLE"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("SCHEMA", "TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); @@ -213,8 +281,8 @@ void toK8sObjectWithNoJobPropertiesHasNullJobProperties() throws SQLException { jobTemplates.add(jobTemplate); // No job.properties.* options — spec should NOT have jobProperties set - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "TABLE"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("SCHEMA", "TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); List specs = deployer.specify(); @@ -234,8 +302,8 @@ void toK8sObjectWithJobPropertiesIncludesThemInSpec() throws SQLException { Map options = new HashMap<>(); options.put("job.properties.parallelism", "4"); options.put("job.properties.restart-strategy", "never"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "TABLE"), "0 * * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", options, + new Source(null, Arrays.asList("SCHEMA", "TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); List specs = deployer.specify(); @@ -258,8 +326,8 @@ void toK8sObjectOptionsPutAllIncludedInEnvironment() throws SQLException { Map options = new HashMap<>(); options.put("someKey", "someValue"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "MY_TABLE"), "5 4 * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "5 4 * * *", options, + new Source(null, Arrays.asList("SCHEMA", "MY_TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); List specs = deployer.specify(); @@ -282,8 +350,8 @@ void toK8sObjectForEachAppliesJobPropertiesFilter() throws SQLException { options.put("job.properties.key1", "val1"); options.put("job.properties.key2", "val2"); options.put("other.option", "ignored"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "TABLE"), "0 * * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", options, + new Source(null, Arrays.asList("SCHEMA", "TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); List specs = deployer.specify(); @@ -305,8 +373,8 @@ void updateWithPausedOptionCallsApiUpdate() throws SQLException { Map options = new HashMap<>(); options.put(Trigger.PAUSED_OPTION, "true"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob(null, "myjob"), "0 * * * *", options, + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); deployer.update(); @@ -326,8 +394,8 @@ void updateWithChangedSpecCallsSuperUpdate() throws SQLException { jobTemplates.add(jobTemplate); // No PAUSED_OPTION → should fall through to super.update() - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "TABLE"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("SCHEMA", "TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); // super.update() calls api.update() via K8sDeployer; FakeK8sApi.update adds to list @@ -339,8 +407,8 @@ void updateWithChangedSpecCallsSuperUpdate() throws SQLException { @Test void deleteOnNonExistingTriggerThrowsSqlException() { // Graceful handling when trigger not found - Trigger trigger = new Trigger("NONEXISTENT", new UserJob(null, "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("NONEXISTENT", new UserJob(null, "myjob"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); @@ -360,8 +428,8 @@ void updatePreservesPausedWhenOptionsHaveNoPausedOption() throws SQLException { .spec(new V1alpha1TableTriggerSpec().paused(true)); triggers.add(existing); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "myjob"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); deployer.update(); @@ -378,8 +446,8 @@ void updatePreservesUnpausedWhenOptionsHaveNoPausedOption() throws SQLException .spec(new V1alpha1TableTriggerSpec().paused(false)); triggers.add(existing); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "myjob"), - Arrays.asList("schema", "table"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "myjob"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("schema", "table"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); deployer.update(); @@ -399,7 +467,7 @@ void updateWithPausedOptionFalseUnpausesAlreadyPausedTrigger() throws SQLExcepti Map options = new HashMap<>(); options.put(Trigger.PAUSED_OPTION, "false"); - Trigger trigger = new Trigger("MY_TRIGGER", null, new ArrayList<>(), null, options); + Trigger trigger = new Trigger("MY_TRIGGER", null, null, options, new Source(null, new ArrayList<>(), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); deployer.update(); @@ -418,8 +486,8 @@ void updateFallsThroughToSuperUpdateWhenNoExistingAndNoPausedOption() throws SQL .spec(new V1alpha1JobTemplateSpec().yaml("template: {{name}}")); jobTemplates.add(jobTemplate); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "TABLE"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("SCHEMA", "TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); deployer.update(); @@ -443,8 +511,8 @@ void updateFallsThroughToSuperUpdateWhenExistingHasNullPaused() throws SQLExcept .spec(new V1alpha1JobTemplateSpec().yaml("template: {{name}}")); jobTemplates.add(jobTemplate); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "TABLE"), "0 * * * *", Collections.emptyMap()); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", Collections.emptyMap(), + new Source(null, Arrays.asList("SCHEMA", "TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); deployer.update(); @@ -466,8 +534,8 @@ void toK8sObjectSetsSpecPausedWhenPausedOptionTrue() throws SQLException { Map options = new HashMap<>(); options.put(Trigger.PAUSED_OPTION, "true"); - Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), - Arrays.asList("SCHEMA", "TABLE"), "0 * * * *", options); + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", options, + new Source(null, Arrays.asList("SCHEMA", "TABLE"), Collections.emptyMap()), null); K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); List specs = deployer.specify(); diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sValidatorProviderTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sValidatorProviderTest.java index a4cf0189..52372e6b 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sValidatorProviderTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sValidatorProviderTest.java @@ -29,7 +29,7 @@ void returnsDependencyValidatorForPendingDeleteOfSource() { Collection validators = provider.validators(pd, null); assertEquals(1, validators.size()); - assertInstanceOf(K8sPipelineDependencyValidator.class, validators.iterator().next()); + assertInstanceOf(K8sDependencyValidator.class, validators.iterator().next()); } @Test @@ -42,7 +42,7 @@ void returnsDependencyValidatorWhenSelfOwnerIsSet() { Collection validators = provider.validators(pd, null); assertEquals(1, validators.size()); - assertInstanceOf(K8sPipelineDependencyValidator.class, validators.iterator().next()); + assertInstanceOf(K8sDependencyValidator.class, validators.iterator().next()); } @Test diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyCheckerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyCheckerTest.java deleted file mode 100644 index 25cf6a40..00000000 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyCheckerTest.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.linkedin.hoptimator.k8s; - -import java.sql.SQLException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1OwnerReference; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; -import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.when; - - -@ExtendWith(MockitoExtension.class) -class PipelineDependencyCheckerTest { - - @Mock - private K8sApi api; - - private static final String DB = "kafka1"; - private static final List PATH = Collections.singletonList("my-topic"); - private static final String IDENTIFIER = "kafka1_my-topic"; - - private static V1alpha1Pipeline pipeline(String name, String ownerKind, String ownerName, - String annotationValue) { - V1ObjectMeta meta = new V1ObjectMeta().name(name); - if (ownerKind != null && ownerName != null) { - meta.addOwnerReferencesItem(new V1OwnerReference().kind(ownerKind).name(ownerName)); - } - if (annotationValue != null) { - Map annotations = new HashMap<>(); - annotations.put(PipelineDependencyLabels.ANNOTATION_KEY, annotationValue); - meta.setAnnotations(annotations); - } - return new V1alpha1Pipeline().metadata(meta); - } - - @Test - void passesWhenNoPipelinesMatch() throws SQLException { - when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))).thenReturn(Collections.emptyList()); - - assertDoesNotThrow(() -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null)); - } - - @Test - void blocksOnExternalPipeline() throws SQLException { - when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))) - .thenReturn(Collections.singletonList(pipeline("ext-pipe", "View", "owner", IDENTIFIER))); - - SQLException ex = assertThrows(SQLException.class, - () -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null)); - assertTrue(ex.getMessage().contains("ext-pipe")); - assertTrue(ex.getMessage().contains(IDENTIFIER)); - } - - @Test - void skipsSelfOwnedPipeline() throws SQLException { - when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))) - .thenReturn(Collections.singletonList(pipeline("owned-pipe", "LogicalTable", "self-name", IDENTIFIER))); - - assertDoesNotThrow(() -> PipelineDependencyChecker.assertNoExternalDependents( - api, DB, PATH, "LogicalTable", "self-name")); - } - - @Test - void blocksOnExternalWhenSomeAreSelfOwned() throws SQLException { - when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))).thenReturn(Arrays.asList( - pipeline("owned-pipe", "LogicalTable", "self-name", IDENTIFIER), - pipeline("external-pipe", "View", "other-owner", IDENTIFIER))); - - SQLException ex = assertThrows(SQLException.class, - () -> PipelineDependencyChecker.assertNoExternalDependents( - api, DB, PATH, "LogicalTable", "self-name")); - assertTrue(ex.getMessage().contains("external-pipe")); - assertFalse(ex.getMessage().contains("owned-pipe"), "self-owned pipeline must not be listed"); - } - - @Test - void rejectsSlugCollisionViaAnnotation() throws SQLException { - // Pipeline labels collide on the slug (which is what api.select matched on) but the - // annotation reveals the actual identifier is different — so this should NOT block. - when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))) - .thenReturn(Collections.singletonList(pipeline("colliding-pipe", "View", "owner", - "some-other-database/some-other-path"))); - - assertDoesNotThrow(() -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null)); - } - - @Test - void treatsMissingAnnotationAsTrusted() throws SQLException { - // A pipeline with the matching label but no depends-on annotation (pre-labeling migration - // case, or future code path that didn't write the annotation) is still treated as a blocker. - when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))) - .thenReturn(Collections.singletonList(pipeline("legacy-pipe", "View", "owner", null))); - - SQLException ex = assertThrows(SQLException.class, - () -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null)); - assertTrue(ex.getMessage().contains("legacy-pipe")); - } - - @Test - void errorMessageIncludesOwnerKindAndName() throws SQLException { - when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))) - .thenReturn(Collections.singletonList(pipeline("ext-pipe", "View", "owner", IDENTIFIER))); - - SQLException ex = assertThrows(SQLException.class, - () -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null)); - assertTrue(ex.getMessage().contains("View/owner"), - "error should name the owning View so the user knows what to unhook: " + ex.getMessage()); - } - - @Test - void errorMessageListsAllBlockers() throws SQLException { - when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))).thenReturn(Arrays.asList( - pipeline("p1", "View", "owner1", IDENTIFIER), - pipeline("p2", "View", "owner2", IDENTIFIER), - pipeline("p3", "View", "owner3", IDENTIFIER))); - - SQLException ex = assertThrows(SQLException.class, - () -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null)); - assertTrue(ex.getMessage().contains("p1")); - assertTrue(ex.getMessage().contains("p2")); - assertTrue(ex.getMessage().contains("p3")); - assertTrue(ex.getMessage().contains("3 active pipeline")); - } -} diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabelsTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabelsTest.java deleted file mode 100644 index b5cd6afb..00000000 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabelsTest.java +++ /dev/null @@ -1,159 +0,0 @@ -package com.linkedin.hoptimator.k8s; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - -import org.junit.jupiter.api.Test; - -import com.linkedin.hoptimator.Sink; -import com.linkedin.hoptimator.Source; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - - -class PipelineDependencyLabelsTest { - - private static Source src(String db, String... path) { - return new Source(db, Arrays.asList(path), Collections.emptyMap()); - } - - private static Sink sink(String db, String... path) { - return new Sink(db, Arrays.asList(path), Collections.emptyMap()); - } - - @Test - void identifierJoinsDatabaseAndPath() { - // Separator is "_" so the identifier is also a valid K8s label value out of the box. - assertEquals("mydb_a.b.c", PipelineDependencyLabels.identifier("mydb", Arrays.asList("a", "b", "c"))); - } - - @Test - void slugIsDeterministic() { - String s1 = PipelineDependencyLabels.slug("db", Arrays.asList("foo", "bar")); - String s2 = PipelineDependencyLabels.slug("db", Arrays.asList("foo", "bar")); - assertEquals(s1, s2); - } - - @Test - void slugVariesByDatabase() { - String a = PipelineDependencyLabels.slug("db1", Collections.singletonList("t")); - String b = PipelineDependencyLabels.slug("db2", Collections.singletonList("t")); - assertNotEquals(a, b); - } - - @Test - void slugVariesByPath() { - String a = PipelineDependencyLabels.slug("db", Arrays.asList("schema", "t")); - String b = PipelineDependencyLabels.slug("db", Arrays.asList("schema", "u")); - assertNotEquals(a, b); - } - - @Test - void labelKeyFitsKubernetesNameLimit() { - // Long path stressing the slug — name portion (after the /) must be ≤ 63 chars. - String key = PipelineDependencyLabels.labelKey( - "a-really-long-database-name", - Arrays.asList("catalog", "schema", "a_very_long_table_name_that_exceeds_sixty_three_chars")); - String namePortion = key.substring(key.indexOf('/') + 1); - assertTrue(namePortion.length() <= 63, "name portion must be ≤63 chars, got " + namePortion.length()); - assertTrue(namePortion.matches("[a-z0-9]([-a-z0-9_.]*[a-z0-9])?"), - "name portion must match K8s label-name regex, got: " + namePortion); - } - - @Test - void labelsForIncludesSourcesAndSink() { - // Both edges matter: dropping a source orphans readers; dropping a sink orphans writers. - Source s1 = src("kafka1", "events"); - Source s2 = src("venice1", "store"); - Sink sink = sink("mysql1", "outbox"); - Map labels = PipelineDependencyLabels.labelsFor(Arrays.asList(s1, s2), sink); - - assertEquals(3, labels.size()); - assertTrue(labels.containsKey(PipelineDependencyLabels.labelKey("kafka1", Collections.singletonList("events")))); - assertTrue(labels.containsKey(PipelineDependencyLabels.labelKey("venice1", Collections.singletonList("store")))); - assertTrue(labels.containsKey(PipelineDependencyLabels.labelKey("mysql1", Collections.singletonList("outbox")))); - } - - @Test - void labelsForHandlesNullSink() { - Map labels = PipelineDependencyLabels.labelsFor( - Collections.singletonList(src("db", "t")), null); - assertEquals(1, labels.size()); - } - - @Test - void labelsForCollapsesSelfLoopIntoOneEntry() { - // Self-loop pipeline: source and sink share a slug, so the map collapses to one entry - // rather than producing duplicate keys. - Source s = src("db", "t"); - Sink k = sink("db", "t"); - Map labels = PipelineDependencyLabels.labelsFor(Collections.singletonList(s), k); - assertEquals(1, labels.size()); - } - - @Test - void labelValueTruncatedAtSixtyThreeChars() { - String longPath = "this_is_a_really_long_table_name_that_exceeds_sixty_three_chars_by_a_lot"; - Map labels = PipelineDependencyLabels.labelsFor( - Collections.singletonList(src("db", longPath)), null); - String value = labels.values().iterator().next(); - assertTrue(value.length() <= 63); - } - - @Test - void labelValueIsKubernetesLabelValueCompliant() { - // K8s label values must match (([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])? - // — the identifier separator is "_" precisely so this holds out of the box for typical - // (database, path) shapes seen in production. - Map labels = PipelineDependencyLabels.labelsFor( - Collections.singletonList(src("ads-database", "ADS", "PAGE_VIEWS")), null); - String value = labels.values().iterator().next(); - - assertTrue(value.length() <= 63); - assertTrue(value.matches("(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?"), - "value must satisfy K8s label-value regex, got: " + value); - assertFalse(value.contains("/"), "no '/' separator should leak into the label value"); - } - - @Test - void annotationForListsAllIdentifiers() { - // Sources and the sink — both edges are recorded so the delete-time check can disambiguate - // a real dependency from a hash collision regardless of which side matched. - String annotation = PipelineDependencyLabels.annotationFor( - Arrays.asList(src("kafka", "a"), src("venice", "b")), - sink("mysql", "c")); - assertTrue(annotation.contains("kafka_a")); - assertTrue(annotation.contains("venice_b")); - assertTrue(annotation.contains("mysql_c")); - } - - @Test - void annotationForDeduplicatesAndOmitsNullSink() { - String annotation = PipelineDependencyLabels.annotationFor( - Arrays.asList(src("db", "t"), src("db", "t")), null); - assertEquals("db_t", annotation); - } - - @Test - void parseAnnotationRoundtrip() { - String annotation = PipelineDependencyLabels.annotationFor( - Arrays.asList(src("a", "1"), src("b", "2")), sink("c", "3")); - Set parsed = PipelineDependencyLabels.parseAnnotation(annotation); - assertEquals(3, parsed.size()); - assertTrue(parsed.contains("a_1")); - assertTrue(parsed.contains("b_2")); - assertTrue(parsed.contains("c_3")); - } - - @Test - void parseAnnotationHandlesNullAndEmpty() { - assertTrue(PipelineDependencyLabels.parseAnnotation(null).isEmpty()); - assertTrue(PipelineDependencyLabels.parseAnnotation("").isEmpty()); - assertTrue(PipelineDependencyLabels.parseAnnotation(" , ").isEmpty()); - } -} diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java index 4114448e..b8a61406 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java @@ -1,9 +1,10 @@ package com.linkedin.hoptimator.k8s; -import com.linkedin.hoptimator.jdbc.QuidemTestBase; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import com.linkedin.hoptimator.jdbc.QuidemTestBase; + @Tag("integration") public class TestSqlScripts extends QuidemTestBase { @@ -43,6 +44,11 @@ public void k8sTriggerOptions() throws Exception { run("k8s-trigger-options.id"); } + @Test + public void k8sTriggerValidation() throws Exception { + run("k8s-trigger-validation.id"); + } + @Test public void k8sDdlCreateDatabase() throws Exception { run("k8s-ddl-create-database.id"); diff --git a/hoptimator-k8s/src/test/resources/k8s-trigger-validation.id b/hoptimator-k8s/src/test/resources/k8s-trigger-validation.id new file mode 100644 index 00000000..24993f41 --- /dev/null +++ b/hoptimator-k8s/src/test/resources/k8s-trigger-validation.id @@ -0,0 +1,32 @@ +!set outputformat mysql +!use k8s + +# Pre-delete dependency guard for triggers. A trigger carries the same +# `depends-on-` label that pipelines do (stamped from `create trigger`'s +# target identifier), so dropping a source still referenced by a live trigger +# is blocked with the same "active dependent(s)" message a pipeline would +# trigger. Drops the trigger to clear the guard, then re-drops the source. + +create or replace table ads."triggertable" ("i" int, "s" VARCHAR); +(0 rows modified) + +!update + +create trigger blocks_drop on ads."triggertable" as 'my-app' in 'my-mp' with (key 'value'); +(0 rows modified) + +!update + +drop table ads."triggertable"; +active dependent +!error + +drop trigger blocks_drop; +(0 rows modified) + +!update + +drop table ads."triggertable"; +(0 rows modified) + +!update diff --git a/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id b/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id index 38304532..e57ec7f7 100644 --- a/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id +++ b/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id @@ -104,12 +104,12 @@ create or replace materialized view KAFKA."create-table-test$guard" as select * # Source-side guard: existing-topic-2 is a source of the MV's pipeline. drop table KAFKA."existing-topic-2"; -active pipeline(s) depend on it +active dependent !error # Sink-side guard: create-table-test is the partial-view sink of the MV's pipeline. drop table KAFKA."create-table-test"; -active pipeline(s) depend on it +active dependent !error # Drop the dependent MV first; its pipeline (and both labels) go away. diff --git a/hoptimator-logical/src/main/java/com/linkedin/hoptimator/logical/LogicalTableDeployer.java b/hoptimator-logical/src/main/java/com/linkedin/hoptimator/logical/LogicalTableDeployer.java index dbaf7ba3..6cfc0004 100644 --- a/hoptimator-logical/src/main/java/com/linkedin/hoptimator/logical/LogicalTableDeployer.java +++ b/hoptimator-logical/src/main/java/com/linkedin/hoptimator/logical/LogicalTableDeployer.java @@ -28,6 +28,7 @@ import com.linkedin.hoptimator.Deployer; import com.linkedin.hoptimator.PendingDelete; +import com.linkedin.hoptimator.Sink; import com.linkedin.hoptimator.Trigger; import com.linkedin.hoptimator.UserJob; import com.linkedin.hoptimator.Validated; @@ -517,6 +518,8 @@ private void deployImplicitTrigger(Map tierMap, Map tierMap, Map hoptimatorDriverMock; @Mock - MockedStatic validationServiceMock; + MockedStatic validationServiceMock; @Mock K8sLogicalTableDeployer mockCrdDeployer; @@ -144,7 +146,7 @@ K8sLogicalTableDeployer createLogicalTableDeployer( @Override void deployPipelineBundle(String fromTier, String toTier, - Map tierDatabases, + Map tierDatabases, Map tierSources, K8sContext ownerContext, boolean update) { // No-op in CRD-focused tests — pipeline deployment is tested separately. @@ -973,10 +975,10 @@ void implicitTriggerPathUsesOfflineTierPhysicalPath() throws Exception { // testSource() has path [logical, testevent]; offline tier's Database has schema=OFFLINE. // Expect the trigger to target the offline physical schema, not the logical one. - assertEquals("OFFLINE", capture.trigger.schema(), - "Trigger schema must point at the offline tier's physical schema"); - assertEquals("testevent", capture.trigger.table(), - "Trigger table must be the logical table name"); + assertEquals("OFFLINE", capture.trigger.source().schema(), + "Trigger source must point at the offline tier's physical schema"); + assertEquals("testevent", capture.trigger.source().table(), + "Trigger source table must be the logical table name"); } @Test @@ -1054,7 +1056,7 @@ void implicitTriggerForwardsSourceOptionsToTriggerOptions() throws Exception { doReturn(new V1OwnerReference()).when(mockCrdDeployer).createAndReference(); // CREATE TABLE WITH ("ab.cd" "customValue", "job.properties.online.name" "testevent-online") - Map sourceOptions = new java.util.HashMap<>(); + Map sourceOptions = new HashMap<>(); sourceOptions.put("ab.cd", "customValue"); sourceOptions.put("job.properties.online.name", "testevent-online"); Source src = new Source("logical", Arrays.asList("logical", "testevent"), sourceOptions); diff --git a/hoptimator-logical/src/test/resources/logical-ddl.id b/hoptimator-logical/src/test/resources/logical-ddl.id index a8ad290d..90b180e8 100644 --- a/hoptimator-logical/src/test/resources/logical-ddl.id +++ b/hoptimator-logical/src/test/resources/logical-ddl.id @@ -220,7 +220,7 @@ spec: # Drop is blocked — logical table inner pipeline depends on KAFKA/testevent drop table "KAFKA"."testevent"; -active pipeline(s) depend on it +active dependent !error create or replace materialized view VENICE."testevent$guard" as select "KEY" as "KEY", "VALUE" as "memberId" from KAFKA."existing-topic-1"; @@ -231,7 +231,7 @@ create or replace materialized view VENICE."testevent$guard" as select "KEY" as # Drop is blocked — testevent$guard's pipeline depends on VENICE/testevent, # which is LOGICAL.testevent's online tier. drop table "LOGICAL"."testevent"; -active pipeline(s) depend on it +active dependent !error # Drop the dependent MV first to release the label.