Skip to content

Commit f5c8cfe

Browse files
张文领zhangwl9
authored andcommitted
Saving cleanup info in table_process
1 parent cf8ff05 commit f5c8cfe

2 files changed

Lines changed: 214 additions & 50 deletions

File tree

amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java

Lines changed: 150 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,25 @@
2424
import org.apache.amoro.ServerTableIdentifier;
2525
import org.apache.amoro.TableRuntime;
2626
import org.apache.amoro.config.TableConfiguration;
27+
import org.apache.amoro.exception.PersistenceException;
28+
import org.apache.amoro.process.ProcessStatus;
2729
import org.apache.amoro.server.optimizing.OptimizingStatus;
30+
import org.apache.amoro.server.persistence.PersistentBase;
31+
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
32+
import org.apache.amoro.server.process.TableProcessMeta;
2833
import org.apache.amoro.server.table.DefaultTableRuntime;
2934
import org.apache.amoro.server.table.RuntimeHandlerChain;
3035
import org.apache.amoro.server.table.TableService;
3136
import org.apache.amoro.server.table.cleanup.CleanupOperation;
37+
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
38+
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
3239
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
3340
import org.apache.commons.lang3.StringUtils;
3441
import org.slf4j.Logger;
3542
import org.slf4j.LoggerFactory;
3643

3744
import java.util.Collections;
45+
import java.util.HashMap;
3846
import java.util.HashSet;
3947
import java.util.List;
4048
import java.util.Locale;
@@ -48,6 +56,12 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain {
4856
protected final Logger logger = LoggerFactory.getLogger(getClass());
4957

5058
private static final long START_DELAY = 10 * 1000L;
59+
private static final String CLEANUP_EXECUTION_ENGINE = "AMORO";
60+
private static final String CLEANUP_PROCESS_STAGE = "CLEANUP";
61+
private static final String EXTERNAL_PROCESS_IDENTIFIER = "";
62+
private static final SnowflakeIdGenerator ID_GENERATOR = new SnowflakeIdGenerator();
63+
64+
private final PersistenceHelper persistenceHelper = new PersistenceHelper();
5165

5266
protected final Set<ServerTableIdentifier> scheduledTables =
5367
Collections.synchronizedSet(new HashSet<>());
@@ -123,16 +137,31 @@ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
123137
}
124138

125139
private void executeTask(TableRuntime tableRuntime) {
140+
TableProcessMeta cleanupProcessMeta = null;
141+
CleanupOperation cleanupOperation = null;
142+
Exception executionError = null;
143+
long cleanupEndTime = 0L;
144+
126145
try {
127146
if (isExecutable(tableRuntime)) {
147+
cleanupOperation = getCleanupOperation();
148+
// create and persist cleanup process info
149+
cleanupProcessMeta = createCleanupProcessInfo(tableRuntime, cleanupOperation);
150+
128151
execute(tableRuntime);
152+
129153
// Different tables take different amounts of time to execute the end of execute(),
130154
// so you need to perform the update operation separately for each table.
131-
persistUpdatingCleanupTime(tableRuntime);
155+
cleanupEndTime = System.currentTimeMillis();
156+
persistUpdatingCleanupTime(tableRuntime, cleanupEndTime);
132157
}
133158
} catch (Exception e) {
134159
logger.error("exception when schedule for table: {}", tableRuntime.getTableIdentifier(), e);
160+
executionError = e;
135161
} finally {
162+
// persist cleanup result info.
163+
persistCleanupResult(
164+
tableRuntime, cleanupOperation, cleanupProcessMeta, cleanupEndTime, executionError);
136165
scheduledTables.remove(tableRuntime.getTableIdentifier());
137166
scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime));
138167
}
@@ -156,14 +185,13 @@ protected boolean shouldExecute(Long lastCleanupEndTime) {
156185
return true;
157186
}
158187

159-
private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
188+
private void persistUpdatingCleanupTime(TableRuntime tableRuntime, long currentTime) {
160189
CleanupOperation cleanupOperation = getCleanupOperation();
161190
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
162191
return;
163192
}
164193

165194
try {
166-
long currentTime = System.currentTimeMillis();
167195
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);
168196

169197
logger.debug(
@@ -178,6 +206,125 @@ private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
178206
}
179207
}
180208

