Skip to content

Commit f6b1e68

Browse files
wardlicanwardli
andauthored
[AMORO-3928] Modify the optimizer to support obtaining tasks from each AMS node for processing. (#3950)
* [Subtask]: Use a new configuration item to control whether master & slave mode is enabled. #3845 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: Modify DefaultTableService to be compatible with master-slave mode #3923 * [Subtask]: In master-slave mode, each AMS should automatically senses the optimizer. #3929 * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS node for processing. #3928 * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS node for processing. #3928 * [Subtask]: Optimize the logic for retrieving the AMS list from ZooKeeper in master-slave mode. #3928 * This addresses the conflict issues with the latest main branch and introduces a new solution for storing allocation information based on a database. * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS node for processing. #3928 * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS node for processing. #3928 * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS node for processing. #3928 * [Subtask]: Fixed a legacy bug that could cause unit tests to fail during compilation. #3928 * [Subtask]: Optimized based on CR feedback. #3928 * [Subtask]: Fixed a legacy bug that could cause unit tests to fail during compilation. #3928 * Revert "[Subtask]: Fixed a legacy bug that could cause unit tests to fail during compilation. #3928" This reverts commit d54c126. * [Subtask]: Optimized based on CR feedback. #3928 * [Subtask]: Optimized based on CR feedback. #3928 * [Subtask]: Optimized based on CR feedback. #3928 --------- Co-authored-by: wardli <wardli@tencent.com>
1 parent fa44c91 commit f6b1e68

33 files changed

Lines changed: 1390 additions & 106 deletions

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ public void startOptimizingService() throws Exception {
277277
serviceConfig, catalogManager, defaultRuntimeFactory, haContainer, bucketAssignStore);
278278
processService = new ProcessService(tableService, actionCoordinators, executeEngineManager);
279279
optimizingService =
280-
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
280+
new DefaultOptimizingService(
281+
serviceConfig, catalogManager, optimizerManager, tableService, bucketAssignStore);
281282

282283
LOG.info("Setting up AMS table executors...");
283284
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);

amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,15 @@ void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
6565
*/
6666
Map<AmsServerInfo, List<String>> getAllAssignments() throws BucketAssignStoreException;
6767

