Skip to content

Commit 041dc0e

Browse files
committed
Cleanup pass on the singleton-removal migration
Self-review surfaced six items worth fixing before pushing: * DefaultPlanExecutor now subscribes to the QUERY pool listener and updates coordinatorAllocator on dynamic resize, matching the pattern used by AnalyticsSearchService and FlightTransport. Without this, runtime grows of parquet.native.pool.query.max never reached the coordinator-side allocator. * DefaultPlanExecutor.coordinatorAllocator is now sized at queryPool.getLimit() instead of Long.MAX_VALUE so the literal matches the actual cap. * Drop the dead try/catch around removeListener in FlightTransport.stopInternal and AnalyticsSearchService.close. Those catches were holdovers from the singleton era and can't fire now that nativeAllocator is constructor-injected. * Fix the joined-import line in AnalyticsSearchService that two earlier patches concatenated. * Expand ArrowBatchResponse Javadoc with a pool-selection decision tree (FLIGHT for transport, INGEST for ingest path, QUERY for query execution) so readers know which pool to pick. * Update misleading Javadoc in TransportNativeArrowStreamDataAction and DefaultPlanExecutor that referenced non-existent close paths. The pre-existing leak is preserved (out of scope for this PR) but documented honestly with a path forward. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
1 parent ea519e4 commit 041dc0e

6 files changed

Lines changed: 56 additions & 22 deletions

