Skip to content

Commit 55fb785

Browse files
baiyangtxzhangyongxiang.alphaAimezhoujinsongxxubai
authored
[Improvement]: Extract TableProcessStore from AmoroProcess interface. (#4116)
* refactor(process): extract process framework changes * CI * Update amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java Co-authored-by: Xu Bai <tocreationbai@gmail.com> * Update amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java Co-authored-by: Xu Bai <tocreationbai@gmail.com> * Fix comments --------- Co-authored-by: zhangyongxiang.alpha <zhangyongxiang.alpha@bytedance.com> Co-authored-by: Aime <aime@bytedance.com> Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com> Co-authored-by: Xu Bai <tocreationbai@gmail.com>
1 parent 316f5e8 commit 55fb785

24 files changed

Lines changed: 363 additions & 348 deletions

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ public void transitionToFollower() {
236236
}
237237

238238
public void startOptimizingService() throws Exception {
239+
239240
// Load process factories and build action coordinators from default table runtime factory.
240241
TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager();
241242
tableProcessFactoryManager.initialize();
@@ -246,6 +247,8 @@ public void startOptimizingService() throws Exception {
246247

247248
List<ActionCoordinator> actionCoordinators = defaultRuntimeFactory.supportedCoordinators();
248249
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
250+
processFactories.forEach(
251+
c -> c.availableExecuteEngines(executeEngineManager.installedPlugins()));
249252

250253
tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory);
251254
processService = new ProcessService(tableService, actionCoordinators, executeEngineManager);

amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,16 +108,10 @@ protected void execute(TableRuntime tableRuntime) {
108108
* @param tableRuntime table runtime
109109
* @param processStore process store
110110
*/
111-
protected void recover(TableRuntime tableRuntime, TableProcessStore processStore) {
112-
TableProcess process = coordinator.recoverTableProcess(tableRuntime, processStore);
113-
processService.recover(tableRuntime, process);
111+
protected TableProcess recover(TableRuntime tableRuntime, TableProcessStore processStore) {
112+
return coordinator.recoverTableProcess(tableRuntime, processStore);
114113
}
115114

116-
/**
117-
* Get executor delay from coordinator.
118-
*
119-
* @return delay in milliseconds
120-
*/
121115
@Override
122116
protected long getExecutorDelay() {
123117
return coordinator.getExecutorDelay();

amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class DefaultTableProcessStore extends PersistentBase implements TablePro
6363
* @param action action type
6464
*/
6565
public DefaultTableProcessStore(TableRuntime tableRuntime, TableProcessMeta meta, Action action) {
66+
this.processId = meta.getProcessId();
6667
this.meta = meta;
6768
this.tableRuntime = tableRuntime;
6869
this.action = action;

0 commit comments

Comments
 (0)