68+
/**
69+
* Get all alive AMS nodes that have bucket assignments. Used by optimizers in master-slave mode
70+
* to discover all AMS optimizing endpoints.
71+
*
72+
* @return List of AmsServerInfo for all nodes with bucket assignments, empty list if none
73+
* @throws BucketAssignStoreException If retrieval operation fails
74+
*/
75+
List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException;
76+
6877
/**
6978
* Get the last update time for a node's assignments.
7079
*

amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ public static BucketAssignStore create(
6363
"Cannot create ZkBucketAssignStore: ZK client not available or invalid container type");
6464

6565
case AmoroManagementConf.HA_TYPE_DATABASE:
66+
long nodeHeartbeatTtlMs = conf.get(AmoroManagementConf.HA_LEASE_TTL).toMillis();
6667
LOG.info("Creating DBBucketAssignStore for cluster: {}", clusterName);
67-
return new DBBucketAssignStore(clusterName);
68+
return new DBBucketAssignStore(clusterName, nodeHeartbeatTtlMs);
6869

6970
default:
7071
throw new IllegalArgumentException(

amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ public class DBBucketAssignStore extends PersistentBase implements BucketAssignS
4545
new TypeReference<List<String>>() {};
4646

4747
private final String clusterName;
48+
private final long nodeHeartbeatTtlMs;
4849

49-
public DBBucketAssignStore(String clusterName) {
50+
public DBBucketAssignStore(String clusterName, long nodeHeartbeatTtlMs) {
5051
this.clusterName = clusterName;
52+
this.nodeHeartbeatTtlMs = nodeHeartbeatTtlMs;
5153
}
5254

5355
@Override
@@ -180,6 +182,32 @@ public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws BucketAssignStor
180182
}
181183
}
182184

185+
@Override
186+
public List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException {
187+
try {
188+
long cutoff = System.currentTimeMillis() - nodeHeartbeatTtlMs;
189+
List<BucketAssignmentMeta> rows =
190+
getAs(BucketAssignMapper.class, mapper -> mapper.selectAllByCluster(clusterName));
191+
List<AmsServerInfo> nodes = new ArrayList<>();
192+
for (BucketAssignmentMeta meta : rows) {
193+
Long heartbeatTs = meta.getNodeHeartbeatTs();
194+
if (heartbeatTs == null || heartbeatTs < cutoff) {
195+
LOG.debug(
196+
"Skipping stale node key={}, node_heartbeat_ts={}", meta.getNodeKey(), heartbeatTs);
197+
continue;
198+
}
199+
AmsServerInfo nodeInfo = parseNodeInfo(meta);
200+
if (nodeInfo.getThriftBindPort() != null && nodeInfo.getThriftBindPort() > 0) {
201+
nodes.add(nodeInfo);
202+
}
203+
}
204+
return nodes;
205+
} catch (Exception e) {
206+
LOG.error("Failed to get alive nodes", e);
207+
throw new BucketAssignStoreException("Failed to get alive nodes", e);
208+
}
209+
}
210+
183211
private static String getNodeKey(AmsServerInfo nodeInfo) {
184212
return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
185213
}

amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.amoro.api.OptimizingTask;
2727
import org.apache.amoro.api.OptimizingTaskId;
2828
import org.apache.amoro.api.OptimizingTaskResult;
29+
import org.apache.amoro.client.AmsServerInfo;
2930
import org.apache.amoro.config.Configurations;
3031
import org.apache.amoro.config.TableConfiguration;
3132
import org.apache.amoro.exception.ForbiddenException;
@@ -66,6 +67,7 @@
6667

6768
import java.time.Duration;
6869
import java.util.ArrayList;
70+
import java.util.Collections;
6971
import java.util.List;
7072
import java.util.Map;
7173
import java.util.Objects;
@@ -116,12 +118,22 @@ public class DefaultOptimizingService extends StatedPersistentBase
116118
private final TableService tableService;
117119
private final RuntimeHandlerChain tableHandlerChain;
118120
private final ExecutorService planExecutor;
121+
private final BucketAssignStore bucketAssignStore;
119122

120123
public DefaultOptimizingService(
121124
Configurations serviceConfig,
122125
CatalogManager catalogManager,
123126
OptimizerManager optimizerManager,
124127
TableService tableService) {
128+
this(serviceConfig, catalogManager, optimizerManager, tableService, null);
129+
}
130+
131+
public DefaultOptimizingService(
132+
Configurations serviceConfig,
133+
CatalogManager catalogManager,
134+
OptimizerManager optimizerManager,
135+
TableService tableService,
136+
BucketAssignStore bucketAssignStore) {
125137
this.optimizerTouchTimeout =
126138
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
127139
this.taskAckTimeout =
@@ -144,6 +156,7 @@ public DefaultOptimizingService(
144156
this.tableService = tableService;
145157
this.catalogManager = catalogManager;
146158
this.optimizerManager = optimizerManager;
159+
this.bucketAssignStore = bucketAssignStore;
147160
this.tableHandlerChain = new TableRuntimeHandlerImpl();
148161
this.planExecutor =
149162
Executors.newCachedThreadPool(
@@ -322,6 +335,28 @@ public boolean cancelProcess(long processId) {
322335
return true;
323336
}
324337

338+
@Override
339+
public List<String> getOptimizingNodeUrls() {
340+
if (bucketAssignStore == null) {
341+
return Collections.emptyList();
342+
}
343+
try {
344+
List<AmsServerInfo> nodes = bucketAssignStore.getAliveNodes();
345+
List<String> urls = new ArrayList<>(nodes.size());
346+
for (AmsServerInfo node : nodes) {
347+
if (node.getHost() != null
348+
&& node.getThriftBindPort() != null
349+
&& node.getThriftBindPort() > 0) {
350+
urls.add(String.format("thrift://%s:%d", node.getHost(), node.getThriftBindPort()));
351+
}
352+
}
353+
return urls;
354+
} catch (Exception e) {
355+
LOG.warn("Failed to get optimizing node URLs from bucket assign store", e);
356+
return Collections.emptyList();
357+
}
358+
}
359+
325360
/**
326361
* Get optimizing queue.
327362
*

amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,35 @@ public Map<AmsServerInfo, List<String>> getAllAssignments() throws BucketAssignS
170170
return allAssignments;
171171
}
172172

173+
@Override
174+
public List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException {
175+
List<AmsServerInfo> nodes = new ArrayList<>();
176+
try {
177+
if (zkClient.checkExists().forPath(assignmentsBasePath) == null) {
178+
return nodes;
179+
}
180+
List<String> nodeKeys = zkClient.getChildren().forPath(assignmentsBasePath);
181+
for (String nodeKey : nodeKeys) {
182+
try {
183+
AmsServerInfo nodeInfo = parseNodeKey(nodeKey);
184+
if (nodeInfo.getThriftBindPort() != null && nodeInfo.getThriftBindPort() > 0) {
185+
nodes.add(nodeInfo);
186+
}
187+
} catch (Exception e) {
188+
LOG.warn("Failed to parse node key: {}", nodeKey, e);
189+
}
190+
}
191+
} catch (KeeperException.NoNodeException e) {
192+
// path doesn't exist
193+
} catch (BucketAssignStoreException e) {
194+
throw e;
195+
} catch (Exception e) {
196+
LOG.error("Failed to get alive nodes", e);
197+
throw new BucketAssignStoreException("Failed to get alive nodes", e);
198+
}
199+
return nodes;
200+
}
201+
173202
@Override
174203
public long getLastUpdateTime(AmsServerInfo nodeInfo) throws BucketAssignStoreException {
175204
String nodeKey = getNodeKey(nodeInfo);

amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java

Lines changed: 93 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import org.apache.amoro.client.AmsServerInfo;
2222
import org.apache.amoro.config.Configurations;
2323
import org.apache.amoro.server.AmoroManagementConf;
24+
import org.apache.amoro.server.persistence.BucketAssignmentMeta;
2425
import org.apache.amoro.server.persistence.HaLeaseMeta;
2526
import org.apache.amoro.server.persistence.PersistentBase;
27+
import org.apache.amoro.server.persistence.mapper.BucketAssignMapper;
2628
import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
2729
import org.apache.amoro.utils.JacksonUtil;
2830
import org.slf4j.Logger;
@@ -144,29 +146,50 @@ public void registerAndElect() throws Exception {
144146
LOG.debug("Master-slave mode is not enabled, skip node registration");
145147
return;
146148
}
147-
// In master-slave mode, register node to database by writing OPTIMIZING_SERVICE info
148-
// This is similar to ZK mode registering ephemeral nodes
149+
// Register this node in bucket_assignments so that all nodes can be discovered via
150+
// getAliveNodes(). ha_lease has PK (cluster_name, service_name) and cannot store multiple
151+
// nodes for the same service, so we use the per-node bucket_assignments table instead.
152+
upsertNodeHeartbeat();
153+
LOG.info(
154+
"Registered AMS node to bucket_assignments: nodeKey={}, optimizingService={}",
155+
getNodeKey(),
156+
optimizingServiceServerInfo);
157+
}
158+
159+
/** Returns nodeKey used as the bucket_assignments row identifier: host:optimizingPort. */
160+
private String getNodeKey() {
161+
return optimizingServiceServerInfo.getHost()
162+
+ ":"
163+
+ optimizingServiceServerInfo.getThriftBindPort();
164+
}
165+
166+
/**
167+
* Upsert this node's heartbeat row into bucket_assignments. Only updates node_heartbeat_ts (and
168+
* server_info_json on first insert); never touches assignments_json so the leader's assignments
169+
* are preserved.
170+
*/
171+
private void upsertNodeHeartbeat() {
149172
long now = System.currentTimeMillis();
150-
String optimizingInfoJson = JacksonUtil.toJSONString(optimizingServiceServerInfo);
173+
String nodeKey = getNodeKey();
174+
String serverInfoJson = JacksonUtil.toJSONString(optimizingServiceServerInfo);
151175
try {
152-
doAsIgnoreError(
153-
HaLeaseMapper.class,
154-
mapper -> {
155-
int updated =
156-
mapper.updateServerInfo(
157-
clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now);
158-
if (updated == 0) {
159-
mapper.insertServerInfoIfAbsent(
160-
clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now);
161-
}
162-
});
163-
LOG.info(
164-
"Registered AMS node to database: nodeId={}, optimizingService={}",
165-
nodeId,
166-
optimizingServiceServerInfo);
176+
int updated =
177+
updateAs(
178+
BucketAssignMapper.class,
179+
mapper -> mapper.updateNodeHeartbeat(clusterName, nodeKey, now))
180+
.intValue();
181+
if (updated == 0) {
182+
// First registration: insert a new row with empty assignments
183+
doAsIgnoreError(
184+
BucketAssignMapper.class,
185+
mapper ->
186+
mapper.insert(
187+
new BucketAssignmentMeta(
188+
clusterName, nodeKey, serverInfoJson, null, now, now)));
189+
}
167190
} catch (Exception e) {
168-
LOG.error("Failed to register node to database", e);
169-
throw e;
191+
LOG.error("Failed to upsert node heartbeat for nodeKey={}", nodeKey, e);
192+
throw new RuntimeException("Failed to register node in bucket_assignments", e);
170193
}
171194
}
172195

