@@ -913,6 +913,174 @@ public void onFailedEngine(String reason, Exception failure) {
913913 }
914914 }
915915
916+ /**
917+ * Covers the preIndex cooperative-flush path when a writer's {@code flush()} throws.
918+ *
919+ * <p>When a write thread picks a writer from the flushQueue during preIndex and the
920+ * flush fails, the engine must:
921+ * <ul>
922+ * <li>Call {@link DataFormatAwareEngine#failEngine} with the originating cause.</li>
923+ * <li>Close the failing writer via {@code IOUtils.closeWhileHandlingException}.</li>
924+ * <li>Count down the activeFlushLatch so the refresh thread is not stuck waiting.</li>
925+ * <li>Reject subsequent operations with {@link AlreadyClosedException}.</li>
926+ * </ul>
927+ *
928+ * <p>This test injects a failing writer directly into the flushQueue via reflection,
929+ * then triggers preIndex by calling {@code engine.index()}.
930+ */
931+ @ SuppressForbidden (reason = "test needs reflective access to inject a failing writer into flushQueue" )
932+ public void testPreIndexFlushFailureFailsEngine () throws Exception {
933+ AtomicReference <Exception > failedEngineCause = new AtomicReference <>();
934+ Engine .EventListener listener = new Engine .EventListener () {
935+ @ Override
936+ public void onFailedEngine (String reason , Exception failure ) {
937+ failedEngineCause .set (failure );
938+ }
939+ };
940+
941+ // Use a normal (non-failing) engine so we can inject the failure precisely.
942+ MockDataFormatPlugin normalPlugin = new MockDataFormatPlugin (mockDataFormat ) {};
943+ EngineConfig config = buildFailingEngineConfig (normalPlugin , listener );
944+ DataFormatAwareEngine engine = new DataFormatAwareEngine (config );
945+ try {
946+ // Inject a FailingFlushWriter directly into the flushQueue.
947+ java .lang .reflect .Field queueField = DataFormatAwareEngine .class .getDeclaredField ("flushQueue" );
948+ queueField .setAccessible (true );
949+ @ SuppressWarnings ("unchecked" )
950+ java .util .concurrent .ConcurrentLinkedQueue <org .opensearch .index .engine .dataformat .Writer <?>> queue =
951+ (java .util .concurrent .ConcurrentLinkedQueue <org .opensearch .index .engine .dataformat .Writer <?>>) queueField .get (engine );
952+ queue .add (new FailingFlushWriter (99L , mockDataFormat ));
953+
954+ // The next index() call triggers preIndex() which polls the failing writer.
955+ AlreadyClosedException ex = expectThrows (
956+ AlreadyClosedException .class ,
957+ () -> engine .index (indexOp (createParsedDocWithInput ("trigger-preindex" , null )))
958+ );
959+
960+ // failEngine was called with the flush IOException.
961+ assertThat ("event listener must observe the failure" , failedEngineCause .get (), notNullValue ());
962+ assertThat (failedEngineCause .get ().getMessage (), containsString ("simulated flush failure" ));
963+
964+ // Engine is closed for all subsequent operations.
965+ expectThrows (AlreadyClosedException .class , engine ::ensureOpen );
966+ } finally {
967+ try {
968+ engine .close ();
969+ } catch (Exception ignored ) {}
970+ }
971+ }
972+
973+ /**
974+ * Covers the preIndex happy path: a writer in the flushQueue is successfully flushed
975+ * by the write thread during preIndex, producing a pending segment.
976+ */
977+ @ SuppressForbidden (reason = "test needs reflective access to inject a writer into flushQueue and read pendingWritersToClose" )
978+ public void testPreIndexSuccessfulFlushProducesPendingSegment () throws Exception {
979+ DataFormatAwareEngine engine = createDFAEngine (store , createTempDir ());
980+ try {
981+ // Index a doc so the engine is in a valid state.
982+ engine .index (indexOp (createParsedDocWithInput ("0" , null )));
983+
984+ // Inject a writer that returns empty FileInfos on flush (success path, no files).
985+ java .lang .reflect .Field queueField = DataFormatAwareEngine .class .getDeclaredField ("flushQueue" );
986+ queueField .setAccessible (true );
987+ @ SuppressWarnings ("unchecked" )
988+ java .util .concurrent .ConcurrentLinkedQueue <org .opensearch .index .engine .dataformat .Writer <?>> queue =
989+ (java .util .concurrent .ConcurrentLinkedQueue <org .opensearch .index .engine .dataformat .Writer <?>>) queueField .get (engine );
990+
991+ // A writer that succeeds on flush with empty result (no files produced).
992+ SuccessFlushWriter successWriter = new SuccessFlushWriter (42L , mockDataFormat );
993+ queue .add (successWriter );
994+
995+ // Index another doc — this triggers preIndex which flushes the queued writer.
996+ engine .index (indexOp (createParsedDocWithInput ("1" , null )));
997+
998+ // Engine should still be open (flush succeeded).
999+ engine .ensureOpen ();
1000+
1001+ // The writer should have been moved to pendingWritersToClose.
1002+ java .lang .reflect .Field closersField = DataFormatAwareEngine .class .getDeclaredField ("pendingWritersToClose" );
1003+ closersField .setAccessible (true );
1004+ @ SuppressWarnings ("unchecked" )
1005+ java .util .Collection <?> pendingClosers = (java .util .Collection <?>) closersField .get (engine );
1006+ assertTrue ("Writer should be queued for deferred close" , pendingClosers .contains (successWriter ));
1007+ } finally {
1008+ engine .close ();
1009+ }
1010+ }
1011+
1012+ /**
1013+ * Covers the preIndex skip path when check_pending_flush is disabled.
1014+ */
1015+ @ SuppressForbidden (reason = "test needs reflective access to inject a writer into flushQueue" )
1016+ public void testPreIndexSkipsWhenCheckPendingFlushDisabled () throws Exception {
1017+ Path translogPath = createTempDir ();
1018+ String uuid = Translog .createEmptyTranslog (translogPath , SequenceNumbers .NO_OPS_PERFORMED , shardId , primaryTerm .get ());
1019+ bootstrapStoreWithMetadata (store , uuid );
1020+
1021+ IndexSettings indexSettings = IndexSettingsModule .newIndexSettings (
1022+ "test" ,
1023+ Settings .builder ()
1024+ .put (IndexMetadata .SETTING_VERSION_CREATED , Version .CURRENT )
1025+ .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true )
1026+ .put (IndexSettings .PLUGGABLE_DATAFORMAT_ENABLED_SETTING .getKey (), true )
1027+ .put (IndexSettings .PLUGGABLE_DATAFORMAT_VALUE_SETTING .getKey (), mockDataFormat .name ())
1028+ .put ("index.check_pending_flush.enabled" , false )
1029+ .build ()
1030+ );
1031+
1032+ TranslogConfig translogConfig = new TranslogConfig (
1033+ shardId ,
1034+ translogPath ,
1035+ indexSettings ,
1036+ BigArrays .NON_RECYCLING_INSTANCE ,
1037+ "" ,
1038+ false
1039+ );
1040+
1041+ DataFormatRegistry registry = createMockRegistry ();
1042+ MapperService mapperService = mock (MapperService .class );
1043+ when (mapperService .getIndexSettings ()).thenReturn (indexSettings );
1044+
1045+ EngineConfig config = new EngineConfig .Builder ().shardId (shardId )
1046+ .threadPool (threadPool )
1047+ .indexSettings (indexSettings )
1048+ .store (store )
1049+ .mergePolicy (NoMergePolicy .INSTANCE )
1050+ .translogConfig (translogConfig )
1051+ .flushMergesAfter (TimeValue .timeValueMinutes (5 ))
1052+ .externalRefreshListener (List .of ())
1053+ .internalRefreshListener (List .of ())
1054+ .globalCheckpointSupplier (() -> SequenceNumbers .NO_OPS_PERFORMED )
1055+ .retentionLeasesSupplier (() -> RetentionLeases .EMPTY )
1056+ .primaryTermSupplier (primaryTerm ::get )
1057+ .tombstoneDocSupplier (tombstoneDocSupplier ())
1058+ .dataFormatRegistry (registry )
1059+ .committerFactory (c -> new InMemoryCommitter (store ))
1060+ .mapperService (mapperService )
1061+ .build ();
1062+
1063+ DataFormatAwareEngine engine = new DataFormatAwareEngine (config );
1064+ try {
1065+ // Even with a failing writer in the queue, preIndex should skip it.
1066+ java .lang .reflect .Field queueField = DataFormatAwareEngine .class .getDeclaredField ("flushQueue" );
1067+ queueField .setAccessible (true );
1068+ @ SuppressWarnings ("unchecked" )
1069+ java .util .concurrent .ConcurrentLinkedQueue <org .opensearch .index .engine .dataformat .Writer <?>> queue =
1070+ (java .util .concurrent .ConcurrentLinkedQueue <org .opensearch .index .engine .dataformat .Writer <?>>) queueField .get (engine );
1071+ queue .add (new FailingFlushWriter (99L , mockDataFormat ));
1072+
1073+ // Index should succeed — preIndex is disabled so the failing writer is never polled.
1074+ engine .index (indexOp (createParsedDocWithInput ("1" , null )));
1075+ engine .ensureOpen (); // Engine still alive
1076+
1077+ // The failing writer is still in the queue (never polled).
1078+ assertEquals (1 , queue .size ());
1079+ } finally {
1080+ engine .close ();
1081+ }
1082+ }
1083+
9161084 public void testCatalogSnapshotContainsFormatSpecificFiles () throws IOException {
9171085 try (DataFormatAwareEngine engine = createDFAEngine (store , createTempDir ())) {
9181086 int numDocs = randomIntBetween (1 , 5 );
@@ -1971,4 +2139,52 @@ public void updateMappingVersion(long newVersion) {}
19712139 @ Override
19722140 public void close () {}
19732141 }
2142+
2143+ /**
2144+ * A writer that succeeds on flush, returning empty FileInfos. Used to test the
2145+ * preIndex happy path.
2146+ */
2147+ private static final class SuccessFlushWriter implements org .opensearch .index .engine .dataformat .Writer <MockDocumentInput > {
2148+ private final long writerGeneration ;
2149+ private final org .opensearch .index .engine .dataformat .DataFormat dataFormat ;
2150+
2151+ SuccessFlushWriter (long writerGeneration , org .opensearch .index .engine .dataformat .DataFormat dataFormat ) {
2152+ this .writerGeneration = writerGeneration ;
2153+ this .dataFormat = dataFormat ;
2154+ }
2155+
2156+ @ Override
2157+ public org .opensearch .index .engine .dataformat .WriteResult addDoc (MockDocumentInput d ) {
2158+ return new org .opensearch .index .engine .dataformat .WriteResult .Success (1L , 1L , 0L );
2159+ }
2160+
2161+ @ Override
2162+ public org .opensearch .index .engine .dataformat .FileInfos flush (org .opensearch .index .engine .dataformat .FlushInput flushInput ) {
2163+ return org .opensearch .index .engine .dataformat .FileInfos .empty ();
2164+ }
2165+
2166+ @ Override
2167+ public void sync () {}
2168+
2169+ @ Override
2170+ public long generation () {
2171+ return writerGeneration ;
2172+ }
2173+
2174+ @ Override
2175+ public boolean isSchemaMutable () {
2176+ return true ;
2177+ }
2178+
2179+ @ Override
2180+ public long mappingVersion () {
2181+ return 0 ;
2182+ }
2183+
2184+ @ Override
2185+ public void updateMappingVersion (long newVersion ) {}
2186+
2187+ @ Override
2188+ public void close () {}
2189+ }
19742190}
0 commit comments