Skip to content

Commit 40a1461

Browse files
authored
fix(datafusion): tighten reduce-sink drain + sender lifecycle (opensearch-project#21671)
Lifecycle and correctness fixes on the coordinator reduce-sink path: locked sender_close, deterministic FFI release on drain/send races, shared drain + session teardown in AbstractDatafusionReduceSink, and moves the drain off SEARCH onto a virtual-thread executor. Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 6ffefb4 commit 40a1461

7 files changed

Lines changed: 169 additions & 199 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/AbstractDatafusionReduceSink.java

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,12 @@
88

99
package org.opensearch.be.datafusion;
1010

11-
import org.apache.arrow.c.ArrowArray;
12-
import org.apache.arrow.c.ArrowSchema;
1311
import org.apache.arrow.c.CDataDictionaryProvider;
14-
import org.apache.arrow.c.Data;
1512
import org.apache.arrow.memory.BufferAllocator;
1613
import org.apache.arrow.vector.VectorSchemaRoot;
17-
import org.apache.arrow.vector.types.pojo.Field;
1814
import org.apache.arrow.vector.types.pojo.Schema;
1915
import org.opensearch.analytics.spi.ExchangeSink;
2016
import org.opensearch.analytics.spi.ExchangeSinkContext;
21-
import org.opensearch.be.datafusion.nativelib.NativeBridge;
2217
import org.opensearch.be.datafusion.nativelib.StreamHandle;
2318
import org.opensearch.core.action.ActionListener;
2419

@@ -28,8 +23,6 @@
2823
import java.util.concurrent.ExecutionException;
2924
import java.util.function.Consumer;
3025

31-
import static org.apache.arrow.c.Data.importField;
32-
3326
/**
3427
* Shared lifecycle skeleton for coordinator-side {@link ExchangeSink}s backed by a native
3528
* DataFusion local session. Subclasses customise per-batch handling and the close-time
@@ -41,8 +34,9 @@
4134
* and always closes the supplied {@link VectorSchemaRoot} in {@code finally} regardless
4235
* of whether {@link #feedBatchUnderLock} succeeds.</li>
4336
* <li>{@link #close} flips {@link #closed} once under {@link #feedLock}, runs the
44-
* subclass-specific {@link #closeUnderLock} hook, and unconditionally closes
45-
* {@link #session} in {@code finally}, accumulating any failures and rethrowing.</li>
37+
* subclass-specific {@link #closeUnderLock} hook, and rethrows any accumulated
38+
* failure. Subclasses must close {@link #session} themselves inside
39+
* {@link #closeUnderLock} (typically last, after any owned native streams).</li>
4640
* <li>The downstream from {@link ExchangeSinkContext#downstream()} is intentionally NOT
4741
* closed here — it accumulates drained results consumed by the walker after the
4842
* sink is done.</li>
@@ -179,38 +173,30 @@ public final void close() {
179173
protected abstract void feedBatchUnderLock(VectorSchemaRoot batch);
180174

181175
/**
182-
* Subclass-specific shutdown. Runs after {@link #closed} is set and before
183-
* {@link #session} is closed. Implementations should close their owned native resources
184-
* (sender, output stream, accumulated FFI structs, …) and drain any pending output.
176+
* Subclass-specific shutdown. Runs after {@link #closed} is set. Implementations must
177+
* close all owned native resources including {@link #session} close owned streams
178+
* before the session.
185179
*
186180
* @return the first failure encountered (use {@link #accumulate(Throwable, Throwable)}
187181
* when multiple steps may fail), or {@code null} on clean shutdown.
188182
*/
189183
protected abstract Throwable closeUnderLock();
190184

191185
/**
192-
* Drains a native output stream into {@link ExchangeSinkContext#downstream()}, importing
193-
* each {@link ArrowArray} into a fresh {@link VectorSchemaRoot} on the Java side.
186+
* Drains a native output stream into {@link ExchangeSinkContext#downstream()},
187+
* importing each native batch into a fresh {@link VectorSchemaRoot}.
188+
*
189+
* <p>Uses {@link DatafusionResultStream.BatchIterator} directly (instead of
190+
* {@link DatafusionResultStream}) so the caller retains ownership of {@code outStream} —
191+
* the iterator manages schema, dictionary provider, and per-batch allocation, but
192+
* does not close the underlying stream handle.
194193
*/
195194
protected final void drainOutputIntoDownstream(StreamHandle outStream) {
196195
BufferAllocator alloc = ctx.allocator();
197196
try (CDataDictionaryProvider dictProvider = new CDataDictionaryProvider()) {
198-
long schemaAddr = asyncCall(listener -> NativeBridge.streamGetSchema(outStream.getPointer(), listener));
199-
Schema outSchema;
200-
try (ArrowSchema arrowSchema = ArrowSchema.wrap(schemaAddr)) {
201-
Field structField = importField(alloc, arrowSchema, dictProvider);
202-
outSchema = new Schema(structField.getChildren(), structField.getMetadata());
203-
}
204-
while (true) {
205-
long arrayAddr = asyncCall(listener -> NativeBridge.streamNext(runtimeHandle.get(), outStream.getPointer(), listener));
206-
if (arrayAddr == 0) {
207-
break;
208-
}
209-
VectorSchemaRoot vsr = VectorSchemaRoot.create(outSchema, alloc);
210-
try (ArrowArray arrowArray = ArrowArray.wrap(arrayAddr)) {
211-
Data.importIntoVectorSchemaRoot(alloc, arrowArray, vsr, dictProvider);
212-
}
213-
ctx.downstream().feed(vsr);
197+
DatafusionResultStream.BatchIterator it = new DatafusionResultStream.BatchIterator(outStream, alloc, dictProvider);
198+
while (it.hasNext()) {
199+
ctx.downstream().feed(it.next().getArrowRoot());
214200
}
215201
}
216202
}

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ public ExchangeSinkProvider getExchangeSinkProvider() {
613613
if ("memtable".equals(mode) && ctx.childInputs().size() == 1 && preparedState == null) {
614614
return new DatafusionMemtableReduceSink(ctx, svc.getNativeRuntime());
615615
}
616-
return new DatafusionReduceSink(ctx, svc.getNativeRuntime(), preparedState);
616+
return new DatafusionReduceSink(ctx, svc.getNativeRuntime(), svc.getDrainExecutor(), preparedState);
617617
};
618618
}
619619

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
import java.io.IOException;
2323
import java.util.Collection;
24+
import java.util.concurrent.Executor;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
2427
import java.util.concurrent.atomic.AtomicLong;
2528

2629
/**
@@ -42,6 +45,15 @@ public class DataFusionService extends AbstractLifecycleComponent {
4245
private final int cpuThreads;
4346
private final ClusterSettings clusterSettings;
4447

48+
/**
49+
* Virtual-thread-per-task executor used for {@link DatafusionReduceSink}'s drain
50+
* loop. Drain tasks park on {@code streamNext().join()} while the native side
51+
* produces the next batch; on a virtual thread the park unmounts from its carrier
52+
* so concurrent queries don't consume fixed platform-pool slots for the full
53+
* query lifetime.
54+
*/
55+
private volatile ExecutorService drainExecutor;
56+
4557
/** Handle to the native DataFusion global runtime (memory pool + cache). */
4658
private volatile NativeRuntimeHandle runtimeHandle;
4759

@@ -62,6 +74,20 @@ private DataFusionService(Builder builder) {
6274
this.clusterSettings = builder.clusterSettings;
6375
}
6476

77+
/**
78+
* Returns the virtual-thread-per-task executor the coordinator-reduce sink uses for
79+
* its drain loop. Tasks spawned here park on native futures; virtual threads unmount
80+
* from their carriers during the park so concurrent queries don't consume fixed
81+
* platform-pool slots for the query's full lifetime.
82+
*/
83+
public Executor getDrainExecutor() {
84+
ExecutorService exec = drainExecutor;
85+
if (exec == null) {
86+
throw new IllegalStateException("DataFusionService has not been started");
87+
}
88+
return exec;
89+
}
90+
6591
/** Creates a new builder. */
6692
public static Builder builder() {
6793
return new Builder();
@@ -73,6 +99,8 @@ protected void doStart() {
7399
NativeBridge.initTokioRuntimeManager(cpuThreads);
74100
logger.debug("Tokio runtime manager initialized with {} CPU threads", cpuThreads);
75101

102+
this.drainExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("analytics-reduce-drain-", 0).factory());
103+
76104
long cacheManagerPtr = 0L;
77105
if (clusterSettings != null) {
78106
cacheManagerPtr = CacheUtils.createCacheConfig(clusterSettings);
@@ -101,7 +129,15 @@ protected void doStop() {
101129
rootAllocator = null;
102130
}
103131
} finally {
104-
NativeBridge.shutdownTokioRuntimeManager();
132+
try {
133+
ExecutorService exec = drainExecutor;
134+
if (exec != null) {
135+
exec.shutdown();
136+
drainExecutor = null;
137+
}
138+
} finally {
139+
NativeBridge.shutdownTokioRuntimeManager();
140+
}
105141
}
106142
}
107143
logger.debug("DataFusion service stopped");

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionMemtableReduceSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ protected Throwable closeUnderLock() {
154154
if (streamPtr != 0) {
155155
NativeBridge.streamClose(streamPtr);
156156
}
157+
session.close();
157158
}
158159
return failure;
159160
}

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionPartitionSender.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public void send(long arrayAddr, long schemaAddr) {
4343
public void close() {
4444
lifecycle.writeLock().lock();
4545
try {
46+
assert lifecycle.isWriteLockedByCurrentThread() : "close must hold the write lock across super.close()";
4647
super.close();
4748
} finally {
4849
lifecycle.writeLock().unlock();

0 commit comments

Comments
 (0)