Skip to content

Commit ee2caac

Browse files
authored
Inline merge at refresh for composite engine: produce one segment per… (opensearch-project#21831)
* Merge on refresh for composite engine with inline merge Implements merge-on-refresh: when multiple writers flush in the same refresh cycle, consolidate all formats into a single segment. Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent d41992f commit ee2caac

11 files changed

Lines changed: 1184 additions & 18 deletions

File tree

β€Žsandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DataFusionNativeBridgeTests.javaβ€Ž

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.opensearch.be.datafusion;
1010

11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
12+
1113
import org.opensearch.analytics.backend.jni.NativeHandle;
1214
import org.opensearch.be.datafusion.nativelib.NativeBridge;
1315
import org.opensearch.be.datafusion.nativelib.ReaderHandle;
@@ -28,6 +30,9 @@
2830
* Smoke test for the DataFusion JNI bridge.
2931
* Verifies native library loading, runtime creation, and reader lifecycle.
3032
*/
33+
// The Tokio IO runtime thread is a process-lifetime singleton spawned by the native Rust library.
34+
// It persists after tests complete and cannot be interrupted (empty Java stack, RUNNABLE state).
35+
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
3136
public class DataFusionNativeBridgeTests extends OpenSearchTestCase {
3237

3338
// Note: initTokioRuntimeManager uses OnceLock and can only be initialized once per JVM.

β€Žsandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeConcurrentIndexingIT.javaβ€Ž

Lines changed: 633 additions & 1 deletion
Large diffs are not rendered by default.

β€Žsandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeMergeIT.javaβ€Ž

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,94 @@ public void testSortedParquetPrimaryLuceneSecondaryMerge() throws Exception {
252252
verifyCrossFormatConsistency(snapshot);
253253
}
254254

255+
/**
256+
* Validates inline consolidation at refresh with fileMappings:
257+
* When multiple writers flush in the same refresh cycle, the primary (Parquet) merges
258+
* them and produces a RowIdMapping via fileMappings. The secondary (Lucene) then applies
259+
* the same mapping. This test uses concurrent indexing to fill multiple writers, then
260+
* a single refresh to trigger consolidation.
261+
*
262+
* Correctness criteria:
263+
* <ol>
264+
* <li>After refresh, catalog has 1 segment (consolidated) instead of N per-writer segments</li>
265+
* <li>Both parquet and lucene formats are present</li>
266+
* <li>Lucene __row_id__ values are sequential (RowIdMapping correctly applied)</li>
267+
* <li>Cross-format consistency: parquet row data matches lucene data at same row_id</li>
268+
* </ol>
269+
*/
270+
public void testMergeOnRefresh() throws Exception {
271+
client().admin()
272+
.indices()
273+
.prepareCreate(INDEX_NAME)
274+
.setSettings(sortedParquetPrimaryLuceneSecondarySettings())
275+
.setMapping("name", "type=keyword", "age", "type=integer")
276+
.get();
277+
ensureGreen(INDEX_NAME);
278+
279+
// Use concurrent threads to fill multiple writers in a single refresh cycle.
280+
// With refresh_interval=-1 and no explicit refresh, all docs land in the writer pool.
281+
int numThreads = 8;
282+
int docsPerThread = 5;
283+
int totalDocs = numThreads * docsPerThread;
284+
java.util.concurrent.CountDownLatch startLatch = new java.util.concurrent.CountDownLatch(1);
285+
java.util.concurrent.atomic.AtomicReference<Exception> error = new java.util.concurrent.atomic.AtomicReference<>();
286+
287+
Thread[] threads = new Thread[numThreads];
288+
for (int t = 0; t < numThreads; t++) {
289+
int threadId = t;
290+
threads[t] = new Thread(() -> {
291+
try {
292+
startLatch.await();
293+
for (int i = 0; i < docsPerThread; i++) {
294+
int docId = threadId * docsPerThread + i;
295+
client().prepareIndex(INDEX_NAME)
296+
.setId(String.valueOf(docId))
297+
.setSource("name", "doc_" + docId, "age", randomIntBetween(0, 100))
298+
.get();
299+
}
300+
} catch (Exception e) {
301+
error.compareAndSet(null, e);
302+
}
303+
});
304+
threads[t].start();
305+
}
306+
startLatch.countDown();
307+
for (Thread t : threads) {
308+
t.join();
309+
}
310+
if (error.get() != null) {
311+
throw error.get();
312+
}
313+
314+
// Single refresh flushes all writers β€” triggers inline consolidation via fileMappings
315+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
316+
client().admin().indices().prepareFlush(INDEX_NAME).setForce(true).setWaitIfOngoing(true).get();
317+
318+
DataformatAwareCatalogSnapshot snapshot = getCatalogSnapshot();
319+
320+
// Merge on refresh must have consolidated: with N writers (N>1) flushing,
321+
// the catalog should have exactly 1 segment (merged) instead of N.
322+
// If this assertion fails, consolidation didn't trigger (all docs landed in 1 writer).
323+
assertEquals("Inline merge on refresh should produce exactly 1 segment from multiple writers", 1, snapshot.getSegments().size());
324+
325+
// Both formats must be present in the single consolidated segment
326+
Set<String> formats = snapshot.getDataFormats();
327+
assertTrue("Catalog should contain 'parquet'", formats.contains("parquet"));
328+
assertTrue("Catalog should contain 'lucene'", formats.contains("lucene"));
329+
330+
// Verify total row count
331+
verifyRowCount(snapshot, totalDocs);
332+
333+
// Verify lucene doc count
334+
verifyLuceneDocCount(totalDocs);
335+
336+
// RowIdMapping correctness: __row_id__ must be sequential within each segment
337+
verifyLuceneRowIdSequential();
338+
339+
// Cross-format field value consistency
340+
verifyCrossFormatConsistency(snapshot);
341+
}
342+
255343
// ══════════════════════════════════════════════════════════════════════
256344
// Private helpers: merge feature flag
257345
// ══════════════════════════════════════════════════════════════════════
@@ -908,13 +996,19 @@ public void testMergeStatsViaApi() throws Exception {
908996

909997
private DataformatAwareCatalogSnapshot waitForMerge(int refreshCycles) throws Exception {
910998
flush(INDEX_NAME);
999+
// With inline merge at refresh, each cycle already produces 1 consolidated segment.
1000+
// Background merge may further reduce, but for small segments it may not fire.
1001+
// Accept: segment count <= refreshCycles (inline consolidation working correctly).
9111002
assertBusy(() -> {
9121003
DataformatAwareCatalogSnapshot snap = getCatalogSnapshot();
9131004
assertTrue(
914-
"Expected merges to reduce segment count below " + refreshCycles + ", but got: " + snap.getSegments().size(),
915-
snap.getSegments().size() < refreshCycles
1005+
"Expected segment count <= "
1006+
+ refreshCycles
1007+
+ " (inline consolidation or background merge), but got: "
1008+
+ snap.getSegments().size(),
1009+
snap.getSegments().size() <= refreshCycles
9161010
);
917-
});
1011+
}, 30, java.util.concurrent.TimeUnit.SECONDS);
9181012
return getCatalogSnapshot();
9191013
}
9201014

0 commit comments

Comments
Β (0)