Skip to content

Commit 1113520

Browse files
committed
fix(server): fix some issues of the distributed scheduler
1 parent 68b906a commit 1113520

4 files changed

Lines changed: 142 additions & 71 deletions

File tree

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java

Lines changed: 76 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import java.util.Iterator;
2121
import java.util.concurrent.Callable;
22+
import java.util.concurrent.CancellationException;
2223
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.ExecutionException;
2325
import java.util.concurrent.ExecutorService;
2426
import java.util.concurrent.Executors;
2527
import java.util.concurrent.Future;
@@ -118,6 +120,11 @@ private static boolean sleep(long ms) {
118120
public void cronSchedule() {
119121
// Perform periodic scheduling tasks
120122

123+
// Check closed flag first to exit early
124+
if (this.closed.get()) {
125+
return;
126+
}
127+
121128
if (!this.graph.started() || this.graph.closed()) {
122129
return;
123130
}
@@ -253,6 +260,10 @@ public <V> Future<?> schedule(HugeTask<V> task) {
253260
return this.ephemeralTaskExecutor.submit(task);
254261
}
255262

263+
// Validate task state before saving to ensure correct exception type
264+
E.checkState(task.type() != null, "Task type can't be null");
265+
E.checkState(task.name() != null, "Task name can't be null");
266+
256267
// Process schema task
257268
// Handle gremlin task
258269
// Handle OLAP calculation tasks
@@ -286,13 +297,25 @@ protected <V> void initTaskParams(HugeTask<V> task) {
286297

287298
@Override
288299
public <V> void cancel(HugeTask<V> task) {
289-
// Update status to CANCELLING
290-
if (!task.completed()) {
291-
// Task not completed, can only execute status not CANCELLING
292-
this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
293-
} else {
294-
LOG.info("cancel task({}) error, task has completed", task.id());
300+
E.checkArgumentNotNull(task, "Task can't be null");
301+
302+
if (task.completed() || task.cancelling()) {
303+
return;
295304
}
305+
306+
LOG.info("Cancel task '{}' in status {}", task.id(), task.status());
307+
308+
// Check if task is running locally, cancel it directly if so
309+
HugeTask<?> runningTask = this.runningTasks.get(task.id());
310+
if (runningTask != null) {
311+
boolean cancelled = runningTask.cancel(true);
312+
LOG.info("Cancel local running task '{}' result: {}", task.id(), cancelled);
313+
return;
314+
}
315+
316+
// Task not running locally, update status to CANCELLING
317+
// for cronSchedule() or other nodes to handle
318+
this.updateStatus(task.id(), null, TaskStatus.CANCELLING);
296319
}
297320

298321
@Override
@@ -316,14 +339,25 @@ protected <V> HugeTask<V> deleteFromDB(Id id) {
316339

317340
@Override
318341
public <V> HugeTask<V> delete(Id id, boolean force) {
319-
if (!force) {
320-
// Change status to DELETING, perform the deletion operation through automatic
321-
// scheduling.
322-
this.updateStatus(id, null, TaskStatus.DELETING);
342+
HugeTask<?> task = this.taskWithoutResult(id);
343+
if (task == null) {
323344
return null;
324-
} else {
325-
return this.deleteFromDB(id);
326345
}
346+
347+
if (!force) {
348+
// Check task status: can't delete running tasks without force
349+
if (!task.completed() && task.status() != TaskStatus.DELETING) {
350+
throw new IllegalArgumentException(
351+
String.format("Can't delete incomplete task '%s' in status %s, " +
352+
"Please try to cancel the task first",
353+
id, task.status()));
354+
}
355+
// Already in DELETING status, delete directly from DB
356+
// Completed tasks can also be deleted directly
357+
}
358+
359+
// Delete from DB directly for completed/DELETING tasks or force=true
360+
return this.deleteFromDB(id);
327361
}
328362

329363
@Override
@@ -353,6 +387,18 @@ public boolean close() {
353387
cronFuture.cancel(false);
354388
}
355389

390+
// Wait for cron task to complete to ensure all transactions are closed
391+
try {
392+
cronFuture.get(schedulePeriod + 5, TimeUnit.SECONDS);
393+
} catch (CancellationException e) {
394+
// Task was cancelled, this is expected
395+
LOG.debug("Cron task was cancelled");
396+
} catch (TimeoutException e) {
397+
LOG.warn("Cron task did not complete in time when closing scheduler");
398+
} catch (ExecutionException | InterruptedException e) {
399+
LOG.warn("Exception while waiting for cron task to complete", e);
400+
}
401+
356402
if (!this.taskDbExecutor.isShutdown()) {
357403
this.call(() -> {
358404
try {
@@ -363,7 +409,10 @@ public boolean close() {
363409
this.graph.closeTx();
364410
});
365411
}
366-
return true;
412+
413+
//todo: serverInfoManager section should be removed in the future.
414+
return this.serverManager().close();
415+
//return true;
367416
}
368417

369418
@Override
@@ -387,15 +436,17 @@ private <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds,
387436
long passes = seconds * 1000 / intervalMs;
388437
HugeTask<V> task = null;
389438
for (long pass = 0; ; pass++) {
390-
try {
391-
task = this.taskWithoutResult(id);
392-
} catch (NotFoundException e) {
393-
if (task != null && task.completed()) {
394-
assert task.id().asLong() < 0L : task.id();
439+
HugeTask<V> previousTask = task;
440+
task = this.taskWithoutResult(id);
441+
if (task == null) {
442+
// Task not found in DB
443+
if (previousTask != null && previousTask.completed()) {
444+
// Task was completed and then deleted (ephemeral task case)
445+
assert previousTask.id().asLong() < 0L : previousTask.id();
395446
sleep(intervalMs);
396-
return task;
447+
return previousTask;
397448
}
398-
throw e;
449+
throw new NotFoundException("Can't find task with id '%s'", id);
399450
}
400451
if (task.completed()) {
401452
// Wait for task result being set after status is completed
@@ -466,6 +517,11 @@ private <V> V call(Callable<V> callable, ExecutorService executor) {
466517
protected boolean updateStatus(Id id, TaskStatus prestatus,
467518
TaskStatus status) {
468519
HugeTask<Object> task = this.taskWithoutResult(id);
520+
if (task == null) {
521+
// Task was already deleted by cronSchedule or another thread
522+
LOG.info("Task '{}' not found, may have been deleted", id);
523+
return false;
524+
}
469525
initTaskParams(task);
470526
if (prestatus == null || task.status() == prestatus) {
471527
task.overwriteStatus(status);

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -371,9 +371,7 @@ protected void done() {
371371
protected void set(V v) {
372372
String result = JsonUtil.toJson(v);
373373
checkPropertySize(result, P.RESULT);
374-
if (!this.result(TaskStatus.SUCCESS, result)) {
375-
assert this.completed();
376-
}
374+
assert this.result(TaskStatus.SUCCESS, result) || this.completed();
377375
// Will call done() and may cause to save to store
378376
super.set(v);
379377
}
@@ -742,19 +740,26 @@ private void checkPropertySize(int propertyLength, String propertyName) {
742740

743741
public void syncWait() {
744742
// This method is just called by tests
743+
/*
744+
* For ephemeral tasks (negative ID), directly wait on the Future.
745+
* Ephemeral tasks are not saved to DB, so we can't query them by ID.
746+
* Since HugeTask extends FutureTask, we can directly wait for completion.
747+
*/
748+
if (this.id().asLong() < 0) {
749+
try {
750+
this.get();
751+
} catch (Exception e) {
752+
throw new HugeException("Failed to wait for task '%s' completed",
753+
e, this.id);
754+
}
755+
return;
756+
}
757+
758+
// For normal tasks, wait through scheduler
745759
HugeTask<?> task = null;
746760
try {
747761
task = this.scheduler().waitUntilTaskCompleted(this.id());
748762
} catch (Throwable e) {
749-
if (this.callable() instanceof EphemeralJob &&
750-
e.getClass() == NotFoundException.class &&
751-
e.getMessage().contains("Can't find task with id")) {
752-
/*
753-
* The task with EphemeralJob won't saved in backends and
754-
* will be removed from memory when completed
755-
*/
756-
return;
757-
}
758763
throw new HugeException("Failed to wait for task '%s' completed",
759764
e, this.id);
760765
}

hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public void testCreateGraphsWithInvalidNames() {
248248

249249
@Test
250250
public void testCreateGraphsWithSameName() {
251-
List<HugeGraph> graphs = openGraphs("g", "g", "G");
251+
List<HugeGraph> graphs = openGraphs("gg", "gg", "GG");
252252
HugeGraph g1 = graphs.get(0);
253253
HugeGraph g2 = graphs.get(1);
254254
HugeGraph g3 = graphs.get(2);

hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,18 @@ public Object execute() throws Exception {
192192
Assert.assertEquals("test", task.type());
193193
Assert.assertFalse(task.completed());
194194

195-
HugeTask<?> task2 = scheduler.waitUntilTaskCompleted(task.id(), 10);
195+
// Ephemeral tasks are node-local and not persisted to DB.
196+
// Use Future.get() to wait for completion instead of ID-based lookup.
197+
try {
198+
task.get(10, java.util.concurrent.TimeUnit.SECONDS);
199+
} catch (Exception e) {
200+
throw new RuntimeException("Ephemeral task execution failed", e);
201+
}
202+
196203
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
197204
Assert.assertEquals("{\"k1\":13579,\"k2\":\"24680\"}", task.result());
198205

199-
Assert.assertEquals(TaskStatus.SUCCESS, task2.status());
200-
Assert.assertEquals("{\"k1\":13579,\"k2\":\"24680\"}", task2.result());
201-
206+
// Ephemeral tasks are not stored in DB, so these should throw NotFoundException
202207
Assert.assertThrows(NotFoundException.class, () -> {
203208
scheduler.waitUntilTaskCompleted(task.id(), 10);
204209
});
@@ -553,10 +558,12 @@ public void testGremlinJobAndCancel() throws TimeoutException {
553558
scheduler.cancel(task);
554559

555560
task = scheduler.task(task.id());
556-
System.out.println(scheduler.getClass());
557-
if (scheduler.getClass().equals(DistributedTaskScheduler.class)) {
558-
Assert.assertEquals(TaskStatus.CANCELLING, task.status());
559-
}
561+
// For DistributedTaskScheduler, local cancel may result in CANCELLED directly
562+
// (task thread updates status after being interrupted)
563+
// or CANCELLING (if task hasn't processed the interrupt yet)
564+
Assert.assertTrue("Task status should be CANCELLING or CANCELLED, but was " + task.status(),
565+
task.status() == TaskStatus.CANCELLING ||
566+
task.status() == TaskStatus.CANCELLED);
560567

561568
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
562569
Assert.assertEquals(TaskStatus.CANCELLED, task.status());
@@ -628,9 +635,9 @@ public void testGremlinJobAndRestore() throws Exception {
628635
scheduler.cancel(task);
629636

630637
task = scheduler.task(task.id());
631-
if (scheduler.getClass().equals(DistributedTaskScheduler.class)) {
632-
Assert.assertEquals(TaskStatus.CANCELLING, task.status());
633-
}
638+
Assert.assertTrue("Task status should be CANCELLING or CANCELLED, but was " + task.status(),
639+
task.status() == TaskStatus.CANCELLING ||
640+
task.status() == TaskStatus.CANCELLED);
634641

635642
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
636643
Assert.assertEquals(TaskStatus.CANCELLED, task.status());
@@ -640,36 +647,39 @@ public void testGremlinJobAndRestore() throws Exception {
640647
Assert.assertNull(task.result());
641648

642649
HugeTask<Object> finalTask = task;
643-
Assert.assertThrows(IllegalArgumentException.class, () -> {
644-
Whitebox.invoke(scheduler.getClass(), "restore", scheduler,
645-
finalTask);
646-
}, e -> {
647-
Assert.assertContains("No need to restore completed task",
648-
e.getMessage());
649-
});
650650

651-
HugeTask<Object> task2 = scheduler.task(task.id());
652-
Assert.assertThrows(IllegalArgumentException.class, () -> {
651+
// because Distributed do nothing in restore, so only test StandardTaskScheduler here
652+
if (scheduler.getClass().equals(StandardTaskScheduler.class)) {
653+
Assert.assertThrows(IllegalArgumentException.class, () -> {
654+
Whitebox.invoke(scheduler.getClass(), "restore", scheduler,
655+
finalTask);
656+
}, e -> {
657+
Assert.assertContains("No need to restore completed task",
658+
e.getMessage());
659+
});
660+
661+
HugeTask<Object> task2 = scheduler.task(task.id());
662+
Assert.assertThrows(IllegalArgumentException.class, () -> {
663+
Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2);
664+
}, e -> {
665+
Assert.assertContains("No need to restore completed task",
666+
e.getMessage());
667+
});
668+
669+
Whitebox.setInternalState(task2, "status", TaskStatus.RUNNING);
653670
Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2);
654-
}, e -> {
655-
Assert.assertContains("No need to restore completed task",
656-
e.getMessage());
657-
});
658-
659-
Whitebox.setInternalState(task2, "status", TaskStatus.RUNNING);
660-
Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2);
661671

662-
Assert.assertThrows(IllegalArgumentException.class, () -> {
663-
Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2);
664-
}, e -> {
665-
Assert.assertContains("is already in the queue", e.getMessage());
666-
});
667-
668-
scheduler.waitUntilTaskCompleted(task2.id(), 10);
669-
sleepAWhile(500);
670-
Assert.assertEquals(10, task2.progress());
671-
Assert.assertEquals(1, task2.retries());
672-
Assert.assertEquals("100", task2.result());
672+
Assert.assertThrows(IllegalArgumentException.class, () -> {
673+
Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2);
674+
}, e -> {
675+
Assert.assertContains("is already in the queue", e.getMessage());
676+
});
677+
scheduler.waitUntilTaskCompleted(task2.id(), 10);
678+
sleepAWhile(500);
679+
Assert.assertEquals(10, task2.progress());
680+
Assert.assertEquals(1, task2.retries());
681+
Assert.assertEquals("100", task2.result());
682+
}
673683
}
674684

675685
private HugeTask<Object> runGremlinJob(String gremlin) {

0 commit comments

Comments
 (0)