Skip to content

Commit ac2b2fd

Browse files
authored
Add unified native allocator framework with elastic rebalancing (#21703)
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent 29c5fdb commit ac2b2fd

32 files changed

Lines changed: 1475 additions & 272 deletions

File tree

libs/arrow-spi/build.gradle

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
apply plugin: 'opensearch.publish'
10+
11+
dependencies {
12+
api project(':libs:opensearch-core')
13+
api project(':libs:opensearch-common')
14+
testImplementation project(':test:framework')
15+
}
16+
17+
tasks.named('forbiddenApisMain').configure {
18+
replaceSignatureFiles 'jdk-signatures'
19+
}
20+
21+
tasks.named('forbiddenApisTest').configure {
22+
replaceSignatureFiles 'jdk-signatures'
23+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.arrow.spi;
10+
11+
import java.io.Closeable;
12+
13+
/**
14+
* Arrow-agnostic interface for a hierarchical native memory allocator.
15+
*
16+
* <p>The implementation (backed by Arrow's {@code RootAllocator}) is provided by
17+
* a plugin. The SPI allows other subsystems to interact with the allocator
18+
* without depending on Arrow classes.
19+
*
20+
* <p>Plugins that need Arrow allocators obtain the implementation via
21+
* service lookup or plugin extension and call {@link #getOrCreatePool} to
22+
* register their pool.
23+
*
24+
* @opensearch.api
25+
*/
26+
public interface NativeAllocator extends Closeable {
27+
28+
/**
29+
* Returns the named pool, creating it on first access with the given limit.
30+
* Subsequent calls with the same name return the same pool (first-call limit wins).
31+
*
32+
* @param poolName logical pool name (e.g., "query", "flight")
33+
* @param limit maximum bytes this pool can allocate in aggregate
34+
* @return an opaque pool handle
35+
*/
36+
PoolHandle getOrCreatePool(String poolName, long limit);
37+
38+
/**
39+
* Updates the limit of an existing pool.
40+
*
41+
* @param poolName logical pool name
42+
* @param newLimit new maximum bytes for the pool
43+
*/
44+
void setPoolLimit(String poolName, long newLimit);
45+
46+
/**
47+
* Sets the root-level memory limit for the entire allocator.
48+
*
49+
* @param limit new maximum bytes for the root allocator
50+
*/
51+
void setRootLimit(long limit);
52+
53+
/**
54+
* Collects a point-in-time stats snapshot across all pools.
55+
*/
56+
NativeAllocatorPoolStats stats();
57+
58+
/**
59+
* Opaque handle to a memory pool. Plugins downcast to the concrete type
60+
* (e.g., Arrow's {@code BufferAllocator}) in the implementation layer.
61+
*/
62+
interface PoolHandle {
63+
64+
/**
65+
* Creates a named child allocation within this pool.
66+
*
67+
* @param childName name for debugging
68+
* @param childLimit maximum bytes for the child
69+
* @return an opaque child handle (downcast to BufferAllocator in Arrow impl)
70+
*/
71+
PoolHandle newChild(String childName, long childLimit);
72+
73+
/**
74+
* Returns the current allocated bytes for this pool/child.
75+
*/
76+
long allocatedBytes();
77+
78+
/**
79+
* Returns the peak memory allocation.
80+
*/
81+
long peakBytes();
82+
83+
/**
84+
* Returns the configured limit.
85+
*/
86+
long limit();
87+
88+
/**
89+
* Releases this allocation handle.
90+
*/
91+
void close();
92+
}
93+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.arrow.spi;
10+
11+
/**
12+
* Pool name constants and setting keys for native allocator pools.
13+
*
14+
* <p>Each pool has a min (guaranteed reservation) and max (burst limit).
15+
* The rebalancer ensures every pool can always allocate up to its min,
16+
* and distributes unused capacity up to each pool's max.
17+
*
18+
* <p>Limits are provided via {@code opensearch.yml} or the cluster settings API.
19+
* The setting keys follow the pattern
20+
* {@code native.allocator.pool.<name>.min} and {@code native.allocator.pool.<name>.max}.
21+
*
22+
* <p>This class is Arrow-agnostic — it defines the logical pool topology
23+
* without referencing any Arrow classes.
24+
*
25+
* @opensearch.api
26+
*/
27+
public final class NativeAllocatorPoolConfig {
28+
29+
/** Pool name for Arrow Flight RPC memory. */
30+
public static final String POOL_FLIGHT = "flight";
31+
/** Pool name for ingest pipeline memory. */
32+
public static final String POOL_INGEST = "ingest";
33+
34+
/** Setting key for the root allocator limit. */
35+
public static final String SETTING_ROOT_LIMIT = "native.allocator.root.limit";
36+
37+
/** Setting key for the Flight pool minimum. */
38+
public static final String SETTING_FLIGHT_MIN = "native.allocator.pool.flight.min";
39+
/** Setting key for the Flight pool maximum. */
40+
public static final String SETTING_FLIGHT_MAX = "native.allocator.pool.flight.max";
41+
/** Setting key for the ingest pool minimum. */
42+
public static final String SETTING_INGEST_MIN = "native.allocator.pool.ingest.min";
43+
/** Setting key for the ingest pool maximum. */
44+
public static final String SETTING_INGEST_MAX = "native.allocator.pool.ingest.max";
45+
46+
private NativeAllocatorPoolConfig() {}
47+
}
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.arrow.spi;
10+
11+
import org.opensearch.core.common.io.stream.StreamInput;
12+
import org.opensearch.core.common.io.stream.StreamOutput;
13+
import org.opensearch.core.common.io.stream.Writeable;
14+
import org.opensearch.core.common.unit.ByteSizeValue;
15+
import org.opensearch.core.xcontent.ToXContentFragment;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
18+
import java.io.IOException;
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
/**
24+
* Point-in-time snapshot of native memory pool statistics for a node.
25+
*
26+
* <p>Arrow-agnostic — describes pool utilization without referencing
27+
* Arrow classes. The concrete implementation (backed by Arrow's
28+
* {@code BufferAllocator}) lives in the plugin that owns the pools.
29+
*
30+
* @opensearch.api
31+
*/
32+
public class NativeAllocatorPoolStats implements Writeable, ToXContentFragment {
33+
34+
private final long rootAllocatedBytes;
35+
private final long rootPeakBytes;
36+
private final long rootLimitBytes;
37+
private final List<PoolStats> pools;
38+
39+
/**
40+
* Creates a new stats snapshot from the given values.
41+
*
42+
* @param rootAllocatedBytes current bytes allocated by the root
43+
* @param rootPeakBytes peak bytes allocated by the root
44+
* @param rootLimitBytes configured root limit
45+
* @param pools per-pool stats
46+
*/
47+
public NativeAllocatorPoolStats(long rootAllocatedBytes, long rootPeakBytes, long rootLimitBytes, List<PoolStats> pools) {
48+
this.rootAllocatedBytes = rootAllocatedBytes;
49+
this.rootPeakBytes = rootPeakBytes;
50+
this.rootLimitBytes = rootLimitBytes;
51+
this.pools = Collections.unmodifiableList(pools);
52+
}
53+
54+
/**
55+
* Deserializes from stream.
56+
*
57+
* @param in the stream input
58+
*/
59+
public NativeAllocatorPoolStats(StreamInput in) throws IOException {
60+
this.rootAllocatedBytes = in.readVLong();
61+
this.rootPeakBytes = in.readVLong();
62+
this.rootLimitBytes = in.readVLong();
63+
int count = in.readVInt();
64+
List<PoolStats> list = new ArrayList<>(count);
65+
for (int i = 0; i < count; i++) {
66+
list.add(new PoolStats(in));
67+
}
68+
this.pools = Collections.unmodifiableList(list);
69+
}
70+
71+
@Override
72+
public void writeTo(StreamOutput out) throws IOException {
73+
out.writeVLong(rootAllocatedBytes);
74+
out.writeVLong(rootPeakBytes);
75+
out.writeVLong(rootLimitBytes);
76+
out.writeVInt(pools.size());
77+
for (PoolStats pool : pools) {
78+
pool.writeTo(out);
79+
}
80+
}
81+
82+
@Override
83+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
84+
builder.startObject("native_allocator");
85+
86+
builder.startObject("root");
87+
builder.humanReadableField("allocated_bytes", "allocated", new ByteSizeValue(rootAllocatedBytes));
88+
builder.humanReadableField("peak_bytes", "peak", new ByteSizeValue(rootPeakBytes));
89+
builder.humanReadableField("limit_bytes", "limit", new ByteSizeValue(rootLimitBytes));
90+
builder.endObject();
91+
92+
builder.startObject("pools");
93+
for (PoolStats pool : pools) {
94+
pool.toXContent(builder, params);
95+
}
96+
builder.endObject();
97+
98+
builder.endObject();
99+
return builder;
100+
}
101+
102+
/** Returns the root allocator's currently allocated bytes. */
103+
public long getRootAllocatedBytes() {
104+
return rootAllocatedBytes;
105+
}
106+
107+
/** Returns the root allocator's peak allocated bytes. */
108+
public long getRootPeakBytes() {
109+
return rootPeakBytes;
110+
}
111+
112+
/** Returns the root allocator's configured limit in bytes. */
113+
public long getRootLimitBytes() {
114+
return rootLimitBytes;
115+
}
116+
117+
/** Returns the per-pool statistics. */
118+
public List<PoolStats> getPools() {
119+
return pools;
120+
}
121+
122+
/**
123+
* Per-pool statistics snapshot.
124+
*/
125+
public static class PoolStats implements Writeable, ToXContentFragment {
126+
127+
private final String name;
128+
private final long allocatedBytes;
129+
private final long peakBytes;
130+
private final long limitBytes;
131+
private final int childCount;
132+
133+
/**
134+
* Creates a new pool stats snapshot.
135+
*
136+
* @param name pool name
137+
* @param allocatedBytes current allocated bytes
138+
* @param peakBytes peak allocated bytes
139+
* @param limitBytes configured limit
140+
* @param childCount number of child allocators
141+
*/
142+
public PoolStats(String name, long allocatedBytes, long peakBytes, long limitBytes, int childCount) {
143+
this.name = name;
144+
this.allocatedBytes = allocatedBytes;
145+
this.peakBytes = peakBytes;
146+
this.limitBytes = limitBytes;
147+
this.childCount = childCount;
148+
}
149+
150+
/**
151+
* Deserializes from stream.
152+
*
153+
* @param in the stream input
154+
*/
155+
public PoolStats(StreamInput in) throws IOException {
156+
this.name = in.readString();
157+
this.allocatedBytes = in.readVLong();
158+
this.peakBytes = in.readVLong();
159+
this.limitBytes = in.readVLong();
160+
this.childCount = in.readVInt();
161+
}
162+
163+
@Override
164+
public void writeTo(StreamOutput out) throws IOException {
165+
out.writeString(name);
166+
out.writeVLong(allocatedBytes);
167+
out.writeVLong(peakBytes);
168+
out.writeVLong(limitBytes);
169+
out.writeVInt(childCount);
170+
}
171+
172+
@Override
173+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
174+
builder.startObject(name);
175+
builder.humanReadableField("allocated_bytes", "allocated", new ByteSizeValue(allocatedBytes));
176+
builder.humanReadableField("peak_bytes", "peak", new ByteSizeValue(peakBytes));
177+
builder.humanReadableField("limit_bytes", "limit", new ByteSizeValue(limitBytes));
178+
builder.field("child_count", childCount);
179+
builder.endObject();
180+
return builder;
181+
}
182+
183+
/** Returns the pool name. */
184+
public String getName() {
185+
return name;
186+
}
187+
188+
/** Returns the currently allocated bytes. */
189+
public long getAllocatedBytes() {
190+
return allocatedBytes;
191+
}
192+
193+
/** Returns the peak allocated bytes. */
194+
public long getPeakBytes() {
195+
return peakBytes;
196+
}
197+
198+
/** Returns the configured limit in bytes. */
199+
public long getLimitBytes() {
200+
return limitBytes;
201+
}
202+
203+
/** Returns the number of child allocators. */
204+
public int getChildCount() {
205+
return childCount;
206+
}
207+
}
208+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Service Provider Interface for Arrow-agnostic native memory allocation.
11+
*/
12+
package org.opensearch.arrow.spi;

0 commit comments

Comments
 (0)