Skip to content

Commit 132ddcd

Browse files
authored
Merge branch 'master' into iceberg-1.7.0-upgrade
2 parents 49776ce + 878d5b0 commit 132ddcd

56 files changed

Lines changed: 1663 additions & 486 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ Amoro support multiple processing engines for Mixed format as below:
8282

8383
| Processing Engine | Version | Batch Read | Batch Write | Batch Overwrite | Streaming Read | Streaming Write | Create Table | Alter Table |
8484
|-------------------|------------------------|-------------|-------------|-----------------|----------------|-----------------|--------------|-------------|
85-
| Flink | 1.17.x, 1.18.x, 1.19.x | ✔ | ✔ | ✖ | ✔ | ✔ | ✔ | ✖ |
85+
| Flink | 1.18.x, 1.19.x | ✔ | ✔ | ✖ | ✔ | ✔ | ✔ | ✖ |
8686
| Spark | 3.3, 3.4, 3.5 | ✔ | ✔ | ✔ | ✖ | ✖ | ✔ | ✔ |
8787
| Hive | 2.x, 3.x | ✔ | ✖ | ✔ | ✖ | ✖ | ✖ | ✔ |
8888
| Trino | 406 | ✔ | ✖ | ✔ | ✖ | ✖ | ✖ | ✔ |

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,13 @@ public void startOptimizingService() throws Exception {
277277
serviceConfig, catalogManager, defaultRuntimeFactory, haContainer, bucketAssignStore);
278278
processService = new ProcessService(tableService, actionCoordinators, executeEngineManager);
279279
optimizingService =
280-
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
280+
new DefaultOptimizingService(
281+
serviceConfig,
282+
catalogManager,
283+
optimizerManager,
284+
tableService,
285+
bucketAssignStore,
286+
haContainer);
281287

282288
LOG.info("Setting up AMS table executors...");
283289
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);

amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,15 @@ void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
6565
*/
6666
Map<AmsServerInfo, List<String>> getAllAssignments() throws BucketAssignStoreException;
6767

