Skip to content

Commit f92c142

Browse files
Merge branch 'master' into starrocks-source
2 parents 188bc48 + 2ee693b commit f92c142

160 files changed

Lines changed: 11320 additions & 2312 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BuildIndicesConfig.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
import com.linkedin.gms.factory.config.ConfigurationProvider;
77
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
88
import com.linkedin.metadata.entity.AspectDao;
9+
import com.linkedin.metadata.entity.EntityService;
910
import com.linkedin.metadata.graph.GraphService;
1011
import com.linkedin.metadata.search.EntitySearchService;
1112
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
1213
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
14+
import com.linkedin.metadata.version.GitVersion;
15+
import io.datahubproject.metadata.context.OperationContext;
16+
import org.springframework.beans.factory.annotation.Qualifier;
1317
import org.springframework.context.annotation.Bean;
1418
import org.springframework.context.annotation.Conditional;
1519
import org.springframework.context.annotation.Configuration;
@@ -28,7 +32,11 @@ public BlockingSystemUpgrade buildIndices(
2832
final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents
2933
baseElasticSearchComponents,
3034
final ConfigurationProvider configurationProvider,
31-
final AspectDao aspectDao) {
35+
final AspectDao aspectDao,
36+
@Qualifier("systemOperationContext") final OperationContext opContext,
37+
final EntityService<?> entityService,
38+
final GitVersion gitVersion,
39+
@Qualifier("revision") final String revision) {
3240

3341
return new BuildIndices(
3442
systemMetadataService,
@@ -37,6 +45,10 @@ public BlockingSystemUpgrade buildIndices(
3745
graphService,
3846
baseElasticSearchComponents,
3947
configurationProvider,
40-
aspectDao);
48+
aspectDao,
49+
opContext,
50+
entityService,
51+
gitVersion,
52+
revision);
4153
}
4254
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.linkedin.datahub.upgrade.config;
2+
3+
import com.linkedin.datahub.upgrade.conditions.SystemUpdateCondition;
4+
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
5+
import com.linkedin.datahub.upgrade.system.elasticsearch.IncrementalReindex;
6+
import com.linkedin.gms.factory.config.ConfigurationProvider;
7+
import com.linkedin.metadata.entity.AspectDao;
8+
import com.linkedin.metadata.entity.EntityService;
9+
import com.linkedin.metadata.graph.GraphService;
10+
import com.linkedin.metadata.search.EntitySearchService;
11+
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
12+
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
13+
import com.linkedin.metadata.version.GitVersion;
14+
import io.datahubproject.metadata.context.OperationContext;
15+
import org.springframework.beans.factory.annotation.Qualifier;
16+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
17+
import org.springframework.context.annotation.Bean;
18+
import org.springframework.context.annotation.Conditional;
19+
import org.springframework.context.annotation.Configuration;
20+
21+
@Configuration
22+
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
23+
@ConditionalOnProperty(
24+
name = "elasticsearch.buildIndices.incrementalReindexEnabled",
25+
havingValue = "true")
26+
public class IncrementalReindexConfig {
27+
28+
@Bean
29+
public NonBlockingSystemUpgrade incrementalReindexNonBlocking(
30+
final SystemMetadataService systemMetadataService,
31+
final TimeseriesAspectService timeseriesAspectService,
32+
final EntitySearchService entitySearchService,
33+
final GraphService graphService,
34+
final ConfigurationProvider configurationProvider,
35+
final AspectDao aspectDao,
36+
@Qualifier("systemOperationContext") final OperationContext opContext,
37+
final EntityService<?> entityService,
38+
final GitVersion gitVersion,
39+
@Qualifier("revision") final String revision) {
40+
41+
String upgradeVersion = String.format("%s-%s", gitVersion.getVersion(), revision);
42+
return new IncrementalReindex(
43+
systemMetadataService,
44+
timeseriesAspectService,
45+
entitySearchService,
46+
graphService,
47+
configurationProvider,
48+
aspectDao,
49+
opContext,
50+
entityService,
51+
upgradeVersion);
52+
}
53+
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/BuildIndices.java

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.linkedin.datahub.upgrade.UpgradeStep;
66
import com.linkedin.datahub.upgrade.shared.ElasticSearchUpgradeUtils;
77
import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
8+
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesIncrementalStep;
89
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesPostStep;
910
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesPreStep;
1011
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesStep;
@@ -14,14 +15,17 @@
1415
import com.linkedin.gms.factory.config.ConfigurationProvider;
1516
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
1617
import com.linkedin.metadata.entity.AspectDao;
18+
import com.linkedin.metadata.entity.EntityService;
1719
import com.linkedin.metadata.graph.GraphService;
1820
import com.linkedin.metadata.search.EntitySearchService;
1921
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
2022
import com.linkedin.metadata.shared.ElasticSearchIndexed;
2123
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
2224
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
25+
import com.linkedin.metadata.version.GitVersion;
2326
import com.linkedin.structured.StructuredPropertyDefinition;
2427
import com.linkedin.util.Pair;
28+
import io.datahubproject.metadata.context.OperationContext;
2529
import java.io.IOException;
2630
import java.util.ArrayList;
2731
import java.util.List;
@@ -34,6 +38,7 @@ public class BuildIndices implements BlockingSystemUpgrade {
3438
private final List<UpgradeStep> _steps;
3539
private final List<ElasticSearchIndexed> _indexedServices;
3640
private final Set<Pair<Urn, StructuredPropertyDefinition>> _structuredProperties;
41+
private final boolean _incrementalReindexEnabled;
3742

3843
public BuildIndices(
3944
final SystemMetadataService systemMetadataService,
@@ -43,7 +48,11 @@ public BuildIndices(
4348
final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents
4449
baseElasticSearchComponents,
4550
final ConfigurationProvider configurationProvider,
46-
final AspectDao aspectDao) {
51+
final AspectDao aspectDao,
52+
final OperationContext opContext,
53+
final EntityService<?> entityService,
54+
final GitVersion gitVersion,
55+
final String revision) {
4756

4857
_indexedServices =
4958
ElasticSearchUpgradeUtils.createElasticSearchIndexedServices(
@@ -56,12 +65,30 @@ public BuildIndices(
5665
_structuredProperties = Set.of();
5766
}
5867

68+
_incrementalReindexEnabled =
69+
configurationProvider.getElasticSearch().getBuildIndices() != null
70+
&& configurationProvider
71+
.getElasticSearch()
72+
.getBuildIndices()
73+
.isIncrementalReindexEnabled();
74+
5975
_steps =
60-
buildSteps(_indexedServices, baseElasticSearchComponents, configurationProvider, aspectDao);
76+
buildSteps(
77+
_indexedServices,
78+
baseElasticSearchComponents,
79+
configurationProvider,
80+
aspectDao,
81+
opContext,
82+
entityService,
83+
String.format("%s-%s", gitVersion.getVersion(), revision));
6184
}
6285

6386
@Override
6487
public boolean requiresK8ScaleDown(UpgradeContext context) {
88+
if (_incrementalReindexEnabled) {
89+
// Incremental reindex never requires consumer scale-down
90+
return false;
91+
}
6592
try {
6693
List<ReindexConfig> configs =
6794
IndexUtils.getAllReindexConfigs(
@@ -88,27 +115,36 @@ private List<UpgradeStep> buildSteps(
88115
final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents
89116
baseElasticSearchComponents,
90117
final ConfigurationProvider configurationProvider,
91-
final AspectDao aspectDao) {
118+
final AspectDao aspectDao,
119+
final OperationContext opContext,
120+
final EntityService<?> entityService,
121+
final String upgradeVersion) {
92122

93123
final List<UpgradeStep> steps = new ArrayList<>();
94124
// Setup Elasticsearch users and roles (if enabled)
95125
steps.add(new CreateUserStep(baseElasticSearchComponents, configurationProvider));
96126
// Setup usage event indices and policies
97127
steps.add(new CreateUsageEventIndicesStep(baseElasticSearchComponents, configurationProvider));
98-
// Disable ES write mode/change refresh rate and clone indices
99-
steps.add(
100-
new BuildIndicesPreStep(
101-
baseElasticSearchComponents,
102-
indexedServices,
103-
configurationProvider,
104-
_structuredProperties));
105-
// Configure graphService, entitySearchService, systemMetadataService, timeseriesAspectService
106-
steps.add(new BuildIndicesStep(indexedServices, _structuredProperties));
107-
// Reset configuration (and delete clones? Or just do this regularly? Or delete clone in
108-
// pre-configure step if it already exists?
109-
steps.add(
110-
new BuildIndicesPostStep(
111-
baseElasticSearchComponents, indexedServices, _structuredProperties));
128+
129+
if (_incrementalReindexEnabled) {
130+
// Incremental path: create next indices + _reindex without blocking writes or swapping
131+
// aliases
132+
steps.add(
133+
new BuildIndicesIncrementalStep(
134+
opContext, indexedServices, _structuredProperties, entityService, upgradeVersion));
135+
} else {
136+
// Legacy path: block writes, reindex in-place, swap aliases, unblock writes
137+
steps.add(
138+
new BuildIndicesPreStep(
139+
baseElasticSearchComponents,
140+
indexedServices,
141+
configurationProvider,
142+
_structuredProperties));
143+
steps.add(new BuildIndicesStep(indexedServices, _structuredProperties));
144+
steps.add(
145+
new BuildIndicesPostStep(
146+
baseElasticSearchComponents, indexedServices, _structuredProperties));
147+
}
112148
return steps;
113149
}
114150
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.linkedin.datahub.upgrade.system.elasticsearch;
2+
3+
import com.linkedin.common.urn.Urn;
4+
import com.linkedin.datahub.upgrade.UpgradeStep;
5+
import com.linkedin.datahub.upgrade.shared.ElasticSearchUpgradeUtils;
6+
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
7+
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.IncrementalReindexCatchUpStep;
8+
import com.linkedin.gms.factory.config.ConfigurationProvider;
9+
import com.linkedin.metadata.entity.AspectDao;
10+
import com.linkedin.metadata.entity.EntityService;
11+
import com.linkedin.metadata.graph.GraphService;
12+
import com.linkedin.metadata.search.EntitySearchService;
13+
import com.linkedin.metadata.shared.ElasticSearchIndexed;
14+
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
15+
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
16+
import com.linkedin.structured.StructuredPropertyDefinition;
17+
import com.linkedin.util.Pair;
18+
import io.datahubproject.metadata.context.OperationContext;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Set;
22+
23+
/**
24+
* Ordered post–Phase-1 work for incremental reindex. Exposed as a single non-blocking upgrade so
25+
* steps always run in sequence.
26+
*/
27+
public class IncrementalReindex implements NonBlockingSystemUpgrade {
28+
29+
public static final String ID = "IncrementalReindex";
30+
31+
private final List<UpgradeStep> steps;
32+
33+
public IncrementalReindex(
34+
SystemMetadataService systemMetadataService,
35+
TimeseriesAspectService timeseriesAspectService,
36+
EntitySearchService entitySearchService,
37+
GraphService graphService,
38+
ConfigurationProvider configurationProvider,
39+
AspectDao aspectDao,
40+
OperationContext opContext,
41+
EntityService<?> entityService,
42+
String upgradeVersion) {
43+
44+
Set<Pair<Urn, StructuredPropertyDefinition>> structuredProperties;
45+
if (configurationProvider.getStructuredProperties().isSystemUpdateEnabled()) {
46+
structuredProperties =
47+
ElasticSearchUpgradeUtils.getActiveStructuredPropertiesDefinitions(aspectDao);
48+
} else {
49+
structuredProperties = Set.of();
50+
}
51+
52+
boolean rollbackDualWriteEnabled =
53+
configurationProvider.getElasticSearch().getBuildIndices() != null
54+
&& configurationProvider
55+
.getElasticSearch()
56+
.getBuildIndices()
57+
.isRollbackDualWriteEnabled();
58+
59+
List<ElasticSearchIndexed> indexedServices =
60+
ElasticSearchUpgradeUtils.createElasticSearchIndexedServices(
61+
graphService, entitySearchService, systemMetadataService, timeseriesAspectService);
62+
63+
steps = new ArrayList<>();
64+
steps.add(
65+
new IncrementalReindexCatchUpStep(
66+
opContext,
67+
entityService,
68+
aspectDao,
69+
indexedServices,
70+
structuredProperties,
71+
upgradeVersion,
72+
rollbackDualWriteEnabled));
73+
}
74+
75+
@Override
76+
public String id() {
77+
return ID;
78+
}
79+
80+
@Override
81+
public List<UpgradeStep> steps() {
82+
return steps;
83+
}
84+
}

0 commit comments

Comments
 (0)