Skip to content

Commit 688aecd

Browse files
committed
Fix race in parallel composite decode breaking dummycode+recode
The multi-threaded DecoderComposite.decode submitted one task per decoder per row block, running all decoders concurrently. This broke the ordering dependency between decoders: recode-on-output reads the category indexes written by the dummycode decoder, so when the recode task raced ahead it read unwritten cells and produced null or the raw index instead of the original value. Parallelize over row blocks instead, running all decoders in order within each block via the sequential block decode. Also short-circuit to the single-threaded path when k <= 1. Fixes order-dependent failures in TransformFrameEncodeDecodeTest and TransformFrameEncodeColmapTest (dummycode single-node/hybrid) that surfaced once transformdecode started using the parallel decode path.
1 parent 73b622d commit 688aecd

1 file changed

Lines changed: 9 additions & 6 deletions

File tree

src/main/java/org/apache/sysds/runtime/transform/decode/DecoderComposite.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,20 @@ public FrameBlock decode(MatrixBlock in, FrameBlock out) {
6262

6363
@Override
6464
public FrameBlock decode(final MatrixBlock in, final FrameBlock out, final int k) {
65+
if(k <= 1)
66+
return decode(in, out);
6567
final ExecutorService pool = CommonThreadPool.get(k);
6668
out.ensureAllocatedColumns(in.getNumRows());
6769
try {
6870
final List<Future<?>> tasks = new ArrayList<>();
6971
int blz = Math.max(in.getNumRows() / k, 1000);
70-
for(Decoder decoder : _decoders){
71-
for(int i = 0; i < in.getNumRows(); i += blz){
72-
final int start = i;
73-
final int end = Math.min(in.getNumRows(), i + blz);
74-
tasks.add(pool.submit(() -> decoder.decode(in, out, start, end)));
75-
}
72+
// Parallelize over row blocks (not over decoders): all decoders must
73+
// run in order within a block, e.g. recode-on-output depends on the
74+
// category indexes produced by the preceding dummycode decoder.
75+
for(int i = 0; i < in.getNumRows(); i += blz){
76+
final int start = i;
77+
final int end = Math.min(in.getNumRows(), i + blz);
78+
tasks.add(pool.submit(() -> decode(in, out, start, end)));
7679
}
7780
for(Future<?> f : tasks)
7881
f.get();

0 commit comments

Comments
 (0)