-
Notifications
You must be signed in to change notification settings - Fork 97
Expand file tree
/
Copy pathOrchestrator.java
More file actions
116 lines (93 loc) · 3.86 KB
/
Orchestrator.java
File metadata and controls
116 lines (93 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package org.dataloader.orchestration;
import org.dataloader.DataLoader;
import org.dataloader.impl.Assertions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
public class Orchestrator<K, V> {
private final Executor executor;
private final Tracker tracker;
private final DataLoader<K, V> startingDL;
private final List<Step<?, ?>> steps = new ArrayList<>();
/**
* This will create a new {@link Orchestrator} that can allow multiple calls to multiple data-loader's
* to be orchestrated so they all run optimally.
*
* @param dataLoader the data loader to start with
* @param <K> the key type
* @param <V> the value type
* @return a new {@link Orchestrator}
*/
public static <K, V> Orchestrator<K, V> orchestrate(DataLoader<K, V> dataLoader) {
return new Orchestrator<>(new Tracker(), dataLoader, ImmediateExecutor.INSTANCE);
}
// TODO - make this a builder
public static <K, V> Orchestrator<K, V> orchestrate(DataLoader<K, V> dataLoader, Executor executor) {
return new Orchestrator<>(new Tracker(), dataLoader, executor);
}
private Orchestrator(Tracker tracker, DataLoader<K, V> dataLoader, Executor executor) {
this.tracker = tracker;
this.startingDL = dataLoader;
this.executor = executor;
}
public Tracker getTracker() {
return tracker;
}
public Executor getExecutor() {
return executor;
}
public Step<K, V> load(K key) {
return load(key, null);
}
public Step<K, V> load(K key, Object keyContext) {
return Step.loadImpl(this, castAs(startingDL), key, keyContext);
}
static <T> T castAs(Object o) {
//noinspection unchecked
return (T) o;
}
<KT, VT> void record(Step<KT, VT> step) {
steps.add(step);
tracker.incrementStepCount();
}
/**
* This is the callback point saying to start the DataLoader loading process.
* <p>
* The type of object returned here depends on the value type of the last Step. We cant be truly generic
* here and must be case.
*
* @param <VT> the value type
* @return the final composed value
*/
<VT> CompletableFuture<VT> execute() {
Assertions.assertState(!steps.isEmpty(), () -> "How can the steps to run be empty??");
int index = 0;
Step<?, ?> firstStep = steps.get(index);
CompletableFuture<Object> currentCF = castAs(firstStep.codeToRun().apply(null)); // first load uses variable capture
whenComplete(index, firstStep, currentCF);
for (index++; index < steps.size(); index++) {
Step<?, ?> nextStep = steps.get(index);
Function<Object, CompletableFuture<?>> codeToRun = castAs(nextStep.codeToRun());
CompletableFuture<Object> nextCF = currentCF.thenCompose(value -> castAs(codeToRun.apply(value)));
currentCF = nextCF;
// side effect when this step is complete
whenComplete(index, nextStep, nextCF);
}
return castAs(currentCF);
}
private void whenComplete(int index, Step<?, ?> step, CompletableFuture<Object> cf) {
cf.whenComplete((v, throwable) -> {
getTracker().loadCallComplete(step.dataLoader());
// replace with instrumentation code
if (throwable != null) {
// TODO - should we be cancelling future steps here - no need for dispatch tracking if they will never run
System.out.println("A throwable has been thrown on step " + index + ": " + throwable.getMessage());
throwable.printStackTrace(System.out);
} else {
System.out.println("step " + index + " returned : " + v);
}
});
}
}