diff --git a/hoptimator-api/build.gradle b/hoptimator-api/build.gradle
index 571a1964..d73eca13 100644
--- a/hoptimator-api/build.gradle
+++ b/hoptimator-api/build.gradle
@@ -4,7 +4,8 @@ plugins {
}
dependencies {
- // plz keep it this way
+ // This package should have minimal dependencies
+ compileOnly 'com.google.code.findbugs:jsr305:3.0.2'
}
publishing {
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/PendingDelete.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/PendingDelete.java
new file mode 100644
index 00000000..5c330aa8
--- /dev/null
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/PendingDelete.java
@@ -0,0 +1,55 @@
+package com.linkedin.hoptimator;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+
+/**
+ * A type-tagged wrapper signaling that {@code target} is about to be deleted. Pre-delete
+ * validators (e.g. dependency guards that block DROP TABLE when a pipeline still references
+ * the resource) key off this wrapper rather than the raw target type — so an unrelated future
+ * caller of {@code ValidationService.validateOrThrow(source, connection)} doesn't accidentally
+ * trigger delete-intent checks.
+ *
+ *
An optional {@code (selfOwnerKind, selfOwnerName)} lets the caller declare an "umbrella"
+ * K8s resource whose owned objects should be excluded from the dependent set — e.g. a
+ * LogicalTable CRD, so its child Pipeline CRDs (which reference tier sources by SQL) don't
+ * self-block the drop.
+ */
+public final class PendingDelete {
+
+ private final T target;
+ private final String selfOwnerKind;
+ private final String selfOwnerName;
+
+ public PendingDelete(T target) {
+ this(target, null, null);
+ }
+
+ public PendingDelete(T target, @Nullable String selfOwnerKind, @Nullable String selfOwnerName) {
+ this.target = Objects.requireNonNull(target, "target");
+ this.selfOwnerKind = selfOwnerKind;
+ this.selfOwnerName = selfOwnerName;
+ }
+
+ public T target() {
+ return target;
+ }
+
+ /** Kind of the K8s resource whose owned objects should be excluded from the dependent set. */
+ public @Nullable String selfOwnerKind() {
+ return selfOwnerKind;
+ }
+
+ /** Name of the K8s resource whose owned objects should be excluded from the dependent set. */
+ public @Nullable String selfOwnerName() {
+ return selfOwnerName;
+ }
+
+ @Override
+ public String toString() {
+ String self = (selfOwnerKind != null && selfOwnerName != null)
+ ? ", self=" + selfOwnerKind + "/" + selfOwnerName : "";
+ return "PendingDelete[" + target + self + "]";
+ }
+}
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validated.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validated.java
index fae19bcf..7c32c148 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validated.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validated.java
@@ -1,6 +1,14 @@
package com.linkedin.hoptimator;
+import java.sql.Connection;
+
+
public interface Validated {
- void validate(Validator.Issues issues);
+ /**
+ * Validates {@code this}, recording any problems in {@code issues}. The connection is always
+ * supplied so validators can run lookups against external systems (e.g. pre-delete dependency
+ * checks).
+ */
+ void validate(Validator.Issues issues, Connection connection);
}
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java
index 82e72831..9db6a279 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java
@@ -1,5 +1,6 @@
package com.linkedin.hoptimator;
+import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -52,8 +53,8 @@ public DefaultValidator(T t) {
}
@Override
- public void validate(Issues issues) {
- t.validate(issues.child(t.getClass().getSimpleName()));
+ public void validate(Issues issues, Connection connection) {
+ t.validate(issues.child(t.getClass().getSimpleName()), connection);
}
}
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java
index 55b60a44..d4729e4a 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java
@@ -1,9 +1,13 @@
package com.linkedin.hoptimator;
+import java.sql.Connection;
import java.util.Collection;
public interface ValidatorProvider {
- Collection validators(T obj);
+ /**
+ * Returns validators that should be applied to {@code obj}.
+ */
+ Collection validators(T obj, Connection connection);
}
diff --git a/hoptimator-api/src/test/java/com/linkedin/hoptimator/PendingDeleteTest.java b/hoptimator-api/src/test/java/com/linkedin/hoptimator/PendingDeleteTest.java
new file mode 100644
index 00000000..4b5b9144
--- /dev/null
+++ b/hoptimator-api/src/test/java/com/linkedin/hoptimator/PendingDeleteTest.java
@@ -0,0 +1,85 @@
+package com.linkedin.hoptimator;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+class PendingDeleteTest {
+
+ @Test
+ void singleArgConstructorLeavesSelfOwnerNull() {
+ Object target = new Object();
+ PendingDelete pd = new PendingDelete<>(target);
+
+ assertSame(target, pd.target());
+ assertNull(pd.selfOwnerKind());
+ assertNull(pd.selfOwnerName());
+ }
+
+ @Test
+ void threeArgConstructorStoresSelfOwner() {
+ Object target = new Object();
+ PendingDelete pd = new PendingDelete<>(target, "LogicalTable", "my-table");
+
+ assertSame(target, pd.target());
+ assertEquals("LogicalTable", pd.selfOwnerKind());
+ assertEquals("my-table", pd.selfOwnerName());
+ }
+
+ @Test
+ void threeArgConstructorAcceptsNullSelfOwner() {
+ Object target = new Object();
+ PendingDelete pd = new PendingDelete<>(target, null, null);
+
+ assertNull(pd.selfOwnerKind());
+ assertNull(pd.selfOwnerName());
+ }
+
+ @Test
+ void nullTargetThrows() {
+ assertThrows(NullPointerException.class, () -> new PendingDelete<>(null));
+ assertThrows(NullPointerException.class,
+ () -> new PendingDelete<>(null, "LogicalTable", "my-table"));
+ }
+
+ @Test
+ void toStringIncludesTargetAndSelfOwnerWhenPresent() {
+ PendingDelete pd = new PendingDelete<>("the-target", "LogicalTable", "my-table");
+ String s = pd.toString();
+ assertTrue(s.contains("the-target"), "toString should include target: " + s);
+ assertTrue(s.contains("LogicalTable/my-table"), "toString should include kind/name: " + s);
+ }
+
+ @Test
+ void toStringOmitsSelfOwnerWhenNull() {
+ PendingDelete pd = new PendingDelete<>("the-target");
+ String s = pd.toString();
+ assertTrue(s.contains("the-target"));
+ assertFalse(s.contains("self="), "toString should not include self= when not set: " + s);
+ }
+
+ @Test
+ void toStringOmitsSelfOwnerWhenOnlyOneFieldSet() {
+ // The toString contract says self= appears only when both kind and name are non-null.
+ // (The K8sPipelineDependencyChecker.isSelfOwned guard also requires both.)
+ PendingDelete kindOnly = new PendingDelete<>("t", "LogicalTable", null);
+ assertFalse(kindOnly.toString().contains("self="));
+
+ PendingDelete nameOnly = new PendingDelete<>("t", null, "my-table");
+ assertFalse(nameOnly.toString().contains("self="));
+ }
+
+ @Test
+ void targetGenericTypeIsPreserved() {
+ Source source = new Source("db", java.util.List.of("schema", "tbl"), java.util.Map.of());
+ PendingDelete pd = new PendingDelete<>(source);
+ Source unwrapped = pd.target();
+ assertEquals("tbl", unwrapped.table());
+ }
+}
diff --git a/hoptimator-api/src/test/java/com/linkedin/hoptimator/ValidatorTest.java b/hoptimator-api/src/test/java/com/linkedin/hoptimator/ValidatorTest.java
index ddd98438..ab1f4719 100644
--- a/hoptimator-api/src/test/java/com/linkedin/hoptimator/ValidatorTest.java
+++ b/hoptimator-api/src/test/java/com/linkedin/hoptimator/ValidatorTest.java
@@ -184,10 +184,10 @@ void testCheckClosedMessageContainsPath() {
@Test
void testDefaultValidatorDelegatesToTarget() {
- Validated target = issues -> issues.error("target error");
+ Validated target = (issues, conn) -> issues.error("target error");
Validator.DefaultValidator validator = new Validator.DefaultValidator<>(target);
Validator.Issues issues = new Validator.Issues("root");
- validator.validate(issues);
+ validator.validate(issues, null);
assertFalse(issues.valid());
}
diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java
index 8d9f45eb..57d35fed 100644
--- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java
+++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java
@@ -17,6 +17,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.sql.Connection;
/** Validates that tables follow Avro schema evolution rules. */
@@ -29,7 +30,7 @@ class AvroTableValidator implements Validator {
}
@Override
- public void validate(Issues issues) {
+ public void validate(Issues issues, Connection connection) {
try {
CalciteSchema originalSchema = schema.unwrap(CalciteSchema.class);
if (originalSchema == null || originalSchema.schema == null) {
diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java
index 0f4c3ef7..de5a986b 100644
--- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java
+++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java
@@ -4,6 +4,7 @@
import com.linkedin.hoptimator.ValidatorProvider;
import org.apache.calcite.schema.SchemaPlus;
+import java.sql.Connection;
import java.util.Collection;
import java.util.Collections;
@@ -12,7 +13,7 @@
public class AvroValidatorProvider implements ValidatorProvider {
@Override
- public Collection validators(T obj) {
+ public Collection validators(T obj, Connection connection) {
if (obj instanceof SchemaPlus) {
return Collections.singletonList(new AvroTableValidator((SchemaPlus) obj));
} else {
diff --git a/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroTableValidatorTest.java b/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroTableValidatorTest.java
index 0d10c71e..b843bf96 100644
--- a/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroTableValidatorTest.java
+++ b/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroTableValidatorTest.java
@@ -41,7 +41,7 @@ void testValidateCatchesClassCastExceptionSilently() {
AvroTableValidator validator = new AvroTableValidator(schema);
Validator.Issues issues = new Validator.Issues("test");
- validator.validate(issues);
+ validator.validate(issues, null);
assertTrue(issues.valid(), "ClassCastException should be silently caught");
}
@@ -53,7 +53,7 @@ void testValidateThrowsForNullOriginalSchema() {
AvroTableValidator validator = new AvroTableValidator(schema);
Validator.Issues issues = new Validator.Issues("test");
- assertThrows(IllegalArgumentException.class, () -> validator.validate(issues));
+ assertThrows(IllegalArgumentException.class, () -> validator.validate(issues, null));
}
@Test
@@ -99,7 +99,7 @@ protected Map getTableMap() {
AvroTableValidator validator = new AvroTableValidator(schema);
Validator.Issues issues = new Validator.Issues("root");
- validator.validate(issues);
+ validator.validate(issues, null);
assertFalse(issues.valid(),
"Incompatible schema (INT→VARCHAR) should produce validation errors");
@@ -137,7 +137,7 @@ protected Map getTableMap() {
AvroTableValidator validator = new AvroTableValidator(schema);
Validator.Issues issues = new Validator.Issues("root");
- validator.validate(issues);
+ validator.validate(issues, null);
assertTrue(issues.valid(), "Compatible schemas should pass validation without errors");
}
diff --git a/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroValidatorProviderTest.java b/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroValidatorProviderTest.java
index 6ce8ba25..0bb41746 100644
--- a/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroValidatorProviderTest.java
+++ b/hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroValidatorProviderTest.java
@@ -24,7 +24,7 @@ class AvroValidatorProviderTest {
void testValidatorsReturnsAvroTableValidatorForSchemaPlus() {
AvroValidatorProvider provider = new AvroValidatorProvider();
- Collection validators = provider.validators(schemaPlus);
+ Collection validators = provider.validators(schemaPlus, null);
assertEquals(1, validators.size());
assertInstanceOf(AvroTableValidator.class, validators.iterator().next());
@@ -34,7 +34,7 @@ void testValidatorsReturnsAvroTableValidatorForSchemaPlus() {
void testValidatorsReturnsEmptyForNonSchemaPlus() {
AvroValidatorProvider provider = new AvroValidatorProvider();
- Collection validators = provider.validators("not-a-schema");
+ Collection validators = provider.validators("not-a-schema", null);
assertTrue(validators.isEmpty());
}
@@ -43,7 +43,7 @@ void testValidatorsReturnsEmptyForNonSchemaPlus() {
void testValidatorsReturnsEmptyForNull() {
AvroValidatorProvider provider = new AvroValidatorProvider();
- Collection validators = provider.validators(null);
+ Collection validators = provider.validators(null, null);
assertTrue(validators.isEmpty());
}
diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorBase.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorBase.java
index 44a1de00..c6a8b12f 100644
--- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorBase.java
+++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorBase.java
@@ -6,6 +6,8 @@
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.lookup.LikePattern;
+import java.sql.Connection;
+
/** Base class for shared schema evolution validators. */
abstract class CompatibilityValidatorBase implements Validator {
@@ -17,7 +19,7 @@ abstract class CompatibilityValidatorBase implements Validator {
}
@Override
- public void validate(Issues issues) {
+ public void validate(Issues issues, Connection connection) {
try {
CalciteSchema originalSchema = schema.unwrap(CalciteSchema.class);
if (originalSchema == null || originalSchema.schema == null) {
diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProvider.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProvider.java
index 46579b8b..32588782 100644
--- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProvider.java
+++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProvider.java
@@ -4,6 +4,7 @@
import com.linkedin.hoptimator.ValidatorProvider;
import org.apache.calcite.schema.SchemaPlus;
+import java.sql.Connection;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -13,7 +14,7 @@
public class CompatibilityValidatorProvider implements ValidatorProvider {
@Override
- public Collection validators(T obj) {
+ public Collection validators(T obj, Connection connection) {
if (obj instanceof SchemaPlus) {
return Arrays.asList(new Validator[]{new BackwardCompatibilityValidator((SchemaPlus) obj),
new ForwardCompatibilityValidator((SchemaPlus) obj)});
diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/DefaultValidatorProvider.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/DefaultValidatorProvider.java
index 8461b26e..fb21346e 100644
--- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/DefaultValidatorProvider.java
+++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/DefaultValidatorProvider.java
@@ -4,6 +4,7 @@
import com.linkedin.hoptimator.Validator;
import com.linkedin.hoptimator.ValidatorProvider;
+import java.sql.Connection;
import java.util.Collection;
import java.util.Collections;
@@ -12,7 +13,7 @@
public class DefaultValidatorProvider implements ValidatorProvider {
@Override
- public Collection validators(T obj) {
+ public Collection validators(T obj, Connection connection) {
if (obj instanceof Validated) {
return Collections.singletonList(new Validator.DefaultValidator<>((Validated) obj));
} else {
diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java
index 678de93b..18e229e7 100644
--- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java
+++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java
@@ -20,6 +20,7 @@
package com.linkedin.hoptimator.jdbc;
import com.linkedin.hoptimator.Deployer;
+import com.linkedin.hoptimator.PendingDelete;
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.Trigger;
import com.linkedin.hoptimator.UserJob;
@@ -107,7 +108,7 @@ public DdlExecutor getDdlExecutor() {
public void execute(SqlCreateView create, CalcitePrepare.Context context) {
logger.info("Validating statement: {}", create);
try {
- ValidationService.validateOrThrow(create);
+ ValidationService.validateOrThrow(create, connection);
} catch (SQLException e) {
throw new DdlException(create, e.getMessage(), e);
}
@@ -145,9 +146,9 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
Collection deployers = null;
try {
logger.info("Validating deployable resources for view {}", viewName);
- ValidationService.validateOrThrow(viewTable);
+ ValidationService.validateOrThrow(viewTable, connection);
deployers = DeploymentService.deployers(view, connection);
- ValidationService.validateOrThrow(deployers);
+ ValidationService.validateOrThrow(deployers, connection);
logger.info("Validated view {}", viewName);
if (create.getReplace()) {
logger.info("Deploying update view {}", viewName);
@@ -195,7 +196,7 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
public void execute(SqlCreateTrigger create, CalcitePrepare.Context context) {
logger.info("Validating statement: {}", create);
try {
- ValidationService.validateOrThrow(create);
+ ValidationService.validateOrThrow(create, connection);
} catch (SQLException e) {
throw new DdlException(create, e.getMessage(), e);
}
@@ -233,9 +234,9 @@ public void execute(SqlCreateTrigger create, CalcitePrepare.Context context) {
Collection deployers = null;
try {
logger.info("Validating trigger {} with deployers", name);
- ValidationService.validateOrThrow(trigger);
+ ValidationService.validateOrThrow(trigger, connection);
deployers = DeploymentService.deployers(trigger, connection);
- ValidationService.validateOrThrow(deployers);
+ ValidationService.validateOrThrow(deployers, connection);
logger.info("Validated trigger {}", name);
if (create.getReplace()) {
logger.info("Updating trigger {}", name);
@@ -296,7 +297,7 @@ public void execute(SqlResumeTrigger resume, CalcitePrepare.Context context) {
public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) {
logger.info("Validating statement: {}", drop);
try {
- ValidationService.validateOrThrow(drop);
+ ValidationService.validateOrThrow(drop, connection);
} catch (SQLException e) {
throw new DdlException(drop, e.getMessage(), e);
}
@@ -331,7 +332,7 @@ public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) {
private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName, boolean paused) {
logger.info("Validating statement: {}", sqlNode);
try {
- ValidationService.validateOrThrow(sqlNode);
+ ValidationService.validateOrThrow(sqlNode, connection);
} catch (SQLException e) {
throw new DdlException(sqlNode, e.getMessage(), e);
}
@@ -366,7 +367,7 @@ private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName
public void execute(SqlDropObject drop, CalcitePrepare.Context context) {
logger.info("Validating statement: {}", drop);
try {
- ValidationService.validateOrThrow(drop);
+ ValidationService.validateOrThrow(drop, connection);
} catch (SQLException e) {
throw new DdlException(drop, e.getMessage(), e);
}
@@ -435,6 +436,10 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) {
TemporaryTable temporaryTable = (TemporaryTable) table;
source = new Source(temporaryTable.databaseName(), tablePath, Collections.emptyMap());
}
+ // Pre-delete dependency guard. PendingDelete is the explicit "delete intent" signal
+ // — only validators that key off it (the K8s dep checker) fire here. The check throws
+ // before any deployer-level state change.
+ ValidationService.validateOrThrow(new PendingDelete<>(source), connection);
deployers = DeploymentService.deployers(source, connection);
logger.info("Deleting table {}", tableName);
DeploymentService.delete(deployers);
diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtils.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtils.java
index 6ff1e551..5e29f047 100644
--- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtils.java
+++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtils.java
@@ -296,7 +296,7 @@ static SpecifyResult processCreateMaterializedView(CalcitePrepare.Context ctx,
HoptimatorConnection.HoptimatorConnectionDualLogger logger = conn.getLogger(HoptimatorDdlUtils.class);
// Validate the DDL statement.
logger.info("Validating statement: {}", create);
- ValidationService.validateOrThrow(create);
+ ValidationService.validateOrThrow(create, conn);
// Extract query SQL (rename columns if a column list was provided) and plan the query.
// This is done first — before schema/conflict checks — so that:
@@ -374,10 +374,10 @@ static SpecifyResult processCreateMaterializedView(CalcitePrepare.Context ctx,
// Validate the hook and its deployers.
logger.info("Validating materialized view {}", viewName);
- ValidationService.validateOrThrow(hook);
+ ValidationService.validateOrThrow(hook, conn);
deployers = DeploymentService.deployers(hook, conn);
logger.info("Validating deployable resources for materialized view {}", viewName);
- ValidationService.validateOrThrow(deployers);
+ ValidationService.validateOrThrow(deployers, conn);
logger.info("Validated materialized view {}", viewName);
// Execute (create/update) or collect specs (specify).
@@ -442,7 +442,7 @@ static SpecifyResult processCreateTable(CalcitePrepare.Context ctx, HoptimatorCo
HoptimatorConnection.HoptimatorConnectionDualLogger logger = conn.getLogger(HoptimatorDdlUtils.class);
logger.info("Validating statement: {}", create);
- ValidationService.validateOrThrow(create);
+ ValidationService.validateOrThrow(create, conn);
// TODO: Add support for populating new tables from a query as a one-time operation.
if (create.query != null) {
@@ -573,10 +573,10 @@ public RexNode newColumnDefaultValue(RelOptTable table, int iColumn,
boolean success = false;
try {
logger.info("Validating new table {}", source);
- ValidationService.validateOrThrow(source);
+ ValidationService.validateOrThrow(source, conn);
deployers = DeploymentService.deployers(source, conn);
logger.info("Validating deployable resources for table {}", tableName);
- ValidationService.validateOrThrow(deployers);
+ ValidationService.validateOrThrow(deployers, conn);
if (mode == DdlMode.UPDATE) {
logger.info("Deploying update table {}", source);
@@ -638,7 +638,7 @@ static SpecifyResult processCreateDatabase(HoptimatorConnection conn,
HoptimatorConnection.HoptimatorConnectionDualLogger logger = conn.getLogger(HoptimatorDdlUtils.class);
logger.info("Validating statement: {}", create);
- ValidationService.validateOrThrow(create);
+ ValidationService.validateOrThrow(create, conn);
if (create.name.names.size() > 1) {
throw new SQLException("Database names cannot be compound identifiers.");
@@ -651,9 +651,9 @@ static SpecifyResult processCreateDatabase(HoptimatorConnection conn,
Collection deployers = null;
try {
logger.info("Validating database {}", name);
- ValidationService.validateOrThrow(database);
+ ValidationService.validateOrThrow(database, conn);
deployers = DeploymentService.deployers(database, conn);
- ValidationService.validateOrThrow(deployers);
+ ValidationService.validateOrThrow(deployers, conn);
List specs = mode.executeDeployers(deployers, conn);
if (mode.mutable()) {
@@ -892,4 +892,37 @@ public static Runnable registerTemporaryTableInSchema(HoptimatorConnection conn,
return registerTemporaryTable(rootSchema, tableName, rowType, databaseName);
}
}
+
+ /**
+ * Removes {@code tableName} from the tier schema identified by {@code (catalog, schema)}, the
+ * inverse of {@link #registerTemporaryTableInSchema}. Silent no-op if the catalog, schema or
+ * table entry does not exist (e.g. the connection was opened after the table was already gone)
+ * — remove must never fail a user-visible DROP that has already succeeded at the backend.
+ */
+ public static void removeTableFromSchema(HoptimatorConnection conn,
+ @Nullable String catalog, @Nullable String schema, String tableName) {
+ if (conn == null) {
+ return;
+ }
+ SchemaPlus rootSchema = conn.calciteConnection().getRootSchema();
+ SchemaPlus target;
+ if (catalog != null) {
+ SchemaPlus catalogSchemaPlus = rootSchema.subSchemas().get(catalog);
+ if (catalogSchemaPlus == null) {
+ return;
+ }
+ if (schema == null) {
+ target = catalogSchemaPlus;
+ } else {
+ target = catalogSchemaPlus.subSchemas().get(schema);
+ }
+ } else if (schema != null) {
+ target = rootSchema.subSchemas().get(schema);
+ } else {
+ target = rootSchema;
+ }
+ if (target != null && target.tables().get(tableName) != null) {
+ target.removeTable(tableName);
+ }
+ }
}
diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java
index 72fa2e0b..ccd6ceb5 100644
--- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java
+++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java
@@ -28,40 +28,40 @@ public static Validator.Issues validate(Connection connection) {
}
CalciteConnection conn = (CalciteConnection) connection;
Validator.Issues issues = new Validator.Issues("");
- walk(conn.getRootSchema(), issues);
+ walk(conn.getRootSchema(), issues, connection);
return issues;
}
- private static void walk(SchemaPlus schema, Validator.Issues issues) {
- validate(schema, issues);
+ private static void walk(SchemaPlus schema, Validator.Issues issues, Connection connection) {
+ validate(schema, issues, connection);
for (String x : schema.subSchemas().getNames(LikePattern.any())) {
- walk(schema.subSchemas().get(x), issues.child(x));
+ walk(schema.subSchemas().get(x), issues.child(x), connection);
}
for (String x : schema.tables().getNames(LikePattern.any())) {
- walk(schema.tables().get(x), issues.child(x));
+ walk(schema.tables().get(x), issues.child(x), connection);
}
}
- private static void walk(Table table, Validator.Issues issues) {
- validate(table, issues);
+ private static void walk(Table table, Validator.Issues issues, Connection connection) {
+ validate(table, issues, connection);
}
- public static void validate(T obj, Validator.Issues issues) {
- validators(obj).forEach(x -> x.validate(issues));
+ public static void validate(T obj, Validator.Issues issues, Connection connection) {
+ validators(obj, connection).forEach(x -> x.validate(issues, connection));
}
- public static void validateOrThrow(T obj) throws SQLException {
+ public static void validateOrThrow(T obj, Connection connection) throws SQLException {
Validator.Issues issues = new Validator.Issues("");
- validate(obj, issues);
+ validate(obj, issues, connection);
if (!issues.valid()) {
throw new SQLDataException("Failed validation:\n" + issues);
}
}
- public static void validateOrThrow(Collection objs) throws SQLException {
+ public static void validateOrThrow(Collection objs, Connection connection) throws SQLException {
Validator.Issues issues = new Validator.Issues("");
for (T obj : objs) {
- validate(obj, issues);
+ validate(obj, issues, connection);
if (!issues.valid()) {
throw new SQLDataException("Failed validation:\n" + issues);
}
@@ -75,7 +75,7 @@ public static Collection providers() {
return providers;
}
- public static Collection validators(T obj) {
- return providers().stream().flatMap(x -> x.validators(obj).stream()).collect(Collectors.toList());
+ public static Collection validators(T obj, Connection connection) {
+ return providers().stream().flatMap(x -> x.validators(obj, connection).stream()).collect(Collectors.toList());
}
}
diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorBaseTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorBaseTest.java
index 07dd51b4..b8ec8c23 100644
--- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorBaseTest.java
+++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorBaseTest.java
@@ -38,7 +38,7 @@ protected void validate(Table table, Table originalTable, Validator.Issues issue
};
Validator.Issues issues = new Validator.Issues("test");
- validator.validate(issues);
+ validator.validate(issues, null);
assertTrue(issues.valid());
}
@@ -57,7 +57,7 @@ protected void validate(Table table, Table originalTable, Validator.Issues issue
Validator.Issues issues = new Validator.Issues("test");
try {
- validator.validate(issues);
+ validator.validate(issues, null);
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Null original schema"));
}
@@ -99,7 +99,7 @@ protected void validate(Table table, Table originalTable, Validator.Issues issue
};
Validator.Issues issues = new Validator.Issues("test");
- validator.validate(issues);
+ validator.validate(issues, null);
assertTrue(issues.valid());
}
@@ -125,7 +125,7 @@ protected void validate(Table table, Table originalTable, Validator.Issues issue
};
Validator.Issues issues = new Validator.Issues("test");
- validator.validate(issues);
+ validator.validate(issues, null);
// brandNewTable won't have an original, so validate should be skipped
assertTrue(issues.valid());
diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProviderTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProviderTest.java
index db2f3627..2fc0b8a4 100644
--- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProviderTest.java
+++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/CompatibilityValidatorProviderTest.java
@@ -23,7 +23,7 @@ class CompatibilityValidatorProviderTest {
void testValidatorsReturnsTwoValidatorsForSchemaPlus() {
CompatibilityValidatorProvider provider = new CompatibilityValidatorProvider();
- Collection validators = provider.validators(mockSchema);
+ Collection validators = provider.validators(mockSchema, null);
assertEquals(2, validators.size());
}
@@ -32,7 +32,7 @@ void testValidatorsReturnsTwoValidatorsForSchemaPlus() {
void testValidatorsReturnsEmptyForNonSchemaPlus() {
CompatibilityValidatorProvider provider = new CompatibilityValidatorProvider();
- Collection validators = provider.validators("not-a-schema");
+ Collection validators = provider.validators("not-a-schema", null);
assertTrue(validators.isEmpty());
}
@@ -41,7 +41,7 @@ void testValidatorsReturnsEmptyForNonSchemaPlus() {
void testValidatorsReturnsEmptyForNull() {
CompatibilityValidatorProvider provider = new CompatibilityValidatorProvider();
- Collection validators = provider.validators(null);
+ Collection validators = provider.validators(null, null);
assertTrue(validators.isEmpty());
}
diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/DefaultValidatorProviderTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/DefaultValidatorProviderTest.java
index ece661c8..f838f51f 100644
--- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/DefaultValidatorProviderTest.java
+++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/DefaultValidatorProviderTest.java
@@ -15,9 +15,9 @@ class DefaultValidatorProviderTest {
@Test
void testValidatorsReturnsValidatorForValidatedObject() {
DefaultValidatorProvider provider = new DefaultValidatorProvider();
- Validated validated = issues -> issues.error("test error");
+ Validated validated = (issues, conn) -> issues.error("test error");
- Collection validators = provider.validators(validated);
+ Collection validators = provider.validators(validated, null);
assertEquals(1, validators.size());
}
@@ -26,7 +26,7 @@ void testValidatorsReturnsValidatorForValidatedObject() {
void testValidatorsReturnsEmptyForNonValidatedObject() {
DefaultValidatorProvider provider = new DefaultValidatorProvider();
- Collection validators = provider.validators("not-validated");
+ Collection validators = provider.validators("not-validated", null);
assertTrue(validators.isEmpty());
}
@@ -35,7 +35,7 @@ void testValidatorsReturnsEmptyForNonValidatedObject() {
void testValidatorsReturnsEmptyForNull() {
DefaultValidatorProvider provider = new DefaultValidatorProvider();
- Collection validators = provider.validators(null);
+ Collection validators = provider.validators(null, null);
assertTrue(validators.isEmpty());
}
@@ -43,11 +43,11 @@ void testValidatorsReturnsEmptyForNull() {
@Test
void testReturnedValidatorDelegates() {
DefaultValidatorProvider provider = new DefaultValidatorProvider();
- Validated validated = issues -> issues.error("validation failed");
+ Validated validated = (issues, conn) -> issues.error("validation failed");
- Collection validators = provider.validators(validated);
+ Collection validators = provider.validators(validated, null);
Validator.Issues issues = new Validator.Issues("test");
- validators.iterator().next().validate(issues);
+ validators.iterator().next().validate(issues, null);
assertTrue(issues.toString().contains("validation failed"));
}
diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutorTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutorTest.java
index 4ac1507f..a902cccf 100644
--- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutorTest.java
+++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutorTest.java
@@ -1221,10 +1221,10 @@ private void addMaterializedViewToDefaultSchema(CalcitePrepare.Context context,
private void stubValidationToFail() {
// Casting to Object in the lambda forces Java's overload resolution to pick the T-object
// overload (erased to Object) rather than the Collection overload.
- mockValidationService.when(() -> ValidationService.validateOrThrow(any(Object.class)))
+ mockValidationService.when(() -> ValidationService.validateOrThrow(any(Object.class), any()))
.thenThrow(new SQLException("validation failed"));
// Also stub the Collection overload for callers that pass deployer collections.
- mockValidationService.when(() -> ValidationService.validateOrThrow(any(Collection.class)))
+ mockValidationService.when(() -> ValidationService.validateOrThrow(any(Collection.class), any()))
.thenThrow(new SQLException("validation failed"));
}
}
diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtilsTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtilsTest.java
index 4013e367..29a11973 100644
--- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtilsTest.java
+++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtilsTest.java
@@ -44,6 +44,7 @@
import java.util.Map;
import java.util.Properties;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -1604,4 +1605,96 @@ public String databaseName() {
return name;
}
}
+
+ // ---- removeTableFromSchema tests ----
+
+ @Test
+ void removeTableFromSchemaToleratesNullConnection() {
+ // Must not NPE — this overload is called from code paths that may lack a connection.
+ assertDoesNotThrow(() -> HoptimatorDdlUtils.removeTableFromSchema(null, null, null, "ANY"));
+ }
+
+ @Test
+ void removeTableFromSchemaRemovesFromRootWhenNoSchema() throws SQLException {
+ HoptimatorDriver driver = new HoptimatorDriver();
+ try (HoptimatorConnection connection =
+ (HoptimatorConnection) driver.connect("jdbc:hoptimator://catalogs=util", new Properties())) {
+ RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ RelDataType rowType = typeFactory.createStructType(Collections.emptyList(), Collections.emptyList());
+ HoptimatorDdlUtils.registerTemporaryTableInSchema(
+ connection, null, null, "ROOT_TMP", rowType, "db");
+ SchemaPlus rootSchema = connection.calciteConnection().getRootSchema();
+ assertNotNull(rootSchema.tables().get("ROOT_TMP"));
+
+ HoptimatorDdlUtils.removeTableFromSchema(connection, null, null, "ROOT_TMP");
+
+ assertEquals(null, rootSchema.tables().get("ROOT_TMP"));
+ }
+ }
+
+ @Test
+ void removeTableFromSchemaRemovesFromTierSchema() throws SQLException {
+ HoptimatorDriver driver = new HoptimatorDriver();
+ try (HoptimatorConnection connection =
+ (HoptimatorConnection) driver.connect("jdbc:hoptimator://catalogs=util", new Properties())) {
+ SchemaPlus rootSchema = connection.calciteConnection().getRootSchema();
+ rootSchema.add("KAFKA", new AbstractSchema());
+ RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ RelDataType rowType = typeFactory.createStructType(Collections.emptyList(), Collections.emptyList());
+ HoptimatorDdlUtils.registerTemporaryTableInSchema(
+ connection, null, "KAFKA", "my_topic", rowType, "kafka-db");
+ SchemaPlus tierSchema = rootSchema.subSchemas().get("KAFKA");
+ assertNotNull(tierSchema);
+ assertNotNull(tierSchema.tables().get("my_topic"));
+
+ HoptimatorDdlUtils.removeTableFromSchema(connection, null, "KAFKA", "my_topic");
+
+ assertEquals(null, tierSchema.tables().get("my_topic"));
+ }
+ }
+
+ @Test
+ void removeTableFromSchemaRemovesFromCatalogAndSchema() throws SQLException {
+ HoptimatorDriver driver = new HoptimatorDriver();
+ try (HoptimatorConnection connection =
+ (HoptimatorConnection) driver.connect("jdbc:hoptimator://catalogs=util", new Properties())) {
+ SchemaPlus rootSchema = connection.calciteConnection().getRootSchema();
+ SchemaPlus catalogSchema = rootSchema.add("CAT", new AbstractSchema());
+ SchemaPlus dbSchema = catalogSchema.add("DB", new AbstractSchema());
+ RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ RelDataType rowType = typeFactory.createStructType(Collections.emptyList(), Collections.emptyList());
+ HoptimatorDdlUtils.registerTemporaryTableInSchema(
+ connection, "CAT", "DB", "t", rowType, "mysql-db");
+ assertNotNull(dbSchema.tables().get("t"));
+
+ HoptimatorDdlUtils.removeTableFromSchema(connection, "CAT", "DB", "t");
+
+ assertEquals(null, dbSchema.tables().get("t"));
+ }
+ }
+
+ @Test
+ void removeTableFromSchemaIsNoOpWhenEntriesMissing() throws SQLException {
+ HoptimatorDriver driver = new HoptimatorDriver();
+ try (HoptimatorConnection connection =
+ (HoptimatorConnection) driver.connect("jdbc:hoptimator://catalogs=util", new Properties())) {
+ // Missing catalog, missing schema, missing table — each path must be silent.
+ SchemaPlus rootSchema = connection.calciteConnection().getRootSchema();
+
+ assertDoesNotThrow(() ->
+ HoptimatorDdlUtils.removeTableFromSchema(connection, "MISSING_CATALOG", "S", "t"));
+ assertDoesNotThrow(() ->
+ HoptimatorDdlUtils.removeTableFromSchema(connection, null, "MISSING_SCHEMA", "t"));
+ assertDoesNotThrow(() ->
+ HoptimatorDdlUtils.removeTableFromSchema(connection, null, null, "MISSING_TABLE"));
+
+ // None of those calls should have accidentally created the missing catalog / schema.
+ assertNull(rootSchema.subSchemas().get("MISSING_CATALOG"),
+ "missing-catalog call must not create the catalog");
+ assertNull(rootSchema.subSchemas().get("MISSING_SCHEMA"),
+ "missing-schema call must not create the schema");
+ assertNull(rootSchema.tables().get("MISSING_TABLE"),
+ "missing-table call must not create the table");
+ }
+ }
}
diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestSqlScripts.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestSqlScripts.java
index 9c417e65..d735543d 100644
--- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestSqlScripts.java
+++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestSqlScripts.java
@@ -13,6 +13,7 @@
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.sql.Connection;
import java.util.Collection;
import java.util.List;
@@ -76,7 +77,7 @@ private void useTestValidators() throws IOException {
public static class CreateViewValidatorProvider implements ValidatorProvider {
@Override
- public Collection validators(T obj) {
+ public Collection validators(T obj, Connection connection) {
if (obj instanceof SqlCreateView || obj instanceof SqlCreateMaterializedView) {
return List.of(new SqlCreateViewValidator());
}
@@ -88,7 +89,7 @@ static class SqlCreateViewValidator implements Validator {
static final String ERROR_MESSAGE = "Create view is not allowed in this test.";
@Override
- public void validate(Issues issues) {
+ public void validate(Issues issues, Connection connection) {
issues.error(ERROR_MESSAGE);
}
}
diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/ValidationServiceTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/ValidationServiceTest.java
index 52383b04..af2bead6 100644
--- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/ValidationServiceTest.java
+++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/ValidationServiceTest.java
@@ -103,7 +103,7 @@ void testValidateConnectionCallsWalk() throws SQLException {
void testValidateObjIssuesInvokesValidators() {
ValidatorProviderTest.enableErrors();
Validator.Issues issues = new Validator.Issues("test");
- ValidationService.validate("any-object", issues);
+ ValidationService.validate("any-object", issues, null);
assertFalse(issues.valid(),
"validate(obj, issues) must invoke validators; if forEach is removed no error fires");
assertTrue(issues.toString().contains("injected error"),
@@ -115,7 +115,7 @@ void testValidateObjIssuesInvokesValidators() {
void testValidateOrThrowSingleObjectThrowsWhenErrorRecorded() {
ValidatorProviderTest.enableErrors();
assertThrows(SQLException.class,
- () -> ValidationService.validateOrThrow("any-object"),
+ () -> ValidationService.validateOrThrow("any-object", null),
"validateOrThrow must throw when a provider records an error");
}
@@ -125,27 +125,27 @@ void testValidateOrThrowSingleObjectThrowsWhenErrorRecorded() {
@Test
void testValidateOrThrowSingleObjectPassesWhenValid() throws SQLException {
// ValidatorProviderTest is in no-error mode (reset in setUp)
- ValidationService.validateOrThrow("test-object");
+ ValidationService.validateOrThrow("test-object", null);
}
@Test
void testValidateOrThrowCollectionThrowsWhenErrorRecorded() {
ValidatorProviderTest.enableErrors();
assertThrows(SQLException.class,
- () -> ValidationService.validateOrThrow(Arrays.asList("obj1", "obj2")),
+ () -> ValidationService.validateOrThrow(Arrays.asList("obj1", "obj2"), null),
"validateOrThrow(Collection) must throw when provider records an error");
}
@Test
void testValidateOrThrowCollectionPassesWhenValid() throws SQLException {
Collection validObjects = Arrays.asList("obj1", "obj2");
- ValidationService.validateOrThrow(validObjects);
+ ValidationService.validateOrThrow(validObjects, null);
}
@Test
void testValidateOrThrowCollectionPassesWithEmptyCollection() throws SQLException {
Collection emptyCollection = Collections.emptyList();
- ValidationService.validateOrThrow(emptyCollection);
+ ValidationService.validateOrThrow(emptyCollection, null);
}
// ValidatorProviderTest is registered via META-INF/services so ServiceLoader must find it.
@@ -161,7 +161,7 @@ void testProvidersReturnsAtLeastOneRegisteredProvider() {
@Test
void testValidatorsReturnsValidatorsFromRegisteredProvider() {
ValidatorProviderTest.enableErrors();
- Collection validators = ValidationService.validators("any-object");
+ Collection validators = ValidationService.validators("any-object", null);
assertNotNull(validators);
assertFalse(validators.isEmpty(),
"validators() must return the non-empty list provided by ValidatorProviderTest");
@@ -175,14 +175,14 @@ void testProvidersReturnsCollection() {
@Test
void testValidatorsReturnsCollection() {
- Collection validators = ValidationService.validators("test-object");
+ Collection validators = ValidationService.validators("test-object", null);
assertNotNull(validators);
}
@Test
void testValidatePopulatesIssues() {
Validator.Issues issues = new Validator.Issues("test");
- ValidationService.validate("test-object", issues);
+ ValidationService.validate("test-object", issues, null);
assertNotNull(issues);
}
diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/ValidatorProviderTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/ValidatorProviderTest.java
index c8319bc0..9537c4fb 100644
--- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/ValidatorProviderTest.java
+++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/ValidatorProviderTest.java
@@ -3,6 +3,7 @@
import com.linkedin.hoptimator.Validator;
import com.linkedin.hoptimator.ValidatorProvider;
+import java.sql.Connection;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,10 +40,10 @@ static Object lastSeen() {
}
@Override
- public Collection validators(T obj) {
+ public Collection validators(T obj, Connection connection) {
LAST_SEEN.set(obj);
if (SHOULD_ERROR.get()) {
- return Collections.singletonList(issues -> issues.error("ValidatorProviderTest injected error"));
+ return Collections.singletonList((issues, conn) -> issues.error("ValidatorProviderTest injected error"));
}
return Collections.emptyList();
}
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java
index 0f19919e..23ee79ef 100644
--- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java
@@ -154,7 +154,7 @@ public void update(T obj) throws SQLException {
final KubernetesApiResponse resp;
if (existing.isSuccess()) {
- // Ensure labels are additive.
+ // Ensure labels, annotations, and owners are additive.
Map labels = new HashMap<>();
if (existing.getObject().getMetadata().getLabels() != null) {
labels.putAll(existing.getObject().getMetadata().getLabels());
@@ -164,6 +164,15 @@ public void update(T obj) throws SQLException {
}
obj.getMetadata().setLabels(labels);
+ Map annotations = new HashMap<>();
+ if (existing.getObject().getMetadata().getAnnotations() != null) {
+ annotations.putAll(existing.getObject().getMetadata().getAnnotations());
+ }
+ if (obj.getMetadata().getAnnotations() != null) {
+ annotations.putAll(obj.getMetadata().getAnnotations());
+ }
+ obj.getMetadata().setAnnotations(annotations);
+
List owners = new LinkedList<>();
if (existing.getObject().getMetadata().getOwnerReferences() != null) {
owners.addAll(existing.getObject().getMetadata().getOwnerReferences());
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java
index 60d657b9..76967411 100644
--- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java
@@ -2,6 +2,7 @@
import com.linkedin.hoptimator.Deployer;
import com.linkedin.hoptimator.MaterializedView;
+import com.linkedin.hoptimator.Sink;
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.SqlDialect;
import com.linkedin.hoptimator.util.DeploymentService;
@@ -9,6 +10,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
@@ -34,8 +36,8 @@ K8sViewDeployer createViewDeployer(MaterializedView view, K8sContext context) {
}
K8sPipelineBundle createPipelineBundle(String name, List pipelineSpecs, String sql,
- K8sContext viewContext) {
- return new K8sPipelineBundle(name, pipelineSpecs, sql, viewContext);
+ Collection sources, Sink sink, K8sContext viewContext) {
+ return new K8sPipelineBundle(name, pipelineSpecs, sql, sources, sink, viewContext);
}
@Override
@@ -45,7 +47,8 @@ public void create() throws SQLException {
List pipelineSpecs = pipelineSpecs();
V1OwnerReference viewRef = viewDeployer.createAndReference();
K8sContext viewContext = context.withOwner(viewRef);
- K8sPipelineBundle bundle = createPipelineBundle(name, pipelineSpecs, sql(), viewContext);
+ K8sPipelineBundle bundle = createPipelineBundle(name, pipelineSpecs, sql(),
+ view.pipeline().sources(), view.pipeline().sink(), viewContext);
deployers.add(bundle);
bundle.create();
}
@@ -58,7 +61,8 @@ public void update() throws SQLException {
List pipelineSpecs = pipelineSpecs();
V1OwnerReference viewRef = viewDeployer.updateAndReference();
K8sContext viewContext = context.withOwner(viewRef);
- K8sPipelineBundle bundle = createPipelineBundle(name, pipelineSpecs, sql(), viewContext);
+ K8sPipelineBundle bundle = createPipelineBundle(name, pipelineSpecs, sql(),
+ view.pipeline().sources(), view.pipeline().sink(), viewContext);
deployers.add(bundle);
bundle.update();
}
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineBundle.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineBundle.java
index e2b1606b..8fe455d2 100644
--- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineBundle.java
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineBundle.java
@@ -2,11 +2,14 @@
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import io.kubernetes.client.openapi.models.V1OwnerReference;
import com.linkedin.hoptimator.Deployer;
+import com.linkedin.hoptimator.Sink;
+import com.linkedin.hoptimator.Source;
/**
@@ -25,16 +28,22 @@ public class K8sPipelineBundle implements Deployer {
private final K8sPipelineDeployer pipelineDeployer;
private final List deployers = new ArrayList<>();
+ /**
+ * {@code sources} and {@code sink} are stamped as {@code depends-on-*}
+ * labels on the Pipeline CRD so the delete-time guard in {@link PipelineDependencyChecker}
+ * can find this pipeline by label selector.
+ */
public K8sPipelineBundle(String name, List pipelineSpecs, String sql,
- K8sContext ownerContext) {
+ Collection sources, Sink sink, K8sContext ownerContext) {
this.name = name;
this.pipelineSpecs = pipelineSpecs;
this.ownerContext = ownerContext;
- this.pipelineDeployer = createPipelineDeployer(name, pipelineSpecs, sql, ownerContext);
+ this.pipelineDeployer = createPipelineDeployer(name, pipelineSpecs, sql, sources, sink, ownerContext);
}
- K8sPipelineDeployer createPipelineDeployer(String n, List specs, String sql, K8sContext ctx) {
- return new K8sPipelineDeployer(n, specs, sql, ctx);
+ K8sPipelineDeployer createPipelineDeployer(String n, List specs, String sql,
+ Collection sources, Sink sink, K8sContext ctx) {
+ return new K8sPipelineDeployer(n, specs, sql, sources, sink, ctx);
}
K8sYamlDeployerImpl createYamlDeployerImpl(K8sContext pipelineContext, List specs) {
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidator.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidator.java
new file mode 100644
index 00000000..ff06a3be
--- /dev/null
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidator.java
@@ -0,0 +1,39 @@
+package com.linkedin.hoptimator.k8s;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import javax.annotation.Nullable;
+
+import com.linkedin.hoptimator.Source;
+import com.linkedin.hoptimator.Validator;
+
+
+/**
+ * Pre-delete dependency check, run by the validator framework when a {@link Source} is wrapped
+ * in {@link com.linkedin.hoptimator.PendingDelete}. Delegates to the existing
+ * {@link PipelineDependencyChecker} (label-selector + annotation collision-guard + self-owner
+ * exclusion) and surfaces any blocking pipeline as a validation error.
+ */
+final class K8sPipelineDependencyValidator implements Validator {
+
+ private final Source source;
+ private final String selfOwnerKind;
+ private final String selfOwnerName;
+
+ K8sPipelineDependencyValidator(Source source, @Nullable String selfOwnerKind, @Nullable String selfOwnerName) {
+ this.source = source;
+ this.selfOwnerKind = selfOwnerKind;
+ this.selfOwnerName = selfOwnerName;
+ }
+
+ @Override
+ public void validate(Issues issues, Connection connection) {
+ try {
+ PipelineDependencyChecker.assertNoExternalDependents(
+ K8sContext.create(connection), source.database(), source.path(),
+ selfOwnerKind, selfOwnerName);
+ } catch (SQLException e) {
+ issues.error(e.getMessage());
+ }
+ }
+}
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java
index 5e76bb19..0a9cfedf 100644
--- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java
@@ -1,33 +1,64 @@
package com.linkedin.hoptimator.k8s;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+
+import com.linkedin.hoptimator.Sink;
+import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineSpec;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
-
-import java.sql.SQLException;
-import java.util.List;
-/** Deploys a Pipeline object. */
+/**
+ * Deploys a Pipeline object. Stamps {@code depends-on-*} labels and a {@code depends-on}
+ * collision-guard annotation describing which sources/sink the pipeline references, so
+ * {@link PipelineDependencyChecker} can look up dependents by label selector at delete time.
+ *
+ * {@link K8sApi#update} merges labels additively, so stale {@code depends-on-*} labels from
+ * a previous version of the pipeline's SQL can linger. Correctness is preserved by the
+ * annotation, which is rewritten in full on every update: the checker rejects any label-only
+ * match whose annotation doesn't list the target identifier. In return, we avoid the extra
+ * round trip that in-place label stripping would require.
+ */
class K8sPipelineDeployer extends K8sDeployer {
private final String name;
private final String yaml;
private final String sql;
+ private final Collection sources;
+ private final Sink sink;
- K8sPipelineDeployer(String name, List specs, String sql, K8sContext context) {
+ K8sPipelineDeployer(String name, List specs, String sql,
+ Collection sources, Sink sink, K8sContext context) {
super(context, K8sApiEndpoints.PIPELINES);
this.name = name;
this.yaml = String.join("\n---\n", specs);
this.sql = sql;
+ this.sources = sources == null ? Collections.emptyList() : sources;
+ this.sink = sink;
}
@Override
protected V1alpha1Pipeline toK8sObject() throws SQLException {
+ V1ObjectMeta meta = new V1ObjectMeta().name(name);
+ Map labels = PipelineDependencyLabels.labelsFor(sources, sink);
+ if (!labels.isEmpty()) {
+ meta.setLabels(new HashMap<>(labels));
+ Map annotations = new HashMap<>();
+ annotations.put(PipelineDependencyLabels.ANNOTATION_KEY,
+ PipelineDependencyLabels.annotationFor(sources, sink));
+ meta.setAnnotations(annotations);
+ }
return new V1alpha1Pipeline().kind(K8sApiEndpoints.PIPELINES.kind())
.apiVersion(K8sApiEndpoints.PIPELINES.apiVersion())
- .metadata(new V1ObjectMeta().name(name))
+ .metadata(meta)
.spec(new V1alpha1PipelineSpec().sql(sql).yaml(yaml));
}
}
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sValidatorProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sValidatorProvider.java
new file mode 100644
index 00000000..3cc45e4b
--- /dev/null
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sValidatorProvider.java
@@ -0,0 +1,34 @@
+package com.linkedin.hoptimator.k8s;
+
+import java.sql.Connection;
+import java.util.Collection;
+import java.util.Collections;
+
+import com.linkedin.hoptimator.PendingDelete;
+import com.linkedin.hoptimator.Source;
+import com.linkedin.hoptimator.Validator;
+import com.linkedin.hoptimator.ValidatorProvider;
+
+
+/**
+ * Returns a Kubernetes-backed dependency-guard validator for any {@link Source} wrapped in a
+ * {@link PendingDelete} — i.e. when a DROP is issued. Keying off
+ * {@code PendingDelete} (not raw {@code Source}) makes the guard explicitly delete-time:
+ * other callers of {@code ValidationService.validateOrThrow(source, connection)} won't trigger
+ * a pre-delete lookup against K8s.
+ */
+public class K8sValidatorProvider implements ValidatorProvider {
+
+ @Override
+ public Collection validators(T obj, Connection connection) {
+ if (obj instanceof PendingDelete) {
+ PendingDelete> pd = (PendingDelete>) obj;
+ Object target = pd.target();
+ if (target instanceof Source) {
+ return Collections.singletonList(
+ new K8sPipelineDependencyValidator((Source) target, pd.selfOwnerKind(), pd.selfOwnerName()));
+ }
+ }
+ return Collections.emptyList();
+ }
+}
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sViewTable.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sViewTable.java
index 401f6544..19a55611 100644
--- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sViewTable.java
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sViewTable.java
@@ -16,6 +16,7 @@
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.ViewTable;
+import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -157,7 +158,7 @@ public Schema.TableType getJdbcTableType() {
}
@Override
- public void validate(Validator.Issues issues) {
+ public void validate(Validator.Issues issues, Connection connection) {
for (Row row : rows()) {
Validator.Issues issues2 = issues.child(row.toString());
Validator.validateSubdomainName(row.NAME, issues2.child("NAME"));
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java
index bc14dc6d..76fa7857 100644
--- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sYamlApi.java
@@ -115,7 +115,7 @@ public void update(DynamicKubernetesObject obj) throws SQLException {
final KubernetesApiResponse resp;
if (existing.isSuccess()) {
- // Ensure labels are additive. Existing values are kept.
+ // Ensure labels, annotations are additive. Existing values are kept.
Map labels = new HashMap<>();
if (obj.getMetadata().getLabels() != null) {
labels.putAll(obj.getMetadata().getLabels());
@@ -125,6 +125,15 @@ public void update(DynamicKubernetesObject obj) throws SQLException {
}
existing.getObject().getMetadata().setLabels(labels);
+ Map annotations = new HashMap<>();
+ if (obj.getMetadata().getAnnotations() != null) {
+ annotations.putAll(obj.getMetadata().getAnnotations());
+ }
+ if (existing.getObject().getMetadata().getAnnotations() != null) {
+ annotations.putAll(existing.getObject().getMetadata().getAnnotations());
+ }
+ existing.getObject().getMetadata().setAnnotations(annotations);
+
obj.setMetadata(existing.getObject().getMetadata());
resp = context.dynamic(obj.getApiVersion(), K8sUtils.guessPlural(obj)).update(obj);
} else {
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyChecker.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyChecker.java
new file mode 100644
index 00000000..ee9a6b64
--- /dev/null
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyChecker.java
@@ -0,0 +1,116 @@
+package com.linkedin.hoptimator.k8s;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1OwnerReference;
+
+import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
+import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Checks whether any Pipeline CRDs still depend on a resource a {@link com.linkedin.hoptimator.Deployer}
+ * is about to delete.
+ *
+ * The lookup is a label-selector list against the Pipeline CRD group, so it is O(matches) on
+ * the wire — not a full scan. Each candidate is then cross-checked against the
+ * {@link PipelineDependencyLabels#ANNOTATION_KEY} annotation to rule out the (rare) case of a
+ * hash collision in the label slug.
+ *
+ *
Pipelines owned (directly) by {@code (selfOwnerKind, selfOwnerName)} are excluded from the
+ * blocker list: those pipelines will be cascade-deleted alongside the parent resource, so counting
+ * them as external dependents would make composite deletes (e.g. {@code LogicalTableDeployer.delete()})
+ * impossible.
+ */
+public final class PipelineDependencyChecker {
+
+ private PipelineDependencyChecker() {
+ }
+
+ public static void assertNoExternalDependents(K8sContext context, String database,
+ List path, @Nullable String selfOwnerKind, @Nullable String selfOwnerName) throws SQLException {
+ assertNoExternalDependents(new K8sApi<>(context, K8sApiEndpoints.PIPELINES),
+ database, path, selfOwnerKind, selfOwnerName);
+ }
+
+ /** Variant that takes a pre-built {@link K8sApi} — used by tests to inject mocks. */
+ static void assertNoExternalDependents(K8sApi api,
+ String database, List path, @Nullable String selfOwnerKind,
+ @Nullable String selfOwnerName) throws SQLException {
+
+ String labelKey = PipelineDependencyLabels.labelKey(database, path);
+ String identifier = PipelineDependencyLabels.identifier(database, path);
+
+ Collection matches = api.select(labelKey);
+
+ List blockers = new ArrayList<>();
+ for (V1alpha1Pipeline p : matches) {
+ if (isSelfOwned(p, selfOwnerKind, selfOwnerName)) {
+ continue;
+ }
+ if (!annotationConfirms(p, identifier)) {
+ // Label matched but annotation doesn't — this is a slug collision or a stale label, skip it.
+ continue;
+ }
+ blockers.add(describeBlocker(p));
+ }
+
+ if (!blockers.isEmpty()) {
+ throw new SQLException(String.format(
+ "Cannot delete %s — %d active pipeline(s) depend on it: %s",
+ identifier, blockers.size(), String.join(", ", blockers)));
+ }
+ }
+
+ private static boolean isSelfOwned(V1alpha1Pipeline pipeline, @Nullable String selfOwnerKind,
+ @Nullable String selfOwnerName) {
+ if (selfOwnerKind == null || selfOwnerName == null) {
+ return false;
+ }
+ V1ObjectMeta meta = pipeline.getMetadata();
+ if (meta == null || meta.getOwnerReferences() == null) {
+ return false;
+ }
+ for (V1OwnerReference owner : meta.getOwnerReferences()) {
+ if (selfOwnerKind.equals(owner.getKind()) && selfOwnerName.equals(owner.getName())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean annotationConfirms(V1alpha1Pipeline pipeline, String identifier) {
+ V1ObjectMeta meta = pipeline.getMetadata();
+ if (meta == null || meta.getAnnotations() == null) {
+ return true; // pre-labeling pipeline — conservatively trust the label match
+ }
+ String annotation = meta.getAnnotations().get(PipelineDependencyLabels.ANNOTATION_KEY);
+ if (annotation == null) {
+ return true; // same — no annotation to cross-check against
+ }
+ Set listed = PipelineDependencyLabels.parseAnnotation(annotation);
+ return listed.contains(identifier);
+ }
+
+ /**
+ * Builds a human-readable blocker description: the pipeline name, plus (when present) the top
+ * ownerReference's {@code kind/name} so the user knows which higher-level resource owns it.
+ */
+ private static String describeBlocker(V1alpha1Pipeline pipeline) {
+ V1ObjectMeta meta = pipeline.getMetadata();
+ String name = meta == null ? "" : meta.getName();
+ String ownerSuffix = "";
+ if (meta != null && meta.getOwnerReferences() != null && !meta.getOwnerReferences().isEmpty()) {
+ V1OwnerReference owner = meta.getOwnerReferences().get(0);
+ ownerSuffix = " (owned by " + owner.getKind() + "/" + owner.getName() + ")";
+ }
+ return name + ownerSuffix;
+ }
+}
diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabels.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabels.java
new file mode 100644
index 00000000..d9662337
--- /dev/null
+++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabels.java
@@ -0,0 +1,123 @@
+package com.linkedin.hoptimator.k8s;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.linkedin.hoptimator.Sink;
+import com.linkedin.hoptimator.Source;
+
+
+/**
+ * Computes the labels and annotation that encode a Pipeline CRD's dependency edges.
+ *
+ * Every source and sink a pipeline references is recorded as a label:
+ * {@code hoptimator.linkedin.com/depends-on-: "_"} where
+ * {@code } is a deterministic hash derived from {@code database + "_" + pathString}.
+ * The hash keeps label keys within Kubernetes's 63-character name limit for arbitrary paths,
+ * and lets {@code K8sApi.select} filter pipelines by dependency on the server.
+ *
+ * A collision-guard annotation ({@code ANNOTATION_KEY}) lists all logical identifiers verbatim,
+ * so the delete-time check can distinguish a real dependency match from a rare hash collision.
+ */
+public final class PipelineDependencyLabels {
+
+ static final String LABEL_PREFIX = "hoptimator.linkedin.com/depends-on-";
+ public static final String ANNOTATION_KEY = "hoptimator.linkedin.com/depends-on";
+
+ private static final int SLUG_LENGTH = 16; // 64 bits of SHA-256 → ~1 in 1.8e19 collisions
+ private static final int MAX_LABEL_VALUE = 63;
+
+ private PipelineDependencyLabels() {
+ }
+
+ /**
+ * Canonical logical identifier for a resource: {@code _}.
+ */
+ public static String identifier(String database, Iterable path) {
+ return database + "_" + String.join(".", path);
+ }
+
+ /** Hex slug derived from the full identifier; same identifier always produces the same slug. */
+ public static String slug(String database, Iterable path) {
+ byte[] digest = sha256(identifier(database, path).getBytes(StandardCharsets.UTF_8));
+ StringBuilder sb = new StringBuilder(SLUG_LENGTH);
+ for (int i = 0; i < SLUG_LENGTH / 2; i++) {
+ sb.append(String.format("%02x", digest[i]));
+ }
+ return sb.toString();
+ }
+
+ /** Label key a Pipeline carries if it depends on the given resource. */
+ public static String labelKey(String database, Iterable path) {
+ return LABEL_PREFIX + slug(database, path);
+ }
+
+ /**
+ * Labels to stamp on a Pipeline CRD — one entry per source and the sink. Both edges
+ * matter to the guard: dropping a source orphans pipelines that read from it; dropping a sink
+ * orphans pipelines that write to it.
+ *
+ * Keys are the same as {@link #labelKey}. Values are the readable identifier, truncated
+ * to 63 chars if necessary (the annotation preserves the untruncated form). Values are
+ * for debugging purposes only.
+ */
+ public static Map labelsFor(Collection sources, Sink sink) {
+ Map labels = new LinkedHashMap<>();
+ for (Source src : sources) {
+ labels.put(labelKey(src.database(), src.path()), truncate(identifier(src.database(), src.path())));
+ }
+ if (sink != null) {
+ labels.put(labelKey(sink.database(), sink.path()), truncate(identifier(sink.database(), sink.path())));
+ }
+ return labels;
+ }
+
+ /**
+ * Collision-guard annotation value — comma-separated list of full source and sink identifiers,
+ * deduplicated and not truncated. The delete-time check cross-references this annotation after
+ * the label selector narrows the candidate set.
+ */
+ public static String annotationFor(Collection sources, Sink sink) {
+ Set ids = new LinkedHashSet<>();
+ for (Source src : sources) {
+ ids.add(identifier(src.database(), src.path()));
+ }
+ if (sink != null) {
+ ids.add(identifier(sink.database(), sink.path()));
+ }
+ return String.join(",", ids);
+ }
+
+ /** Parses the collision-guard annotation back into the set of identifiers it encoded. */
+ public static Set parseAnnotation(String annotation) {
+ Set out = new LinkedHashSet<>();
+ if (annotation == null || annotation.isEmpty()) {
+ return out;
+ }
+ for (String id : annotation.split(",")) {
+ String trimmed = id.trim();
+ if (!trimmed.isEmpty()) {
+ out.add(trimmed);
+ }
+ }
+ return out;
+ }
+
+ private static String truncate(String value) {
+ return value.length() <= MAX_LABEL_VALUE ? value : value.substring(0, MAX_LABEL_VALUE);
+ }
+
+ private static byte[] sha256(byte[] input) {
+ try {
+ return MessageDigest.getInstance("SHA-256").digest(input);
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException("SHA-256 unavailable", e);
+ }
+ }
+}
diff --git a/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.ValidatorProvider b/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.ValidatorProvider
new file mode 100644
index 00000000..5fda4b5c
--- /dev/null
+++ b/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.ValidatorProvider
@@ -0,0 +1 @@
+com.linkedin.hoptimator.k8s.K8sValidatorProvider
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sApiErrorResponseTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sApiErrorResponseTest.java
index 1d753c42..ff9cd7f9 100644
--- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sApiErrorResponseTest.java
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sApiErrorResponseTest.java
@@ -78,6 +78,7 @@ void deleteThrowsWhenResponseIsErrorStatus() throws ApiException {
}
@Test
+ @SuppressWarnings("unchecked")
void updateThrowsWhenResponseIsErrorStatusOnFinalUpdate() throws ApiException {
V1alpha1Pipeline pipeline = makePipeline("bad-pipeline", "test-ns");
V1alpha1Pipeline existing = makePipeline("bad-pipeline", "test-ns");
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sApiTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sApiTest.java
index 92414370..f1e52c26 100644
--- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sApiTest.java
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sApiTest.java
@@ -276,6 +276,58 @@ void updateMergesLabelsFromExisting() throws SQLException {
assertEquals("new-val", mergedLabels.get("new-key"));
}
+ @Test
+ void updateMergesAnnotationsFromExisting() throws SQLException {
+ V1alpha1Pipeline pipeline = makePipeline("existing", "test-ns");
+ Map newAnnotations = new HashMap<>();
+ newAnnotations.put("new-key", "new-val");
+ pipeline.getMetadata().setAnnotations(newAnnotations);
+
+ V1alpha1Pipeline existing = makePipeline("existing", "test-ns");
+ Map existingAnnotations = new HashMap<>();
+ existingAnnotations.put("existing-key", "existing-val");
+ existing.getMetadata().setAnnotations(existingAnnotations);
+ existing.getMetadata().setResourceVersion("rv2");
+
+ when(mockGenericApi.get(eq("test-ns"), eq("existing"))).thenReturn(mockSingleResponse);
+ when(mockSingleResponse.isSuccess()).thenReturn(true);
+ when(mockSingleResponse.getObject()).thenReturn(existing);
+ when(mockGenericApi.update(any(V1alpha1Pipeline.class))).thenReturn(mockSingleResponse);
+
+ api.update(pipeline);
+
+ Map mergedAnnotations = pipeline.getMetadata().getAnnotations();
+ assertEquals("existing-val", mergedAnnotations.get("existing-key"));
+ assertEquals("new-val", mergedAnnotations.get("new-key"));
+ }
+
+ @Test
+ void updateLocalAnnotationWinsOnSharedKey() throws SQLException {
+ // Locks in the freshness guarantee the dependency-guard relies on: when the local object
+ // sets the same annotation key the cluster's existing object had, the local value wins.
+ // Without this, the depends-on annotation would never refresh on CREATE OR REPLACE and stale
+ // labels could no longer be disambiguated by the collision-guard.
+ V1alpha1Pipeline pipeline = makePipeline("existing", "test-ns");
+ Map newAnnotations = new HashMap<>();
+ newAnnotations.put("shared-key", "fresh-val");
+ pipeline.getMetadata().setAnnotations(newAnnotations);
+
+ V1alpha1Pipeline existing = makePipeline("existing", "test-ns");
+ Map existingAnnotations = new HashMap<>();
+ existingAnnotations.put("shared-key", "stale-val");
+ existing.getMetadata().setAnnotations(existingAnnotations);
+ existing.getMetadata().setResourceVersion("rv2");
+
+ when(mockGenericApi.get(eq("test-ns"), eq("existing"))).thenReturn(mockSingleResponse);
+ when(mockSingleResponse.isSuccess()).thenReturn(true);
+ when(mockSingleResponse.getObject()).thenReturn(existing);
+ when(mockGenericApi.update(any(V1alpha1Pipeline.class))).thenReturn(mockSingleResponse);
+
+ api.update(pipeline);
+
+ assertEquals("fresh-val", pipeline.getMetadata().getAnnotations().get("shared-key"));
+ }
+
@Test
void updateWhenObjectNotExistsCallsCreate() throws SQLException {
V1alpha1Pipeline pipeline = makePipeline("new-pipeline", "test-ns");
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployerTest.java
index 2acb55e0..5b987339 100644
--- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployerTest.java
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployerTest.java
@@ -8,7 +8,6 @@
import com.linkedin.hoptimator.SqlDialect;
import com.linkedin.hoptimator.ThrowingFunction;
import com.linkedin.hoptimator.util.DeploymentService;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.kubernetes.client.openapi.models.V1OwnerReference;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -19,6 +18,7 @@
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -33,8 +33,6 @@
@ExtendWith(MockitoExtension.class)
-@SuppressFBWarnings(value = {"OBL_UNSATISFIED_OBLIGATION"},
- justification = "Mock objects created in stubbing setup don't need resource management")
class K8sMaterializedViewDeployerTest {
@Mock
@@ -75,7 +73,7 @@ K8sViewDeployer createViewDeployer(MaterializedView v, K8sContext ctx) {
@Override
K8sPipelineBundle createPipelineBundle(String name, List pipelineSpecs, String sql,
- K8sContext viewContext) {
+ Collection sources, Sink sink, K8sContext viewContext) {
return capturedBundle;
}
};
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineBundleTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineBundleTest.java
index 396271fe..dc84bb62 100644
--- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineBundleTest.java
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineBundleTest.java
@@ -9,9 +9,13 @@
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import com.linkedin.hoptimator.Sink;
+import com.linkedin.hoptimator.Source;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
@@ -42,9 +46,10 @@ void setUp() {
private K8sPipelineBundle makeBundleWithMocks(String name, List pipelineSpecs) {
K8sPipelineDeployer capturedPipelineDeployer = pipelineDeployer;
K8sYamlDeployerImpl capturedYamlDeployer = yamlDeployer;
- return new K8sPipelineBundle(name, pipelineSpecs, "INSERT INTO sink SELECT * FROM source", context) {
+ return new K8sPipelineBundle(name, pipelineSpecs, "INSERT INTO sink SELECT * FROM source", Collections.emptyList(), null, context) {
@Override
- K8sPipelineDeployer createPipelineDeployer(String n, List specs, String sql, K8sContext ctx) {
+ K8sPipelineDeployer createPipelineDeployer(String n, List specs, String sql,
+ Collection sources, Sink sink, K8sContext ctx) {
return capturedPipelineDeployer;
}
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidatorTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidatorTest.java
new file mode 100644
index 00000000..ac94636d
--- /dev/null
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDependencyValidatorTest.java
@@ -0,0 +1,119 @@
+package com.linkedin.hoptimator.k8s;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.linkedin.hoptimator.Source;
+import com.linkedin.hoptimator.Validator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
+
+
+/**
+ * Unit tests for {@link K8sPipelineDependencyValidator}. The validator is a thin adapter that
+ * forwards to {@link PipelineDependencyChecker#assertNoExternalDependents} — these tests use
+ * {@link MockedStatic} on both K8sContext and PipelineDependencyChecker to verify that source
+ * fields and self-owner fields are forwarded correctly, and that thrown SQLException becomes a
+ * validation issue rather than propagating.
+ */
+@ExtendWith(MockitoExtension.class)
+class K8sPipelineDependencyValidatorTest {
+
+ @Mock
+ private MockedStatic contextStatic;
+
+ @Mock
+ private MockedStatic checkerStatic;
+
+ @Mock
+ private Connection connection;
+
+ @Mock
+ private K8sContext context;
+
+ private static Source source() {
+ return new Source("kafka1", List.of("KAFKA", "my-topic"), Map.of());
+ }
+
+ @Test
+ void validateForwardsSourceFieldsAndSelfOwnerToChecker() {
+ contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context);
+
+ K8sPipelineDependencyValidator validator =
+ new K8sPipelineDependencyValidator(source(), "LogicalTable", "my-table");
+ Validator.Issues issues = new Validator.Issues("test");
+
+ validator.validate(issues, connection);
+
+ ArgumentCaptor dbCaptor = ArgumentCaptor.forClass(String.class);
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor> pathCaptor = ArgumentCaptor.forClass(List.class);
+ ArgumentCaptor kindCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class);
+
+ checkerStatic.verify(() -> PipelineDependencyChecker.assertNoExternalDependents(
+ eq(context), dbCaptor.capture(), pathCaptor.capture(),
+ kindCaptor.capture(), nameCaptor.capture()));
+
+ assertEquals("kafka1", dbCaptor.getValue());
+ assertEquals(List.of("KAFKA", "my-topic"), pathCaptor.getValue());
+ assertEquals("LogicalTable", kindCaptor.getValue());
+ assertEquals("my-table", nameCaptor.getValue());
+ assertTrue(issues.valid());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void validatePassesNullSelfOwnerWhenUnset() {
+ contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context);
+
+ K8sPipelineDependencyValidator validator =
+ new K8sPipelineDependencyValidator(source(), null, null);
+ Validator.Issues issues = new Validator.Issues("test");
+
+ validator.validate(issues, connection);
+
+ ArgumentCaptor kindCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class);
+ checkerStatic.verify(() -> PipelineDependencyChecker.assertNoExternalDependents(
+ eq(context), nullable(String.class), nullable(List.class),
+ kindCaptor.capture(), nameCaptor.capture()));
+
+ assertNull(kindCaptor.getValue());
+ assertNull(nameCaptor.getValue());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void validateRecordsCheckerSqlExceptionAsIssue() {
+ contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context);
+ checkerStatic.when(() -> PipelineDependencyChecker.assertNoExternalDependents(
+ nullable(K8sContext.class), nullable(String.class), nullable(List.class),
+ nullable(String.class), nullable(String.class)))
+ .thenThrow(new SQLException("3 active pipeline(s) depend on it: p1, p2, p3"));
+
+ K8sPipelineDependencyValidator validator =
+ new K8sPipelineDependencyValidator(source(), null, null);
+ Validator.Issues issues = new Validator.Issues("test");
+
+ validator.validate(issues, connection);
+
+ assertFalse(issues.valid(), "blocking pipelines should surface as a validation error");
+ assertTrue(issues.toString().contains("3 active pipeline"),
+ "issue message should include the SQLException's text: " + issues);
+ }
+}
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployerTest.java
index c7ef7469..a18122ed 100644
--- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployerTest.java
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployerTest.java
@@ -1,22 +1,35 @@
package com.linkedin.hoptimator.k8s;
+import com.linkedin.hoptimator.Sink;
+import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
import org.junit.jupiter.api.Test;
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class K8sPipelineDeployerTest {
+ private static Source src(String db, String path) {
+ return new Source(db, Collections.singletonList(path), Collections.emptyMap());
+ }
+
+ private static Sink sink(String db, String path) {
+ return new Sink(db, Collections.singletonList(path), Collections.emptyMap());
+ }
+
@Test
void toK8sObjectSetsPipelineFields() throws SQLException {
- K8sPipelineDeployer deployer = new K8sPipelineDeployer(
- "my-pipeline", Arrays.asList("spec1", "spec2"), "SELECT 1", null);
+ K8sPipelineDeployer deployer = new K8sPipelineDeployer("my-pipeline", Arrays.asList("spec1", "spec2"),
+ "SELECT 1", Collections.emptyList(), null, null);
V1alpha1Pipeline pipeline = deployer.toK8sObject();
@@ -29,11 +42,46 @@ void toK8sObjectSetsPipelineFields() throws SQLException {
@Test
void toK8sObjectWithSingleSpec() throws SQLException {
- K8sPipelineDeployer deployer = new K8sPipelineDeployer(
- "single", List.of("only-spec"), "SELECT 1", null);
+ K8sPipelineDeployer deployer = new K8sPipelineDeployer("single", List.of("only-spec"),
+ "SELECT 1", Collections.emptyList(), null, null);
V1alpha1Pipeline pipeline = deployer.toK8sObject();
assertEquals("only-spec", pipeline.getSpec().getYaml());
}
+
+ @Test
+ void stampsDependencyLabelsForSourcesAndSink() throws SQLException {
+ K8sPipelineDeployer deployer = new K8sPipelineDeployer(
+ "p1", List.of("spec"), "SELECT 1",
+ Arrays.asList(src("kafka1", "topic-a"), src("kafka2", "topic-b")),
+ sink("mysql", "outbox"), null);
+
+ V1alpha1Pipeline pipeline = deployer.toK8sObject();
+ Map labels = pipeline.getMetadata().getLabels();
+
+ assertEquals(3, labels.size(), "should have one label per source + one for the sink");
+ assertTrue(labels.containsKey(
+ PipelineDependencyLabels.labelKey("kafka1", Collections.singletonList("topic-a"))));
+ assertTrue(labels.containsKey(
+ PipelineDependencyLabels.labelKey("kafka2", Collections.singletonList("topic-b"))));
+ assertTrue(labels.containsKey(
+ PipelineDependencyLabels.labelKey("mysql", Collections.singletonList("outbox"))));
+ }
+
+ @Test
+ void stampsCollisionGuardAnnotation() throws SQLException {
+ K8sPipelineDeployer deployer = new K8sPipelineDeployer(
+ "p1", List.of("spec"), "SELECT 1",
+ Collections.singletonList(src("kafka", "topic")),
+ sink("mysql", "outbox"), null);
+
+ V1alpha1Pipeline pipeline = deployer.toK8sObject();
+ Map annotations = pipeline.getMetadata().getAnnotations();
+
+ String annotation = annotations.get(PipelineDependencyLabels.ANNOTATION_KEY);
+ assertNotNull(annotation);
+ assertTrue(annotation.contains("kafka_topic"));
+ assertTrue(annotation.contains("mysql_outbox"));
+ }
}
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sSourceDeployerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sSourceDeployerTest.java
index 4535a359..9bc94560 100644
--- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sSourceDeployerTest.java
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sSourceDeployerTest.java
@@ -5,7 +5,6 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateSpec;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -26,13 +25,10 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-@SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"},
- justification = "Mockito doReturn().when() stubs — framework captures the return value")
class K8sSourceDeployerTest {
@Mock
@@ -58,7 +54,12 @@ K8sYamlApi createYamlApi(K8sContext context) {
return fakeYamlApi;
}
};
+ }
+
+ /** Wires up {@code mockContext.connection()} for tests that exercise specify()'s template path. */
+ private void stubConnection() {
when(mockContext.connection()).thenReturn(connection);
+ when(connection.connectionProperties()).thenReturn(new Properties());
}
private K8sSourceDeployer makeDeployer(Source source) {
@@ -85,7 +86,7 @@ K8sSnapshot createSnapshot(K8sContext context) {
@Test
void specifyWithNoTemplatesReturnsEmpty() throws SQLException {
- doReturn(new Properties()).when(connection).connectionProperties();
+ stubConnection();
Source source = new Source("testdb", Arrays.asList("schema", "table"),
Collections.emptyMap());
@@ -100,7 +101,7 @@ void specifyWithNoTemplatesReturnsEmpty() throws SQLException {
@Test
void specifyRendersMatchingTemplate() throws SQLException {
- doReturn(new Properties()).when(connection).connectionProperties();
+ stubConnection();
templates.add(new V1alpha1TableTemplate()
.metadata(new V1ObjectMeta().name("template1"))
@@ -121,7 +122,7 @@ void specifyRendersMatchingTemplate() throws SQLException {
@Test
void specifyFiltersOutNonMatchingDatabases() throws SQLException {
- doReturn(new Properties()).when(connection).connectionProperties();
+ stubConnection();
templates.add(new V1alpha1TableTemplate()
.metadata(new V1ObjectMeta().name("template1"))
@@ -141,7 +142,7 @@ void specifyFiltersOutNonMatchingDatabases() throws SQLException {
@Test
void specifyWithJobPropertiesInOptions() throws SQLException {
- doReturn(new Properties()).when(connection).connectionProperties();
+ stubConnection();
templates.add(new V1alpha1TableTemplate()
.metadata(new V1ObjectMeta().name("template1"))
@@ -161,7 +162,7 @@ void specifyWithJobPropertiesInOptions() throws SQLException {
@Test
void specifyWithNullDatabasesMatchesAll() throws SQLException {
- doReturn(new Properties()).when(connection).connectionProperties();
+ stubConnection();
templates.add(new V1alpha1TableTemplate()
.metadata(new V1ObjectMeta().name("template1"))
@@ -182,7 +183,7 @@ void specifyWithNullDatabasesMatchesAll() throws SQLException {
@Test
void specifyRendersNonEmptyYamlWithSourceContent() throws SQLException {
// Verify fields are non-empty.
- doReturn(new Properties()).when(connection).connectionProperties();
+ stubConnection();
templates.add(new V1alpha1TableTemplate()
.metadata(new V1ObjectMeta().name("template1"))
@@ -206,7 +207,7 @@ void specifyRendersNonEmptyYamlWithSourceContent() throws SQLException {
@Test
void getJobPropertiesFromOptionsMapsCorrectKeys() throws SQLException {
- doReturn(new Properties()).when(connection).connectionProperties();
+ stubConnection();
// Template uses {{job.properties}} prefix variable to expose job properties
templates.add(new V1alpha1TableTemplate()
@@ -233,7 +234,7 @@ void getJobPropertiesFromOptionsMapsCorrectKeys() throws SQLException {
@Test
void getJobPropertiesFromOptionsFiltersNonMatchingKeys() throws SQLException {
// Ensures the filter actually filters — only job.properties.* keys should be mapped
- doReturn(new Properties()).when(connection).connectionProperties();
+ stubConnection();
templates.add(new V1alpha1TableTemplate()
.metadata(new V1ObjectMeta().name("template1"))
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sValidatorProviderTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sValidatorProviderTest.java
new file mode 100644
index 00000000..a4cf0189
--- /dev/null
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sValidatorProviderTest.java
@@ -0,0 +1,77 @@
+package com.linkedin.hoptimator.k8s;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import com.linkedin.hoptimator.PendingDelete;
+import com.linkedin.hoptimator.Source;
+import com.linkedin.hoptimator.Validator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+class K8sValidatorProviderTest {
+
+ private static Source testSource() {
+ return new Source("kafka1", java.util.List.of("KAFKA", "my-topic"), Map.of());
+ }
+
+ @Test
+ void returnsDependencyValidatorForPendingDeleteOfSource() {
+ K8sValidatorProvider provider = new K8sValidatorProvider();
+ PendingDelete pd = new PendingDelete<>(testSource());
+
+ Collection validators = provider.validators(pd, null);
+
+ assertEquals(1, validators.size());
+ assertInstanceOf(K8sPipelineDependencyValidator.class, validators.iterator().next());
+ }
+
+ @Test
+ void returnsDependencyValidatorWhenSelfOwnerIsSet() {
+ // Self-owner fields are stored on the validator; this exercises the (kind, name) plumbing
+ // through the provider boundary.
+ K8sValidatorProvider provider = new K8sValidatorProvider();
+ PendingDelete pd = new PendingDelete<>(testSource(), "LogicalTable", "my-table");
+
+ Collection validators = provider.validators(pd, null);
+
+ assertEquals(1, validators.size());
+ assertInstanceOf(K8sPipelineDependencyValidator.class, validators.iterator().next());
+ }
+
+ @Test
+ void returnsEmptyForRawSourceWithoutPendingDeleteWrapper() {
+ // The validator only fires for a delete-intent signal — not for a plain Source. Other callers
+ // of ValidationService.validate(source, ...) must NOT trigger the K8s pipeline lookup.
+ K8sValidatorProvider provider = new K8sValidatorProvider();
+
+ Collection validators = provider.validators(testSource(), null);
+
+ assertTrue(validators.isEmpty());
+ }
+
+ @Test
+ void returnsEmptyForPendingDeleteOfNonSourceTarget() {
+ K8sValidatorProvider provider = new K8sValidatorProvider();
+ PendingDelete pd = new PendingDelete<>("not-a-source");
+
+ Collection validators = provider.validators(pd, null);
+
+ assertTrue(validators.isEmpty());
+ }
+
+ @Test
+ void returnsEmptyForUnrelatedTypes() {
+ K8sValidatorProvider provider = new K8sValidatorProvider();
+
+ assertTrue(provider.validators("just-a-string", null).isEmpty());
+ assertTrue(provider.validators(42, null).isEmpty());
+ assertTrue(provider.validators(Collections.emptyList(), null).isEmpty());
+ }
+}
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sViewTableTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sViewTableTest.java
index a7a50130..8e817d15 100644
--- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sViewTableTest.java
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sViewTableTest.java
@@ -198,7 +198,7 @@ void validateWithValidRows() {
stubRows();
Validator.Issues issues = new Validator.Issues("test");
- tableWithApi.validate(issues);
+ tableWithApi.validate(issues, null);
assertNotNull(issues);
}
@@ -213,7 +213,7 @@ void validateDetectsDuplicateNames() {
stubRows();
Validator.Issues issues = new Validator.Issues("test");
- tableWithApi.validate(issues);
+ tableWithApi.validate(issues, null);
assertNotNull(issues);
}
@@ -402,7 +402,7 @@ void validateWithDuplicateNameRecordsError() {
stubRows();
Validator.Issues issues = new Validator.Issues("test");
- tableWithApi.validate(issues);
+ tableWithApi.validate(issues, null);
assertFalse(issues.valid(),
"Duplicate view name must record an error, making issues invalid");
@@ -419,7 +419,7 @@ void validateWithUniqueValidNamesIsValid() {
stubRows();
Validator.Issues issues = new Validator.Issues("test");
- tableWithApi.validate(issues);
+ tableWithApi.validate(issues, null);
assertTrue(issues.valid(), "unique valid names must not produce errors");
}
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sYamlApiTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sYamlApiTest.java
index 8f8a623a..d640d7da 100644
--- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sYamlApiTest.java
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sYamlApiTest.java
@@ -849,6 +849,75 @@ void updateExistingObjectWithNewLabelsOnlyPreservesAll() throws SQLException {
assertNotNull(capturedLabels);
assertTrue(capturedLabels.containsKey("existing"), "Should contain existing label");
}
+
+ @Test
+ void updateExistingPreservesExistingAnnotationsOverNewOnes() throws SQLException {
+ // obj.setMetadata(existing.getObject().getMetadata()) — existing metadata replaces obj's.
+ // After update, existing labels are present on the object passed to dynApi.update().
+ DynamicKubernetesObject existingObj = new DynamicKubernetesObject();
+ existingObj.setApiVersion("v1");
+ existingObj.setKind("ConfigMap");
+ Map existingAnnotations = new HashMap<>();
+ existingAnnotations.put("key", "existing-value");
+ existingObj.setMetadata(new V1ObjectMeta().name("target").namespace("ns").annotations(existingAnnotations));
+
+ DynamicKubernetesApi dynApi = mock(DynamicKubernetesApi.class);
+ doReturn(dynApi).when(mockContext).dynamic(anyString(), anyString());
+ doReturn(successResponse(existingObj)).when(dynApi).get(anyString(), anyString());
+ doReturn(successResponse(existingObj)).when(dynApi).update(any(DynamicKubernetesObject.class));
+
+ K8sYamlApi api = new K8sYamlApi(mockContext);
+ DynamicKubernetesObject obj = new DynamicKubernetesObject();
+ obj.setApiVersion("v1");
+ obj.setKind("ConfigMap");
+ // obj starts with different annotation value for same key
+ Map newAnnotations = new HashMap<>();
+ newAnnotations.put("key", "new-value");
+ obj.setMetadata(new V1ObjectMeta().name("target").namespace("ns").annotations(newAnnotations));
+
+ api.update(obj);
+
+ // After update, obj's metadata = existingObj's metadata
+ // The existing label value should be present since existing metadata replaces obj's metadata
+ ArgumentCaptor captor = ArgumentCaptor.forClass(DynamicKubernetesObject.class);
+ verify(dynApi, times(1)).update(captor.capture());
+ Map capturedAnnotations = captor.getValue().getMetadata().getAnnotations();
+ assertNotNull(capturedAnnotations);
+ assertEquals("existing-value", capturedAnnotations.get("key"));
+ }
+
+ @Test
+ void updateExistingWithDisjointAnnotationsKeepsExisting() throws SQLException {
+ // Mirrors updateExistingObjectWithNewLabelsOnlyPreservesAll for annotations: when local and
+ // existing carry different keys, existing's annotation ends up on the captured update payload.
+ DynamicKubernetesObject existingObj = new DynamicKubernetesObject();
+ existingObj.setApiVersion("v1");
+ existingObj.setKind("ConfigMap");
+ Map existingAnnotations = new HashMap<>();
+ existingAnnotations.put("from-cluster", "e-val");
+ existingObj.setMetadata(new V1ObjectMeta().name("target").namespace("ns").annotations(existingAnnotations));
+
+ DynamicKubernetesApi dynApi = mock(DynamicKubernetesApi.class);
+ doReturn(dynApi).when(mockContext).dynamic(anyString(), anyString());
+ doReturn(successResponse(existingObj)).when(dynApi).get(anyString(), anyString());
+ doReturn(successResponse(existingObj)).when(dynApi).update(any(DynamicKubernetesObject.class));
+
+ K8sYamlApi api = new K8sYamlApi(mockContext);
+ DynamicKubernetesObject obj = new DynamicKubernetesObject();
+ obj.setApiVersion("v1");
+ obj.setKind("ConfigMap");
+ Map newAnnotations = new HashMap<>();
+ newAnnotations.put("from-local", "a-val");
+ obj.setMetadata(new V1ObjectMeta().name("target").namespace("ns").annotations(newAnnotations));
+
+ api.update(obj);
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(DynamicKubernetesObject.class);
+ verify(dynApi, times(1)).update(captor.capture());
+ Map capturedAnnotations = captor.getValue().getMetadata().getAnnotations();
+ assertNotNull(capturedAnnotations);
+ assertTrue(capturedAnnotations.containsKey("from-cluster"), "Should contain existing annotation");
+ }
}
private K8sYamlApi createRealApi() {
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyCheckerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyCheckerTest.java
new file mode 100644
index 00000000..25cf6a40
--- /dev/null
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyCheckerTest.java
@@ -0,0 +1,140 @@
+package com.linkedin.hoptimator.k8s;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1OwnerReference;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
+import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+
+@ExtendWith(MockitoExtension.class)
+class PipelineDependencyCheckerTest {
+
+ @Mock
+ private K8sApi api;
+
+ private static final String DB = "kafka1";
+ private static final List PATH = Collections.singletonList("my-topic");
+ private static final String IDENTIFIER = "kafka1_my-topic";
+
+ private static V1alpha1Pipeline pipeline(String name, String ownerKind, String ownerName,
+ String annotationValue) {
+ V1ObjectMeta meta = new V1ObjectMeta().name(name);
+ if (ownerKind != null && ownerName != null) {
+ meta.addOwnerReferencesItem(new V1OwnerReference().kind(ownerKind).name(ownerName));
+ }
+ if (annotationValue != null) {
+ Map annotations = new HashMap<>();
+ annotations.put(PipelineDependencyLabels.ANNOTATION_KEY, annotationValue);
+ meta.setAnnotations(annotations);
+ }
+ return new V1alpha1Pipeline().metadata(meta);
+ }
+
+ @Test
+ void passesWhenNoPipelinesMatch() throws SQLException {
+ when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))).thenReturn(Collections.emptyList());
+
+ assertDoesNotThrow(() -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null));
+ }
+
+ @Test
+ void blocksOnExternalPipeline() throws SQLException {
+ when(api.select(PipelineDependencyLabels.labelKey(DB, PATH)))
+ .thenReturn(Collections.singletonList(pipeline("ext-pipe", "View", "owner", IDENTIFIER)));
+
+ SQLException ex = assertThrows(SQLException.class,
+ () -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null));
+ assertTrue(ex.getMessage().contains("ext-pipe"));
+ assertTrue(ex.getMessage().contains(IDENTIFIER));
+ }
+
+ @Test
+ void skipsSelfOwnedPipeline() throws SQLException {
+ when(api.select(PipelineDependencyLabels.labelKey(DB, PATH)))
+ .thenReturn(Collections.singletonList(pipeline("owned-pipe", "LogicalTable", "self-name", IDENTIFIER)));
+
+ assertDoesNotThrow(() -> PipelineDependencyChecker.assertNoExternalDependents(
+ api, DB, PATH, "LogicalTable", "self-name"));
+ }
+
+ @Test
+ void blocksOnExternalWhenSomeAreSelfOwned() throws SQLException {
+ when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))).thenReturn(Arrays.asList(
+ pipeline("owned-pipe", "LogicalTable", "self-name", IDENTIFIER),
+ pipeline("external-pipe", "View", "other-owner", IDENTIFIER)));
+
+ SQLException ex = assertThrows(SQLException.class,
+ () -> PipelineDependencyChecker.assertNoExternalDependents(
+ api, DB, PATH, "LogicalTable", "self-name"));
+ assertTrue(ex.getMessage().contains("external-pipe"));
+ assertFalse(ex.getMessage().contains("owned-pipe"), "self-owned pipeline must not be listed");
+ }
+
+ @Test
+ void rejectsSlugCollisionViaAnnotation() throws SQLException {
+ // Pipeline labels collide on the slug (which is what api.select matched on) but the
+ // annotation reveals the actual identifier is different — so this should NOT block.
+ when(api.select(PipelineDependencyLabels.labelKey(DB, PATH)))
+ .thenReturn(Collections.singletonList(pipeline("colliding-pipe", "View", "owner",
+ "some-other-database/some-other-path")));
+
+ assertDoesNotThrow(() -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null));
+ }
+
+ @Test
+ void treatsMissingAnnotationAsTrusted() throws SQLException {
+ // A pipeline with the matching label but no depends-on annotation (pre-labeling migration
+ // case, or future code path that didn't write the annotation) is still treated as a blocker.
+ when(api.select(PipelineDependencyLabels.labelKey(DB, PATH)))
+ .thenReturn(Collections.singletonList(pipeline("legacy-pipe", "View", "owner", null)));
+
+ SQLException ex = assertThrows(SQLException.class,
+ () -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null));
+ assertTrue(ex.getMessage().contains("legacy-pipe"));
+ }
+
+ @Test
+ void errorMessageIncludesOwnerKindAndName() throws SQLException {
+ when(api.select(PipelineDependencyLabels.labelKey(DB, PATH)))
+ .thenReturn(Collections.singletonList(pipeline("ext-pipe", "View", "owner", IDENTIFIER)));
+
+ SQLException ex = assertThrows(SQLException.class,
+ () -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null));
+ assertTrue(ex.getMessage().contains("View/owner"),
+ "error should name the owning View so the user knows what to unhook: " + ex.getMessage());
+ }
+
+ @Test
+ void errorMessageListsAllBlockers() throws SQLException {
+ when(api.select(PipelineDependencyLabels.labelKey(DB, PATH))).thenReturn(Arrays.asList(
+ pipeline("p1", "View", "owner1", IDENTIFIER),
+ pipeline("p2", "View", "owner2", IDENTIFIER),
+ pipeline("p3", "View", "owner3", IDENTIFIER)));
+
+ SQLException ex = assertThrows(SQLException.class,
+ () -> PipelineDependencyChecker.assertNoExternalDependents(api, DB, PATH, null, null));
+ assertTrue(ex.getMessage().contains("p1"));
+ assertTrue(ex.getMessage().contains("p2"));
+ assertTrue(ex.getMessage().contains("p3"));
+ assertTrue(ex.getMessage().contains("3 active pipeline"));
+ }
+}
diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabelsTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabelsTest.java
new file mode 100644
index 00000000..b5cd6afb
--- /dev/null
+++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineDependencyLabelsTest.java
@@ -0,0 +1,159 @@
+package com.linkedin.hoptimator.k8s;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.jupiter.api.Test;
+
+import com.linkedin.hoptimator.Sink;
+import com.linkedin.hoptimator.Source;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+class PipelineDependencyLabelsTest {
+
+ private static Source src(String db, String... path) {
+ return new Source(db, Arrays.asList(path), Collections.emptyMap());
+ }
+
+ private static Sink sink(String db, String... path) {
+ return new Sink(db, Arrays.asList(path), Collections.emptyMap());
+ }
+
+ @Test
+ void identifierJoinsDatabaseAndPath() {
+ // Separator is "_" so the identifier is also a valid K8s label value out of the box.
+ assertEquals("mydb_a.b.c", PipelineDependencyLabels.identifier("mydb", Arrays.asList("a", "b", "c")));
+ }
+
+ @Test
+ void slugIsDeterministic() {
+ String s1 = PipelineDependencyLabels.slug("db", Arrays.asList("foo", "bar"));
+ String s2 = PipelineDependencyLabels.slug("db", Arrays.asList("foo", "bar"));
+ assertEquals(s1, s2);
+ }
+
+ @Test
+ void slugVariesByDatabase() {
+ String a = PipelineDependencyLabels.slug("db1", Collections.singletonList("t"));
+ String b = PipelineDependencyLabels.slug("db2", Collections.singletonList("t"));
+ assertNotEquals(a, b);
+ }
+
+ @Test
+ void slugVariesByPath() {
+ String a = PipelineDependencyLabels.slug("db", Arrays.asList("schema", "t"));
+ String b = PipelineDependencyLabels.slug("db", Arrays.asList("schema", "u"));
+ assertNotEquals(a, b);
+ }
+
+ @Test
+ void labelKeyFitsKubernetesNameLimit() {
+ // Long path stressing the slug — name portion (after the /) must be ≤ 63 chars.
+ String key = PipelineDependencyLabels.labelKey(
+ "a-really-long-database-name",
+ Arrays.asList("catalog", "schema", "a_very_long_table_name_that_exceeds_sixty_three_chars"));
+ String namePortion = key.substring(key.indexOf('/') + 1);
+ assertTrue(namePortion.length() <= 63, "name portion must be ≤63 chars, got " + namePortion.length());
+ assertTrue(namePortion.matches("[a-z0-9]([-a-z0-9_.]*[a-z0-9])?"),
+ "name portion must match K8s label-name regex, got: " + namePortion);
+ }
+
+ @Test
+ void labelsForIncludesSourcesAndSink() {
+ // Both edges matter: dropping a source orphans readers; dropping a sink orphans writers.
+ Source s1 = src("kafka1", "events");
+ Source s2 = src("venice1", "store");
+ Sink sink = sink("mysql1", "outbox");
+ Map labels = PipelineDependencyLabels.labelsFor(Arrays.asList(s1, s2), sink);
+
+ assertEquals(3, labels.size());
+ assertTrue(labels.containsKey(PipelineDependencyLabels.labelKey("kafka1", Collections.singletonList("events"))));
+ assertTrue(labels.containsKey(PipelineDependencyLabels.labelKey("venice1", Collections.singletonList("store"))));
+ assertTrue(labels.containsKey(PipelineDependencyLabels.labelKey("mysql1", Collections.singletonList("outbox"))));
+ }
+
+ @Test
+ void labelsForHandlesNullSink() {
+ Map labels = PipelineDependencyLabels.labelsFor(
+ Collections.singletonList(src("db", "t")), null);
+ assertEquals(1, labels.size());
+ }
+
+ @Test
+ void labelsForCollapsesSelfLoopIntoOneEntry() {
+ // Self-loop pipeline: source and sink share a slug, so the map collapses to one entry
+ // rather than producing duplicate keys.
+ Source s = src("db", "t");
+ Sink k = sink("db", "t");
+ Map labels = PipelineDependencyLabels.labelsFor(Collections.singletonList(s), k);
+ assertEquals(1, labels.size());
+ }
+
+ @Test
+ void labelValueTruncatedAtSixtyThreeChars() {
+ String longPath = "this_is_a_really_long_table_name_that_exceeds_sixty_three_chars_by_a_lot";
+ Map labels = PipelineDependencyLabels.labelsFor(
+ Collections.singletonList(src("db", longPath)), null);
+ String value = labels.values().iterator().next();
+ assertTrue(value.length() <= 63);
+ }
+
+ @Test
+ void labelValueIsKubernetesLabelValueCompliant() {
+ // K8s label values must match (([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?
+ // — the identifier separator is "_" precisely so this holds out of the box for typical
+ // (database, path) shapes seen in production.
+ Map labels = PipelineDependencyLabels.labelsFor(
+ Collections.singletonList(src("ads-database", "ADS", "PAGE_VIEWS")), null);
+ String value = labels.values().iterator().next();
+
+ assertTrue(value.length() <= 63);
+ assertTrue(value.matches("(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?"),
+ "value must satisfy K8s label-value regex, got: " + value);
+ assertFalse(value.contains("/"), "no '/' separator should leak into the label value");
+ }
+
+ @Test
+ void annotationForListsAllIdentifiers() {
+ // Sources and the sink — both edges are recorded so the delete-time check can disambiguate
+ // a real dependency from a hash collision regardless of which side matched.
+ String annotation = PipelineDependencyLabels.annotationFor(
+ Arrays.asList(src("kafka", "a"), src("venice", "b")),
+ sink("mysql", "c"));
+ assertTrue(annotation.contains("kafka_a"));
+ assertTrue(annotation.contains("venice_b"));
+ assertTrue(annotation.contains("mysql_c"));
+ }
+
+ @Test
+ void annotationForDeduplicatesAndOmitsNullSink() {
+ String annotation = PipelineDependencyLabels.annotationFor(
+ Arrays.asList(src("db", "t"), src("db", "t")), null);
+ assertEquals("db_t", annotation);
+ }
+
+ @Test
+ void parseAnnotationRoundtrip() {
+ String annotation = PipelineDependencyLabels.annotationFor(
+ Arrays.asList(src("a", "1"), src("b", "2")), sink("c", "3"));
+ Set parsed = PipelineDependencyLabels.parseAnnotation(annotation);
+ assertEquals(3, parsed.size());
+ assertTrue(parsed.contains("a_1"));
+ assertTrue(parsed.contains("b_2"));
+ assertTrue(parsed.contains("c_3"));
+ }
+
+ @Test
+ void parseAnnotationHandlesNullAndEmpty() {
+ assertTrue(PipelineDependencyLabels.parseAnnotation(null).isEmpty());
+ assertTrue(PipelineDependencyLabels.parseAnnotation("").isEmpty());
+ assertTrue(PipelineDependencyLabels.parseAnnotation(" , ").isEmpty());
+ }
+}
diff --git a/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/KafkaDeployer.java b/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/KafkaDeployer.java
index 6b9e8fc5..3a269d1e 100644
--- a/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/KafkaDeployer.java
+++ b/hoptimator-kafka/src/main/java/com/linkedin/hoptimator/kafka/KafkaDeployer.java
@@ -19,6 +19,7 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
@@ -53,7 +54,7 @@ public KafkaDeployer(Source source, Properties properties) {
}
@Override
- public void validate(Validator.Issues issues) {
+ public void validate(Validator.Issues issues, Connection connection) {
String topicName = source.table();
// null default = option was not specified by user, skip validation for that option
Integer partitions = DeployerUtils.parseIntOption(source.options(), "partitions", null);
diff --git a/hoptimator-kafka/src/test/java/com/linkedin/hoptimator/kafka/KafkaDeployerTest.java b/hoptimator-kafka/src/test/java/com/linkedin/hoptimator/kafka/KafkaDeployerTest.java
index 8758bcbf..42848533 100644
--- a/hoptimator-kafka/src/test/java/com/linkedin/hoptimator/kafka/KafkaDeployerTest.java
+++ b/hoptimator-kafka/src/test/java/com/linkedin/hoptimator/kafka/KafkaDeployerTest.java
@@ -568,7 +568,7 @@ private TopicDescription mockTopicWithPartitions(int numPartitions) {
private Validator.Issues collectIssues(KafkaDeployer deployer) {
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
return issues;
}
diff --git a/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id b/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id
index 32efc51a..38304532 100644
--- a/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id
+++ b/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id
@@ -84,7 +84,41 @@ spec:
segment.bytes: 1073741824
!specify create-table-test
-# Clean up - drop table
+# ─────────────────────────────────────────────────────────────────────────────
+# Dependency guard: a Kafka topic referenced by an MV's pipeline — as either a
+# source or a sink — cannot be dropped until the dependent MV is removed.
+# Exercises the label-based check that protects Kafka topic drops that would
+# orphan an active downstream pipeline.
+#
+# A single MV exercises both edges. The MV is a partial view
+# (KAFKA."create-table-test$guard") whose implicit sink falls back to
+# create-table-test, and reads from existing-topic-2.
+# ─────────────────────────────────────────────────────────────────────────────
+
+# Pipeline reads from existing-topic-2 (source) and writes to create-table-test
+# (partial-view sink). Its Pipeline CRD gets a depends-on-* label per edge.
+create or replace materialized view KAFKA."create-table-test$guard" as select * from KAFKA."existing-topic-2";
+(0 rows modified)
+
+!update
+
+# Source-side guard: existing-topic-2 is a source of the MV's pipeline.
+drop table KAFKA."existing-topic-2";
+active pipeline(s) depend on it
+!error
+
+# Sink-side guard: create-table-test is the partial-view sink of the MV's pipeline.
+drop table KAFKA."create-table-test";
+active pipeline(s) depend on it
+!error
+
+# Drop the dependent MV first; its pipeline (and both labels) go away.
+drop materialized view KAFKA."create-table-test$guard";
+(0 rows modified)
+
+!update
+
+# Now sink drop should succeed
drop table KAFKA."create-table-test";
(0 rows modified)
diff --git a/hoptimator-logical/src/main/java/com/linkedin/hoptimator/logical/LogicalTableDeployer.java b/hoptimator-logical/src/main/java/com/linkedin/hoptimator/logical/LogicalTableDeployer.java
index 59258791..dbaf7ba3 100644
--- a/hoptimator-logical/src/main/java/com/linkedin/hoptimator/logical/LogicalTableDeployer.java
+++ b/hoptimator-logical/src/main/java/com/linkedin/hoptimator/logical/LogicalTableDeployer.java
@@ -1,7 +1,7 @@
package com.linkedin.hoptimator.logical;
+import java.sql.Connection;
import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLNonTransientException;
import java.util.ArrayList;
import java.util.Collection;
@@ -27,6 +27,7 @@
import io.kubernetes.client.openapi.models.V1OwnerReference;
import com.linkedin.hoptimator.Deployer;
+import com.linkedin.hoptimator.PendingDelete;
import com.linkedin.hoptimator.Trigger;
import com.linkedin.hoptimator.UserJob;
import com.linkedin.hoptimator.Validated;
@@ -47,6 +48,7 @@
import com.linkedin.hoptimator.jdbc.HoptimatorConnection;
import com.linkedin.hoptimator.jdbc.HoptimatorDriver;
import com.linkedin.hoptimator.jdbc.HoptimatorDdlUtils;
+import com.linkedin.hoptimator.jdbc.ValidationService;
/**
@@ -86,7 +88,7 @@ public class LogicalTableDeployer implements Deployer, Validated {
this(source, tierProps, context, new K8sApi<>(context, K8sApiEndpoints.DATABASES));
}
- /** Package-private constructor for testing — accepts an injectable database API. */
+ /** Package-private constructor for testing — accepts an injectable Database K8s API. */
LogicalTableDeployer(Source source, Properties tierProps, K8sContext context,
K8sApi databasesApi) {
this.source = source;
@@ -113,7 +115,7 @@ K8sLogicalTableDeployer createLogicalTableDeployer(
* Called by {@link com.linkedin.hoptimator.jdbc.ValidationService} before deployment.
*/
@Override
- public void validate(Validator.Issues issues) {
+ public void validate(Validator.Issues issues, Connection connection) {
try {
// Pre-register the row type in tier schemas so deployers (e.g. VeniceDeployer) can
// call HoptimatorDriver.rowType() during their own validate() calls.
@@ -124,7 +126,7 @@ public void validate(Validator.Issues issues) {
Collection deployers = DeploymentService.deployers(tierSource, context.connection());
for (Deployer deployer : deployers) {
if (deployer instanceof Validated) {
- ((Validated) deployer).validate(issues);
+ ((Validated) deployer).validate(issues, connection);
}
}
}
@@ -206,15 +208,58 @@ private void deployAll(boolean update) throws SQLException {
}
}
+ /**
+ * Deletes a logical table.
+ *
+ * A logical DROP is structurally equivalent to running DROP TABLE on each tier plus
+ * deleting the LogicalTable CRD. We mirror that shape exactly: each tier goes through the
+ * same {@code validateOrThrow → DeploymentService.delete} pipeline a standalone DROP would.
+ * The {@link PendingDelete}'s {@code (selfOwnerKind, selfOwnerName)} pair identifies the
+ * LogicalTable CRD so the implicit inter-tier pipelines (owned by the CRD, cascade-deleted
+ * with it) are excluded from the dependent set — only external pipelines block.
+ *
+ *
+ * Per-tier dep check via the validator framework. Any active external pipeline blocks.
+ * Delete the {@code LogicalTable} CRD. K8s owner-ref cascade removes its implicit
+ * inter-tier Pipelines and their Flink/YAML children. Must succeed .
+ * Per-tier physical cleanup (Kafka topic, Venice store, ...). Best effort — a
+ * stranded tier resource is recoverable; aborting mid-DROP isn't.
+ * Per-tier schema cleanup (deregister the {@code TemporaryTable} in tier schemas).
+ *
+ */
@Override
public void delete() throws SQLException {
- // TODO: Implement safe logical table deletion.
- // Deletion is blocked until we can verify no active pipelines depend on this table.
- // See: LogicalTableDeployer.delete()
- throw new SQLFeatureNotSupportedException(
- "Logical table deletion is not yet supported. "
- + "Cannot safely delete physical tier resources without verifying no active pipelines "
- + "depend on this table.");
+ Map tierSources = buildTierSources();
+ HoptimatorConnection conn = context.connection();
+ String selfName = K8sUtils.canonicalizeName(source.path());
+
+ // 1. Per-tier pre-flight dep check.
+ for (Source tierSource : tierSources.values()) {
+ ValidationService.validateOrThrow(
+ new PendingDelete<>(tierSource, "LogicalTable", selfName), conn);
+ }
+
+ // 2. Delete the LogicalTable CRD (cascades owned pipelines/triggers).
+ createLogicalTableDeployer(selfName, source.database(), buildTierMap()).delete();
+
+ // 3. Per-tier physical cleanup. Best-effort: only deregister a tier's schema entry when its
+ // physical delete succeeded; failed tiers keep their entries so the user can retry.
+ for (Source tierSource : tierSources.values()) {
+ boolean tierSucceeded = true;
+ for (Deployer deployer : DeploymentService.deployers(tierSource, conn)) {
+ try {
+ deployer.delete();
+ } catch (Exception e) {
+ tierSucceeded = false;
+ log.warn("Tier cleanup failed for {} (continuing): {}",
+ tierSource.pathString(), e.getMessage(), e);
+ }
+ }
+ if (tierSucceeded) {
+ HoptimatorDdlUtils.removeTableFromSchema(conn,
+ tierSource.catalog(), tierSource.schema(), tierSource.table());
+ }
+ }
}
@Override
@@ -445,7 +490,8 @@ void deployPipelineBundle(String fromTier, String toTier, Map hoptimatorDriverMock;
+ @Mock
+ MockedStatic validationServiceMock;
+
@Mock
K8sLogicalTableDeployer mockCrdDeployer;
@@ -108,9 +117,9 @@ private static Properties twoTierProps(String nearlineDb, String offlineDb) {
private static K8sContext mockContext() {
K8sContext ctx = mock(K8sContext.class);
- when(ctx.namespace()).thenReturn("default");
- when(ctx.withOwner(any())).thenReturn(ctx);
- when(ctx.withLabel(anyString(), anyString())).thenReturn(ctx);
+ lenient().when(ctx.namespace()).thenReturn("default");
+ lenient().when(ctx.withOwner(any())).thenReturn(ctx);
+ lenient().when(ctx.withLabel(anyString(), anyString())).thenReturn(ctx);
return ctx;
}
@@ -228,17 +237,148 @@ void pipelineNameNearlineToOnline() {
LogicalTableDeployer.pipelineName("events", "nearline", "online"));
}
- // delete() / specify() tests
+ // delete() / DependencyGuarded / specify() tests
+
+ /** Builds a 2-tier deployer with mocked CRD deployer and a pre-populated fake Database API. */
+ private LogicalTableDeployer deployerWithApis(Properties props, List dbs) {
+ FakeK8sApi dbApi = new FakeK8sApi<>(new ArrayList<>(dbs));
+ return new LogicalTableDeployer(testSource(), props, mockContext(), dbApi) {
+ @Override
+ K8sLogicalTableDeployer createLogicalTableDeployer(
+ String crdName, String databaseLabel, Map tierMap) {
+ return mockCrdDeployer;
+ }
+ };
+ }
@Test
- void deleteThrowsSQLFeatureNotSupportedException() {
- Properties props = new Properties();
- props.setProperty("nearline", "kafka-db");
- props.setProperty("online", "venice-db");
+ void deleteThrowsWhenCrdDeleteFails() throws SQLException {
+ LogicalTableDeployer deployer = deployerWithApis(
+ twoTierProps("kafka-db", "venice-db"),
+ Arrays.asList(makeDb("kafka-db", "KAFKA"), makeDb("venice-db", "VENICE")));
+
+ doThrow(new SQLException("crd gone")).when(mockCrdDeployer).delete();
+
+ SQLException ex = assertThrows(SQLException.class, deployer::delete);
+ assertTrue(ex.getMessage().contains("crd gone"));
+ verify(mockCrdDeployer).delete();
+ // Tier deployers must not run when the CRD delete itself fails.
+ deploymentServiceMock.verify(
+ () -> DeploymentService.deployers(any(), any()), never());
+ }
+
+ @Test
+ void deleteSwallowsTierDeleteFailuresAndContinues() throws SQLException {
+ LogicalTableDeployer deployer = deployerWithApis(
+ twoTierProps("kafka-db", "venice-db"),
+ Arrays.asList(makeDb("kafka-db", "KAFKA"), makeDb("venice-db", "VENICE")));
+
+ // Two tier deployers: first throws, second succeeds. The overall delete must return cleanly.
+ Deployer failing = mock(Deployer.class);
+ Deployer succeeding = mock(Deployer.class);
+ doThrow(new SQLException("kafka delete failed")).when(failing).delete();
+
+ deploymentServiceMock.when(() -> DeploymentService.deployers(any(), any()))
+ .thenReturn(Collections.singletonList(failing), Collections.singletonList(succeeding));
+
+ // Must NOT throw despite the failing tier.
+ deployer.delete();
+
+ verify(mockCrdDeployer).delete();
+ verify(failing).delete();
+ verify(succeeding).delete();
+ }
+
+ /**
+ * Executes {@code body} with {@link HoptimatorDdlUtils} statics stubbed, then gives the
+ * supplied verifier a handle on the mock to assert side effects.
+ *
+ * Uses try-with-resources rather than the project-standard {@code @Mock MockedStatic}
+ * field because sibling tests in this class rely on the real static methods of
+ * {@code HoptimatorDdlUtils} (via {@code ensureTierRowTypesRegistered} and friends); a
+ * class-level mock would intercept them and break unrelated tests.
+ */
+ private void withMockedDdlUtils(Runnable body, Consumer> verifier) {
+ try (MockedStatic utilsMock = mockStatic(HoptimatorDdlUtils.class)) {
+ body.run();
+ verifier.accept(utilsMock);
+ }
+ }
+
+ @Test
+ void deleteRemovesTierEntriesFromConnectionSchema() throws SQLException {
+ LogicalTableDeployer deployer = deployerWithApis(
+ twoTierProps("kafka-db", "venice-db"),
+ Arrays.asList(makeDb("kafka-db", "KAFKA"), makeDb("venice-db", "VENICE")));
+
+ Deployer tierDeployer = mock(Deployer.class);
+ deploymentServiceMock.when(() -> DeploymentService.deployers(any(), any()))
+ .thenReturn(Collections.singletonList(tierDeployer));
+
+ withMockedDdlUtils(() -> {
+ try {
+ deployer.delete();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }, utilsMock -> {
+ // Inverse of the registerTemporaryTableInSchema calls that ran at create time — one
+ // call per tier source (null catalog, non-null schema = tier "KAFKA" / "VENICE").
+ utilsMock.verify(() -> HoptimatorDdlUtils.removeTableFromSchema(
+ any(), any(), eq("KAFKA"), any()));
+ utilsMock.verify(() -> HoptimatorDdlUtils.removeTableFromSchema(
+ any(), any(), eq("VENICE"), any()));
+ });
+ }
+
+ @Test
+ void deleteKeepsSchemaEntryForTierWhoseDeleteFailed() throws SQLException {
+ // Two tiers: the first (kafka-db → KAFKA) fails to delete; the second succeeds.
+ // The failed tier's schema entry must NOT be removed; the succeeded tier's must be.
+ LogicalTableDeployer deployer = deployerWithApis(
+ twoTierProps("kafka-db", "venice-db"),
+ Arrays.asList(makeDb("kafka-db", "KAFKA"), makeDb("venice-db", "VENICE")));
+
+ Deployer failingTier = mock(Deployer.class);
+ Deployer succeedingTier = mock(Deployer.class);
+ doThrow(new SQLException("kafka delete failed")).when(failingTier).delete();
+
+ // DeploymentService.deployers is invoked once per tier — order follows tierSources.
+ deploymentServiceMock.when(() -> DeploymentService.deployers(any(), any()))
+ .thenReturn(Collections.singletonList(failingTier),
+ Collections.singletonList(succeedingTier));
+
+ withMockedDdlUtils(() -> {
+ try {
+ deployer.delete();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }, utilsMock -> {
+ // KAFKA failed → its schema entry must NOT be removed.
+ utilsMock.verify(() -> HoptimatorDdlUtils.removeTableFromSchema(
+ any(), any(), eq("KAFKA"), any()), never());
+ // VENICE succeeded → its schema entry is removed.
+ utilsMock.verify(() -> HoptimatorDdlUtils.removeTableFromSchema(
+ any(), any(), eq("VENICE"), any()));
+ });
+ }
+
+ @Test
+ void deleteRunsCrdDeleteBeforeTierDeletes() throws SQLException {
+ LogicalTableDeployer deployer = deployerWithApis(
+ twoTierProps("kafka-db", "venice-db"),
+ Arrays.asList(makeDb("kafka-db", "KAFKA"), makeDb("venice-db", "VENICE")));
+
+ Deployer tierDeployer = mock(Deployer.class);
+ deploymentServiceMock.when(() -> DeploymentService.deployers(any(), any()))
+ .thenReturn(Collections.singletonList(tierDeployer));
+
+ deployer.delete();
- LogicalTableDeployer deployer = new LogicalTableDeployer(makeSource("mydb", "myTable"), props, null);
- SQLFeatureNotSupportedException e = assertThrows(SQLFeatureNotSupportedException.class, deployer::delete);
- assertTrue(e.getMessage().contains("Logical table deletion is not yet supported"));
+ InOrder inOrder = inOrder(mockCrdDeployer, tierDeployer);
+ inOrder.verify(mockCrdDeployer).delete();
+ inOrder.verify(tierDeployer, atLeastOnce()).delete();
}
// CRD model construction tests
@@ -516,7 +656,7 @@ void validateSucceedsWithValidTierConfiguration() throws Exception {
Validator.Issues issues = new Validator.Issues("test");
new LogicalTableDeployer(
testSource(), twoTierProps("nearline-db", "offline-db"), mockContext(), dbApi)
- .validate(issues);
+ .validate(issues, null);
assertTrue(issues.valid());
}
@@ -527,7 +667,7 @@ void validateReportsIssueWhenDatabaseNotFound() throws Exception {
new LogicalTableDeployer(
testSource(), twoTierProps("missing-db", "also-missing"),
mockContext(), new FakeK8sApi<>(new ArrayList<>()))
- .validate(issues);
+ .validate(issues, null);
assertFalse(issues.valid());
}
@@ -548,9 +688,9 @@ void validateCallsValidatedDeployersWhenTiersExist() throws Exception {
Validator.Issues issues = new Validator.Issues("test");
new LogicalTableDeployer(
makeSource("logical", "testevent"), oneTierProps, ctx, dbApi)
- .validate(issues);
+ .validate(issues, null);
- verify(mockValidatedDeployer).validate(issues);
+ verify(mockValidatedDeployer).validate(issues, null);
assertTrue(issues.valid());
}
@@ -594,7 +734,7 @@ void ensureTierRowTypesRegisteredWithConnectionRecordsRowTypeError() throws Exce
Validator.Issues issues = new Validator.Issues("test");
new LogicalTableDeployer(
makeSource("logical", "testevent"), oneTierProps, ctx, dbApi)
- .validate(issues);
+ .validate(issues, null);
assertFalse(issues.valid());
}
diff --git a/hoptimator-logical/src/test/resources/logical-ddl.id b/hoptimator-logical/src/test/resources/logical-ddl.id
index 7284ae72..a8ad290d 100644
--- a/hoptimator-logical/src/test/resources/logical-ddl.id
+++ b/hoptimator-logical/src/test/resources/logical-ddl.id
@@ -168,14 +168,7 @@ Failed to generate key schema for Venice store testevent
!describe "VENICE"."testevent"
# ─────────────────────────────────────────────────────────────────────────────
-# Test 7: DROP TABLE — disabled; should fail with helpful error message
-# ─────────────────────────────────────────────────────────────────────────────
-drop table "LOGICAL"."testevent";
-Logical table deletion is not yet supported
-!error
-
-# ─────────────────────────────────────────────────────────────────────────────
-# Test 8: CREATE TABLE against non-existent schema is rejected
+# Test 7: CREATE TABLE against non-existent schema is rejected
# ─────────────────────────────────────────────────────────────────────────────
create table "LOGICAL-NONEXISTENT"."testevent" ("KEY" varchar, "id" bigint);
Schema for LOGICAL-NONEXISTENT.testevent not found.
@@ -220,8 +213,63 @@ spec:
!specify create-table-test
# ─────────────────────────────────────────────────────────────────────────────
-# Clean up (deletion not yet supported — verified in Test 6 above)
+# Test 9: Dependency guard — An underlying tier cannot be dropped if there is
+# an active logical table. Likewise, a MV created against any tier will block
+# the composite logical table drop
# ─────────────────────────────────────────────────────────────────────────────
-drop table "LOGICAL"."pageview";
-Logical table deletion is not yet supported
+
+# Drop is blocked — logical table inner pipeline depends on KAFKA/testevent
+drop table "KAFKA"."testevent";
+active pipeline(s) depend on it
!error
+
+create or replace materialized view VENICE."testevent$guard" as select "KEY" as "KEY", "VALUE" as "memberId" from KAFKA."existing-topic-1";
+(0 rows modified)
+
+!update
+
+# Drop is blocked — testevent$guard's pipeline depends on VENICE/testevent,
+# which is LOGICAL.testevent's online tier.
+drop table "LOGICAL"."testevent";
+active pipeline(s) depend on it
+!error
+
+# Drop the dependent MV first to release the label.
+drop materialized view VENICE."testevent$guard";
+(0 rows modified)
+
+!update
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Test 10: DROP TABLE — cascades to the LogicalTable CRD and its implicit
+# inter-tier pipeline; tier resources are best-effort cleaned up afterward.
+# ─────────────────────────────────────────────────────────────────────────────
+drop table "LOGICAL"."testevent";
+(0 rows modified)
+
+!update
+
+# The owner-ref cascade removes the implicit nearline→online pipeline.
+select name from "k8s".pipelines where name = 'logical-testevent-nearline-to-online';
++------+
+| NAME |
++------+
++------+
+(0 rows)
+
+!ok
+
+drop table "LOGICAL"."pageview";
+(0 rows modified)
+
+!update
+
+# Verify the pageview pipeline is gone too.
+select name from "k8s".pipelines where name = 'logical-pageview-nearline-to-online';
++------+
+| NAME |
++------+
++------+
+(0 rows)
+
+!ok
diff --git a/hoptimator-logical/src/test/resources/logical-offline-ddl.id b/hoptimator-logical/src/test/resources/logical-offline-ddl.id
index 82fcd538..9e154ba2 100644
--- a/hoptimator-logical/src/test/resources/logical-offline-ddl.id
+++ b/hoptimator-logical/src/test/resources/logical-offline-ddl.id
@@ -77,3 +77,8 @@ select paused from "k8s".table_triggers where name = 'logical-members-offline-tr
(1 row)
!ok
+
+drop table "LOGICAL_OFFLINE"."MEMBERS";
+(0 rows modified)
+
+!update
\ No newline at end of file
diff --git a/hoptimator-mysql/src/main/java/com/linkedin/hoptimator/mysql/MySqlDeployer.java b/hoptimator-mysql/src/main/java/com/linkedin/hoptimator/mysql/MySqlDeployer.java
index c2d996d4..227600bb 100644
--- a/hoptimator-mysql/src/main/java/com/linkedin/hoptimator/mysql/MySqlDeployer.java
+++ b/hoptimator-mysql/src/main/java/com/linkedin/hoptimator/mysql/MySqlDeployer.java
@@ -31,8 +31,8 @@
/**
* Deployer for MySQL tables. Creates tables in the synchronous DDL hot path.
*
- * Implements {@link Validated} to pre-check table constraints
- * before any deployment side effects.
+ *
Implements {@link Validated} to pre-check table constraints before any deployment side
+ * effects.
*/
public class MySqlDeployer implements Deployer, Validated {
@@ -99,7 +99,7 @@ private String toMySqlType(RelDataTypeField field) {
}
@Override
- public void validate(Validator.Issues issues) {
+ public void validate(Validator.Issues issues, Connection connection) {
String tableName = source.table();
String database = source.schema();
@@ -170,11 +170,11 @@ public void validate(Validator.Issues issues) {
if (columnName.equals(keyField)) {
String newType = toMySqlType(field);
String existingType = existingCol.type;
-
+
// Normalize types for comparison (remove size for basic comparison)
String normalizedNew = newType.replaceAll("\\(.*?\\)", "");
String normalizedExisting = existingType.replaceAll("\\(.*?\\)", "");
-
+
if (!normalizedNew.equalsIgnoreCase(normalizedExisting)) {
issues.error("Cannot modify KEY field type for table " + tableName
+ ". KEY field '" + keyField + "' has existing type " + existingType
diff --git a/hoptimator-mysql/src/test/java/com/linkedin/hoptimator/mysql/MySqlDeployerTest.java b/hoptimator-mysql/src/test/java/com/linkedin/hoptimator/mysql/MySqlDeployerTest.java
index 660cabac..08f352cf 100644
--- a/hoptimator-mysql/src/test/java/com/linkedin/hoptimator/mysql/MySqlDeployerTest.java
+++ b/hoptimator-mysql/src/test/java/com/linkedin/hoptimator/mysql/MySqlDeployerTest.java
@@ -1,10 +1,10 @@
package com.linkedin.hoptimator.mysql;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.Validator;
import com.linkedin.hoptimator.jdbc.HoptimatorConnection;
import com.linkedin.hoptimator.jdbc.HoptimatorDriver;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
@@ -116,7 +116,7 @@ private MySqlDeployer createDeployer(Source source) {
private Validator.Issues collectIssues(MySqlDeployer deployer) {
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
return issues;
}
@@ -406,7 +406,7 @@ void testValidateFailsWithNullDatabase() {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Database & table names are required"));
@@ -418,7 +418,7 @@ void testValidateFailsWithInvalidDatabaseName() {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Invalid database name"));
@@ -430,7 +430,7 @@ void testValidateFailsWithInvalidTableName() {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Invalid table name"));
@@ -442,7 +442,7 @@ void testValidateFailsWithEmptyDatabaseName() {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Invalid database name"));
@@ -469,7 +469,7 @@ void testValidateFailsNoKeyFields() throws SQLException {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("No KEY_ fields found"));
@@ -504,7 +504,7 @@ void testValidateFailsWhenPrimaryKeysChange() throws SQLException {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Cannot modify KEY fields"));
@@ -531,7 +531,7 @@ void testValidateFailsWithInvalidColumnName() throws SQLException {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Invalid column name"));
@@ -547,7 +547,7 @@ void testValidateFailsWhenRowTypeThrowsException() throws SQLException {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Failed to get schema for table"));
@@ -568,7 +568,7 @@ void testValidatePassesWithMaxLength64Identifier() throws SQLException {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertTrue(issues.valid(), "Expected 64-char identifier to be valid");
}
@@ -581,7 +581,7 @@ void testValidateFailsWithTooLongIdentifier() {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid(), "Expected 65-char identifier to be invalid");
assertTrue(issues.toString().contains("Invalid table name"),
@@ -601,7 +601,7 @@ void testValidatePassesWhenAllConditionsGood() throws SQLException {
MySqlDeployer deployer = new MySqlDeployer(source, PROPERTIES, mockHoptimatorConnection);
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertTrue(issues.valid(), "Expected no errors for valid new table, got: " + issues);
}
@@ -1311,4 +1311,5 @@ void testEnsureDatabaseExistsSqlContainsCreateDatabase() throws Exception {
assertTrue(createDbSql.contains("`test_db`"),
"Expected backtick-escaped db name in CREATE DATABASE SQL, got: " + createDbSql);
}
+
}
diff --git a/hoptimator-mysql/src/test/java/com/linkedin/hoptimator/mysql/MySqlDriverTest.java b/hoptimator-mysql/src/test/java/com/linkedin/hoptimator/mysql/MySqlDriverTest.java
index 8f434bf4..ad9534c1 100644
--- a/hoptimator-mysql/src/test/java/com/linkedin/hoptimator/mysql/MySqlDriverTest.java
+++ b/hoptimator-mysql/src/test/java/com/linkedin/hoptimator/mysql/MySqlDriverTest.java
@@ -1,6 +1,5 @@
package com.linkedin.hoptimator.mysql;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.impl.AbstractSchema;
@@ -25,8 +24,6 @@
@ExtendWith(MockitoExtension.class)
-@SuppressFBWarnings(value = {"OBL_UNSATISFIED_OBLIGATION", "ODR_OPEN_DATABASE_RESOURCE", "DMI_EMPTY_DB_PASSWORD"},
- justification = "Mock objects do not hold real resources")
class MySqlDriverTest {
/** Returns a driver whose {@code createMySqlRootSchema()} yields a no-op schema. */
diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java
index 0220f98b..554b71df 100644
--- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java
+++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java
@@ -23,6 +23,7 @@
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
+
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
@@ -181,7 +182,6 @@ void parseHintsMixedEncodedAndNonEncoded() {
assertEquals("test", mixed.get("another"));
}
-
@Test
void testCreateCallsCreateOnAllDeployers() throws SQLException {
List deployers = Arrays.asList(mockDeployer1, mockDeployer2);
diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDeployer.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDeployer.java
index 28f34361..ee61a1b3 100644
--- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDeployer.java
+++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDeployer.java
@@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.Collections;
@@ -55,7 +56,7 @@ public VeniceDeployer(Source source, Properties properties, HoptimatorConnection
}
@Override
- public void validate(Validator.Issues issues) {
+ public void validate(Validator.Issues issues, Connection connection) {
String storeName = source.table();
// Validate Venice configuration
diff --git a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java
index f033b5d1..c5b62272 100644
--- a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java
+++ b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/ClusterSchemaTest.java
@@ -6,7 +6,6 @@
import com.linkedin.venice.controllerapi.MultiStoreResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.security.SSLFactory;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;
@@ -31,8 +30,6 @@
@ExtendWith(MockitoExtension.class)
-@SuppressFBWarnings(value = {"OBL_UNSATISFIED_OBLIGATION", "ODR_OPEN_DATABASE_RESOURCE"},
- justification = "Mock objects created in stubbing setup don't need resource management")
class ClusterSchemaTest {
@Mock
@@ -197,7 +194,7 @@ void testCreateControllerClientWithNonLocalhostUrl() {
@Override
protected ControllerClient createControllerClient(String cluster, Optional sslFactory) {
// verify the non-localhost branch would be reached (url does not contain localhost)
- assertTrue(!properties.getProperty("router.url").contains("localhost"));
+ assertFalse(properties.getProperty("router.url").contains("localhost"));
return mockControllerClient;
}
};
diff --git a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceDeployerTest.java b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceDeployerTest.java
index b364cc29..9a16cea9 100644
--- a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceDeployerTest.java
+++ b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/VeniceDeployerTest.java
@@ -10,7 +10,6 @@
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.meta.StoreInfo;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.avro.Schema;
import org.apache.calcite.util.Pair;
import org.junit.jupiter.api.BeforeEach;
@@ -40,8 +39,6 @@
* Tests for VeniceDeployer using mocks.
*/
@ExtendWith(MockitoExtension.class)
-@SuppressFBWarnings(value = {"OBL_UNSATISFIED_OBLIGATION", "ODR_OPEN_DATABASE_RESOURCE"},
- justification = "Mock objects created in stubbing setup don't need resource management")
class VeniceDeployerTest {
private static final String TEST_STORE = "test_store";
@@ -315,7 +312,7 @@ protected Pair getKeyPayloadSchema() throws SQLException {
};
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertTrue(issues.valid(), "Expected no validation errors for new store. Issues: " + issues);
}
@@ -348,7 +345,7 @@ protected Pair getKeyPayloadSchema() throws SQLException {
};
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertTrue(issues.valid(), "Expected no validation errors when key schema unchanged. Issues: " + issues);
}
@@ -381,7 +378,7 @@ protected Pair getKeyPayloadSchema() throws SQLException {
};
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid(), "Expected validation error for key schema change");
assertTrue(issues.toString().contains("Key schema evolution is not supported"),
@@ -400,7 +397,7 @@ protected Pair getKeyPayloadSchema() {
};
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Failed to generate key schema"));
@@ -418,7 +415,7 @@ protected Pair getKeyPayloadSchema() {
};
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Failed to generate value schema"));
@@ -436,10 +433,11 @@ protected Pair getKeyPayloadSchema() {
};
Validator.Issues issues = new Validator.Issues("test");
- deployer.validate(issues);
+ deployer.validate(issues, null);
assertFalse(issues.valid());
assertTrue(issues.toString().contains("Failed to generate key schema"));
assertTrue(issues.toString().contains("Failed to generate value schema"));
}
+
}