Skip to content

Commit 5fc7395

Browse files
pedro93claude
andauthored
feat(upgrade): add Cleanup upgrade for Helm pre-delete infrastructure teardown (datahub-project#16619)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent bf580cd commit 5fc7395

17 files changed

Lines changed: 1664 additions & 5 deletions

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCli.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.datahub.upgrade;
22

3+
import com.linkedin.datahub.upgrade.cleanup.Cleanup;
34
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
45
import com.linkedin.datahub.upgrade.loadindices.LoadIndices;
56
import com.linkedin.datahub.upgrade.removeunknownaspects.RemoveUnknownAspects;
@@ -86,6 +87,10 @@ private static final class Args {
8687
@Named("reindexDebug")
8788
private ReindexDebug reindexDebug;
8889

90+
@Autowired(required = false)
91+
@Named("cleanup")
92+
private Cleanup cleanup;
93+
8994
@Override
9095
public void run(String... cmdLineArgs) {
9196
// Register upgrades with null checks and warnings
@@ -149,6 +154,12 @@ public void run(String... cmdLineArgs) {
149154
log.info("ReindexDebug upgrade not available - bean not found");
150155
}
151156

157+
if (cleanup != null) {
158+
_upgradeManager.register(cleanup);
159+
} else {
160+
log.info("Cleanup upgrade not available - bean not found");
161+
}
162+
152163
final Args args = new Args();
153164
new CommandLine(args).setCaseInsensitiveEnumValuesAllowed(true).parseArgs(cmdLineArgs);
154165

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeConfigurationSelector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.linkedin.datahub.upgrade;
22

3+
import com.linkedin.datahub.upgrade.cleanup.CleanupCondition;
4+
import com.linkedin.datahub.upgrade.cleanup.CleanupUpgradeConfig;
35
import com.linkedin.datahub.upgrade.conditions.GeneralUpgradeCondition;
46
import com.linkedin.datahub.upgrade.conditions.LoadIndicesCondition;
57
import com.linkedin.datahub.upgrade.conditions.SqlSetupCondition;
@@ -34,4 +36,10 @@ public static class SqlSetupConfiguration {}
3436
@Conditional(GeneralUpgradeCondition.class)
3537
@Import(GeneralUpgradeConfiguration.class)
3638
public static class GeneralConfiguration {}
39+
40+
/** Configuration for Cleanup upgrade - teardown of all DataHub infrastructure resources */
41+
@Configuration
42+
@Conditional(CleanupCondition.class)
43+
@Import(CleanupUpgradeConfig.class)
44+
public static class CleanupConfiguration {}
3745
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.linkedin.datahub.upgrade.cleanup;
2+
3+
import com.linkedin.datahub.upgrade.Upgrade;
4+
import com.linkedin.datahub.upgrade.UpgradeStep;
5+
import java.util.List;
6+
7+
/**
8+
* Upgrade that tears down all infrastructure resources created by DataHub setup jobs. Intended to
9+
* run as a Helm pre-delete hook so that {@code helm uninstall} leaves no DataHub-specific state in
10+
* shared infrastructure (Elasticsearch, Kafka, SQL).
11+
*
12+
* <p>Execution order: Elasticsearch → Kafka → SQL. Elasticsearch is cleaned first so that indices
13+
* are not queried while the database is being dropped.
14+
*/
15+
public class Cleanup implements Upgrade {
16+
17+
private final List<UpgradeStep> steps;
18+
19+
public Cleanup(List<UpgradeStep> steps) {
20+
this.steps = steps;
21+
}
22+
23+
@Override
24+
public String id() {
25+
return "Cleanup";
26+
}
27+
28+
@Override
29+
public List<UpgradeStep> steps() {
30+
return steps;
31+
}
32+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.linkedin.datahub.upgrade.cleanup;
2+
3+
import java.util.List;
4+
import java.util.Objects;
5+
import java.util.Set;
6+
import org.springframework.boot.ApplicationArguments;
7+
import org.springframework.context.annotation.Condition;
8+
import org.springframework.context.annotation.ConditionContext;
9+
import org.springframework.core.type.AnnotatedTypeMetadata;
10+
11+
/**
12+
* Spring condition that matches when the CLI arguments contain {@code Cleanup}. This ensures the
13+
* cleanup-specific Spring configuration is only loaded for the cleanup upgrade path.
14+
*/
15+
public class CleanupCondition implements Condition {
16+
public static final String CLEANUP_ARG = "Cleanup";
17+
public static final Set<String> CLEANUP_ARGS = Set.of(CLEANUP_ARG);
18+
19+
@Override
20+
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
21+
List<String> nonOptionArgs =
22+
context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs();
23+
if (nonOptionArgs == null) {
24+
return false;
25+
}
26+
return nonOptionArgs.stream().filter(Objects::nonNull).anyMatch(CLEANUP_ARGS::contains);
27+
}
28+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package com.linkedin.datahub.upgrade.cleanup;
2+
3+
import com.linkedin.datahub.upgrade.UpgradeStep;
4+
import com.linkedin.datahub.upgrade.config.OpenTelemetryConfig;
5+
import com.linkedin.datahub.upgrade.sqlsetup.SqlSetupArgs;
6+
import com.linkedin.datahub.upgrade.sqlsetup.config.SqlSetupConfig;
7+
import com.linkedin.datahub.upgrade.sqlsetup.config.SqlSetupEbeanFactory;
8+
import com.linkedin.gms.factory.config.ConfigurationProvider;
9+
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
10+
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
11+
import com.linkedin.metadata.utils.EnvironmentUtils;
12+
import io.ebean.Database;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import javax.annotation.Nonnull;
16+
import lombok.extern.slf4j.Slf4j;
17+
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.beans.factory.annotation.Qualifier;
19+
import org.springframework.boot.actuate.autoconfigure.metrics.MetricsAutoConfiguration;
20+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.ComponentScan;
23+
import org.springframework.context.annotation.Configuration;
24+
import org.springframework.context.annotation.Import;
25+
26+
/**
27+
* Spring configuration for the Cleanup upgrade. Loads the minimal set of beans needed to tear down
28+
* Elasticsearch indices, Kafka topics, and the SQL database.
29+
*/
30+
@Slf4j
31+
@Configuration
32+
@Import({
33+
MetricsAutoConfiguration.class,
34+
OpenTelemetryConfig.class,
35+
SqlSetupConfig.class,
36+
SqlSetupEbeanFactory.class
37+
})
38+
@ComponentScan(
39+
basePackages = {
40+
"com.linkedin.gms.factory.config",
41+
"com.linkedin.gms.factory.common",
42+
"com.linkedin.gms.factory.entity",
43+
"com.linkedin.gms.factory.entityclient",
44+
"com.linkedin.gms.factory.plugins",
45+
"com.linkedin.gms.factory.entityregistry",
46+
"com.linkedin.gms.factory.search",
47+
"com.linkedin.gms.factory.timeseries",
48+
"com.linkedin.gms.factory.context",
49+
"com.linkedin.gms.factory.system_telemetry"
50+
})
51+
public class CleanupUpgradeConfig {
52+
53+
@Autowired(required = false)
54+
private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents;
55+
56+
@Autowired(required = false)
57+
private ConfigurationProvider configurationProvider;
58+
59+
@Autowired(required = false)
60+
private KafkaProperties kafkaProperties;
61+
62+
@Autowired(required = false)
63+
@Qualifier("ebeanServer")
64+
private Database ebeanServer;
65+
66+
@Autowired(required = false)
67+
@Qualifier("sqlSetupArgs")
68+
private SqlSetupArgs sqlSetupArgs;
69+
70+
@Bean(name = "cleanup")
71+
@Nonnull
72+
public Cleanup createCleanup() {
73+
boolean esEnabled = EnvironmentUtils.getBoolean("CLEANUP_ELASTICSEARCH_ENABLED", true);
74+
boolean kafkaEnabled = EnvironmentUtils.getBoolean("CLEANUP_KAFKA_ENABLED", true);
75+
boolean sqlEnabled = EnvironmentUtils.getBoolean("CLEANUP_SQL_ENABLED", true);
76+
77+
List<UpgradeStep> steps = new ArrayList<>();
78+
79+
// Order: ES first (so indices aren't queried during DB drop), then Kafka, then SQL
80+
if (esEnabled && esComponents != null) {
81+
steps.add(new DeleteElasticsearchIndicesStep(esComponents));
82+
log.info("Elasticsearch cleanup step enabled");
83+
} else if (esEnabled) {
84+
log.warn("Elasticsearch cleanup requested but ES components not available — skipping");
85+
}
86+
87+
KafkaConfiguration kafkaConfig =
88+
configurationProvider != null ? configurationProvider.getKafka() : null;
89+
if (kafkaEnabled && kafkaConfig != null && kafkaProperties != null) {
90+
steps.add(new DeleteKafkaTopicsStep(kafkaConfig, kafkaProperties));
91+
log.info("Kafka cleanup step enabled");
92+
} else if (kafkaEnabled) {
93+
log.warn("Kafka cleanup requested but Kafka config not available — skipping");
94+
}
95+
96+
if (sqlEnabled && ebeanServer != null && sqlSetupArgs != null) {
97+
steps.add(new DropDatabaseStep(ebeanServer, sqlSetupArgs));
98+
log.info("SQL cleanup step enabled");
99+
} else if (sqlEnabled) {
100+
log.warn("SQL cleanup requested but database not available — skipping");
101+
}
102+
103+
return new Cleanup(steps);
104+
}
105+
}

0 commit comments

Comments
 (0)