File tree

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/NativeAllocatorPluginStats.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,21 @@ public class NativeAllocatorPluginStats implements PluginNodeStats {
3030

3131
private final NativeAllocatorPoolStats poolStats;
3232

33-
/** Wraps the given snapshot. */
33+
/**
34+
* Wraps the given snapshot.
35+
*
36+
* @param poolStats the pool stats snapshot to expose under {@code _nodes/stats}
37+
*/
3438
public NativeAllocatorPluginStats(NativeAllocatorPoolStats poolStats) {
3539
this.poolStats = poolStats;
3640
}
3741

38-
/** Reads from the wire. */
42+
/**
43+
* Reads from the wire.
44+
*
45+
* @param in the stream input
46+
* @throws IOException if reading fails
47+
*/
3948
public NativeAllocatorPluginStats(StreamInput in) throws IOException {
4049
this.poolStats = new NativeAllocatorPoolStats(in);
4150
}

plugins/arrow-base/src/main/java/org/opensearch/arrow/transport/ArrowBatchResponse.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,17 @@
4848
* <p><b>Allocator rules:</b>
4949
* <ul>
5050
* <li><b>Send side:</b> Source the allocator from one of the framework's named pools via
51-
* {@link ArrowNativeAllocator#getPoolAllocator(String)} (e.g. {@code POOL_FLIGHT},
52-
* {@code POOL_INGEST}, {@code POOL_QUERY}). All allocators must share the same root so
53-
* zero-copy transfers pass Arrow's {@code AllocationManager} associate check.</li>
51+
* {@link ArrowNativeAllocator#getPoolAllocator(String)}. Pick the pool that matches the
52+
* semantics of the producing component:
53+
* <ul>
54+
* <li>{@code POOL_FLIGHT} — transport-layer producers/consumers (arrow-flight-rpc and
55+
* plugins built on top of {@code StreamTransportService}).</li>
56+
* <li>{@code POOL_INGEST} — ingest-path producers (parquet-data-format VSR allocators).</li>
57+
* <li>{@code POOL_QUERY} — query-execution producers (analytics-engine fragments and
58+
* coordinator-side intermediate batches).</li>
59+
* </ul>
60+
* All allocators must share the same root so zero-copy transfers pass Arrow's
61+
* {@code AllocationManager} associate check.</li>
5462
* <li><b>Send side:</b> Allocators must outlive the transport stream — some transports
5563
* (e.g., gRPC zero-copy) retain buffer references beyond stream completion. Do not
5664
* create and close a child allocator per request.</li>

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,7 @@ private List<InetSocketAddress> bindToPort(InetAddress[] hostAddresses) {
282282
protected void stopInternal() {
283283
try {
284284
if (poolListener != null) {
285-
try {
286-
nativeAllocator.removeListener(poolListener);
287-
} catch (IllegalStateException ignored) {
288-
// Framework already torn down — nothing to detach from.
289-
}
285+
nativeAllocator.removeListener(poolListener);
290286
poolListener = null;
291287
}
292288
if (flightServer != null) {

plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/TransportNativeArrowStreamDataAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,18 @@
3838
*
3939
* <p>Demonstrates the pipelined producer pattern:
4040
* <ol>
41-
* <li>Receive an allocator owned by the plugin (closed in {@link StreamTransportExamplePlugin#close()})</li>
41+
* <li>Receive an allocator sourced from the framework's FLIGHT pool</li>
4242
* <li>For each batch, create a {@link VectorSchemaRoot}, populate it, and wrap it in a response</li>
4343
* <li>Send via {@code sendResponseBatch()} — the framework zero-copy transfers
4444
* the vectors into the Flight stream on the executor thread</li>
4545
* <li>Call {@code completeStream()} when done</li>
4646
* </ol>
47+
*
48+
* <p><b>Known leak:</b> the action's allocator is not closed on plugin teardown
49+
* (the action is Guice-managed and not held by {@link StreamTransportExamplePlugin}).
50+
* On node shutdown, ArrowNativeAllocator.close() will warn about this outstanding child.
51+
* A clean fix would make this action {@link java.io.Closeable} and have the plugin track
52+
* and close it from {@code Plugin#close()}.
4753
*/
4854
public class TransportNativeArrowStreamDataAction extends TransportAction<NativeArrowStreamDataRequest, NativeArrowStreamDataResponse> {
4955

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
import org.opensearch.analytics.spi.InstructionNode;
2626
import org.opensearch.arrow.allocator.ArrowNativeAllocator;
2727
import org.opensearch.arrow.spi.NativeAllocatorListener;
28-
import org.opensearch.arrow.spi.NativeAllocatorPoolConfig;import org.opensearch.common.concurrent.GatedCloseable;
28+
import org.opensearch.arrow.spi.NativeAllocatorPoolConfig;
29+
import org.opensearch.common.concurrent.GatedCloseable;
2930
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
3031
import org.opensearch.core.tasks.TaskCancelledException;
3132
import org.opensearch.index.engine.exec.IndexReaderProvider;
@@ -113,12 +114,7 @@ public AnalyticsSearchService(
113114

114115
@Override
115116
public void close() {
116-
try {
117-
nativeAllocator.removeListener(poolListener);
118-
} catch (Exception ignored) {
119-
// Best-effort — framework may have already torn down ahead of us in some
120-
// shutdown paths.
121-
}
117+
nativeAllocator.removeListener(poolListener);
122118
allocator.close();
123119
}
124120

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.analytics.planner.dag.PlanForker;
3434
import org.opensearch.analytics.planner.dag.QueryDAG;
3535
import org.opensearch.arrow.allocator.ArrowNativeAllocator;
36+
import org.opensearch.arrow.spi.NativeAllocatorListener;
3637
import org.opensearch.arrow.spi.NativeAllocatorPoolConfig;
3738
import org.opensearch.cluster.service.ClusterService;
3839
import org.opensearch.common.inject.Inject;
@@ -76,9 +77,15 @@ public class DefaultPlanExecutor extends HandledTransportAction<ActionRequest, A
7677
private final Executor searchExecutor;
7778
private final TaskManager taskManager;
7879
private final NodeClient client;
79-
// TODO: close on shutdown — currently arrow-base's root.close() will warn about this
80-
// outstanding child. Consider wrapping in a Guice-bound type owned by AnalyticsPlugin.
80+
private final ArrowNativeAllocator nativeAllocator;
81+
// Pre-existing leak: DefaultPlanExecutor is Guice-instantiated and not closed by the
82+
// framework, so coordinatorAllocator + the registered NativeAllocatorListener leak on
83+
// node shutdown. ArrowNativeAllocator.close() will warn about the outstanding child.
84+
// Fix would require either (a) making DefaultPlanExecutor LifecycleComponent so Node
85+
// closes it, or (b) wrapping in a Guice-bound singleton that AnalyticsPlugin owns and
86+
// closes from its own close() method. Out of scope for the SPI-removal migration.
8187
private final BufferAllocator coordinatorAllocator;
88+
private final NativeAllocatorListener poolListener;
8289
private volatile long perQueryBufferLimit;
8390

8491
@Inject
@@ -102,16 +109,28 @@ public DefaultPlanExecutor(
102109
this.taskManager = transportService.getTaskManager();
103110
this.client = client;
104111
this.scheduler = scheduler;
112+
this.nativeAllocator = nativeAllocator;
105113
// Source the coordinator allocator from the framework's QUERY pool so coordinator-side
106114
// allocations (held shard responses, intermediate batches during reduce) count against
107115
// parquet.native.pool.query.max alongside the data-node-side per-fragment allocators in
108116
// AnalyticsSearchService. Before this, coordinator allocations were root-siblings outside
109117
// any pool and invisible to _nodes/stats.native_allocator.pools.query.allocated.
110-
this.coordinatorAllocator = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_QUERY)
111-
.newChildAllocator("coordinator", 0, Long.MAX_VALUE);
118+
BufferAllocator queryPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_QUERY);
119+
this.coordinatorAllocator = queryPool.newChildAllocator("coordinator", 0, queryPool.getLimit());
112120
this.perQueryBufferLimit = AnalyticsPlugin.COORDINATOR_BUFFER_LIMIT.get(clusterService.getSettings());
113121
clusterService.getClusterSettings()
114122
.addSettingsUpdateConsumer(AnalyticsPlugin.COORDINATOR_BUFFER_LIMIT, v -> perQueryBufferLimit = v);
123+
124+
// Subscribe to QUERY pool resize events so a dynamic update to
125+
// parquet.native.pool.query.max propagates into this captured child allocator,
126+
// matching the listener pattern used by AnalyticsSearchService and FlightTransport.
127+
// Without this, runtime grows are silently ignored once a query is in flight.
128+
this.poolListener = (poolName, newLimit) -> {
129+
if (NativeAllocatorPoolConfig.POOL_QUERY.equals(poolName)) {
130+
coordinatorAllocator.setLimit(newLimit);
131+
}
132+
};
133+
nativeAllocator.addListener(poolListener);
115134
}
116135

117136
@Override

0 commit comments

Comments
 (0)