Skip to content

Commit 25f19da

Browse files
wardlicanwardli
andauthored
[AMORO-3921] Add AmsAssignService and ZkBucketAssignStore to implement balanced bucket allocation in master-slave mode (#3922)
* [Subtask]: Add a registration function for table allocation in master-slave mode. #3919 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: Add a registration function for table allocation in master-slave mode. #3919 * [Subtask]: Add a registration function for table allocation in master-slave mode. #3919 * [Subtask]: Replace zk with mocking. #3919 * [Subtask]: Replace zk with mocking. #3919 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: Revised based on CR's comments. #3921 * [Subtask]: Revised based on CR's comments. #3921 * [Subtask]: Revised based on CR's comments. #3921 --------- Co-authored-by: wardli <wardli@tencent.com>
1 parent 55fb785 commit 25f19da

15 files changed

Lines changed: 2964 additions & 10 deletions

amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,27 @@ public class AmoroManagementConf {
286286
.defaultValue(java.time.Duration.ofSeconds(30))
287287
.withDescription("TTL of HA lease.");
288288

289+
public static final ConfigOption<Integer> HA_BUCKET_ID_TOTAL_COUNT =
290+
ConfigOptions.key("ha.bucket-id.total-count")
291+
.intType()
292+
.defaultValue(100)
293+
.withDescription(
294+
"Total count of bucket IDs for assignment. Bucket IDs range from 1 to this value.");
295+
296+
public static final ConfigOption<Duration> HA_NODE_OFFLINE_TIMEOUT =
297+
ConfigOptions.key("ha.node-offline.timeout")
298+
.durationType()
299+
.defaultValue(Duration.ofMinutes(5))
300+
.withDescription(
301+
"Timeout duration to determine if a node is offline. After this duration, the node's bucket IDs will be reassigned.");
302+
303+
public static final ConfigOption<Duration> HA_ASSIGN_INTERVAL =
304+
ConfigOptions.key("ha.bucket-assign.interval")
305+
.durationType()
306+
.defaultValue(Duration.ofSeconds(60))
307+
.withDescription(
308+
"Interval for bucket assignment service to detect node changes and redistribute bucket IDs.");
309+
289310
public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
290311
ConfigOptions.key("thrift-server.table-service.bind-port")
291312
.intType()

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public class AmoroServiceContainer {
126126
private Javalin httpServer;
127127
private AmsServiceMetrics amsServiceMetrics;
128128
private HAState haState = HAState.INITIALIZING;
129+
private AmsAssignService amsAssignService;
129130

130131
public AmoroServiceContainer() throws Exception {
131132
initConfig();
@@ -244,6 +245,20 @@ public void startOptimizingService() throws Exception {
244245

245246
DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory();
246247
defaultRuntimeFactory.initialize(processFactories);
248+
// In master-slave mode, create AmsAssignService for bucket assignment
249+
if (IS_MASTER_SLAVE_MODE && haContainer != null) {
250+
try {
251+
// Create and start AmsAssignService for bucket assignment
252+
// The factory will handle different HA types (ZK, database, etc.)
253+
amsAssignService = new AmsAssignService(haContainer, serviceConfig);
254+
amsAssignService.start();
255+
LOG.info("AmsAssignService started for master-slave mode");
256+
} catch (UnsupportedOperationException e) {
257+
LOG.info("Skip AmsAssignService: {}", e.getMessage());
258+
} catch (Exception e) {
259+
LOG.error("Failed to start AmsAssignService", e);
260+
}
261+
}
247262

248263
List<ActionCoordinator> actionCoordinators = defaultRuntimeFactory.supportedCoordinators();
249264
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
@@ -293,6 +308,11 @@ public void disposeOptimizingService() {
293308
LOG.info("Stopping optimizing server[serving:{}] ...", optimizingServiceServer.isServing());
294309
optimizingServiceServer.stop();
295310
}
311+
if (amsAssignService != null) {
312+
LOG.info("Stopping AmsAssignService...");
313+
amsAssignService.stop();
314+
amsAssignService = null;
315+
}
296316
if (tableService != null) {
297317
LOG.info("Stopping table service...");
298318
tableService.dispose();

0 commit comments

Comments
 (0)