@@ -180,7 +203,12 @@ public AmsServerInfo getTableServiceServerInfo() {
180203
return tableServiceServerInfo;
181204
}
182205

183-
/** Closes the heartbeat executor safely. */
206+
@Override
207+
public AmsServerInfo getOptimizingServiceServerInfo() {
208+
return optimizingServiceServerInfo;
209+
}
210+
211+
/** Closes the heartbeat executor safely and removes this node's registration row. */
184212
@Override
185213
public void close() {
186214
try {
@@ -190,6 +218,18 @@ public void close() {
190218
} catch (Exception e) {
191219
LOG.error("Close Database HighAvailabilityContainer failed", e);
192220
}
221+
// Remove this node from bucket_assignments so the leader immediately stops seeing it
222+
// as alive without waiting for the heartbeat TTL to expire.
223+
boolean isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
224+
if (isMasterSlaveMode) {
225+
try {
226+
String nodeKey = getNodeKey();
227+
doAs(BucketAssignMapper.class, mapper -> mapper.deleteByNode(clusterName, nodeKey));
228+
LOG.info("Unregistered AMS node from bucket_assignments: nodeKey={}", nodeKey);
229+
} catch (Exception e) {
230+
LOG.warn("Failed to unregister node from bucket_assignments on close", e);
231+
}
232+
}
193233
}
194234

195235
private class HeartbeatRunnable implements Runnable {
@@ -204,6 +244,14 @@ public void run() {
204244
return;
205245
}
206246

247+
// Each node independently refreshes its own heartbeat in bucket_assignments so that
248+
// getAliveNodes() can correctly detect liveness without relying on ha_lease.
249+
try {
250+
upsertNodeHeartbeat();
251+
} catch (Exception e) {
252+
LOG.warn("Failed to refresh node heartbeat in bucket_assignments", e);
253+
}
254+
207255
if (!isLeader.get()) {
208256
// First attempt to acquire the lease (similar to candidate/await)
209257
boolean success = tryAcquireLease(newExpireTs, now);
@@ -353,29 +401,35 @@ public List<AmsServerInfo> getAliveNodes() {
353401
LOG.warn("Only leader node can get alive nodes list");
354402
return aliveNodes;
355403
}
404+
// Read alive nodes from bucket_assignments keyed by node_heartbeat_ts. ha_lease has
405+
// PK (cluster_name, service_name) which only allows one row per service and cannot
406+
// represent multiple AMS nodes. bucket_assignments has PK (cluster_name, node_key) and
407+
// stores one row per node; node_heartbeat_ts is updated exclusively by the owning node
408+
// so the leader's refreshLastUpdateTime calls cannot mask a dead node's staleness.
356409
try {
357-
long currentTime = System.currentTimeMillis();
358-
List<HaLeaseMeta> leases =
359-
getAs(
360-
HaLeaseMapper.class,
361-
mapper -> mapper.selectLeasesByService(clusterName, OPTIMIZING_SERVICE));
362-
for (HaLeaseMeta lease : leases) {
363-
// Only include nodes with valid (non-expired) leases
364-
if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() > currentTime) {
365-
if (lease.getServerInfoJson() != null && !lease.getServerInfoJson().isEmpty()) {
366-
try {
367-
AmsServerInfo nodeInfo =
368-
JacksonUtil.parseObject(lease.getServerInfoJson(), AmsServerInfo.class);
369-
aliveNodes.add(nodeInfo);
370-
} catch (Exception e) {
371-
LOG.warn("Failed to parse server info for node {}", lease.getNodeId(), e);
372-
}
410+
long cutoff = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlSeconds);
411+
List<BucketAssignmentMeta> rows =
412+
getAs(BucketAssignMapper.class, mapper -> mapper.selectAllByCluster(clusterName));
413+
for (BucketAssignmentMeta meta : rows) {
414+
Long heartbeatTs = meta.getNodeHeartbeatTs();
415+
if (heartbeatTs == null || heartbeatTs < cutoff) {
416+
LOG.debug(
417+
"Skipping stale node key={}, node_heartbeat_ts={}", meta.getNodeKey(), heartbeatTs);
418+
continue;
419+
}
420+
if (meta.getServerInfoJson() != null && !meta.getServerInfoJson().isEmpty()) {
421+
try {
422+
AmsServerInfo nodeInfo =
423+
JacksonUtil.parseObject(meta.getServerInfoJson(), AmsServerInfo.class);
424+
aliveNodes.add(nodeInfo);
425+
} catch (Exception e) {
426+
LOG.warn("Failed to parse server_info_json for node {}", meta.getNodeKey(), e);
373427
}
374428
}
375429
}
376430
} catch (Exception e) {
377-
LOG.error("Failed to get alive nodes from database", e);
378-
throw e;
431+
LOG.error("Failed to get alive nodes from bucket_assignments", e);
432+
throw new RuntimeException("Failed to get alive nodes", e);
379433
}
380434
return aliveNodes;
381435
}

amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,18 @@ public interface HighAvailabilityContainer {
7070
boolean hasLeadership();
7171

7272
/**
73-
* Get current AMS node information.
73+
* Get current AMS node's table service server info (host + table-service Thrift port).
7474
*
7575
* @return {@link AmsServerInfo}
7676
*/
7777
AmsServerInfo getTableServiceServerInfo();
78+
79+
/**
80+
* Get current AMS node's optimizing service server info (host + optimizing Thrift port). This
81+
* must be consistent with the {@link AmsServerInfo} written into {@link
82+
* org.apache.amoro.server.BucketAssignStore} so that bucket lookups can match the stored nodeKey.
83+
*
84+
* @return {@link AmsServerInfo}
85+
*/
86+
AmsServerInfo getOptimizingServiceServerInfo();
7887
}

0 commit comments

Comments
 (0)