2020import java .io .IOException ;
2121import java .util .List ;
2222import java .util .Set ;
23+ import java .util .concurrent .CountDownLatch ;
24+ import java .util .concurrent .TimeUnit ;
2325import java .util .concurrent .atomic .AtomicBoolean ;
2426
2527import org .apache .hadoop .conf .Configuration ;
@@ -203,6 +205,9 @@ public static class HiveSplitGeneratorSerializerException extends HiveSplitGener
203205 private static final String EXCEPTION_MESSAGE = "Cannot write file to path" ;
204206 private final AtomicBoolean split0Finished = new AtomicBoolean (false );
205207 private final AtomicBoolean split2Finished = new AtomicBoolean (false );
208+ // Ensures split #0's writeSplit() is past the anyTaskFailed check before split #1 throws,
209+ // so the test deterministically exercises the "already running future is not cancelled" path.
210+ private final CountDownLatch split0Started = new CountDownLatch (1 );
206211
207212 class SplitSerializerWithException extends SplitSerializer {
208213 SplitSerializerWithException () throws IOException {
@@ -229,6 +234,7 @@ void writeSplit(int count, MRSplitProto mrSplit, Path filePath) throws IOExcepti
229234 // current implementation of the waitFor doesn't cancel it
230235 LOG .info ("Write split #{}" , count );
231236 if (count == 0 ) {
237+ split0Started .countDown ();
232238 try {
233239 Thread .sleep (1000 );
234240 split0Finished .set (true );
@@ -238,8 +244,17 @@ void writeSplit(int count, MRSplitProto mrSplit, Path filePath) throws IOExcepti
238244 throw new IOException (e );
239245 }
240246 }
241- // writing second split fails
247+ // writing second split fails - but wait until split #0 is already running so the
248+ // anyTaskFailed flag cannot short-circuit it before it gets a chance to call writeSplit().
242249 if (count == 1 ) {
250+ try {
251+ if (!split0Started .await (10 , TimeUnit .SECONDS )) {
252+ throw new IOException ("Timed out waiting for split #0 to start" );
253+ }
254+ } catch (InterruptedException e ) {
255+ Thread .currentThread ().interrupt ();
256+ throw new IOException (e );
257+ }
243258 LOG .info ("Split #1 is about to throw exception" );
244259 throw new IOException (EXCEPTION_MESSAGE + ": " + filePath );
245260 }
0 commit comments