Skip to content

Commit 0986aed

Browse files
Merge remote-tracking branch 'origin/main' into merge_parquet
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
2 parents d92978f + 968438f commit 0986aed

44 files changed

Lines changed: 3092 additions & 310 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class DataFusionAnalyticsBackendPlugin implements AnalyticsSearchBackendP
4444
SUPPORTED_FIELD_TYPES.addAll(FieldType.keyword());
4545
SUPPORTED_FIELD_TYPES.addAll(FieldType.date());
4646
SUPPORTED_FIELD_TYPES.add(FieldType.BOOLEAN);
47+
SUPPORTED_FIELD_TYPES.add(FieldType.TEXT);
4748
}
4849

4950
private static final Set<FilterOperator> STANDARD_FILTER_OPS = Set.of(

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.common.annotation.ExperimentalApi;
2727
import org.opensearch.index.engine.dataformat.DataFormat;
2828
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
29+
import org.opensearch.index.engine.dataformat.MergeResult;
2930
import org.opensearch.index.engine.dataformat.Merger;
3031
import org.opensearch.index.engine.dataformat.RefreshInput;
3132
import org.opensearch.index.engine.dataformat.RefreshResult;
@@ -278,7 +279,8 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
278279
/** Returns {@code null} — merge scheduling is not yet implemented for the Lucene format. */
279280
@Override
280281
public Merger getMerger() {
281-
return null;
282+
// TODO: Implement merge support as ParquetMerger
283+
return mergeInput -> new MergeResult(Map.of());
282284
}
283285

284286
/**

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/FieldStorageResolver.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,11 @@ public List<FieldStorageInfo> resolve(List<String> fieldNames) {
9292
}
9393

9494
private static FieldStorageInfo resolveField(String fieldName, String fieldType, Map<String, Object> fieldProps, String primaryFormat) {
95-
// Doc values: present for all types except text, unless explicitly disabled
96-
boolean hasDocValues = !"text".equals(fieldType) && !Boolean.FALSE.equals(fieldProps.get("doc_values"));
95+
// Doc values: present for all types unless explicitly disabled
96+
boolean hasDocValues = !Boolean.FALSE.equals(fieldProps.get("doc_values"));
9797

98-
// Index: only when explicitly set to true in mapping
99-
boolean isIndexed = Boolean.TRUE.equals(fieldProps.get("index"));
98+
// Index: only when explicitly set to false in mapping - enabled by default.
99+
boolean isIndexed = !Boolean.FALSE.equals(fieldProps.get("index"));
100100

101101
// Stored fields: only when explicitly set to true in mapping
102102
boolean isStored = Boolean.TRUE.equals(fieldProps.get("store"));
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.planner;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.cluster.metadata.MappingMetadata;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.core.index.Index;
15+
import org.opensearch.test.OpenSearchTestCase;
16+
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.when;
22+
23+
/**
24+
* Unit tests for {@link FieldStorageResolver} field storage resolution.
25+
*/
26+
public class FieldStorageResolverTests extends OpenSearchTestCase {
27+
28+
public void testTextFieldGetsDocValuesInPrimaryFormat() {
29+
FieldStorageResolver resolver = newResolver("parquet", Map.of("name", Map.of("type", "text")));
30+
31+
FieldStorageInfo info = resolver.resolve(List.of("name")).get(0);
32+
33+
assertEquals("name", info.getFieldName());
34+
assertEquals(List.of("parquet"), info.getDocValueFormats());
35+
assertEquals(List.of("lucene"), info.getIndexFormats());
36+
}
37+
38+
public void testLongFieldGetsDocValuesInPrimaryFormat() {
39+
FieldStorageResolver resolver = newResolver("parquet", Map.of("age", Map.of("type", "long")));
40+
41+
FieldStorageInfo info = resolver.resolve(List.of("age")).get(0);
42+
43+
assertEquals("age", info.getFieldName());
44+
assertEquals(List.of("parquet"), info.getDocValueFormats());
45+
assertEquals(List.of("lucene"), info.getIndexFormats());
46+
}
47+
48+
public void testFieldWithAllStorageDisabledHasNoStorage() {
49+
IllegalStateException ex = expectThrows(
50+
IllegalStateException.class,
51+
() -> newResolver("parquet", Map.of("name", Map.of("type", "text", "doc_values", false, "index", false)))
52+
);
53+
assertTrue("expected 'no storage' error, got: " + ex.getMessage(), ex.getMessage().contains("has no storage in any format"));
54+
}
55+
56+
private static FieldStorageResolver newResolver(String primaryFormat, Map<String, Map<String, Object>> fieldMappings) {
57+
Map<String, Object> mappingSource = Map.of("properties", fieldMappings);
58+
59+
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
60+
when(mappingMetadata.sourceAsMap()).thenReturn(mappingSource);
61+
62+
IndexMetadata indexMetadata = mock(IndexMetadata.class);
63+
when(indexMetadata.getIndex()).thenReturn(new Index("test_index", "uuid"));
64+
when(indexMetadata.getSettings()).thenReturn(Settings.builder().put("index.composite.primary_data_format", primaryFormat).build());
65+
when(indexMetadata.mapping()).thenReturn(mappingMetadata);
66+
67+
return new FieldStorageResolver(indexMetadata);
68+
}
69+
}

sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/FilterRuleTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,9 @@ public void testFullTextErrorsWithoutDelegation() {
172172
RexNode condition = makeFullTextCall(FilterOperator.MATCH_PHRASE.toSqlFunction(), 0, "hello world");
173173
LogicalFilter filter = LogicalFilter.create(stubScan(table), condition);
174174

175-
PlannerContext context = buildContext("parquet", Map.of("message", Map.of("type", "keyword")));
175+
// index=false strips the inverted index so no backend can satisfy the full-text predicate
176+
// natively, forcing the "without delegation" code path under test.
177+
PlannerContext context = buildContext("parquet", Map.of("message", Map.of("type", "keyword", "index", false)));
176178

177179
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> runPlanner(filter, context));
178180
assertTrue(exception.getMessage().contains("No backend can evaluate filter predicate"));
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite;
10+
11+
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
12+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
13+
import org.opensearch.action.admin.indices.stats.ShardStats;
14+
import org.opensearch.action.index.IndexResponse;
15+
import org.opensearch.be.lucene.LucenePlugin;
16+
import org.opensearch.cluster.metadata.IndexMetadata;
17+
import org.opensearch.common.SuppressForbidden;
18+
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.common.util.FeatureFlags;
20+
import org.opensearch.core.rest.RestStatus;
21+
import org.opensearch.index.IndexSettings;
22+
import org.opensearch.index.engine.CommitStats;
23+
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
24+
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
25+
import org.opensearch.index.engine.dataformat.ReaderManagerConfig;
26+
import org.opensearch.index.engine.dataformat.stub.MockDataFormat;
27+
import org.opensearch.index.engine.dataformat.stub.MockDataFormatPlugin;
28+
import org.opensearch.index.engine.dataformat.stub.MockReaderManager;
29+
import org.opensearch.index.engine.exec.EngineReaderManager;
30+
import org.opensearch.index.engine.exec.coord.DataformatAwareCatalogSnapshot;
31+
import org.opensearch.index.merge.MergeStats;
32+
import org.opensearch.index.store.PrecomputedChecksumStrategy;
33+
import org.opensearch.plugins.Plugin;
34+
import org.opensearch.plugins.SearchBackEndPlugin;
35+
import org.opensearch.test.OpenSearchIntegTestCase;
36+
37+
import java.io.IOException;
38+
import java.util.Arrays;
39+
import java.util.Collection;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Set;
43+
import java.util.function.Function;
44+
45+
/**
46+
* Integration tests for composite merge operations across single and multiple data format engines.
47+
*
48+
* Requires JDK 25 and sandbox enabled. Run with:
49+
* ./gradlew :sandbox:plugins:composite-engine:internalClusterTest \
50+
* --tests "*.CompositeMergeIT" \
51+
* -Dsandbox.enabled=true
52+
*/
53+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1)
54+
public class CompositeMergeIT extends OpenSearchIntegTestCase {
55+
56+
private static final String INDEX_NAME = "test-composite-merge";
57+
private static final String MERGE_ENABLED_PROPERTY = "opensearch.pluggable.dataformat.merge.enabled";
58+
59+
// ── Mock DataFormatPlugin using test framework stubs ──
60+
61+
public static class MockParquetDataFormatPlugin extends MockDataFormatPlugin implements SearchBackEndPlugin<Object> {
62+
private static final MockDataFormat PARQUET_FORMAT = new MockDataFormat("parquet", 0L, Set.of());
63+
64+
public MockParquetDataFormatPlugin() {
65+
super(PARQUET_FORMAT);
66+
}
67+
68+
@Override
69+
public Map<String, DataFormatDescriptor> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry registry) {
70+
return Map.of("parquet", new DataFormatDescriptor("parquet", new PrecomputedChecksumStrategy()));
71+
}
72+
73+
@Override
74+
public String name() {
75+
return "mock-parquet-backend";
76+
}
77+
78+
@Override
79+
public List<String> getSupportedFormats() {
80+
return List.of("parquet");
81+
}
82+
83+
@Override
84+
public EngineReaderManager<?> createReaderManager(ReaderManagerConfig settings) {
85+
return new MockReaderManager("parquet");
86+
}
87+
}
88+
89+
// ── Test setup ──
90+
91+
@Override
92+
public void setUp() throws Exception {
93+
enableMerge();
94+
super.setUp();
95+
}
96+
97+
@Override
98+
public void tearDown() throws Exception {
99+
try {
100+
client().admin().indices().prepareDelete(INDEX_NAME).get();
101+
} catch (Exception e) {
102+
// index may not exist if test failed before creation
103+
}
104+
super.tearDown();
105+
disableMerge();
106+
}
107+
108+
@SuppressForbidden(reason = "enable pluggable dataformat merge for integration testing")
109+
private static void enableMerge() {
110+
System.setProperty(MERGE_ENABLED_PROPERTY, "true");
111+
}
112+
113+
@SuppressForbidden(reason = "restore pluggable dataformat merge property after test")
114+
private static void disableMerge() {
115+
System.clearProperty(MERGE_ENABLED_PROPERTY);
116+
}
117+
118+
@Override
119+
protected Collection<Class<? extends Plugin>> nodePlugins() {
120+
return Arrays.asList(MockParquetDataFormatPlugin.class, CompositeDataFormatPlugin.class, LucenePlugin.class);
121+
}
122+
123+
@Override
124+
protected Settings nodeSettings(int nodeOrdinal) {
125+
return Settings.builder()
126+
.put(super.nodeSettings(nodeOrdinal))
127+
.put(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG, true)
128+
.build();
129+
}
130+
131+
// ── Tests ──
132+
133+
/**
134+
* Verifies that background merges are triggered automatically after refresh
135+
* when enough segments accumulate to exceed the TieredMergePolicy threshold.
136+
* <p>
137+
* Flow: index docs across many refresh cycles → each refresh calls
138+
* triggerPossibleMerges() → MergeScheduler picks up merge candidates
139+
* asynchronously → segment count decreases.
140+
*/
141+
public void testBackgroundMergeSingleEngine() throws Exception {
142+
createIndex(INDEX_NAME, singleEngineSettings());
143+
ensureGreen(INDEX_NAME);
144+
145+
// Create enough segments to exceed TieredMergePolicy's default threshold (~10)
146+
int totalSegmentsCreated = indexDocsAcrossMultipleRefreshes(15, 5);
147+
148+
// Wait for async background merges to complete
149+
assertBusy(() -> {
150+
flush(INDEX_NAME);
151+
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
152+
assertTrue(
153+
"Expected merges to reduce segment count below " + totalSegmentsCreated + ", but got: " + snapshot.getSegments().size(),
154+
snapshot.getSegments().size() < totalSegmentsCreated
155+
);
156+
});
157+
158+
MergeStats mergeStats = getMergeStats();
159+
assertTrue("Expected at least one merge to have occurred", mergeStats.getTotal() > 0);
160+
161+
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
162+
assertEquals(Set.of("parquet"), snapshot.getDataFormats());
163+
}
164+
165+
// ── Helpers ──
166+
167+
private Settings singleEngineSettings() {
168+
return Settings.builder()
169+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
170+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
171+
.put("index.pluggable.dataformat.enabled", true)
172+
.put("index.pluggable.dataformat", "composite")
173+
.put("index.composite.primary_data_format", "parquet")
174+
.putList("index.composite.secondary_data_formats")
175+
.build();
176+
}
177+
178+
private int indexDocsAcrossMultipleRefreshes(int refreshCycles, int docsPerCycle) {
179+
for (int cycle = 0; cycle < refreshCycles; cycle++) {
180+
for (int i = 0; i < docsPerCycle; i++) {
181+
IndexResponse response = client().prepareIndex()
182+
.setIndex(INDEX_NAME)
183+
.setSource("field_text", randomAlphaOfLength(10), "field_number", randomIntBetween(1, 1000))
184+
.get();
185+
assertEquals(RestStatus.CREATED, response.status());
186+
}
187+
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(INDEX_NAME).get();
188+
assertEquals(RestStatus.OK, refreshResponse.getStatus());
189+
}
190+
return refreshCycles;
191+
}
192+
193+
private DataformatAwareCatalogSnapshot getCatalogSnapshot() throws IOException {
194+
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(INDEX_NAME).clear().setStore(true).get();
195+
ShardStats shardStats = statsResponse.getIndex(INDEX_NAME).getShards()[0];
196+
CommitStats commitStats = shardStats.getCommitStats();
197+
assertNotNull(commitStats);
198+
assertTrue(commitStats.getUserData().containsKey(DataformatAwareCatalogSnapshot.CATALOG_SNAPSHOT_KEY));
199+
return DataformatAwareCatalogSnapshot.deserializeFromString(
200+
commitStats.getUserData().get(DataformatAwareCatalogSnapshot.CATALOG_SNAPSHOT_KEY),
201+
Function.identity()
202+
);
203+
}
204+
205+
private MergeStats getMergeStats() {
206+
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(INDEX_NAME).clear().setMerge(true).get();
207+
return statsResponse.getIndex(INDEX_NAME).getShards()[0].getStats().getMerge();
208+
}
209+
}

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,25 @@
2626
@ExperimentalApi
2727
public class CompositeDataFormat extends DataFormat {
2828

29+
private final DataFormat primaryDataFormat;
2930
private final List<DataFormat> dataFormats;
3031

3132
/**
32-
* Constructs a CompositeDataFormat from the given list of data formats.
33+
* Constructs a CompositeDataFormat with a designated primary format and a list of all constituent formats.
3334
*
34-
* @param dataFormats the constituent data formats
35+
* @param primaryDataFormat the authoritative data format used for merge operations
36+
* @param dataFormats all constituent data formats (including the primary)
3537
*/
36-
public CompositeDataFormat(List<DataFormat> dataFormats) {
38+
public CompositeDataFormat(DataFormat primaryDataFormat, List<DataFormat> dataFormats) {
39+
this.primaryDataFormat = Objects.requireNonNull(primaryDataFormat, "primaryDataFormat must not be null");
3740
this.dataFormats = List.copyOf(Objects.requireNonNull(dataFormats, "dataFormats must not be null"));
3841
}
3942

4043
/**
4144
* Constructs an empty CompositeDataFormat with no constituent formats.
4245
*/
4346
public CompositeDataFormat() {
47+
this.primaryDataFormat = null;
4448
this.dataFormats = List.of();
4549
}
4650

@@ -53,6 +57,15 @@ public List<DataFormat> getDataFormats() {
5357
return dataFormats;
5458
}
5559

60+
/**
61+
* Returns the primary data format used for merge operations.
62+
*
63+
* @return the primary data format
64+
*/
65+
public DataFormat getPrimaryDataFormat() {
66+
return primaryDataFormat;
67+
}
68+
5669
@Override
5770
public String name() {
5871
return "composite";

0 commit comments

Comments
 (0)