Skip to content

Commit af85bef

Browse files
committed
fix(server): fix some issues of the distributed scheduler
1 parent 6dd52e4 commit af85bef

3 files changed

Lines changed: 27 additions & 24 deletions

File tree

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.slf4j.Logger;
5151

5252
public class DistributedTaskScheduler extends TaskAndResultScheduler {
53+
5354
private static final Logger LOG = Log.logger(DistributedTaskScheduler.class);
5455
private final long schedulePeriod;
5556
private final ExecutorService taskDbExecutor;

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.concurrent.Future;
2929
import java.util.concurrent.TimeoutException;
3030

31-
import com.google.common.collect.ImmutableMap;
3231
import org.apache.hugegraph.HugeException;
3332
import org.apache.hugegraph.HugeGraph;
3433
import org.apache.hugegraph.HugeGraphParams;
@@ -57,6 +56,8 @@
5756
import org.apache.tinkerpop.gremlin.structure.Vertex;
5857
import org.slf4j.Logger;
5958

59+
import com.google.common.collect.ImmutableMap;
60+
6061
public class StandardTaskScheduler implements TaskScheduler {
6162

6263
private static final Logger LOG = Log.logger(StandardTaskScheduler.class);
@@ -266,7 +267,7 @@ public synchronized <V> void cancel(HugeTask<V> task) {
266267
}
267268

268269
throw new HugeException("Can't cancel task '%s' in status %s",
269-
task.id(), task.status());
270+
task.id(), task.status());
270271
}
271272

