diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java index d0c3f9f7c1..68dc34bf83 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java @@ -33,12 +33,16 @@ import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Set; public class TaskRuntime> extends StatedPersistentBase { + private static final Logger LOG = LoggerFactory.getLogger(TaskRuntime.class); + private final SimpleFuture future = new SimpleFuture(); private final TaskStatusMachine statusMachine = new TaskStatusMachine(); private OptimizingTaskId taskId; @@ -82,9 +86,22 @@ public SimpleFuture getCompletedFuture() { } void complete(OptimizerThread thread, OptimizingTaskResult result) { + // A completion for an already reset/rescheduled task is stale: the execution it reports belongs + // to a round that the OptimizerKeeper has already torn down. Absorb it gracefully instead of + // failing the (now illegal) state transition; the task will be re-executed in its current + // round. + if (isStaleResponse(thread) || status != Status.ACKED) { + LOG.warn( + "Ignoring stale completion for task {} from optimizer thread {}, current status {} and " + + "owner {}. The task was likely reset by the OptimizerKeeper and will be re-executed.", + taskId, + thread, + status, + getResourceDesc()); + return; + } invokeConsistency( () -> { - validThread(thread); if (result.getErrorMessage() != null) { statusMachine.accept(Status.FAILED); failReason = result.getErrorMessage(); @@ -133,9 +150,21 @@ void schedule(OptimizerThread thread) { } void ack(OptimizerThread thread) { + // If the task has been reset/rescheduled, reject the stale ack so the optimizer skips executing + // this obsolete round (OptimizerExecutor treats the failure as "skip"). Thrown outside the + // transaction to avoid a misleading "failed to commit transaction" error for this expected + // case. + if (isStaleResponse(thread) || status != Status.SCHEDULED) { + LOG.warn( + "Rejecting stale ack for task {} from optimizer thread {}: current status {}, owner {}.", + taskId, + thread, + status, + getResourceDesc()); + throw new TaskRuntimeException("Task has been reset or not yet scheduled, taskId:%s", taskId); + } invokeConsistency( () -> { - validThread(thread); statusMachine.accept(Status.ACKED); persistTaskRuntime(); }); @@ -247,15 +276,14 @@ public String toString() { .toString(); } - private void validThread(OptimizerThread thread) { - if (token == null) { - throw new TaskRuntimeException("Task has been reset or not yet scheduled, taskId:%s", taskId); - } - if (!thread.getToken().equals(getToken()) || thread.getThreadId() != threadId) { - throw new TaskRuntimeException( - "The optimizer thread does not match, the thread in the task is OptimizerThread(token=%s, threadId=%s), and the thread in the request is OptimizerThread(token=%s, threadId=%s).", - getToken(), threadId, thread.getToken(), thread.getThreadId()); - } + /** + * Detects a response from an optimizer round the task no longer belongs to: it was reset (token + * cleared) or has since been rescheduled to a different owner. Reads token/threadId outside + * {@link #invokeConsistency}, so callers must hold the owning TableOptimizingProcess lock, under + * which OptimizingQueue serializes ack/complete/reset/poll. + */ + private boolean isStaleResponse(OptimizerThread thread) { + return token == null || !thread.getToken().equals(token) || thread.getThreadId() != threadId; } private void persistTaskRuntime() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 417150b390..591010683b 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -35,8 +35,8 @@ import org.apache.amoro.catalog.CatalogTestHelper; import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.config.TableConfiguration; -import org.apache.amoro.exception.IllegalTaskStateException; import org.apache.amoro.exception.PluginRetryAuthException; +import org.apache.amoro.exception.TaskRuntimeException; import org.apache.amoro.io.MixedDataTestHelpers; import org.apache.amoro.optimizing.RewriteFilesOutput; import org.apache.amoro.optimizing.TableOptimizing; @@ -302,9 +302,12 @@ public void testRebootAndPoll() throws InterruptedException { public void testAckAndCompleteTask() { OptimizingTask task = optimizingService().pollTask(token, THREAD_ID); Assertions.assertNotNull(task); - Assertions.assertThrows( - IllegalTaskStateException.class, - () -> optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId()))); + // Completing before ack is now treated as a stale response and absorbed silently (see + // TaskRuntime#complete): the result cannot be told apart from a stale completion for a task + // that + // was reset and re-scheduled to the same thread, so the task simply stays SCHEDULED. + optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId())); + assertTaskStatus(TaskRuntime.Status.SCHEDULED); optimizingService().ackTask(token, THREAD_ID, task.getTaskId()); @@ -314,6 +317,31 @@ public void testAckAndCompleteTask() { assertTaskCompleted(taskRuntime); } + // Reproduces the EXACT path of issue #4235 end-to-end with the real OptimizerKeeper: a live + // optimizer (the Toucher keeps heartbeating) polls a task but its ack is delayed past + // OPTIMIZER_TASK_ACK_TIMEOUT (30s). The keeper, via the SCHEDULED + ackTimeout branch of + // buildSuspendingPredication, resets the still-owned task to PLANNED. The late ack then arrives + // and is rejected -- this is the "Task has been reset or not yet scheduled" from the issue log, + // produced without any artificial retryTask() call. + @Test + public void testAckTimeoutResetThenLateAckRejected() throws InterruptedException { + OptimizingTask task = optimizingService().pollTask(token, THREAD_ID); + Assertions.assertNotNull(task); + assertTaskStatus(TaskRuntime.Status.SCHEDULED); // polled but NOT acked + + // ack-timeout is 30s; the optimizer stays alive (Toucher touches every 300ms), so this hits the + // SCHEDULED + ackTimeout branch rather than the optimizer-expired branch. + Thread.sleep(35000); + + assertTaskStatus( + TaskRuntime.Status.PLANNED); // keeper reset it out from under the live optimizer + + // the delayed ack arrives for the now-reset task -> rejected, exactly like the issue + Assertions.assertThrows( + TaskRuntimeException.class, + () -> optimizingService().ackTask(token, THREAD_ID, task.getTaskId())); + } + @Test public void testExecuteTaskTimeOutAndRetry() throws InterruptedException { OptimizingTask task = optimizingService().pollTask(token, THREAD_ID); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index 014189f64e..95db05f4f0 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -39,6 +39,7 @@ import org.apache.amoro.api.OptimizingTaskResult; import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.exception.TaskRuntimeException; import org.apache.amoro.io.MixedDataTestHelpers; import org.apache.amoro.metrics.Gauge; import org.apache.amoro.metrics.MetricKey; @@ -364,6 +365,74 @@ public void testRetryTask() { queue.dispose(); } + // Issue #4235 fix: when the same optimizer thread polls again while one of its tasks is still + // ACKED, pollTask -> resetStaleTasksForThread resets that task. With more than one task in the + // process, the repoll schedules a *different* task, leaving the original PLANNED with token == + // null. The optimizer's in-flight ack for the reset task is rejected on purpose so the optimizer + // skips executing this obsolete round; the task stays PLANNED to be re-polled by another thread. + @Test + public void testStaleAckAfterRepollIsRejected() { + DefaultTableRuntime tableRuntime = initTableWithPartitionedFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); + + // 1. poll + ack task A -> ACKED (optimizer started executing it) + TaskRuntime taskA = queue.pollTask(optimizerThread, MAX_POLLING_TIME); + Assert.assertNotNull(taskA); + queue.ackTask(taskA.getTaskId(), optimizerThread); + Assert.assertEquals(TaskRuntime.Status.ACKED, taskA.getStatus()); + + // 2. the same thread polls again; resetStaleTasksForThread resets the still-executing task A + // and + // the repoll schedules a different task B. Task A is left PLANNED with no token. + TaskRuntime taskB = queue.pollTask(optimizerThread, MAX_POLLING_TIME); + Assert.assertNotNull(taskB); + Assert.assertNotEquals(taskA.getTaskId(), taskB.getTaskId()); + Assert.assertEquals(TaskRuntime.Status.PLANNED, taskA.getStatus()); + Assert.assertNull(taskA.getToken()); + + // 3. task A's in-flight ack is rejected so the optimizer skips this stale round (no duplicate + // execution). The exception still carries the message the optimizer client recognizes. + TaskRuntimeException e = + Assert.assertThrows( + TaskRuntimeException.class, () -> queue.ackTask(taskA.getTaskId(), optimizerThread)); + Assert.assertTrue(e.getMessage().contains("has been reset or not yet scheduled")); + + // task A remains PLANNED, still waiting to be re-polled and re-executed + Assert.assertEquals(TaskRuntime.Status.PLANNED, taskA.getStatus()); + queue.dispose(); + } + + // Issue #4235 fix, single-task variant: the repoll re-schedules the *same* task (its token is + // restored), so the in-flight SUCCESS result for the previous run must be recognized as stale and + // ignored -- not blow up the SCHEDULED -> SUCCESS transition. The freshly re-scheduled round is + // left intact to be acked and completed normally. + @Test + public void testStaleCompleteAfterRepollIsIgnored() { + DefaultTableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); + + TaskRuntime task = queue.pollTask(optimizerThread, MAX_POLLING_TIME); + Assert.assertNotNull(task); + queue.ackTask(task.getTaskId(), optimizerThread); + Assert.assertEquals(TaskRuntime.Status.ACKED, task.getStatus()); + + // same thread polls again -> resetStaleTasksForThread resets the executing task, then the + // single + // task is re-scheduled back to the same thread (now SCHEDULED again, awaiting its own ack). + TaskRuntime repolled = queue.pollTask(optimizerThread, MAX_POLLING_TIME); + Assert.assertSame(task, repolled); + Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus()); + + // the previous run's SUCCESS result arrives; it is stale and must be ignored (no exception) + queue.completeTask( + optimizerThread, + buildOptimizingTaskResult(task.getTaskId(), optimizerThread.getThreadId())); + + // the current re-scheduled round is untouched: still SCHEDULED, awaiting its own ack + Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus()); + queue.dispose(); + } + @Test public void testCommitTask() { DefaultTableRuntime tableRuntime = initTableWithFiles();