68+
/**
69+
* Get all alive AMS nodes that have bucket assignments. Used by optimizers in master-slave mode
70+
* to discover all AMS optimizing endpoints.
71+
*
72+
* @return List of AmsServerInfo for all nodes with bucket assignments, empty list if none
73+
* @throws BucketAssignStoreException If retrieval operation fails
74+
*/
75+
List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException;
76+
6877
/**
6978
* Get the last update time for a node's assignments.
7079
*

amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ public static BucketAssignStore create(
6363
"Cannot create ZkBucketAssignStore: ZK client not available or invalid container type");
6464

6565
case AmoroManagementConf.HA_TYPE_DATABASE:
66+
long nodeHeartbeatTtlMs = conf.get(AmoroManagementConf.HA_LEASE_TTL).toMillis();
6667
LOG.info("Creating DBBucketAssignStore for cluster: {}", clusterName);
67-
return new DBBucketAssignStore(clusterName);
68+
return new DBBucketAssignStore(clusterName, nodeHeartbeatTtlMs);
6869

6970
default:
7071
throw new IllegalArgumentException(

amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ public class DBBucketAssignStore extends PersistentBase implements BucketAssignS
4545
new TypeReference<List<String>>() {};
4646

4747
private final String clusterName;
48+
private final long nodeHeartbeatTtlMs;
4849

49-
public DBBucketAssignStore(String clusterName) {
50+
public DBBucketAssignStore(String clusterName, long nodeHeartbeatTtlMs) {
5051
this.clusterName = clusterName;
52+
this.nodeHeartbeatTtlMs = nodeHeartbeatTtlMs;
5153
}
5254

5355
@Override
@@ -180,6 +182,32 @@ public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws BucketAssignStor
180182
}
181183
}
182184

185+
@Override
186+
public List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException {
187+
try {
188+
long cutoff = System.currentTimeMillis() - nodeHeartbeatTtlMs;
189+
List<BucketAssignmentMeta> rows =
190+
getAs(BucketAssignMapper.class, mapper -> mapper.selectAllByCluster(clusterName));
191+
List<AmsServerInfo> nodes = new ArrayList<>();
192+
for (BucketAssignmentMeta meta : rows) {
193+
Long heartbeatTs = meta.getNodeHeartbeatTs();
194+
if (heartbeatTs == null || heartbeatTs < cutoff) {
195+
LOG.debug(
196+
"Skipping stale node key={}, node_heartbeat_ts={}", meta.getNodeKey(), heartbeatTs);
197+
continue;
198+
}
199+
AmsServerInfo nodeInfo = parseNodeInfo(meta);
200+
if (nodeInfo.getThriftBindPort() != null && nodeInfo.getThriftBindPort() > 0) {
201+
nodes.add(nodeInfo);
202+
}
203+
}
204+
return nodes;
205+
} catch (Exception e) {
206+
LOG.error("Failed to get alive nodes", e);
207+
throw new BucketAssignStoreException("Failed to get alive nodes", e);
208+
}
209+
}
210+
183211
private static String getNodeKey(AmsServerInfo nodeInfo) {
184212
return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
185213
}

amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java

Lines changed: 152 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.amoro.api.OptimizingTask;
2727
import org.apache.amoro.api.OptimizingTaskId;
2828
import org.apache.amoro.api.OptimizingTaskResult;
29+
import org.apache.amoro.client.AmsServerInfo;
2930
import org.apache.amoro.config.Configurations;
3031
import org.apache.amoro.config.TableConfiguration;
3132
import org.apache.amoro.exception.ForbiddenException;
@@ -38,6 +39,7 @@
3839
import org.apache.amoro.resource.ResourceType;
3940
import org.apache.amoro.server.catalog.CatalogManager;
4041
import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
42+
import org.apache.amoro.server.ha.HighAvailabilityContainer;
4143
import org.apache.amoro.server.manager.AbstractOptimizerContainer;
4244
import org.apache.amoro.server.optimizing.OptimizingProcess;
4345
import org.apache.amoro.server.optimizing.OptimizingQueue;
@@ -66,6 +68,9 @@
6668

6769
import java.time.Duration;
6870
import java.util.ArrayList;
71+
import java.util.Collections;
72+
import java.util.HashMap;
73+
import java.util.HashSet;
6974
import java.util.List;
7075
import java.util.Map;
7176
import java.util.Objects;
@@ -116,12 +121,17 @@ public class DefaultOptimizingService extends StatedPersistentBase
116121
private final TableService tableService;
117122
private final RuntimeHandlerChain tableHandlerChain;
118123
private final ExecutorService planExecutor;
124+
private final BucketAssignStore bucketAssignStore;
125+
private final HighAvailabilityContainer haContainer;
126+
private final boolean isMasterSlaveMode;
119127

120128
public DefaultOptimizingService(
121129
Configurations serviceConfig,
122130
CatalogManager catalogManager,
123131
OptimizerManager optimizerManager,
124-
TableService tableService) {
132+
TableService tableService,
133+
BucketAssignStore bucketAssignStore,
134+
HighAvailabilityContainer haContainer) {
125135
this.optimizerTouchTimeout =
126136
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
127137
this.taskAckTimeout =
@@ -144,6 +154,10 @@ public DefaultOptimizingService(
144154
this.tableService = tableService;
145155
this.catalogManager = catalogManager;
146156
this.optimizerManager = optimizerManager;
157+
this.bucketAssignStore = bucketAssignStore;
158+
this.haContainer = haContainer;
159+
this.isMasterSlaveMode =
160+
haContainer != null && serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
147161
this.tableHandlerChain = new TableRuntimeHandlerImpl();
148162
this.planExecutor =
149163
Executors.newCachedThreadPool(
@@ -322,6 +336,28 @@ public boolean cancelProcess(long processId) {
322336
return true;
323337
}
324338

339+
@Override
340+
public List<String> getOptimizingNodeUrls() {
341+
if (bucketAssignStore == null) {
342+
return Collections.emptyList();
343+
}
344+
try {
345+
List<AmsServerInfo> nodes = bucketAssignStore.getAliveNodes();
346+
List<String> urls = new ArrayList<>(nodes.size());
347+
for (AmsServerInfo node : nodes) {
348+
if (node.getHost() != null
349+
&& node.getThriftBindPort() != null
350+
&& node.getThriftBindPort() > 0) {
351+
urls.add(String.format("thrift://%s:%d", node.getHost(), node.getThriftBindPort()));
352+
}
353+
}
354+
return urls;
355+
} catch (Exception e) {
356+
LOG.warn("Failed to get optimizing node URLs from bucket assign store", e);
357+
return Collections.emptyList();
358+
}
359+
}
360+
325361
/**
326362
* Get optimizing queue.
327363
*
@@ -530,10 +566,27 @@ public void dispose() {
530566

531567
@Override
532568
public void run() {
569+
// Use 1/4 of optimizerTouchTimeout as sync interval (default ~30 seconds), used for
570+
// master-slave follower sync.
571+
long syncInterval = Math.max(5000, optimizerTouchTimeout / 4);
572+
// In non-master-slave mode, this node is always the leader.
573+
boolean wasLeader = !isMasterSlaveMode;
533574
while (!stopped) {
534575
try {
535-
T keepingTask = suspendingQueue.take();
536-
this.processTask(keepingTask);
576+
boolean isLeader = !isMasterSlaveMode || haContainer.hasLeadership();
577+
if (!wasLeader && isLeader) {
578+
// Follower → Leader transition: subclass takes over monitoring of inherited optimizers.
579+
onBecomeLeader();
580+
}
581+
wasLeader = isLeader;
582+
583+
if (isLeader) {
584+
T keepingTask = suspendingQueue.take();
585+
this.processTask(keepingTask);
586+
} else {
587+
// Not leader: let subclass handle follower state (e.g. sync optimizer list from DB)
588+
onFollowerTick(syncInterval);
589+
}
537590
} catch (InterruptedException ignored) {
538591
} catch (Throwable t) {
539592
LOG.error("{} has encountered a problem.", this.getClass().getSimpleName(), t);
@@ -542,6 +595,12 @@ public void run() {
542595
}
543596

544597
protected abstract void processTask(T task) throws Exception;
598+
599+
protected void onFollowerTick(long syncInterval) throws InterruptedException {
600+
Thread.sleep(syncInterval);
601+
}
602+
603+
protected void onBecomeLeader() {}
545604
}
546605

547606
private class OptimizerKeeper extends AbstractKeeper<OptimizerKeepingTask> {
@@ -575,6 +634,96 @@ protected void processTask(OptimizerKeepingTask keepingTask) {
575634
}
576635
}
577636

637+
@Override
638+
protected void onFollowerTick(long syncInterval) throws InterruptedException {
639+
loadOptimizersFromDatabase();
640+
Thread.sleep(syncInterval);
641+
}
642+
643+
@Override
644+
protected void onBecomeLeader() {
645+
LOG.info(
646+
"Became leader, starting heartbeat monitoring for {} inherited optimizers",
647+
authOptimizers.size());
648+
// All optimizers in authOptimizers were loaded from DB by the follower sync loop.
649+
// Their touchTime reflects the latest DB-persisted heartbeat, which is the correct
650+
// baseline for the new leader's expiry detection.
651+
authOptimizers.values().forEach(this::keepInTouch);
652+
}
653+
654+
/**
655+
* Load optimizer information from database. This is used in master-slave mode for follower
656+
* nodes to sync optimizer state from database. This method performs incremental updates by
657+
* comparing database state with local authOptimizers, only adding new optimizers and removing
658+
* missing ones.
659+
*/
660+
private void loadOptimizersFromDatabase() {
661+
try {
662+
List<OptimizerInstance> dbOptimizers =
663+
getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
664+
665+
Map<String, OptimizerInstance> dbOptimizersByToken = new HashMap<>();
666+
for (OptimizerInstance optimizer : dbOptimizers) {
667+
String token = optimizer.getToken();
668+
if (token != null) {
669+
dbOptimizersByToken.put(token, optimizer);
670+
}
671+
}
672+
673+
Set<String> localTokens = new HashSet<>(authOptimizers.keySet());
674+
Set<String> dbTokens = new HashSet<>(dbOptimizersByToken.keySet());
675+
Set<String> tokensToAdd = new HashSet<>(dbTokens);
676+
tokensToAdd.removeAll(localTokens);
677+
678+
Set<String> tokensToRemove = new HashSet<>(localTokens);
679+
tokensToRemove.removeAll(dbTokens);
680+
681+
for (String token : tokensToAdd) {
682+
OptimizerInstance optimizer = dbOptimizersByToken.get(token);
683+
if (optimizer != null) {
684+
registerOptimizerWithoutPersist(optimizer);
685+
LOG.debug("Added optimizer {} from database", token);
686+
}
687+
}
688+
689+
for (String token : tokensToRemove) {
690+
removeOptimizerFromLocal(token);
691+
LOG.debug("Removed optimizer {} (not in database)", token);
692+
}
693+
694+
LOG.debug(
695+
"Synced optimizers from database: total={}, added={}, removed={}, current={}",
696+
dbOptimizersByToken.size(),
697+
tokensToAdd.size(),
698+
tokensToRemove.size(),
699+
authOptimizers.size());
700+
} catch (Exception e) {
701+
LOG.error("Failed to load optimizers from database", e);
702+
}
703+
}
704+
705+
private void registerOptimizerWithoutPersist(OptimizerInstance optimizer) {
706+
OptimizingQueue optimizingQueue = optimizingQueueByGroup.get(optimizer.getGroupName());
707+
if (optimizingQueue == null) {
708+
LOG.warn(
709+
"Cannot register optimizer {}: optimizing queue for group {} not found",
710+
optimizer.getToken(),
711+
optimizer.getGroupName());
712+
return;
713+
}
714+
optimizingQueue.addOptimizer(optimizer);
715+
authOptimizers.put(optimizer.getToken(), optimizer);
716+
optimizingQueueByToken.put(optimizer.getToken(), optimizingQueue);
717+
}
718+
719+
private void removeOptimizerFromLocal(String token) {
720+
OptimizingQueue optimizingQueue = optimizingQueueByToken.remove(token);
721+
OptimizerInstance optimizer = authOptimizers.remove(token);
722+
if (optimizingQueue != null && optimizer != null) {
723+
optimizingQueue.removeOptimizer(optimizer);
724+
}
725+
}
726+
578727
private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
579728
if (isTaskExecTimeout(task)) {
580729
LOG.warn(

amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,35 @@ public Map<AmsServerInfo, List<String>> getAllAssignments() throws BucketAssignS
170170
return allAssignments;
171171
}
172172

173+
@Override
174+
public List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException {
175+
List<AmsServerInfo> nodes = new ArrayList<>();
176+
try {
177+
if (zkClient.checkExists().forPath(assignmentsBasePath) == null) {
178+
return nodes;
179+
}
180+
List<String> nodeKeys = zkClient.getChildren().forPath(assignmentsBasePath);
181+
for (String nodeKey : nodeKeys) {
182+
try {
183+
AmsServerInfo nodeInfo = parseNodeKey(nodeKey);
184+
if (nodeInfo.getThriftBindPort() != null && nodeInfo.getThriftBindPort() > 0) {
185+
nodes.add(nodeInfo);
186+
}
187+
} catch (Exception e) {
188+
LOG.warn("Failed to parse node key: {}", nodeKey, e);
189+
}
190+
}
191+
} catch (KeeperException.NoNodeException e) {
192+
// path doesn't exist
193+
} catch (BucketAssignStoreException e) {
194+
throw e;
195+
} catch (Exception e) {
196+
LOG.error("Failed to get alive nodes", e);
197+
throw new BucketAssignStoreException("Failed to get alive nodes", e);
198+
}
199+
return nodes;
200+
}
201+
173202
@Override
174203
public long getLastUpdateTime(AmsServerInfo nodeInfo) throws BucketAssignStoreException {
175204
String nodeKey = getNodeKey(nodeInfo);

0 commit comments

Comments
 (0)