Skip to content

Commit fc2ec66

Browse files
committed
fix(server): fix some issues of the distributed scheduler
1 parent 5f91d8b commit fc2ec66

19 files changed

Lines changed: 219 additions & 521 deletions

File tree

hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ store=hugegraph
4545
pd.peers=$PD_PEERS_LIST$
4646

4747
# task config
48-
task.scheduler_type=local
4948
task.schedule_period=10
5049
task.retry=0
5150
task.wait_timeout=10

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -556,9 +556,9 @@ public class ServerOptions extends OptionHolder {
556556
public static final ConfigOption<String> SERVER_ID =
557557
new ConfigOption<>(
558558
"server.id",
559-
"The id of hugegraph-server.",
560-
disallowEmpty(),
561-
"server-1"
559+
"The id of hugegraph-server, auto-generated if not specified.",
560+
null,
561+
""
562562
);
563563
public static final ConfigOption<String> SERVER_ROLE =
564564
new ConfigOption<>(

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.Objects;
3535
import java.util.Set;
36+
import java.util.UUID;
3637
import java.util.concurrent.ConcurrentHashMap;
3738
import java.util.concurrent.Future;
3839
import java.util.concurrent.TimeUnit;
@@ -68,6 +69,7 @@
6869
import org.apache.hugegraph.config.TypedOption;
6970
import org.apache.hugegraph.event.EventHub;
7071
import org.apache.hugegraph.exception.ExistedException;
72+
import org.apache.hugegraph.exception.NotFoundException;
7173
import org.apache.hugegraph.exception.NotSupportException;
7274
import org.apache.hugegraph.io.HugeGraphSONModule;
7375
import org.apache.hugegraph.k8s.K8sDriver;
@@ -195,7 +197,17 @@ public final class GraphManager {
195197
public GraphManager(HugeConfig conf, EventHub hub) {
196198
LOG.info("Init graph manager");
197199
E.checkArgumentNotNull(conf, "The config can't be null");
200+
201+
// Auto-generate server.id if not configured.
202+
// Random generation is to prevent duplicate id error reports.This id is currently
203+
// meaningless and needs to be completely removed serverInfoManager in
204+
// the future
198205
String server = conf.get(ServerOptions.SERVER_ID);
206+
if (StringUtils.isEmpty(server)) {
207+
server = "server-" + UUID.randomUUID().toString().substring(0, 8);
208+
LOG.info("Auto-generated server.id: {}", server);
209+
conf.setProperty(ServerOptions.SERVER_ID.name(), server);
210+
}
199211
String role = conf.get(ServerOptions.SERVER_ROLE);
200212

201213
this.config = conf;
@@ -206,10 +218,6 @@ public GraphManager(HugeConfig conf, EventHub hub) {
206218
conf.get(ServerOptions.SERVER_DEPLOY_IN_K8S);
207219
this.startIgnoreSingleGraphError = conf.get(
208220
ServerOptions.SERVER_START_IGNORE_SINGLE_GRAPH_ERROR);
209-
E.checkArgument(server != null && !server.isEmpty(),
210-
"The server name can't be null or empty");
211-
E.checkArgument(role != null && !role.isEmpty(),
212-
"The server role can't be null or empty");
213221
this.graphsDir = conf.get(ServerOptions.GRAPHS);
214222
this.cluster = conf.get(ServerOptions.CLUSTER);
215223
this.graphSpaces = new ConcurrentHashMap<>();
@@ -276,7 +284,7 @@ private static String serviceId(String graphSpace, Service.ServiceType type,
276284
.replace("_", "-").toLowerCase();
277285
}
278286

279-
public boolean isPDEnabled() {
287+
public boolean usePD() {
280288
return this.PDExist;
281289
}
282290

@@ -1227,7 +1235,7 @@ private void dropGraphLocal(HugeGraph graph) {
12271235

12281236
public HugeGraph createGraph(String graphSpace, String name, String creator,
12291237
Map<String, Object> configs, boolean init) {
1230-
if (!isPDEnabled()) {
1238+
if (!usePD()) {
12311239
// Extract nickname from configs
12321240
String nickname;
12331241
if (configs.get("nickname") != null) {
@@ -1557,6 +1565,14 @@ private void loadGraph(String name, String graphConfPath) {
15571565
String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS);
15581566
config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(),
15591567
raftGroupPeers);
1568+
1569+
// Transfer `pd.peers` from server config to graph config
1570+
// Only inject if not already configured in graph config
1571+
if (!config.containsKey("pd.peers")) {
1572+
String pdPeers = this.conf.get(ServerOptions.PD_PEERS);
1573+
config.addProperty("pd.peers", pdPeers);
1574+
}
1575+
15601576
this.transferRoleWorkerConfig(config);
15611577

15621578
Graph graph = GraphFactory.open(config);
@@ -1637,10 +1653,6 @@ private void checkBackendVersionOrExit(HugeConfig config) {
16371653
private void initNodeRole() {
16381654
String id = config.get(ServerOptions.SERVER_ID);
16391655
String role = config.get(ServerOptions.SERVER_ROLE);
1640-
E.checkArgument(StringUtils.isNotEmpty(id),
1641-
"The server name can't be null or empty");
1642-
E.checkArgument(StringUtils.isNotEmpty(role),
1643-
"The server role can't be null or empty");
16441656

16451657
NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase());
16461658
boolean supportRoleElection = !nodeRole.computer() &&
@@ -1937,7 +1949,7 @@ public Set<String> getServiceUrls(String graphSpace, String service,
19371949
public HugeGraph graph(String graphSpace, String name) {
19381950
String key = String.join(DELIMITER, graphSpace, name);
19391951
Graph graph = this.graphs.get(key);
1940-
if (graph == null && isPDEnabled()) {
1952+
if (graph == null && usePD()) {
19411953
Map<String, Map<String, Object>> configs =
19421954
this.metaManager.graphConfigs(graphSpace);
19431955
// If current server registered graph space is not DEFAULT, only load graph creation
@@ -1960,7 +1972,7 @@ public HugeGraph graph(String graphSpace, String name) {
19601972
} else if (graph instanceof HugeGraph) {
19611973
return (HugeGraph) graph;
19621974
}
1963-
throw new NotSupportException("graph instance of %s", graph.getClass());
1975+
throw new NotFoundException(String.format("Graph '%s' does not exist", name));
19641976
}
19651977

19661978
public void dropGraphLocal(String name) {
@@ -1981,7 +1993,7 @@ public void dropGraphLocal(String name) {
19811993
}
19821994

19831995
public void dropGraph(String graphSpace, String name, boolean clear) {
1984-
if (!isPDEnabled()) {
1996+
if (!usePD()) {
19851997
dropGraphLocal(name);
19861998
return;
19871999
}
@@ -2086,7 +2098,7 @@ private void checkOptionsUnique(String graphSpace,
20862098
public Set<String> graphs(String graphSpace) {
20872099
Set<String> graphs = new HashSet<>();
20882100

2089-
if (!isPDEnabled()) {
2101+
if (!usePD()) {
20902102
for (String key : this.graphs.keySet()) {
20912103
String[] parts = key.split(DELIMITER);
20922104
if (parts[0].equals(graphSpace)) {
@@ -2103,7 +2115,7 @@ public Set<String> graphs(String graphSpace) {
21032115
}
21042116

21052117
public GraphSpace graphSpace(String name) {
2106-
if (!isPDEnabled()) {
2118+
if (!usePD()) {
21072119
return new GraphSpace("DEFAULT");
21082120
}
21092121
GraphSpace space = this.graphSpaces.get(name);
@@ -2152,7 +2164,7 @@ private MapConfiguration buildConfig(Map<String, Object> configs) {
21522164
public void graphReadMode(String graphSpace, String graphName,
21532165
GraphReadMode readMode) {
21542166

2155-
if (!isPDEnabled()) {
2167+
if (!usePD()) {
21562168
HugeGraph g = this.graph(spaceGraphName(graphSpace, graphName));
21572169
g.readMode(readMode);
21582170
return;

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ public class StandardHugeGraph implements HugeGraph {
176176
private final BackendStoreProvider storeProvider;
177177
private final TinkerPopTransaction tx;
178178
private final RamTable ramtable;
179-
private final String schedulerType;
180179
private volatile boolean started;
181180
private volatile boolean closed;
182181
private volatile GraphMode mode;
@@ -229,7 +228,6 @@ public StandardHugeGraph(HugeConfig config) {
229228
this.closed = false;
230229
this.mode = GraphMode.NONE;
231230
this.readMode = GraphReadMode.OLTP_ONLY;
232-
this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE);
233231

234232
LockUtil.init(this.spaceGraphName());
235233

@@ -315,6 +313,7 @@ public String backend() {
315313
return this.storeProvider.type();
316314
}
317315

316+
@Override
318317
public BackendStoreInfo backendStoreInfo() {
319318
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
320319
// TODO: pass storeProvider.metaStore()
@@ -465,6 +464,7 @@ public void updateTime(Date updateTime) {
465464
this.updateTime = updateTime;
466465
}
467466

467+
@Override
468468
public void waitStarted() {
469469
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
470470
this.schemaTransaction();
@@ -1629,7 +1629,9 @@ public <T> void submitEphemeralJob(EphemeralJob<T> job) {
16291629

16301630
@Override
16311631
public String schedulerType() {
1632-
return StandardHugeGraph.this.schedulerType;
1632+
// Use distributed scheduler for hstore backend, otherwise use local
1633+
// After the merger of rocksdb and hstore, consider whether to change this logic
1634+
return StandardHugeGraph.this.isHstore() ? "distributed" : "local";
16331635
}
16341636
}
16351637

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,7 @@ public class CoreOptions extends OptionHolder {
303303
rangeInt(1, 500),
304304
1
305305
);
306-
public static final ConfigOption<String> SCHEDULER_TYPE =
307-
new ConfigOption<>(
308-
"task.scheduler_type",
309-
"The type of scheduler used in distribution system.",
310-
allowValues("local", "distributed"),
311-
"local"
312-
);
306+
313307
public static final ConfigOption<Boolean> TASK_SYNC_DELETION =
314308
new ConfigOption<>(
315309
"task.sync_deletion",

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.hugegraph.type.define.NodeRole;
2323
import org.apache.hugegraph.util.E;
2424

25-
// TODO: rename to GlobalNodeRoleInfo
25+
// TODO: We need to completely delete the startup of master-worker
2626
public final class GlobalMasterInfo {
2727

2828
private static final NodeInfo NO_MASTER = new NodeInfo(false, "");

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.hugegraph.masterelection;
1919

20-
import java.util.Objects;
21-
2220
import org.apache.hugegraph.task.TaskManager;
2321
import org.apache.hugegraph.util.Log;
2422
import org.slf4j.Logger;
2523

24+
import java.util.Objects;
25+
2626
public class StandardRoleListener implements RoleListener {
2727

2828
private static final Logger LOG = Log.logger(StandardRoleListener.class);
@@ -36,7 +36,6 @@ public class StandardRoleListener implements RoleListener {
3636
public StandardRoleListener(TaskManager taskManager,
3737
GlobalMasterInfo roleInfo) {
3838
this.taskManager = taskManager;
39-
this.taskManager.enableRoleElection();
4039
this.roleInfo = roleInfo;
4140
this.selfIsMaster = false;
4241
}

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import java.util.Iterator;
2121
import java.util.concurrent.Callable;
22+
import java.util.concurrent.CancellationException;
2223
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.ExecutionException;
2325
import java.util.concurrent.ExecutorService;
2426
import java.util.concurrent.Executors;
2527
import java.util.concurrent.Future;
@@ -48,6 +50,7 @@
4850
import org.slf4j.Logger;
4951

5052
public class DistributedTaskScheduler extends TaskAndResultScheduler {
53+
5154
private static final Logger LOG = Log.logger(DistributedTaskScheduler.class);
5255
private final long schedulePeriod;
5356
private final ExecutorService taskDbExecutor;
@@ -118,6 +121,11 @@ private static boolean sleep(long ms) {
118121
public void cronSchedule() {
119122
// Perform periodic scheduling tasks
120123

124+
// Check closed flag first to exit early
125+
if (this.closed.get()) {
126+
return;
127+
}
128+
121129
if (!this.graph.started() || this.graph.closed()) {
122130
return;
123131
}
@@ -253,6 +261,10 @@ public <V> Future<?> schedule(HugeTask<V> task) {
253261
return this.ephemeralTaskExecutor.submit(task);
254262
}
255263

264+
// Validate task state before saving to ensure correct exception type
265+
E.checkState(task.type() != null, "Task type can't be null");
266+
E.checkState(task.name() != null, "Task name can't be null");
267+
256268
// Process schema task
257269
// Handle gremlin task
258270
// Handle OLAP calculation tasks
@@ -284,14 +296,41 @@ protected <V> void initTaskParams(HugeTask<V> task) {
284296
}
285297
}
286298

299+
/**
300+
* Note: This method will update the status of the input task.
301+
*
302+
* @param task
303+
* @param <V>
304+
*/
287305
@Override
288306
public <V> void cancel(HugeTask<V> task) {
289-
// Update status to CANCELLING
290-
if (!task.completed()) {
291-
// Task not completed, can only execute status not CANCELLING
292-
this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
307+
E.checkArgumentNotNull(task, "Task can't be null");
308+
309+
if (task.completed() || task.cancelling()) {
310+
return;
311+
}
312+
313+
LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
314+
315+
// Check if task is running locally, cancel it directly if so
316+
HugeTask<?> runningTask = this.runningTasks.get(task.id());
317+
if (runningTask != null) {
318+
boolean cancelled = runningTask.cancel(true);
319+
if (cancelled) {
320+
task.overwriteStatus(TaskStatus.CANCELLED);
321+
}
322+
LOG.info("Cancel local running task '{}' result: {}", task.id(), cancelled);
323+
return;
324+
}
325+
326+
// Task not running locally, update status to CANCELLING
327+
// for cronSchedule() or other nodes to handle
328+
TaskStatus currentStatus = task.status();
329+
if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) {
330+
LOG.info("Failed to cancel task '{}', status may have changed from {}",
331+
task.id(), currentStatus);
293332
} else {
294-
LOG.info("cancel task({}) error, task has completed", task.id());
333+
task.overwriteStatus(TaskStatus.CANCELLING);
295334
}
296335
}
297336

@@ -316,14 +355,18 @@ protected <V> HugeTask<V> deleteFromDB(Id id) {
316355

317356
@Override
318357
public <V> HugeTask<V> delete(Id id, boolean force) {
319-
if (!force) {
320-
// Change status to DELETING, perform the deletion operation through automatic
321-
// scheduling.
358+
HugeTask<?> task = this.taskWithoutResult(id);
359+
360+
if (!force && !task.completed()) {
361+
// Check task status: can't delete running tasks without force
322362
this.updateStatus(id, null, TaskStatus.DELETING);
323363
return null;
324-
} else {
325-
return this.deleteFromDB(id);
364+
// Already in DELETING status, delete directly from DB
365+
// Completed tasks can also be deleted directly
326366
}
367+
368+
// Delete from DB directly for completed/DELETING tasks or force=true
369+
return this.deleteFromDB(id);
327370
}
328371

329372
@Override
@@ -353,6 +396,18 @@ public boolean close() {
353396
cronFuture.cancel(false);
354397
}
355398

399+
// Wait for cron task to complete to ensure all transactions are closed
400+
try {
401+
cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);
402+
} catch (CancellationException e) {
403+
// Task was cancelled, this is expected
404+
LOG.debug("Cron task was cancelled");
405+
} catch (TimeoutException e) {
406+
LOG.warn("Cron task did not complete in time when closing scheduler");
407+
} catch (ExecutionException | InterruptedException e) {
408+
LOG.warn("Exception while waiting for cron task to complete", e);
409+
}
410+
356411
if (!this.taskDbExecutor.isShutdown()) {
357412
this.call(() -> {
358413
try {
@@ -363,7 +418,10 @@ public boolean close() {
363418
this.graph.closeTx();
364419
});
365420
}
366-
return true;
421+
422+
//todo: serverInfoManager section should be removed in the future.
423+
return this.serverManager().close();
424+
//return true;
367425
}
368426

369427
@Override

0 commit comments

Comments
 (0)