Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ private void allTaskReady() {
}
InvocationFuture<?>[] futures = notifyTaskStart();
CompletableFuture.allOf(futures).join();
notifyCompleted(latestCompletedCheckpoint);
if (!notifyCompleted(latestCompletedCheckpoint)) {
return;
}
if (coordinatorConfig.isCheckpointEnable()) {
LOG.info("checkpoint is enabled, start schedule trigger pending checkpoint.");
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
Expand All @@ -369,7 +371,7 @@ private void allTaskReady() {
}

@VisibleForTesting
protected void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
protected boolean notifyCompleted(CompletedCheckpoint completedCheckpoint) {
if (completedCheckpoint != null) {
try {
LOG.info(
Expand All @@ -390,8 +392,10 @@ protected void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
"notify checkpoint completed failed",
e,
CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED);
return false;
}
}
return true;
}

public InvocationFuture<?>[] notifyTaskStart() {
Expand Down Expand Up @@ -490,7 +494,9 @@ protected void restoreCoordinator(boolean alreadyStarted) {
shutdown = false;
if (alreadyStarted) {
isAllTaskReady.set(true);
notifyCompleted(latestCompletedCheckpoint);
if (!notifyCompleted(latestCompletedCheckpoint)) {
return;
}
tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
} else {
isAllTaskReady.set(false);
Expand Down Expand Up @@ -1000,8 +1006,13 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed
long stateSize = CheckpointMonitorService.calculateStateSize(completedCheckpoint);
checkpointMonitorService.onCheckpointCompleted(completedCheckpoint, stateSize);
}
notifyCompleted(completedCheckpoint);
pendingCheckpoints.remove(checkpointId).abortCheckpointTimeoutFutureWhenIsCompleted();
if (!notifyCompleted(completedCheckpoint)) {
return;
}
PendingCheckpoint pendingCheckpoint = pendingCheckpoints.remove(checkpointId);
if (pendingCheckpoint != null) {
pendingCheckpoint.abortCheckpointTimeoutFutureWhenIsCompleted();
}
pendingCounter.decrementAndGet();

if (isCompleted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.checkpoint.monitor.CheckpointMonitorService;
Expand Down Expand Up @@ -51,11 +52,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
Expand Down Expand Up @@ -371,6 +374,206 @@ void testFilteringClosedTasksAndActions() {

executor.shutdownNow();
}

// ------------------------------------------------------------------
// Regression tests: notifyCompleted returns false → callers bail out
// ------------------------------------------------------------------

/**
* Helper: build a minimal {@link CheckpointCoordinator} whose external dependencies are all
* mocked, so the test never touches Hazelcast / Hadoop I/O.
*/
private CheckpointCoordinator buildMinimalCoordinator(ExecutorService executorService) {
CheckpointConfig checkpointConfig = new CheckpointConfig();
checkpointConfig.setStorage(new CheckpointStorageConfig());

TaskLocation taskLocation = new TaskLocation(new TaskGroupLocation(1L, 1, 1), 1, 1);
CheckpointPlan plan =
CheckpointPlan.builder()
.pipelineId(1)
.pipelineSubtasks(Collections.singleton(taskLocation))
.startingSubtasks(Collections.singleton(taskLocation))
.build();

CheckpointManager mockManager = Mockito.mock(CheckpointManager.class);
CheckpointStorage mockStorage = Mockito.mock(CheckpointStorage.class);
CheckpointIDCounter mockIdCounter = Mockito.mock(CheckpointIDCounter.class);
@SuppressWarnings("unchecked")
IMap<Object, Object> mockIMap = Mockito.mock(IMap.class);

return new CheckpointCoordinator(
mockManager,
mockStorage,
checkpointConfig,
1L,
plan,
mockIdCounter,
null,
executorService,
mockIMap,
false,
null);
}

/**
* Regression: when {@code notifyCompleted()} fails (returns {@code false}), {@code
* completePendingCheckpoint} must return immediately without decrementing {@code
* pendingCounter} or executing any other "success path" logic.
*
* <p>Before the fix the chained call on the {@code null} result of {@code
* pendingCheckpoints.remove()} caused a {@link NullPointerException}.
*/
@Test
void testCompletePendingCheckpointShouldReturnEarlyWhenNotifyCompletedFails() {
ExecutorService executorService = Executors.newCachedThreadPool();
try {
CheckpointCoordinator coordinator = buildMinimalCoordinator(executorService);
CheckpointCoordinator spy = Mockito.spy(coordinator);

// Mock notifyCompleted to simulate failure: clear pendingCheckpoints (as
// handleCoordinatorError would) and return false.
Mockito.doAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ConcurrentHashMap<Long, PendingCheckpoint> map =
(ConcurrentHashMap<Long, PendingCheckpoint>)
ReflectionUtils.getField(spy, "pendingCheckpoints")
.orElse(null);
if (map != null) {
map.clear();
}
return false;
})
.when(spy)
.notifyCompleted(Mockito.any());

long checkpointId = 1L;
CompletedCheckpoint completedCheckpoint =
new CompletedCheckpoint(
1L,
1,
checkpointId,
System.currentTimeMillis(),
CheckpointType.CHECKPOINT_TYPE,
System.currentTimeMillis(),
new HashMap<>(),
new HashMap<>());

PendingCheckpoint pendingCheckpoint =
new PendingCheckpoint(
1L,
1,
checkpointId,
System.currentTimeMillis(),
CheckpointType.CHECKPOINT_TYPE,
new HashSet<>(),
new HashMap<>(),
new HashMap<>());

@SuppressWarnings("unchecked")
ConcurrentHashMap<Long, PendingCheckpoint> pendingCheckpoints =
(ConcurrentHashMap<Long, PendingCheckpoint>)
ReflectionUtils.getField(spy, "pendingCheckpoints")
.orElseThrow(
() ->
new IllegalStateException(
"pendingCheckpoints field not found"));
pendingCheckpoints.put(checkpointId, pendingCheckpoint);

// Set pendingCounter to 1 so we can verify it is NOT decremented after failure.
AtomicInteger pendingCounter =
(AtomicInteger)
ReflectionUtils.getField(spy, "pendingCounter")
.orElseThrow(
() ->
new IllegalStateException(
"pendingCounter field not found"));
pendingCounter.set(1);

// Must not throw, and must not execute the success path.
Assertions.assertDoesNotThrow(
() -> spy.completePendingCheckpoint(completedCheckpoint),
"completePendingCheckpoint must not throw when notifyCompleted fails");

// pendingCounter must remain 1 – the success path (decrementAndGet) was skipped.
Assertions.assertEquals(
1,
pendingCounter.get(),
"pendingCounter must not be decremented when notifyCompleted fails");
} finally {
executorService.shutdownNow();
}
}

/**
* Regression: when {@code notifyCompleted()} fails inside {@code allTaskReady()}, the method
* must return immediately and must NOT schedule the next checkpoint trigger.
*/
@Test
void testAllTaskReadyShouldNotScheduleCheckpointWhenNotifyCompletedFails() {
ExecutorService executorService = Executors.newCachedThreadPool();
try {
CheckpointCoordinator coordinator = buildMinimalCoordinator(executorService);
CheckpointCoordinator spy = Mockito.spy(coordinator);

// notifyTaskStart() must return a non-null empty array so allOf(...).join() succeeds.
Mockito.doReturn(new com.hazelcast.spi.impl.operationservice.impl.InvocationFuture[0])
.when(spy)
.notifyTaskStart();

// Simulate notifyCompleted failure.
Mockito.doReturn(false).when(spy).notifyCompleted(Mockito.any());

// Set all tasks to READY_START so allTaskReady() passes the guard checks.
Map<Long, SeaTunnelTaskState> taskStatus = spy.getPipelineTaskStatus();
CheckpointPlan plan =
(CheckpointPlan)
ReflectionUtils.getField(spy, "plan")
.orElseThrow(
() ->
new IllegalStateException(
"plan field not found"));
plan.getPipelineSubtasks()
.forEach(t -> taskStatus.put(t.getTaskID(), SeaTunnelTaskState.READY_START));

// Invoke allTaskReady() via the package-private reportedTask path by directly calling
// the protected restoreCoordinator and then manipulating state, or call allTaskReady
// via reflection since it is private.
ReflectionUtils.invoke(spy, "allTaskReady");

// scheduleTriggerPendingCheckpoint must NOT have been called.
Mockito.verify(spy, Mockito.never())
.scheduleTriggerPendingCheckpoint(
Mockito.any(CheckpointType.class), Mockito.anyLong());
} finally {
executorService.shutdownNow();
}
}

/**
* Regression: when {@code notifyCompleted()} fails inside {@code restoreCoordinator(true)}, the
* method must return immediately and must NOT call {@code tryTriggerPendingCheckpoint}.
*/
@Test
void testRestoreCoordinatorShouldNotTriggerCheckpointWhenNotifyCompletedFails() {
ExecutorService executorService = Executors.newCachedThreadPool();
try {
CheckpointCoordinator coordinator = buildMinimalCoordinator(executorService);
CheckpointCoordinator spy = Mockito.spy(coordinator);

// Simulate notifyCompleted failure.
Mockito.doReturn(false).when(spy).notifyCompleted(Mockito.any());

// alreadyStarted=true is the branch that calls notifyCompleted.
spy.restoreCoordinator(true);

// tryTriggerPendingCheckpoint must NOT have been called.
Mockito.verify(spy, Mockito.never())
.tryTriggerPendingCheckpoint(Mockito.any(CheckpointType.class));
} finally {
executorService.shutdownNow();
}
}
}

class TestCheckpointManager extends CheckpointManager {
Expand Down
Loading