Skip to content

Commit 70cf46f

Browse files
authored
analytics-engine: drive reduce through stage tasks (#21723)
* analytics-engine: drive reduce through stage tasks + wire cancellation Two related changes: 1. Drive reduce as a task. Introduces ReducingExchangeSink with a reduce/close split (reduce() owns the work, close() owns cleanup); collapses three reduce-execution variants into one ReduceStageExecution driven by the sink's supportsEagerScheduling(). Streaming sink anchors native cleanup on the reduce() caller via a SinkState machine. 2. Wire coordinator-side cancellation. Plumbs AnalyticsQueryTask.id through ExchangeSinkContext, registers a cancellation token in the Rust QUERY_REGISTRY, and has close() fire cancel_query for in-flight drains -- coordinator reduce now participates in task-level cancellation alongside data-node scans. Adds before-/after-first-batch cancellation tests in DatafusionReduceSinkTests. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * analytics-engine: fix race in DatafusionReduceSink.close() close()'s prior get()+compareAndSet pattern had a window where reduce() could transition READY→REDUCING between the read and the CAS, leaving cancel_query unfired and the parked drain hung. Replaced with a single compareAndExchange whose return value tells us which branch to take. Also collapsed the close() override into closeImpl, removed the base's auto-session-close (every concrete subclass already closed session, so this was a double-close masked by idempotency), and gated session.close on preparedState == null in both subclasses. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * spotless Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * fix javadoc Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * analytics-engine: fix outStream/session leak on cancel-during-REDUCING External close() arriving while reduce() was parked set the base class's `closed` flag and ran closeImpl() — which only fired cancel_query and deferred teardown. When reduce()'s finally then called super.close() to do the teardown, the base's `closed` short-circuit blocked re-entry and closeImpl() never ran a second time. outStream/session leaked. Have reduce()'s finally call closeImpl() directly, bypassing the base's closed-gate. Add a `torndown` AtomicBoolean so concurrent close paths (external + reduce-finally) can't both run the teardown body. Regression test: testCancelBeforeFirstBatchUnwindsDrain now asserts sink.torndown is set after the cancel — fails on the prior code. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> --------- Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 8f2d058 commit 70cf46f

40 files changed

Lines changed: 1307 additions & 782 deletions

File tree

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.spi;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
13+
/**
14+
* {@link ExchangeSink} that runs reduce work (execute plan, drain into downstream).
15+
* The reduce-stage execution invokes {@link #reduce} from its task; {@link #close} runs
16+
* separately on every terminal transition for resource cleanup — split so cancel-before-reduce
17+
* paths can release resources without re-running the work.
18+
*
19+
* @opensearch.experimental
20+
*/
21+
public interface ReducingExchangeSink extends ExchangeSink {
22+
23+
/**
24+
* Execute the reduce plan and drain output into the configured downstream; fire
25+
* {@code listener} on completion. Single-shot. Must not release resources — that's
26+
* {@link #close()}'s job.
27+
*/
28+
void reduce(ActionListener<Void> listener);
29+
30+
/**
31+
* {@code true} (default) — {@link #reduce} can run concurrently with producer
32+
* {@link #feed} calls. Buffered sinks that need the complete input set first
33+
* (e.g. memtable-backed) must override to {@code false}.
34+
*/
35+
default boolean supportsEagerScheduling() {
36+
return true;
37+
}
38+
}

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/AbstractDatafusionReduceSink.java

Lines changed: 43 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -12,91 +12,60 @@
1212
import org.apache.arrow.memory.BufferAllocator;
1313
import org.apache.arrow.vector.VectorSchemaRoot;
1414
import org.apache.arrow.vector.types.pojo.Schema;
15-
import org.opensearch.analytics.spi.ExchangeSink;
1615
import org.opensearch.analytics.spi.ExchangeSinkContext;
16+
import org.opensearch.analytics.spi.ReducingExchangeSink;
1717
import org.opensearch.be.datafusion.nativelib.StreamHandle;
18-
import org.opensearch.core.action.ActionListener;
1918

2019
import java.util.LinkedHashMap;
2120
import java.util.Map;
22-
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.ExecutionException;
24-
import java.util.function.Consumer;
2521

2622
/**
27-
* Shared lifecycle skeleton for coordinator-side {@link ExchangeSink}s backed by a native
28-
* DataFusion local session. Subclasses customise per-batch handling and the close-time
29-
* native handoff via {@link #feedBatchUnderLock} and {@link #closeUnderLock}.
23+
* Shared lifecycle skeleton for coordinator-side {@link ReducingExchangeSink}s backed by a
24+
* native DataFusion local session. Subclasses customise per-batch handling, the reduce verb,
25+
* and native cleanup via {@link #feedBatchUnderLock}, {@link #reduce}, {@link #closeImpl}.
3026
*
31-
* <p>Lifecycle invariants enforced by this base:
32-
* <ul>
33-
* <li>{@link #feed} synchronises on {@link #feedLock}, short-circuits when {@link #closed},
34-
* and always closes the supplied {@link VectorSchemaRoot} in {@code finally} regardless
35-
* of whether {@link #feedBatchUnderLock} succeeds.</li>
36-
* <li>{@link #close} flips {@link #closed} once under {@link #feedLock}, runs the
37-
* subclass-specific {@link #closeUnderLock} hook, and rethrows any accumulated
38-
* failure. Subclasses must close {@link #session} themselves inside
39-
* {@link #closeUnderLock} (typically last, after any owned native streams).</li>
40-
* <li>The downstream from {@link ExchangeSinkContext#downstream()} is intentionally NOT
41-
* closed here — it accumulates drained results consumed by the walker after the
42-
* sink is done.</li>
43-
* </ul>
27+
* <p>Invariants: {@link #feed} runs under {@link #feedLock} and always closes the batch in
28+
* {@code finally}; {@link #close} flips {@link #closed} under the lock then delegates to
29+
* {@link #closeImpl}. Subclasses own the full teardown sequence in {@link #closeImpl} —
30+
* including {@link #session} and any sender/stream handles. The base used to auto-close
31+
* {@code session} after {@code closeImpl} returned, but that double-closed (every concrete
32+
* subclass already closed it) and tangled session ownership with the close-state machine
33+
* the streaming subclass needs to manage.
34+
* The {@link ExchangeSinkContext#downstream()} sink is NOT closed here — its lifecycle is the
35+
* walker's.
4436
*
45-
* <p>Multi-input shapes (Union, future Join) are supported at this base by exposing
46-
* {@link #childInputs} (childStageId → producer-side plan bytes) for subclasses to register
47-
* one native partition per child stage. The native call returns the IPC-encoded schema the
48-
* session settled on after lowering; subclasses populate {@link #childSchemas} from those
49-
* returns so the {@code typesMatch} tripwire validates batches against the same schema the
50-
* native session is registered with. The {@link #INPUT_ID} constant remains as the
51-
* conventional name for the single-input case (childStageId=0); the per-child id is
52-
* computed via {@link #inputIdFor(int)}.
37+
* <p>Multi-input is exposed via {@link #childInputs} (childStageId → producer plan bytes) and
38+
* {@link #childSchemas} (lazily populated by subclasses from native registration replies).
39+
* Use {@link #inputIdFor(int)} for the per-child table id; {@link #INPUT_ID} is the
40+
* single-input shortcut.
5341
*
5442
* @opensearch.internal
5543
*/
56-
abstract class AbstractDatafusionReduceSink implements ExchangeSink {
44+
abstract class AbstractDatafusionReduceSink implements ReducingExchangeSink {
5745

58-
/**
59-
* Substrait/DataFusion table name used for the single-input case (childStageId=0).
60-
* For multi-input shapes use {@link #inputIdFor(int)} instead.
61-
*/
46+
/** Single-input shortcut for the per-child table id; multi-input uses {@link #inputIdFor(int)}. */
6247
static final String INPUT_ID = "input-0";
6348

6449
protected final ExchangeSinkContext ctx;
6550
protected final NativeRuntimeHandle runtimeHandle;
6651
protected final DatafusionLocalSession session;
52+
6753
/**
68-
* Non-null when this sink was constructed with a pre-prepared FINAL-aggregate plan
69-
* from the FinalAggregateInstructionHandler. When present, the handler already created
70-
* the session, registered the input partitions, and called {@code prepareFinalPlan} on
71-
* the Rust side; the sink only needs to drive {@code executeLocalPreparedPlan} and feed
72-
* batches. When null, the sink falls back to the legacy path (create its own session,
73-
* register its own partitions, call {@code executeLocalPlan}).
74-
*
75-
* <p>Close ownership: when {@code preparedState != null} the state owns session +
76-
* senders and {@link #close} skips re-closing them (avoids double-close on the native
77-
* side). When {@code preparedState == null} the base class closes the session itself.
54+
* Non-null when constructed from a pre-prepared plan (FinalAggregateInstructionHandler):
55+
* session + senders are owned by the state and {@link #close} skips re-closing them.
7856
*/
7957
protected final DataFusionReduceState preparedState;
80-
/**
81-
* Per-child producer-side plan bytes, keyed by childStageId. Iteration order matches
82-
* the order of {@code ctx.childInputs()} so subclasses get deterministic registration.
83-
* Subclasses pass each entry to the native registration call, which lowers the plan
84-
* and returns the schema the session settled on.
85-
*/
58+
59+
/** Per-child producer plan bytes (childStageId → bytes), iteration order = ctx.childInputs(). */
8660
protected final Map<Integer, byte[]> childInputs;
8761

88-
/**
89-
* Declared Arrow {@link org.apache.arrow.vector.types.pojo.Schema} per childStageId.
90-
* Populated lazily by subclasses from the IPC bytes the native registration call
91-
* returns — i.e. the schema the native session itself derived from the producer plan.
92-
* Used by sinks to validate incoming batches via the {@code typesMatch} tripwire.
93-
*/
62+
/** Per-child declared schema, populated lazily from native registration replies. */
9463
protected final Map<Integer, Schema> childSchemas;
9564

96-
/** Guards {@link #closed} and serialises {@link #feed}/{@link #close} against producers. */
65+
/** Serialises {@link #feed}/{@link #close} against producers. */
9766
protected final Object feedLock = new Object();
9867

99-
/** Set once in {@link #close} under {@link #feedLock}. Visible to all threads via volatile. */
68+
/** Set once under {@link #feedLock} in {@link #close}. */
10069
protected volatile boolean closed;
10170

10271
protected AbstractDatafusionReduceSink(ExchangeSinkContext ctx, NativeRuntimeHandle runtimeHandle) {
@@ -132,67 +101,46 @@ public void feed(VectorSchemaRoot batch) {
132101
batch.close();
133102
return;
134103
}
135-
try {
104+
try (batch) {
136105
feedBatchUnderLock(batch);
137-
} finally {
138-
batch.close();
139106
}
140107
}
141108
}
142109

143110
@Override
144-
public final void close() {
111+
public void close() {
145112
synchronized (feedLock) {
146113
if (closed) {
147114
return;
148115
}
149116
closed = true;
150117
}
151-
Throwable failure = null;
118+
Exception failure = null;
152119
try {
153-
failure = closeUnderLock();
154-
} catch (Throwable t) {
120+
failure = closeImpl();
121+
} catch (Exception t) {
155122
failure = accumulate(failure, t);
156-
} finally {
157-
// If a preparedState owns the session/senders, let the state's close handle
158-
// them (invoked by the orchestrator). Otherwise close the session we created.
159-
if (preparedState == null) {
160-
try {
161-
session.close();
162-
} catch (Throwable t) {
163-
failure = accumulate(failure, t);
164-
}
165-
}
166123
}
167124
rethrow(failure);
168125
}
169126

170127
/**
171-
* Per-batch hook. Called inside {@code synchronized(feedLock)} after {@code closed} is
172-
* verified false. Implementations export and hand off (or buffer) {@code batch} via the
173-
* native bridge. Implementations MUST NOT close {@code batch} — the base class does that
174-
* in {@code finally}.
128+
* Per-batch hook. Called under {@link #feedLock} after the closed-check. MUST NOT
129+
* close {@code batch} — the base does it in {@code finally}.
175130
*/
176131
protected abstract void feedBatchUnderLock(VectorSchemaRoot batch);
177132

178133
/**
179-
* Subclass-specific shutdown. Runs after {@link #closed} is set. Implementations must
180-
* close all owned native resources including {@link #session} — close owned streams
181-
* before the session.
182-
*
183-
* @return the first failure encountered (use {@link #accumulate(Throwable, Throwable)}
184-
* when multiple steps may fail), or {@code null} on clean shutdown.
134+
* Subclass shutdown. Despite the historical name, this is NOT called under {@link #feedLock} —
135+
* the lock is released after {@link #closed} is flipped. Subclasses own the full teardown
136+
* including {@link #session} (gate on {@link #preparedState} == null when the session is
137+
* owned externally). Return the first failure, or null.
185138
*/
186-
protected abstract Throwable closeUnderLock();
139+
protected abstract Exception closeImpl();
187140

188141
/**
189-
* Drains a native output stream into {@link ExchangeSinkContext#downstream()},
190-
* importing each native batch into a fresh {@link VectorSchemaRoot}.
191-
*
192-
* <p>Uses {@link DatafusionResultStream.BatchIterator} directly (instead of
193-
* {@link DatafusionResultStream}) so the caller retains ownership of {@code outStream} —
194-
* the iterator manages schema, dictionary provider, and per-batch allocation, but
195-
* does not close the underlying stream handle.
142+
* Imports each batch from {@code outStream} into a fresh {@link VectorSchemaRoot} and
143+
* feeds it downstream. Caller retains ownership of {@code outStream}.
196144
*/
197145
protected final void drainOutputIntoDownstream(StreamHandle outStream) {
198146
BufferAllocator alloc = ctx.allocator();
@@ -204,48 +152,22 @@ protected final void drainOutputIntoDownstream(StreamHandle outStream) {
204152
}
205153
}
206154

207-
/**
208-
* Synchronously awaits the result of an async native call expressed as a
209-
* {@code Consumer<ActionListener<Long>>}. Restores interrupt state on
210-
* {@link InterruptedException} and unwraps {@link ExecutionException} to surface the
211-
* original cause.
212-
*/
213-
protected static long asyncCall(Consumer<ActionListener<Long>> call) {
214-
CompletableFuture<Long> future = new CompletableFuture<>();
215-
call.accept(ActionListener.wrap(future::complete, future::completeExceptionally));
216-
try {
217-
return future.get();
218-
} catch (InterruptedException e) {
219-
Thread.currentThread().interrupt();
220-
throw new RuntimeException(e);
221-
} catch (ExecutionException e) {
222-
Throwable cause = e.getCause();
223-
if (cause instanceof RuntimeException re) {
224-
throw re;
225-
}
226-
throw new RuntimeException(cause);
227-
}
228-
}
229-
230155
/** Returns {@code t} if {@code acc} is null; otherwise adds {@code t} as a suppressed of {@code acc}. */
231-
protected static Throwable accumulate(Throwable acc, Throwable t) {
156+
protected static Exception accumulate(Exception acc, Exception t) {
232157
if (acc == null) {
233158
return t;
234159
}
235160
acc.addSuppressed(t);
236161
return acc;
237162
}
238163

239-
private static void rethrow(Throwable failure) {
164+
private static void rethrow(Exception failure) {
240165
if (failure == null) {
241166
return;
242167
}
243168
if (failure instanceof RuntimeException re) {
244169
throw re;
245170
}
246-
if (failure instanceof Error err) {
247-
throw err;
248-
}
249171
throw new RuntimeException(failure);
250172
}
251173
}

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ public ExchangeSink createSink(ExchangeSinkContext ctx, BackendExecutionContext
725725
if ("memtable".equals(mode) && ctx.childInputs().size() == 1 && preparedState == null) {
726726
return new DatafusionMemtableReduceSink(ctx, svc.getNativeRuntime());
727727
}
728-
return new DatafusionReduceSink(ctx, svc.getNativeRuntime(), svc.getDrainExecutor(), preparedState);
728+
return new DatafusionReduceSink(ctx, svc.getNativeRuntime(), preparedState);
729729
}
730730
};
731731
}

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionService.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919

2020
import java.io.IOException;
2121
import java.util.Collection;
22-
import java.util.concurrent.Executor;
23-
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
2522

2623
/**
2724
* Node-level service managing the DataFusion native runtime lifecycle.
@@ -42,8 +39,6 @@ public class DataFusionService extends AbstractLifecycleComponent {
4239
private final int cpuThreads;
4340
private final ClusterSettings clusterSettings;
4441

45-
private volatile ExecutorService drainExecutor;
46-
4742
/** Handle to the native DataFusion global runtime (memory pool + cache). */
4843
private volatile NativeRuntimeHandle runtimeHandle;
4944

@@ -58,14 +53,6 @@ private DataFusionService(Builder builder) {
5853
this.clusterSettings = builder.clusterSettings;
5954
}
6055

61-
public Executor getDrainExecutor() {
62-
ExecutorService exec = drainExecutor;
63-
if (exec == null) {
64-
throw new IllegalStateException("DataFusionService has not been started");
65-
}
66-
return exec;
67-
}
68-
6956
/** Creates a new builder. */
7057
public static Builder builder() {
7158
return new Builder();
@@ -75,7 +62,6 @@ public static Builder builder() {
7562
protected void doStart() {
7663
logger.debug("Starting DataFusion service");
7764
NativeBridge.initTokioRuntimeManager(cpuThreads);
78-
this.drainExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("analytics-reduce-drain-", 0).factory());
7965
logger.debug("Tokio runtime manager initialized with {} CPU threads", cpuThreads);
8066

8167
long cacheManagerPtr = 0L;
@@ -99,16 +85,9 @@ protected void doStop() {
9985
try {
10086
releaseRuntime();
10187
} finally {
102-
try {
103-
ExecutorService exec = drainExecutor;
104-
if (exec != null) {
105-
exec.shutdown();
106-
drainExecutor = null;
107-
}
108-
} finally {
109-
NativeBridge.shutdownTokioRuntimeManager();
110-
}
88+
NativeBridge.shutdownTokioRuntimeManager();
11189
}
90+
11291
logger.debug("DataFusion service stopped");
11392
}
11493

0 commit comments

Comments
 (0)