Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Set;

public class TaskRuntime<T extends StagedTaskDescriptor<?, ?, ?>> extends StatedPersistentBase {

private static final Logger LOG = LoggerFactory.getLogger(TaskRuntime.class);

private final SimpleFuture future = new SimpleFuture();
private final TaskStatusMachine statusMachine = new TaskStatusMachine();
private OptimizingTaskId taskId;
Expand Down Expand Up @@ -82,9 +86,22 @@ public SimpleFuture getCompletedFuture() {
}

void complete(OptimizerThread thread, OptimizingTaskResult result) {
// A completion for an already reset/rescheduled task is stale: the execution it reports belongs
// to a round that the OptimizerKeeper has already torn down. Absorb it gracefully instead of
// failing the (now illegal) state transition; the task will be re-executed in its current
// round.
if (isStaleResponse(thread) || status != Status.ACKED) {
LOG.warn(
"Ignoring stale completion for task {} from optimizer thread {}, current status {} and "
+ "owner {}. The task was likely reset by the OptimizerKeeper and will be re-executed.",
taskId,
thread,
status,
getResourceDesc());
return;
}
invokeConsistency(
() -> {
validThread(thread);
if (result.getErrorMessage() != null) {
statusMachine.accept(Status.FAILED);
failReason = result.getErrorMessage();
Expand Down Expand Up @@ -133,9 +150,21 @@ void schedule(OptimizerThread thread) {
}

void ack(OptimizerThread thread) {
// If the task has been reset/rescheduled, reject the stale ack so the optimizer skips executing
// this obsolete round (OptimizerExecutor treats the failure as "skip"). Thrown outside the
// transaction to avoid a misleading "failed to commit transaction" error for this expected
// case.
if (isStaleResponse(thread) || status != Status.SCHEDULED) {
LOG.warn(
"Rejecting stale ack for task {} from optimizer thread {}: current status {}, owner {}.",
taskId,
thread,
status,
getResourceDesc());
throw new TaskRuntimeException("Task has been reset or not yet scheduled, taskId:%s", taskId);
}
invokeConsistency(
() -> {
validThread(thread);
statusMachine.accept(Status.ACKED);
persistTaskRuntime();
});
Expand Down Expand Up @@ -247,15 +276,14 @@ public String toString() {
.toString();
}

private void validThread(OptimizerThread thread) {
if (token == null) {
throw new TaskRuntimeException("Task has been reset or not yet scheduled, taskId:%s", taskId);
}
if (!thread.getToken().equals(getToken()) || thread.getThreadId() != threadId) {
throw new TaskRuntimeException(
"The optimizer thread does not match, the thread in the task is OptimizerThread(token=%s, threadId=%s), and the thread in the request is OptimizerThread(token=%s, threadId=%s).",
getToken(), threadId, thread.getToken(), thread.getThreadId());
}
/**
* Detects a response from an optimizer round the task no longer belongs to: it was reset (token
* cleared) or has since been rescheduled to a different owner. Reads token/threadId outside
* {@link #invokeConsistency}, so callers must hold the owning TableOptimizingProcess lock, under
* which OptimizingQueue serializes ack/complete/reset/poll.
*/
private boolean isStaleResponse(OptimizerThread thread) {
return token == null || !thread.getToken().equals(token) || thread.getThreadId() != threadId;
}

private void persistTaskRuntime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.PluginRetryAuthException;
import org.apache.amoro.exception.TaskRuntimeException;
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.TableOptimizing;
Expand Down Expand Up @@ -302,9 +302,12 @@ public void testRebootAndPoll() throws InterruptedException {
public void testAckAndCompleteTask() {
OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
Assertions.assertNotNull(task);
Assertions.assertThrows(
IllegalTaskStateException.class,
() -> optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId())));
// Completing before ack is now treated as a stale response and absorbed silently (see
// TaskRuntime#complete): the result cannot be told apart from a stale completion for a task
// that
// was reset and re-scheduled to the same thread, so the task simply stays SCHEDULED.
optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId()));
assertTaskStatus(TaskRuntime.Status.SCHEDULED);

optimizingService().ackTask(token, THREAD_ID, task.getTaskId());

Expand All @@ -314,6 +317,31 @@ public void testAckAndCompleteTask() {
assertTaskCompleted(taskRuntime);
}

// Reproduces the EXACT path of issue #4235 end-to-end with the real OptimizerKeeper: a live
// optimizer (the Toucher keeps heartbeating) polls a task but its ack is delayed past
// OPTIMIZER_TASK_ACK_TIMEOUT (30s). The keeper, via the SCHEDULED + ackTimeout branch of
// buildSuspendingPredication, resets the still-owned task to PLANNED. The late ack then arrives
// and is rejected -- this is the "Task has been reset or not yet scheduled" from the issue log,
// produced without any artificial retryTask() call.
@Test
public void testAckTimeoutResetThenLateAckRejected() throws InterruptedException {
OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
Assertions.assertNotNull(task);
assertTaskStatus(TaskRuntime.Status.SCHEDULED); // polled but NOT acked

// ack-timeout is 30s; the optimizer stays alive (Toucher touches every 300ms), so this hits the
// SCHEDULED + ackTimeout branch rather than the optimizer-expired branch.
Thread.sleep(35000);

assertTaskStatus(
TaskRuntime.Status.PLANNED); // keeper reset it out from under the live optimizer

// the delayed ack arrives for the now-reset task -> rejected, exactly like the issue
Assertions.assertThrows(
TaskRuntimeException.class,
() -> optimizingService().ackTask(token, THREAD_ID, task.getTaskId()));
}

@Test
public void testExecuteTaskTimeOutAndRetry() throws InterruptedException {
OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.amoro.api.OptimizingTaskResult;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.exception.TaskRuntimeException;
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.metrics.Gauge;
import org.apache.amoro.metrics.MetricKey;
Expand Down Expand Up @@ -364,6 +365,74 @@ public void testRetryTask() {
queue.dispose();
}

// Issue #4235 fix: when the same optimizer thread polls again while one of its tasks is still
// ACKED, pollTask -> resetStaleTasksForThread resets that task. With more than one task in the
// process, the repoll schedules a *different* task, leaving the original PLANNED with token ==
// null. The optimizer's in-flight ack for the reset task is rejected on purpose so the optimizer
// skips executing this obsolete round; the task stays PLANNED to be re-polled by another thread.
@Test
public void testStaleAckAfterRepollIsRejected() {
DefaultTableRuntime tableRuntime = initTableWithPartitionedFiles();
OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);

// 1. poll + ack task A -> ACKED (optimizer started executing it)
TaskRuntime<?> taskA = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(taskA);
queue.ackTask(taskA.getTaskId(), optimizerThread);
Assert.assertEquals(TaskRuntime.Status.ACKED, taskA.getStatus());

// 2. the same thread polls again; resetStaleTasksForThread resets the still-executing task A
// and
// the repoll schedules a different task B. Task A is left PLANNED with no token.
TaskRuntime<?> taskB = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(taskB);
Assert.assertNotEquals(taskA.getTaskId(), taskB.getTaskId());
Assert.assertEquals(TaskRuntime.Status.PLANNED, taskA.getStatus());
Assert.assertNull(taskA.getToken());

// 3. task A's in-flight ack is rejected so the optimizer skips this stale round (no duplicate
// execution). The exception still carries the message the optimizer client recognizes.
TaskRuntimeException e =
Assert.assertThrows(
TaskRuntimeException.class, () -> queue.ackTask(taskA.getTaskId(), optimizerThread));
Assert.assertTrue(e.getMessage().contains("has been reset or not yet scheduled"));

// task A remains PLANNED, still waiting to be re-polled and re-executed
Assert.assertEquals(TaskRuntime.Status.PLANNED, taskA.getStatus());
queue.dispose();
}

// Issue #4235 fix, single-task variant: the repoll re-schedules the *same* task (its token is
// restored), so the in-flight SUCCESS result for the previous run must be recognized as stale and
// ignored -- not blow up the SCHEDULED -> SUCCESS transition. The freshly re-scheduled round is
// left intact to be acked and completed normally.
@Test
public void testStaleCompleteAfterRepollIsIgnored() {
DefaultTableRuntime tableRuntime = initTableWithFiles();
OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);

TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task);
queue.ackTask(task.getTaskId(), optimizerThread);
Assert.assertEquals(TaskRuntime.Status.ACKED, task.getStatus());

// same thread polls again -> resetStaleTasksForThread resets the executing task, then the
// single
// task is re-scheduled back to the same thread (now SCHEDULED again, awaiting its own ack).
TaskRuntime<?> repolled = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertSame(task, repolled);
Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus());

// the previous run's SUCCESS result arrives; it is stale and must be ignored (no exception)
queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(task.getTaskId(), optimizerThread.getThreadId()));

// the current re-scheduled round is untouched: still SCHEDULED, awaiting its own ack
Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus());
queue.dispose();
}

@Test
public void testCommitTask() {
DefaultTableRuntime tableRuntime = initTableWithFiles();
Expand Down
Loading