1313import org .apache .lucene .analysis .Analyzer ;
1414import org .apache .lucene .codecs .Codec ;
1515import org .apache .lucene .document .Document ;
16+ import org .apache .lucene .index .CodecReader ;
1617import org .apache .lucene .index .IndexWriter ;
1718import org .apache .lucene .index .IndexWriterConfig ;
19+ import org .apache .lucene .index .MergePolicy ;
20+ import org .apache .lucene .index .MergeTrigger ;
1821import org .apache .lucene .index .SegmentCommitInfo ;
22+ import org .apache .lucene .index .SegmentInfo ;
1923import org .apache .lucene .index .SegmentInfos ;
24+ import org .apache .lucene .index .Sorter ;
2025import org .apache .lucene .search .Sort ;
26+ import org .apache .lucene .search .SortField ;
27+ import org .apache .lucene .search .SortedNumericSortField ;
2128import org .apache .lucene .store .Directory ;
29+ import org .apache .lucene .store .IOContext ;
2230import org .apache .lucene .store .MMapDirectory ;
2331import org .opensearch .be .lucene .LuceneDataFormat ;
2432import org .opensearch .common .annotation .ExperimentalApi ;
2533import org .opensearch .common .util .io .IOUtils ;
2634import org .opensearch .index .engine .dataformat .FileInfos ;
35+ import org .opensearch .index .engine .dataformat .FlushInput ;
36+ import org .opensearch .index .engine .dataformat .RowIdMapping ;
2737import org .opensearch .index .engine .dataformat .WriteResult ;
2838import org .opensearch .index .engine .dataformat .Writer ;
2939import org .opensearch .index .engine .exec .WriterFileSet ;
3040
3141import java .io .IOException ;
3242import java .nio .file .Files ;
3343import java .nio .file .Path ;
44+ import java .util .ArrayList ;
3445import java .util .Arrays ;
46+ import java .util .List ;
47+ import java .util .Map ;
48+ import java .util .concurrent .Executor ;
49+ import java .util .concurrent .locks .ReentrantLock ;
3550
3651/**
3752 * Per-generation Lucene writer that creates segments in an isolated temporary directory.
3853 *
3954 * Each instance owns its own {@link IndexWriter} and {@link Directory}. Documents are
40- * added via {@link #addDoc(LuceneDocumentInput)}, and on {@link #flush()}, the writer
55+ * added via {@link #addDoc(LuceneDocumentInput)}, and on {@link #flush(FlushInput )}, the writer
4156 * performs a force merge to exactly 1 segment to maintain a 1:1 mapping between the
4257 * Lucene segment and the corresponding Parquet file for the same writer generation.
4358 *
@@ -82,7 +97,7 @@ public class LuceneWriter implements Writer<LuceneDocumentInput> {
8297 * @param baseDirectory the base directory under which to create the temp directory
8398 * @param analyzer the analyzer to use for tokenized fields, or null for default
8499 * @param codec the codec to use, or null for default
85- * @param indexSort the index sort to apply to segments, or null for no sort
100+ * @param indexSort the index sort to apply (null when Lucene is secondary format)
86101 * @throws IOException if directory creation or IndexWriter opening fails
87102 */
88103 public LuceneWriter (
@@ -106,6 +121,10 @@ public LuceneWriter(
106121 IndexWriterConfig iwc = analyzer != null ? new IndexWriterConfig (analyzer ) : new IndexWriterConfig ();
107122 iwc .setOpenMode (IndexWriterConfig .OpenMode .CREATE );
108123 iwc .setRAMBufferSizeMB (RAM_BUFFER_SIZE_MB );
124+ // When Lucene is primary, apply the customer's IndexSort so segments
125+ // are natively sorted and compatible with the shared writer's IndexSort.
126+ // When Lucene is secondary, no IndexSort — reorder is done via
127+ // ReorderingOneMerge.reorder() in configureSortedMerge().
109128 if (indexSort != null ) {
110129 iwc .setIndexSort (indexSort );
111130 }
@@ -147,27 +166,51 @@ public WriteResult addDoc(LuceneDocumentInput input) throws IOException {
147166 * on disk for {@link LuceneIndexingExecutionEngine#refresh} to incorporate via
148167 * {@code addIndexes}.
149168 *
169+ * <p>If the {@link FlushInput} carries a sort permutation from the primary data format
170+ * (e.g., Parquet sort-on-close), the Lucene segment is reordered using Lucene's IndexSort
171+ * mechanism with a custom SortField that remaps {@code ___row_id} values through the
172+ * permutation. This ensures the Lucene doc order matches the sorted Parquet row order.
173+ *
174+ * @param flushInput optional context; if it carries a sort permutation, the segment is sorted
150175 * @return file infos containing the temp directory path and segment file names,
151176 * or {@link FileInfos#empty()} if no documents were added
152177 * @throws IOException if force merge, commit, or file listing fails
153178 */
154179 @ Override
155- public FileInfos flush () throws IOException {
180+ public FileInfos flush (FlushInput flushInput ) throws IOException {
156181 if (docCount == 0 ) {
157182 return FileInfos .empty ();
158183 }
159184
160- // Force merge to exactly 1 segment to maintain 1:1 mapping with other formats.
185+ // If sort permutation is provided, configure the reorder merge policy
186+ if (flushInput .hasRowIdMapping ()) {
187+ configureSortedMerge (flushInput .rowIdMapping ());
188+ }
189+
190+ // Common path: forceMerge to 1 segment, commit, build FileInfos
161191 indexWriter .forceMerge (1 , true );
162192 indexWriter .commit ();
163193
194+ // Close the IndexWriter before rewriting segment metadata.
195+ // This prevents IndexFileDeleter from removing our rewritten segments_N
196+ // file (which it wouldn't recognize as its own commit).
197+ indexWriter .close ();
198+
164199 // Verify the invariant: exactly 1 segment with docCount documents
165200 SegmentInfos segmentInfos = SegmentInfos .readLatestCommit (directory );
166201 assert segmentInfos .size () == 1 : "Expected exactly 1 segment after force merge, got " + segmentInfos .size ();
167202
168203 SegmentCommitInfo segmentInfo = segmentInfos .info (0 );
169204 assert segmentInfo .info .maxDoc () == docCount : "Expected " + docCount + " docs in segment, got " + segmentInfo .info .maxDoc ();
170205
206+ // Stamp the IndexSort on the segment metadata post-commit so that
207+ // addIndexes(Directory...) on the shared writer sees matching sort.
208+ // The segment is always sorted by __row_id__ — either naturally (docs
209+ // written sequentially) or via OneMerge.reorder() + row ID rewrite.
210+ if (segmentInfo .info .getIndexSort () == null ) {
211+ rewriteSegmentInfoWithSort (segmentInfos , segmentInfo );
212+ }
213+
171214 // Build the WriterFileSet pointing to the temp directory
172215 WriterFileSet .Builder wfsBuilder = WriterFileSet .builder ()
173216 .directory (tempDirectory )
@@ -181,12 +224,165 @@ public FileInfos flush() throws IOException {
181224 }
182225 }
183226
184- // Since flush is once only, close the IndexWriter but keep directory open for close()
185- indexWriter .close ();
186227
187228 return FileInfos .builder ().putWriterFileSet (dataFormat , wfsBuilder .build ()).build ();
188229 }
189230
231+ /**
232+ * Configures the child writer for sorted flush: sets a ReorderingMergePolicy
233+ * that physically reorders docs via OneMerge.reorder(), and enables sequential
234+ * __row_id__ rewrite on the codec so the merge writes 0..N in one pass.
235+ */
236+ private void configureSortedMerge (RowIdMapping mapping ) {
237+ indexWriter .getConfig ().setMergePolicy (new ReorderingMergePolicy (mapping ));
238+ Codec currentCodec = indexWriter .getConfig ().getCodec ();
239+ if (currentCodec instanceof LuceneWriterCodec lwc ) {
240+ lwc .enableRowIdRewrite ();
241+ }
242+ }
243+
244+ /**
245+ * Rewrites the segment's .si file and segments_N commit to declare the IndexSort.
246+ * <p>
247+ * After the child writer commits, the segment on disk has no IndexSort metadata
248+ * (because the writer operates without IndexSort to allow OneMerge.reorder()).
249+ * However, the segment is logically sorted by __row_id__ (either naturally sequential
250+ * or via reorder + row ID rewrite). This method reconstructs the SegmentInfo with
251+ * the expected sort, rewrites the .si file, and re-commits the SegmentInfos so that
252+ * addIndexes(Directory...) on the shared writer sees matching sort metadata.
253+ *
254+ * @param segmentInfos the current committed SegmentInfos
255+ * @param segmentCommitInfo the single segment's commit info
256+ * @throws IOException if rewriting fails
257+ */
258+ private void rewriteSegmentInfoWithSort (SegmentInfos segmentInfos , SegmentCommitInfo segmentCommitInfo ) throws IOException {
259+ SegmentInfo originalInfo = segmentCommitInfo .info ;
260+ Sort sort = new Sort (new SortedNumericSortField (LuceneDocumentInput .ROW_ID_FIELD , SortField .Type .LONG ));
261+
262+ // Reconstruct SegmentInfo with the IndexSort declared
263+ SegmentInfo sortedInfo = new SegmentInfo (
264+ originalInfo .dir ,
265+ originalInfo .getVersion (),
266+ originalInfo .getMinVersion (),
267+ originalInfo .name ,
268+ originalInfo .maxDoc (),
269+ originalInfo .getUseCompoundFile (),
270+ originalInfo .getHasBlocks (),
271+ originalInfo .getCodec (),
272+ originalInfo .getDiagnostics (),
273+ originalInfo .getId (),
274+ originalInfo .getAttributes (),
275+ sort
276+ );
277+ sortedInfo .setFiles (originalInfo .files ());
278+
279+ // Delete the existing .si file before rewriting — Lucene's createOutput
280+ // does not overwrite existing files.
281+ String siFileName = originalInfo .name + ".si" ;
282+ directory .deleteFile (siFileName );
283+
284+ // Rewrite the .si file with sort metadata
285+ originalInfo .getCodec ().segmentInfoFormat ().write (directory , sortedInfo , IOContext .DEFAULT );
286+
287+ // Replace the segment in SegmentInfos and re-commit so segments_N is consistent
288+ SegmentCommitInfo newCommitInfo = new SegmentCommitInfo (
289+ sortedInfo ,
290+ segmentCommitInfo .getDelCount (),
291+ segmentCommitInfo .getSoftDelCount (),
292+ segmentCommitInfo .getDelGen (),
293+ segmentCommitInfo .getFieldInfosGen (),
294+ segmentCommitInfo .getDocValuesGen (),
295+ segmentCommitInfo .getId ()
296+ );
297+ segmentInfos .clear ();
298+ segmentInfos .add (newCommitInfo );
299+ segmentInfos .commit (directory );
300+ }
301+
302+ /**
303+ * MergePolicy that wraps the standard merge selection but returns
304+ * ReorderingOneMerge instances that override reorder() with our DocMap.
305+ */
306+ static class ReorderingMergePolicy extends MergePolicy {
307+ private final RowIdMapping mapping ;
308+ private volatile boolean reorderDone = false ;
309+
310+ ReorderingMergePolicy (RowIdMapping mapping ) {
311+ this .mapping = mapping ;
312+ }
313+
314+ @ Override
315+ public MergeSpecification findMerges (MergeTrigger mergeTrigger , SegmentInfos segmentInfos , MergeContext mergeContext ) {
316+ return null ; // no automatic merges
317+ }
318+
319+ @ Override
320+ public MergeSpecification findForcedMerges (SegmentInfos segmentInfos , int maxSegmentCount , Map <SegmentCommitInfo , Boolean > segmentsToMerge , MergeContext mergeContext ) {
321+ if (reorderDone ) {
322+ return null ; // already reordered, stop the loop
323+ }
324+ reorderDone = true ;
325+
326+ List <SegmentCommitInfo > segments = new ArrayList <>();
327+ for (int i = 0 ; i < segmentInfos .size (); i ++) {
328+ segments .add (segmentInfos .info (i ));
329+ }
330+ if (segments .isEmpty ()) {
331+ return null ;
332+ }
333+ MergeSpecification spec = new MergeSpecification ();
334+ spec .add (new ReorderingOneMerge (segments , mapping ));
335+ return spec ;
336+ }
337+
338+ @ Override
339+ public MergeSpecification findForcedDeletesMerges (SegmentInfos segmentInfos , MergeContext mergeContext ) {
340+ return null ;
341+ }
342+ }
343+
344+ /**
345+ * Custom OneMerge that overrides {@code reorder()} to provide the sort permutation
346+ * as a {@link Sorter.DocMap}. This causes Lucene to physically reorder docs during
347+ * the merge according to the Parquet sort order.
348+ */
349+ static class ReorderingOneMerge extends MergePolicy .OneMerge {
350+ private final RowIdMapping mapping ;
351+
352+ ReorderingOneMerge (List <SegmentCommitInfo > segments , RowIdMapping mapping ) {
353+ super (segments );
354+ this .mapping = mapping ;
355+ }
356+
357+ @ Override
358+ public Sorter .DocMap reorder (CodecReader reader , Directory dir , Executor executor ) throws IOException {
359+ return new Sorter .DocMap () {
360+ @ Override
361+ public int oldToNew (int docID ) {
362+ return mapping .oldToNew (docID );
363+ }
364+
365+ @ Override
366+ public int newToOld (int docID ) {
367+ return mapping .newToOld (docID );
368+ }
369+
370+ @ Override
371+ public int size () {
372+ return mapping .size ();
373+ }
374+ };
375+ }
376+
377+ @ Override
378+ public void setMergeInfo (SegmentCommitInfo info ) {
379+ super .setMergeInfo (info );
380+ if (info != null ) {
381+ info .info .putAttribute (WRITER_GENERATION_ATTRIBUTE , String .valueOf (0 ));
382+ }
383+ }
384+ }
385+
190386 /**
191387 * Syncs all files in the temp directory to durable storage.
192388 *
0 commit comments