272273
@Override

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
* Base class of task & result scheduler
4747
*/
4848
public abstract class TaskAndResultScheduler implements TaskScheduler {
49+
4950
/**
5051
* Which graph the scheduler belongs to
5152
*/
@@ -61,8 +62,8 @@ public abstract class TaskAndResultScheduler implements TaskScheduler {
6162
private final ServerInfoManager serverManager;
6263

6364
public TaskAndResultScheduler(
64-
HugeGraphParams graph,
65-
ExecutorService serverInfoDbExecutor) {
65+
HugeGraphParams graph,
66+
ExecutorService serverInfoDbExecutor) {
6667
E.checkNotNull(graph, "graph");
6768

6869
this.graph = graph;
@@ -90,7 +91,7 @@ public <V> void save(HugeTask<V> task) {
9091
// Save result outcome
9192
if (rawResult != null) {
9293
HugeTaskResult result =
93-
new HugeTaskResult(HugeTaskResult.genId(task.id()));
94+
new HugeTaskResult(HugeTaskResult.genId(task.id()));
9495
result.result(rawResult);
9596

9697
this.call(() -> {
@@ -164,7 +165,7 @@ protected <V> Iterator<HugeTask<V>> queryTask(Map<String, Object> conditions,
164165
}
165166
Iterator<Vertex> vertices = this.tx().queryTaskInfos(query);
166167
Iterator<HugeTask<V>> tasks =
167-
new MapperIterator<>(vertices, HugeTask::fromVertex);
168+
new MapperIterator<>(vertices, HugeTask::fromVertex);
168169
// Convert iterator to list to avoid across thread tx accessed
169170
return QueryResults.toList(tasks);
170171
});
@@ -180,16 +181,16 @@ protected <V> Iterator<HugeTask<V>> queryTask(Map<String, Object> conditions,
180181

181182
protected <V> Iterator<HugeTask<V>> queryTask(List<Id> ids) {
182183
ListIterator<HugeTask<V>> ts = this.call(
183-
() -> {
184-
Object[] idArray = ids.toArray(new Id[ids.size()]);
185-
Iterator<Vertex> vertices = this.tx()
186-
.queryTaskInfos(idArray);
187-
Iterator<HugeTask<V>> tasks =
188-
new MapperIterator<>(vertices,
189-
HugeTask::fromVertex);
190-
// Convert iterator to list to avoid across thread tx accessed
191-
return QueryResults.toList(tasks);
192-
});
184+
() -> {
185+
Object[] idArray = ids.toArray(new Id[ids.size()]);
186+
Iterator<Vertex> vertices = this.tx()
187+
.queryTaskInfos(idArray);
188+
Iterator<HugeTask<V>> tasks =
189+
new MapperIterator<>(vertices,
190+
HugeTask::fromVertex);
191+
// Convert iterator to list to avoid across thread tx accessed
192+
return QueryResults.toList(tasks);
193+
});
193194

194195
Iterator<HugeTaskResult> results = queryTaskResult(ids);
195196

@@ -201,7 +202,7 @@ protected <V> Iterator<HugeTask<V>> queryTask(List<Id> ids) {
201202

202203
return new MapperIterator<>(ts, (task) -> {
203204
HugeTaskResult taskResult =
204-
resultCaches.get(HugeTaskResult.genId(task.id()));
205+
resultCaches.get(HugeTaskResult.genId(task.id()));
205206
if (taskResult != null) {
206207
task.result(taskResult);
207208
}
@@ -231,7 +232,7 @@ protected <V> Iterator<HugeTask<V>> tasksWithoutResult(List<Id> ids) {
231232
Object[] idArray = ids.toArray(new Id[ids.size()]);
232233
Iterator<Vertex> vertices = this.tx().queryTaskInfos(idArray);
233234
Iterator<HugeTask<V>> tasks =
234-
new MapperIterator<>(vertices, HugeTask::fromVertex);
235+
new MapperIterator<>(vertices, HugeTask::fromVertex);
235236
// Convert iterator to list to avoid across thread tx accessed
236237
return QueryResults.toList(tasks);
237238
});
@@ -254,7 +255,7 @@ protected <V> Iterator<HugeTask<V>> queryTaskWithoutResult(String key,
254255
}
255256

256257
protected <V> Iterator<HugeTask<V>> queryTaskWithoutResult(Map<String,
257-
Object> conditions, long limit, String page) {
258+
Object> conditions, long limit, String page) {
258259
return this.call(() -> {
259260
ConditionQuery query = new ConditionQuery(HugeType.TASK);
260261
if (page != null) {
@@ -272,7 +273,7 @@ protected <V> Iterator<HugeTask<V>> queryTaskWithoutResult(Map<String,
272273
}
273274
Iterator<Vertex> vertices = this.tx().queryTaskInfos(query);
274275
Iterator<HugeTask<V>> tasks =
275-
new MapperIterator<>(vertices, HugeTask::fromVertex);
276+
new MapperIterator<>(vertices, HugeTask::fromVertex);
276277
// Convert iterator to list to avoid across thread tx accessed
277278
return QueryResults.toList(tasks);
278279
});
@@ -281,7 +282,7 @@ protected <V> Iterator<HugeTask<V>> queryTaskWithoutResult(Map<String,
281282
protected HugeTaskResult queryTaskResult(Id taskid) {
282283
HugeTaskResult result = this.call(() -> {
283284
Iterator<Vertex> vertices =
284-
this.tx().queryTaskInfos(HugeTaskResult.genId(taskid));
285+
this.tx().queryTaskInfos(HugeTaskResult.genId(taskid));
285286
Vertex vertex = QueryResults.one(vertices);
286287
if (vertex == null) {
287288
return null;
@@ -296,12 +297,12 @@ protected HugeTaskResult queryTaskResult(Id taskid) {
296297
protected Iterator<HugeTaskResult> queryTaskResult(List<Id> taskIds) {
297298
return this.call(() -> {
298299
Object[] idArray =
299-
taskIds.stream().map(HugeTaskResult::genId).toArray();
300+
taskIds.stream().map(HugeTaskResult::genId).toArray();
300301
Iterator<Vertex> vertices = this.tx()
301302
.queryTaskInfos(idArray);
302303
Iterator<HugeTaskResult> tasks =
303-
new MapperIterator<>(vertices,
304-
HugeTaskResult::fromVertex);
304+
new MapperIterator<>(vertices,
305+
HugeTaskResult::fromVertex);
305306
// Convert iterator to list to avoid across thread tx accessed
306307
return QueryResults.toList(tasks);
307308
});

0 commit comments

Comments
 (0)