209+
@VisibleForTesting
210+
public TableProcessMeta createCleanupProcessInfo(
211+
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
212+
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
213+
return null;
214+
}
215+
216+
TableProcessMeta cleanupProcessMeta = buildCleanupProcessMeta(tableRuntime, cleanupOperation);
217+
persistenceHelper.beginAndPersistCleanupProcess(cleanupProcessMeta);
218+
logger.debug(
219+
"Successfully persist cleanup process [processId={}, tableId={}, processType={}]",
220+
cleanupProcessMeta.getProcessId(),
221+
cleanupProcessMeta.getTableId(),
222+
cleanupProcessMeta.getProcessType());
223+
224+
return cleanupProcessMeta;
225+
}
226+
227+
private TableProcessMeta buildCleanupProcessMeta(
228+
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
229+
TableProcessMeta cleanupProcessMeta = new TableProcessMeta();
230+
231+
cleanupProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId());
232+
cleanupProcessMeta.setProcessId(ID_GENERATOR.generateId());
233+
cleanupProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER);
234+
cleanupProcessMeta.setStatus(ProcessStatus.RUNNING);
235+
cleanupProcessMeta.setProcessType(cleanupOperation.name());
236+
cleanupProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE);
237+
cleanupProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE);
238+
cleanupProcessMeta.setRetryNumber(0);
239+
cleanupProcessMeta.setFinishTime(0);
240+
cleanupProcessMeta.setFailMessage("");
241+
cleanupProcessMeta.setCreateTime(System.currentTimeMillis());
242+
cleanupProcessMeta.setProcessParameters(new HashMap<>());
243+
cleanupProcessMeta.setSummary(new HashMap<>());
244+
245+
return cleanupProcessMeta;
246+
}
247+
248+
@VisibleForTesting
249+
public void persistCleanupResult(
250+
TableRuntime tableRuntime,
251+
CleanupOperation cleanupOperation,
252+
TableProcessMeta cleanupProcessMeta,
253+
long cleanupEndTime,
254+
Exception executionError) {
255+
256+
if (cleanupOperation == null
257+
|| cleanupProcessMeta == null
258+
|| shouldSkipOperation(tableRuntime, cleanupOperation)) {
259+
return;
260+
}
261+
262+
cleanupProcessMeta.setFinishTime(cleanupEndTime);
263+
if (executionError != null) {
264+
cleanupProcessMeta.setStatus(ProcessStatus.FAILED);
265+
cleanupProcessMeta.setFailMessage(executionError.getMessage());
266+
} else {
267+
cleanupProcessMeta.setStatus(ProcessStatus.SUCCESS);
268+
}
269+
270+
try {
271+
persistenceHelper.updateAndPersistCleanupProcess(cleanupProcessMeta);
272+
} catch (PersistenceException e) {
273+
logger.error(
274+
"Failed to persist cleanup process result [processId={}, tableId={}, processType={}]",
275+
cleanupProcessMeta.getProcessId(),
276+
cleanupProcessMeta.getTableId(),
277+
cleanupProcessMeta.getProcessType(),
278+
e);
279+
}
280+
281+
logger.debug(
282+
"Successfully updated lastCleanTime and cleanupProcess for table {} with processId={}, cleanup operation {}",
283+
tableRuntime.getTableIdentifier().getTableName(),
284+
cleanupProcessMeta.getProcessId(),
285+
cleanupOperation);
286+
}
287+
288+
private static class PersistenceHelper extends PersistentBase {
289+
290+
public PersistenceHelper() {}
291+
292+
private void beginAndPersistCleanupProcess(TableProcessMeta meta) {
293+
doAs(
294+
TableProcessMapper.class,
295+
mapper ->
296+
mapper.insertProcess(
297+
meta.getTableId(),
298+
meta.getProcessId(),
299+
meta.getExternalProcessIdentifier(),
300+
meta.getStatus(),
301+
meta.getProcessType(),
302+
meta.getProcessStage(),
303+
meta.getExecutionEngine(),
304+
meta.getRetryNumber(),
305+
meta.getCreateTime(),
306+
meta.getProcessParameters(),
307+
meta.getSummary()));
308+
}
309+
310+
private void updateAndPersistCleanupProcess(TableProcessMeta meta) {
311+
doAs(
312+
TableProcessMapper.class,
313+
mapper ->
314+
mapper.updateProcess(
315+
meta.getTableId(),
316+
meta.getProcessId(),
317+
meta.getExternalProcessIdentifier(),
318+
meta.getStatus(),
319+
meta.getProcessStage(),
320+
meta.getRetryNumber(),
321+
meta.getFinishTime(),
322+
meta.getFailMessage(),
323+
meta.getProcessParameters(),
324+
meta.getSummary()));
325+
}
326+
}
327+
181328
/**
182329
* Get cleanup operation. Default is NONE, subclasses should override this method to provide
183330
* specific operation.

amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java

Lines changed: 64 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020

2121
import org.apache.amoro.ServerTableIdentifier;
2222
import org.apache.amoro.TableFormat;
23-
import org.apache.amoro.TableRuntime;
24-
import org.apache.amoro.config.TableConfiguration;
23+
import org.apache.amoro.process.ProcessStatus;
2524
import org.apache.amoro.server.persistence.PersistentBase;
2625
import org.apache.amoro.server.persistence.TableRuntimeMeta;
2726
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
27+
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
2828
import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
29+
import org.apache.amoro.server.process.TableProcessMeta;
2930
import org.apache.amoro.server.table.DefaultTableRuntime;
3031
import org.apache.amoro.server.table.DefaultTableRuntimeStore;
31-
import org.apache.amoro.server.table.TableRuntimeHandler;
3232
import org.apache.amoro.server.table.cleanup.CleanupOperation;
3333
import org.apache.amoro.table.TableRuntimeStore;
3434
import org.apache.amoro.table.TableSummary;
@@ -48,6 +48,12 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase {
4848
private static final String TEST_CATALOG = "test_catalog";
4949
private static final String TEST_DB = "test_db";
5050
private static final String TEST_TABLE = "test_table";
51+
private static final List<CleanupOperation> CLEANUP_OPERATIONS =
52+
Arrays.asList(
53+
CleanupOperation.ORPHAN_FILES_CLEANING,
54+
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
55+
CleanupOperation.DATA_EXPIRING,
56+
CleanupOperation.SNAPSHOTS_EXPIRING);
5157

5258
static {
5359
try {
@@ -57,18 +63,6 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase {
5763
}
5864
}
5965

60-
private static final TableRuntimeHandler TEST_HANDLER =
61-
new TableRuntimeHandler() {
62-
@Override
63-
public void handleTableChanged(
64-
TableRuntime tableRuntime,
65-
org.apache.amoro.server.optimizing.OptimizingStatus originalStatus) {}
66-
67-
@Override
68-
public void handleTableChanged(
69-
TableRuntime tableRuntime, TableConfiguration originalConfig) {}
70-
};
71-
7266
/**
7367
* Create a test server table identifier with the given ID
7468
*
@@ -103,7 +97,7 @@ private DefaultTableRuntime createDefaultTableRuntime(ServerTableIdentifier iden
10397
return new DefaultTableRuntime(store, () -> null);
10498
}
10599

106-
private void cleanUpTableRuntimeData(List<Long> tableIds) {
100+
private void cleanupTableRuntimeData(List<Long> tableIds) {
107101
doAs(
108102
TableRuntimeMapper.class,
109103
mapper -> {
@@ -135,7 +129,7 @@ private void cleanUpTableRuntimeData(List<Long> tableIds) {
135129
* @param testTableIds list of table IDs to clean up
136130
*/
137131
private void prepareTestEnvironment(List<Long> testTableIds) {
138-
cleanUpTableRuntimeData(testTableIds);
132+
cleanupTableRuntimeData(testTableIds);
139133
}
140134

