Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
package com.linkedin.hoptimator;

import java.util.List;
import javax.annotation.Nullable;
import java.util.Map;


public class Trigger implements Deployable {

public static final String PAUSED_OPTION = "paused";

private final String name;
private final UserJob job;
private final List<String> path;
private final String cronSchedule;
private final Map<String, String> options;
private final Source source;
private final Sink sink;

public Trigger(String name, UserJob job, List<String> path, String cronSchedule,
Map<String, String> options) {
/**
* Contains an optional downstream sink for triggers that operate between a source
* sink (think ETL/rETL).
* TODO: need to collapse the "job.properties.online.table.name" logic into a sink for adhoc triggers
*/
public Trigger(String name, UserJob job, String cronSchedule, Map<String, String> options,
Source source, @Nullable Sink sink) {
this.name = name;
this.job = job;
this.path = path;
this.cronSchedule = cronSchedule;
this.options = options;
this.source = source;
this.sink = sink;
}

public String name() {
return name;
}

public List<String> path() {
return path;
}

public UserJob job() {
return job;
}
Expand All @@ -38,27 +42,24 @@ public String cronSchedule() {
return cronSchedule;
}

public String table() {
return path.get(path.size() - 1);
}

/**
* Returns the schema name if present.
*/
public String schema() {
return path.size() >= 2 ? path.get(path.size() - 2) : null;
}

public Map<String, String> options() {
return options;
}

private String pathString() {
return String.join(".", path);
/** Upstream source the trigger fires on, or {@code null} when only the name is known
* (e.g. during DROP TRIGGER / PAUSE / RESUME, which only need to look up the existing CRD). */
public Source source() {
return source;
}

/** Downstream sink the trigger's job writes to, or {@code null} when the trigger has no declared sink. */
public @Nullable Sink sink() {
return sink;
}

@Override
public String toString() {
return "Trigger[" + name() + ", " + pathString() + "]";
String path = source == null ? "<unbound>" : String.join(".", source.path());
return "Trigger[" + name + ", " + path + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void toStringOmitsSelfOwnerWhenNull() {
@Test
void toStringOmitsSelfOwnerWhenOnlyOneFieldSet() {
// The toString contract says self= appears only when both kind and name are non-null.
// (The K8sPipelineDependencyChecker.isSelfOwned guard also requires both.)
// (The DependencyChecker.isSelfOwned guard also requires both.)
PendingDelete<String> kindOnly = new PendingDelete<>("t", "LogicalTable", null);
assertFalse(kindOnly.toString().contains("self="));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;


Expand All @@ -15,26 +17,37 @@ class TriggerTest {
void testAccessors() {
UserJob userJob = new UserJob("ns", "jobName");
Map<String, String> options = Map.of("paused", "true");
Trigger trigger = new Trigger("myTrigger", userJob, List.of("schema", "table"), "0 * * * *", options);
Source source = new Source("db", List.of("schema", "table"), Collections.emptyMap());
Trigger trigger = new Trigger("myTrigger", userJob, "0 * * * *", options, source, null);

assertEquals("myTrigger", trigger.name());
assertEquals(userJob, trigger.job());
assertEquals(List.of("schema", "table"), trigger.path());
assertEquals("0 * * * *", trigger.cronSchedule());
assertEquals(options, trigger.options());
assertEquals("table", trigger.table());
assertEquals("schema", trigger.schema());
assertNotNull(trigger.source());
assertEquals(List.of("schema", "table"), trigger.source().path());
assertEquals("table", trigger.source().table());
assertEquals("schema", trigger.source().schema());
assertNull(trigger.sink());
}

@Test
void testSchemaReturnsNullForSingleElementPath() {
Trigger trigger = new Trigger("t", null, List.of("table"), null, Map.of());
assertNull(trigger.schema());
void testNullSourceAccessor() {
Trigger trigger = new Trigger("t", null, null, Map.of(), null, null);
assertNull(trigger.source());
assertNull(trigger.sink());
}

@Test
void testToString() {
Trigger trigger = new Trigger("myTrig", null, List.of("a", "b"), null, Map.of());
Source source = new Source("db", List.of("a", "b"), Collections.emptyMap());
Trigger trigger = new Trigger("myTrig", null, null, Map.of(), source, null);
assertEquals("Trigger[myTrig, a.b]", trigger.toString());
}

@Test
void testToStringWithoutSource() {
Trigger trigger = new Trigger("myTrig", null, null, Map.of(), null, null);
assertEquals("Trigger[myTrig, <unbound>]", trigger.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ public void execute(SqlCreateTrigger create, CalcitePrepare.Context context) {
String cronSchedule = create.cron != null
? ((SqlLiteral) create.cron).getValueAs(String.class) : null;
UserJob job = new UserJob(jobNamespace, jobName);
Trigger trigger = new Trigger(name, job, targetPath, cronSchedule, options);
Source source = new Source(databaseOf(target), targetPath, Collections.emptyMap());
Trigger trigger = new Trigger(name, job, cronSchedule, options, source, null);

Collection<Deployer> deployers = null;
try {
Expand All @@ -255,6 +256,23 @@ public void execute(SqlCreateTrigger create, CalcitePrepare.Context context) {
}
}

/**
* Best-effort lookup of the database name that owns a Calcite Table. Mirrors what the
* {@code DROP TABLE} path does so triggers and tables agree on the same
* {@code (database, path)} identifier.
*/
private static String databaseOf(Table target) {
if (target instanceof HoptimatorJdbcTable) {
HoptimatorJdbcSchema jdbcSchema =
(HoptimatorJdbcSchema) ((HoptimatorJdbcTable) target).jdbcTable().jdbcSchema;
return jdbcSchema.databaseName();
}
if (target instanceof TemporaryTable) {
return ((TemporaryTable) target).databaseName();
}
return null;
}

// N.B. originally copy-pasted from Apache Calcite

/** Executes a {@code CREATE TABLE} command. */
Expand Down Expand Up @@ -307,7 +325,7 @@ public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) {
}
String name = drop.name.names.get(0);

Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, new HashMap<>());
Trigger trigger = new Trigger(name, null, null, new HashMap<>(), null, null);

Collection<Deployer> deployers = null;
try {
Expand Down Expand Up @@ -344,7 +362,7 @@ private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName

Map<String, String> options = new HashMap<>();
options.put(Trigger.PAUSED_OPTION, String.valueOf(paused));
Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, options);
Trigger trigger = new Trigger(name, null, null, options, null, null);

Collection<Deployer> deployers = null;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.linkedin.hoptimator.k8s;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1OwnerReference;

import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList;

import javax.annotation.Nullable;


/**
* Checks whether any Pipeline or TableTrigger CRDs still depend on a resource a
* {@link com.linkedin.hoptimator.Deployer} is about to delete.
*
* <p>Both CRDs carry the same {@code depends-on-<slug>} label and {@code depends-on-sources}/
* {@code depends-on-sinks} annotations (stamped by {@link K8sPipelineDeployer} and
* {@link K8sTriggerDeployer}), so the same lookup works for either: a label-selector list against
* the CRD group is O(matches) on the wire, then each candidate is cross-checked against the union
* of the source + sink annotations to rule out hash collisions in the slug and stale labels left
* over from a prior version of the resource ({@link K8sApi#update}'s additive label merge can leak
* old {@code depends-on-*} keys).
*
* <p>Resources owned (directly) by {@code (selfOwnerKind, selfOwnerName)} are excluded from the
* blocker list: those will be cascade-deleted alongside the parent resource, so counting them as
* external dependents would make composite deletes (e.g. {@code LogicalTableDeployer.delete()})
* impossible.
*/
public final class DependencyChecker {

private DependencyChecker() {
}

public static void assertNoExternalDependents(K8sContext context, String database,
List<String> path, @Nullable String selfOwnerKind, @Nullable String selfOwnerName) throws SQLException {
assertNoExternalDependents(
new K8sApi<>(context, K8sApiEndpoints.PIPELINES),
new K8sApi<>(context, K8sApiEndpoints.TABLE_TRIGGERS),
database, path, selfOwnerKind, selfOwnerName);
}

/** Variant that takes pre-built {@link K8sApi}s — used by tests to inject mocks. */
static void assertNoExternalDependents(K8sApi<V1alpha1Pipeline, V1alpha1PipelineList> pipelineApi,
K8sApi<V1alpha1TableTrigger, V1alpha1TableTriggerList> triggerApi,
String database, List<String> path, @Nullable String selfOwnerKind,
@Nullable String selfOwnerName) throws SQLException {

String labelKey = DependencyLabels.labelKey(database, path);
String identifier = DependencyLabels.identifier(database, path);

List<String> blockers = new ArrayList<>();
blockers.addAll(findBlockers(pipelineApi, labelKey, identifier, "pipeline",
selfOwnerKind, selfOwnerName));
blockers.addAll(findBlockers(triggerApi, labelKey, identifier, "trigger",
selfOwnerKind, selfOwnerName));

if (!blockers.isEmpty()) {
throw new SQLException(String.format(
"Cannot delete %s — %d active dependent(s): %s",
identifier, blockers.size(), String.join(", ", blockers)));
}
}

/**
* Generic blocker enumeration: list resources of type {@code T} that carry the given
* {@code labelKey}, confirm via the depends-on annotations, and exclude self-owned children.
* The {@code kindLabel} is prefixed onto each blocker description so a unified error message
* can attribute each entry to its CRD kind.
*/
private static <T extends KubernetesObject> List<String> findBlockers(K8sApi<T, ?> api,
String labelKey, String identifier, String kindLabel,
@Nullable String selfOwnerKind, @Nullable String selfOwnerName) throws SQLException {
Collection<T> matches = api.select(labelKey);
List<String> blockers = new ArrayList<>();
for (T obj : matches) {
V1ObjectMeta meta = obj.getMetadata();
if (isSelfOwned(meta, selfOwnerKind, selfOwnerName)) {
continue;
}
if (!annotationConfirms(meta, identifier)) {
// Label matched but annotation doesn't — slug collision or stale label, skip it.
continue;
}
blockers.add(kindLabel + "/" + describeBlocker(meta));
}
return blockers;
}

private static boolean isSelfOwned(V1ObjectMeta meta, @Nullable String selfOwnerKind,
@Nullable String selfOwnerName) {
if (selfOwnerKind == null || selfOwnerName == null) {
return false;
}
if (meta == null || meta.getOwnerReferences() == null) {
return false;
}
for (V1OwnerReference owner : meta.getOwnerReferences()) {
if (selfOwnerKind.equals(owner.getKind()) && selfOwnerName.equals(owner.getName())) {
return true;
}
}
return false;
}

private static boolean annotationConfirms(V1ObjectMeta meta, String identifier) {
if (meta == null || meta.getAnnotations() == null) {
return true; // pre-labeling resource — conservatively trust the label match
}
String sourcesAnno = meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SOURCES);
String sinksAnno = meta.getAnnotations().get(DependencyLabels.ANNOTATION_KEY_SINKS);
if (sourcesAnno == null && sinksAnno == null) {
return true; // same — no annotations to cross-check against
}
if (sourcesAnno != null && DependencyLabels.parseAnnotation(sourcesAnno).contains(identifier)) {
return true;
}
return sinksAnno != null && DependencyLabels.parseAnnotation(sinksAnno).contains(identifier);
}

/**
* Builds a human-readable blocker description: the resource name, plus (when present) the top
* ownerReference's {@code kind/name} so the user knows which higher-level resource owns it.
*/
private static String describeBlocker(V1ObjectMeta meta) {
String name = meta == null ? "<unknown>" : meta.getName();
String ownerSuffix = "";
if (meta != null && meta.getOwnerReferences() != null && !meta.getOwnerReferences().isEmpty()) {
V1OwnerReference owner = meta.getOwnerReferences().get(0);
ownerSuffix = " (owned by " + owner.getKind() + "/" + owner.getName() + ")";
}
return name + ownerSuffix;
}
}
Loading
Loading