Skip to content

Commit efe1210

Browse files
Add configurable coordinator buffer limit for per-query Arrow allocator (#21726)
* Add configurable coordinator buffer limit for per-query Arrow allocator Replace hardcoded 256MB per-query Arrow allocator limit with dynamic cluster setting `analytics.coordinator.buffer_limit`. Add debug/warn logging at allocator close to detect memory leaks. Signed-off-by: Bowen Lan <bowenlan@amazon.com> Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor coordinator allocator lifecycle and address review feedback - Move coordinator allocator creation to AnalyticsPlugin (proper lifecycle, closed on plugin shutdown) - DefaultPlanExecutor receives it via Guice injection - QueryContext receives a ready-to-use allocator + ownsAllocator flag (no more lazy init or branching inside QueryContext) - Setting value 0 = use coordinator allocator directly (no per-query child) - Guard per-query allocator against leak if QueryContext construction fails - Test factories create per-test child allocators for proper leak isolation Signed-off-by: Bowen Lan <bowenlan@amazon.com> Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> --------- Signed-off-by: Bowen Lan <bowenlan@amazon.com> Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 70cf46f commit efe1210

4 files changed

Lines changed: 79 additions & 65 deletions

File tree

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/AnalyticsPlugin.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.opensearch.cluster.service.ClusterService;
3131
import org.opensearch.common.inject.Module;
3232
import org.opensearch.common.inject.TypeLiteral;
33+
import org.opensearch.common.settings.Setting;
3334
import org.opensearch.core.action.ActionResponse;
3435
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
3536
import org.opensearch.core.xcontent.NamedXContentRegistry;
@@ -62,6 +63,14 @@ public class AnalyticsPlugin extends Plugin implements ExtensiblePlugin, ActionP
6263

6364
private static final Logger logger = LogManager.getLogger(AnalyticsPlugin.class);
6465

66+
public static final Setting<Long> COORDINATOR_BUFFER_LIMIT = Setting.longSetting(
67+
"analytics.coordinator.buffer_limit",
68+
256L * 1024 * 1024,
69+
0L,
70+
Setting.Property.NodeScope,
71+
Setting.Property.Dynamic
72+
);
73+
6574
/**
6675
* Creates a new analytics engine hub plugin.
6776
*/
@@ -129,6 +138,11 @@ public Collection<Module> createGuiceModules() {
129138
return List.of(new ActionHandler<>(AnalyticsQueryAction.INSTANCE, DefaultPlanExecutor.class));
130139
}
131140

141+
@Override
142+
public List<Setting<?>> getSettings() {
143+
return List.of(COORDINATOR_BUFFER_LIMIT);
144+
}
145+
132146
@Override
133147
public void close() {
134148
if (searchService != null) {

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.analytics.exec;
1010

11+
import org.apache.arrow.memory.BufferAllocator;
1112
import org.apache.arrow.vector.VectorSchemaRoot;
1213
import org.apache.calcite.rel.RelNode;
1314
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
@@ -18,6 +19,7 @@
1819
import org.opensearch.action.support.ActionFilters;
1920
import org.opensearch.action.support.HandledTransportAction;
2021
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
22+
import org.opensearch.analytics.AnalyticsPlugin;
2123
import org.opensearch.analytics.EngineContext;
2224
import org.opensearch.analytics.exec.action.AnalyticsQueryAction;
2325
import org.opensearch.analytics.exec.task.AnalyticsQueryTask;
@@ -73,7 +75,10 @@ public class DefaultPlanExecutor extends HandledTransportAction<ActionRequest, A
7375
private final Executor searchExecutor;
7476
private final TaskManager taskManager;
7577
private final NodeClient client;
76-
private final ArrowAllocatorService allocatorService;
78+
// TODO: close on shutdown — currently arrow-base's root.close() will warn about this
79+
// outstanding child. Consider wrapping in a Guice-bound type owned by AnalyticsPlugin.
80+
private final BufferAllocator coordinatorAllocator;
81+
private volatile long perQueryBufferLimit;
7782

7883
@Inject
7984
public DefaultPlanExecutor(
@@ -96,7 +101,10 @@ public DefaultPlanExecutor(
96101
this.taskManager = transportService.getTaskManager();
97102
this.client = client;
98103
this.scheduler = scheduler;
99-
this.allocatorService = allocatorService;
104+
this.coordinatorAllocator = allocatorService.newChildAllocator("coordinator", Long.MAX_VALUE);
105+
this.perQueryBufferLimit = AnalyticsPlugin.COORDINATOR_BUFFER_LIMIT.get(clusterService.getSettings());
106+
clusterService.getClusterSettings()
107+
.addSettingsUpdateConsumer(AnalyticsPlugin.COORDINATOR_BUFFER_LIMIT, v -> perQueryBufferLimit = v);
100108
}
101109

102110
@Override
@@ -144,7 +152,23 @@ private void executeInternal(RelNode logicalFragment, ActionListener<Iterable<Ob
144152
"analytics_query",
145153
new AnalyticsQueryTaskRequest(dag.queryId(), null)
146154
);
147-
final QueryContext context = new QueryContext(dag, searchExecutor, queryTask, allocatorService);
155+
final BufferAllocator queryAllocator;
156+
final boolean ownsAllocator;
157+
if (perQueryBufferLimit <= 0) {
158+
queryAllocator = coordinatorAllocator;
159+
ownsAllocator = false;
160+
} else {
161+
queryAllocator = coordinatorAllocator.newChildAllocator("query-" + dag.queryId(), 0, perQueryBufferLimit);
162+
ownsAllocator = true;
163+
}
164+
logger.debug("[query-{}] Arrow allocator created, limit={}B", dag.queryId(), perQueryBufferLimit);
165+
final QueryContext context;
166+
try {
167+
context = new QueryContext(dag, searchExecutor, queryTask, queryAllocator, ownsAllocator);
168+
} catch (Exception e) {
169+
if (ownsAllocator) queryAllocator.close();
170+
throw e;
171+
}
148172

149173
ActionListener<Iterable<VectorSchemaRoot>> batchesListener = ActionListener.runAfter(
150174
ActionListener.wrap(batches -> listener.onResponse(batchesToRows(batches)), listener::onFailure),

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/QueryContext.java

Lines changed: 25 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.opensearch.analytics.backend.AnalyticsOperationListener;
1414
import org.opensearch.analytics.exec.task.AnalyticsQueryTask;
1515
import org.opensearch.analytics.planner.dag.QueryDAG;
16-
import org.opensearch.arrow.memory.ArrowAllocatorService;
1716

1817
import java.util.List;
1918
import java.util.concurrent.Executor;
@@ -31,30 +30,24 @@ public class QueryContext {
3130
// TODO: make configurable via cluster setting (like search.max_concurrent_shard_requests)
3231
private static final int DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS = 5;
3332

34-
/** Default per-query memory limit for Arrow allocations (256 MB). */
35-
private static final long DEFAULT_PER_QUERY_MEMORY_LIMIT = 256L * 1024 * 1024;
36-
3733
private final QueryDAG dag;
3834
private final Executor searchExecutor;
3935
private final AnalyticsQueryTask parentTask;
4036
private final int maxConcurrentShardRequests;
41-
private final long perQueryMemoryLimit;
4237
private final List<AnalyticsOperationListener> operationListeners;
43-
private final ArrowAllocatorService allocatorService;
44-
private volatile BufferAllocator bufferAllocator;
38+
private final BufferAllocator allocator;
39+
private final boolean ownsAllocator;
4540
private volatile ExecutorService localTaskExecutor;
4641
private boolean closed; // guarded by `this`
4742

48-
public QueryContext(QueryDAG dag, Executor searchExecutor, AnalyticsQueryTask parentTask, ArrowAllocatorService allocatorService) {
49-
this(
50-
dag,
51-
searchExecutor,
52-
parentTask,
53-
DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS,
54-
DEFAULT_PER_QUERY_MEMORY_LIMIT,
55-
List.of(),
56-
allocatorService
57-
);
43+
public QueryContext(
44+
QueryDAG dag,
45+
Executor searchExecutor,
46+
AnalyticsQueryTask parentTask,
47+
BufferAllocator allocator,
48+
boolean ownsAllocator
49+
) {
50+
this(dag, searchExecutor, parentTask, DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS, List.of(), allocator, ownsAllocator);
5851
}
5952

6053
/** Full-parameter constructor. Private; tests use {@link #forTest} factories. */
@@ -63,17 +56,17 @@ private QueryContext(
6356
Executor searchExecutor,
6457
AnalyticsQueryTask parentTask,
6558
int maxConcurrentShardRequests,
66-
long perQueryMemoryLimit,
6759
List<AnalyticsOperationListener> operationListeners,
68-
ArrowAllocatorService allocatorService
60+
BufferAllocator allocator,
61+
boolean ownsAllocator
6962
) {
7063
this.dag = dag;
7164
this.searchExecutor = searchExecutor;
7265
this.parentTask = parentTask;
7366
this.maxConcurrentShardRequests = maxConcurrentShardRequests;
74-
this.perQueryMemoryLimit = perQueryMemoryLimit;
7567
this.operationListeners = operationListeners;
76-
this.allocatorService = allocatorService;
68+
this.allocator = allocator;
69+
this.ownsAllocator = ownsAllocator;
7770
}
7871

7972
public QueryDAG dag() {
@@ -101,22 +94,8 @@ public List<AnalyticsOperationListener> operationListeners() {
10194
return operationListeners;
10295
}
10396

104-
/** Lazy per-query allocator (child of shared root) with {@link #perQueryMemoryLimit}. */
10597
public BufferAllocator bufferAllocator() {
106-
BufferAllocator alloc = bufferAllocator;
107-
if (alloc == null) {
108-
synchronized (this) {
109-
alloc = bufferAllocator;
110-
if (alloc == null) {
111-
if (closed) {
112-
throw new IllegalStateException("QueryContext closed for query " + dag.queryId());
113-
}
114-
alloc = allocatorService.newChildAllocator("query-" + dag.queryId(), perQueryMemoryLimit);
115-
bufferAllocator = alloc;
116-
}
117-
}
118-
}
119-
return alloc;
98+
return allocator;
12099
}
121100

122101
/** Lazy per-query virtual-thread executor for LOCAL tasks. */
@@ -139,14 +118,17 @@ public ExecutorService localTaskExecutor() {
139118
return exec;
140119
}
141120

121+
boolean ownsAllocator() {
122+
return ownsAllocator;
123+
}
124+
142125
/** Idempotent. Serialised with lazy-init accessors; post-close accessors throw. */
143126
public void close() {
144127
synchronized (this) {
145128
if (closed) return;
146129
closed = true;
147-
if (bufferAllocator != null) {
148-
bufferAllocator.close();
149-
bufferAllocator = null;
130+
if (ownsAllocator) {
131+
allocator.close();
150132
}
151133
if (localTaskExecutor != null) {
152134
localTaskExecutor.shutdown();
@@ -157,27 +139,7 @@ public void close() {
157139

158140
// ─── Test factories ────────────────────────────────────────────────
159141

160-
/** Test-only: wraps a fresh {@link RootAllocator} as an {@link ArrowAllocatorService}. */
161-
private static ArrowAllocatorService testAllocatorService() {
162-
return new ArrowAllocatorService() {
163-
private final RootAllocator root = new RootAllocator(Long.MAX_VALUE);
164-
165-
@Override
166-
public BufferAllocator newChildAllocator(String name, long limit) {
167-
return root.newChildAllocator(name, 0, limit);
168-
}
169-
170-
@Override
171-
public long getAllocatedMemory() {
172-
return root.getAllocatedMemory();
173-
}
174-
175-
@Override
176-
public long getPeakMemoryAllocation() {
177-
return root.getPeakMemoryAllocation();
178-
}
179-
};
180-
}
142+
private static final RootAllocator TEST_ROOT = new RootAllocator(Long.MAX_VALUE);
181143

182144
/** Creates a test context with a synchronous executor. */
183145
public static QueryContext forTest(QueryDAG dag, AnalyticsQueryTask parentTask) {
@@ -186,14 +148,15 @@ public static QueryContext forTest(QueryDAG dag, AnalyticsQueryTask parentTask)
186148

187149
/** Creates a test context with a synchronous executor and the supplied operation listeners. */
188150
public static QueryContext forTest(QueryDAG dag, AnalyticsQueryTask parentTask, List<AnalyticsOperationListener> operationListeners) {
151+
BufferAllocator testAllocator = TEST_ROOT.newChildAllocator("test-" + dag.queryId(), 0, Long.MAX_VALUE);
189152
return new QueryContext(
190153
dag,
191154
Runnable::run,
192155
parentTask,
193156
DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS,
194-
Long.MAX_VALUE,
195157
operationListeners,
196-
testAllocatorService()
158+
testAllocator,
159+
true
197160
);
198161
}
199162
}

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/QueryExecution.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.analytics.exec;
1010

11+
import org.apache.arrow.memory.BufferAllocator;
1112
import org.apache.arrow.vector.VectorSchemaRoot;
1213
import org.apache.logging.log4j.LogManager;
1314
import org.apache.logging.log4j.Logger;
@@ -118,9 +119,21 @@ public void close() {
118119
if (closed.compareAndSet(false, true) == false) return;
119120
runQuietly("terminal sink close", this::closeTerminalSink);
120121
// TODO: Re-evaluate this per query child allocator
122+
logAllocatorState();
121123
runQuietly("query context close", config::close);
122124
}
123125

126+
private void logAllocatorState() {
127+
if (!config.ownsAllocator()) return;
128+
BufferAllocator allocator = config.bufferAllocator();
129+
long allocated = allocator.getAllocatedMemory();
130+
if (allocated > 0) {
131+
logger.warn("[query-{}] Arrow allocator closing with {}B still allocated — potential leak", config.queryId(), allocated);
132+
} else {
133+
logger.debug("[query-{}] Arrow allocator closed cleanly", config.queryId());
134+
}
135+
}
136+
124137
// ─── Internal: query-level state machine ─────────────────────────────
125138

126139
/** On terminal transition: fires user listener exactly once + runs {@link #close()}. */

0 commit comments

Comments
 (0)