141135
/**
@@ -165,16 +159,8 @@ private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation clean
165159
*/
166160
@Test
167161
public void testShouldExecuteTaskWithNoPreviousCleanup() {
168-
List<CleanupOperation> operations =
169-
Arrays.asList(
170-
CleanupOperation.ORPHAN_FILES_CLEANING,
171-
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
172-
CleanupOperation.DATA_EXPIRING,
173-
CleanupOperation.SNAPSHOTS_EXPIRING);
174-
175-
for (CleanupOperation operation : operations) {
176-
List<Long> testTableIds = Collections.singletonList(1L);
177-
prepareTestEnvironment(testTableIds);
162+
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
163+
prepareTestEnvironment(Collections.singletonList(1L));
178164

179165
PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
180166
ServerTableIdentifier identifier = createTableIdentifier(1L);
@@ -190,16 +176,8 @@ public void testShouldExecuteTaskWithNoPreviousCleanup() {
190176
/** Test should not execute task with recent cleanup */
191177
@Test
192178
public void testShouldNotExecuteTaskWithRecentCleanup() {
193-
List<CleanupOperation> operations =
194-
Arrays.asList(
195-
CleanupOperation.ORPHAN_FILES_CLEANING,
196-
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
197-
CleanupOperation.DATA_EXPIRING,
198-
CleanupOperation.SNAPSHOTS_EXPIRING);
199-
200-
for (CleanupOperation operation : operations) {
201-
List<Long> testTableIds = Collections.singletonList(1L);
202-
cleanUpTableRuntimeData(testTableIds);
179+
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
180+
cleanupTableRuntimeData(Collections.singletonList(1L));
203181

204182
PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
205183

@@ -220,16 +198,8 @@ public void testShouldNotExecuteTaskWithRecentCleanup() {
220198
/** Test should execute task with old cleanup */
221199
@Test
222200
public void testShouldExecuteTaskWithOldCleanup() {
223-
List<CleanupOperation> operations =
224-
Arrays.asList(
225-
CleanupOperation.ORPHAN_FILES_CLEANING,
226-
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
227-
CleanupOperation.DATA_EXPIRING,
228-
CleanupOperation.SNAPSHOTS_EXPIRING);
229-
230-
for (CleanupOperation operation : operations) {
231-
List<Long> testTableIds = Collections.singletonList(1L);
232-
cleanUpTableRuntimeData(testTableIds);
201+
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
202+
cleanupTableRuntimeData(Collections.singletonList(1L));
233203

234204
PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
235205

@@ -262,4 +232,51 @@ public void testShouldExecuteTaskWithNoneOperation() {
262232
boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.NONE);
263233
Assert.assertTrue("Should always execute with NONE operation", shouldExecute);
264234
}
235+
236+
/** Test cleanup process info is persisted with SUCCESS status for each cleanup operation */
237+
@Test
238+
public void testCleanupProcessPersistedOnSuccess() {
239+
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
240+
assertCleanupProcessPersisted(operation, null, ProcessStatus.SUCCESS, null);
241+
}
242+
}
243+
244+
/** Test cleanup process info is persisted with FAILED status when execution fails */
245+
@Test
246+
public void testCleanupProcessPersistedOnFailure() {
247+
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
248+
Exception error = new RuntimeException("Simulated cleanup failure for " + operation);
249+
assertCleanupProcessPersisted(operation, error, ProcessStatus.FAILED, error.getMessage());
250+
}
251+
}
252+
253+
/** Assert cleanup process is persisted with expected status and failMessage */
254+
private void assertCleanupProcessPersisted(
255+
CleanupOperation operation,
256+
Exception executionError,
257+
ProcessStatus expectedStatus,
258+
String expectedFailMessage) {
259+
prepareTestEnvironment(Collections.singletonList(1L));
260+
261+
PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
262+
DefaultTableRuntime tableRuntime = createDefaultTableRuntime(createTableIdentifier(1L));
263+
264+
// 1、Create cleanup process info and persist result
265+
TableProcessMeta meta = executor.createCleanupProcessInfo(tableRuntime, operation);
266+
267+
// 2、Update cleanup result with execution outcome
268+
long cleanupEndTime = System.currentTimeMillis();
269+
executor.persistCleanupResult(tableRuntime, operation, meta, cleanupEndTime, executionError);
270+
271+
// 3、Verify persisted process info in database
272+
TableProcessMeta persisted =
273+
getAs(TableProcessMapper.class, mapper -> mapper.getProcessMeta(meta.getProcessId()));
274+
275+
Assert.assertEquals(expectedStatus, persisted.getStatus());
276+
Assert.assertEquals(operation.name(), persisted.getProcessType());
277+
Assert.assertEquals(cleanupEndTime, persisted.getFinishTime());
278+
if (expectedFailMessage != null) {
279+
Assert.assertEquals(expectedFailMessage, persisted.getFailMessage());
280+
}
281+
}
265282
}

0 commit comments

Comments
 (0)