Skip to content

Commit 3b564bb

Browse files
authored
[MINOR] Fix Flaky Tests from Blocking Thread Pools (#2489)
1 parent 17803b1 commit 3b564bb

2 files changed

Lines changed: 6 additions & 10 deletions

File tree

src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
3636
import org.apache.sysds.runtime.matrix.operators.Operator;
3737
import org.apache.sysds.runtime.meta.DataCharacteristics;
38-
import org.apache.sysds.runtime.ooc.stream.StreamContext;
3938
import org.apache.sysds.runtime.util.IndexRange;
4039

4140
import java.util.HashMap;
@@ -119,9 +118,10 @@ public void processInstruction( ExecutionContext ec ) {
119118
});
120119

121120
// global reduce
122-
submitOOCTask(() -> {
123-
IndexedMatrixValue partial;
124-
while ((partial = qLocal.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) {
121+
addOutStream(qOut);
122+
submitOOCTasks(qLocal, callback -> {
123+
IndexedMatrixValue partial = callback.get();
124+
synchronized(aggTracker) {
125125
long idx = aggun.isRowAggregate() ? partial.getIndexes().getRowIndex() : partial.getIndexes()
126126
.getColumnIndex();
127127

@@ -150,8 +150,7 @@ public void processInstruction( ExecutionContext ec ) {
150150
corrs.remove(idx);
151151
}
152152
}
153-
qOut.closeInput();
154-
}, new StreamContext().addOutStream(qOut));
153+
}).thenRun(qOut::closeInput);
155154
}
156155
// full aggregation
157156
else {

src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,7 +1196,7 @@ protected <T> CompletableFuture<Void> submitOOCTasks(OOCStream<T> queue, Consume
11961196
}
11971197

11981198
protected CompletableFuture<Void> submitOOCTask(Runnable r, StreamContext ctx) {
1199-
ExecutorService pool = CommonThreadPool.get();
1199+
ExecutorService pool = CommonThreadPool.getDynamicPool();
12001200
final CompletableFuture<Void> future = new CompletableFuture<>();
12011201
try {
12021202
COMPUTE_IN_FLIGHT.incrementAndGet();
@@ -1220,9 +1220,6 @@ protected CompletableFuture<Void> submitOOCTask(Runnable r, StreamContext ctx) {
12201220
COMPUTE_IN_FLIGHT.decrementAndGet();
12211221
throw new DMLRuntimeException(ex);
12221222
}
1223-
finally {
1224-
pool.shutdown();
1225-
}
12261223

12271224
return future;
12281225
}

0 commit comments

Comments
 (0)