Skip to content

Commit 40f2c9d

Browse files
authored
Adding virtual memory pools (opensearch-project#21885)
* Adding virtual memory pools Signed-off-by: rayshrey <rayshrey@amazon.com> * Node stats BWC and other fixes Signed-off-by: rayshrey <rayshrey@amazon.com> --------- Signed-off-by: rayshrey <rayshrey@amazon.com>
1 parent c78f251 commit 40f2c9d

59 files changed

Lines changed: 2790 additions & 1649 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

libs/arrow-spi/build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ apply plugin: 'opensearch.publish'
1111
dependencies {
1212
api project(':libs:opensearch-core')
1313
api project(':libs:opensearch-common')
14-
testImplementation project(':test:framework')
14+
testImplementation(project(':test:framework')) {
15+
exclude group: 'org.opensearch', module: 'opensearch-arrow-spi'
16+
}
1517
}
1618

1719
tasks.named('forbiddenApisMain').configure {

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocator.java

Lines changed: 88 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,52 +9,114 @@
99
package org.opensearch.arrow.spi;
1010

1111
import java.io.Closeable;
12+
import java.util.Set;
13+
import java.util.function.Consumer;
14+
import java.util.function.Supplier;
1215

1316
/**
14-
* Arrow-agnostic interface for a hierarchical native memory allocator.
17+
* Unified native memory allocator interface.
1518
*
16-
* <p>The implementation (backed by Arrow's {@code RootAllocator}) is provided by
17-
* a plugin. The SPI allows other subsystems to interact with the allocator
18-
* without depending on Arrow classes.
19-
*
20-
* <p>Plugins that need Arrow allocators obtain the implementation via
21-
* service lookup or plugin extension and call {@link #getOrCreatePool} to
22-
* register their pool.
19+
* <p>Manages memory pools under a shared budget. Each pool has a minimum
20+
* guaranteed allocation and a maximum burst limit. Implementations may
21+
* redistribute unused capacity across pools.
2322
*
2423
* @opensearch.api
2524
*/
2625
public interface NativeAllocator extends Closeable {
2726

2827
/**
29-
* Returns the named pool, creating it on first access with the given limit.
30-
* Subsequent calls with the same name return the same pool (first-call limit wins).
28+
* Returns the named pool, creating it on first access.
29+
* Subsequent calls with the same name return the existing pool (first-call config wins).
3130
*
32-
* @param poolName logical pool name (e.g., "query", "flight")
33-
* @param limit maximum bytes this pool can allocate in aggregate
31+
* @param poolName logical pool name
32+
* @param min minimum guaranteed bytes
33+
* @param max maximum bytes this pool can allocate
34+
* @param group the group this pool belongs to for aggregated stats, or null
3435
* @return an opaque pool handle
3536
*/
36-
PoolHandle getOrCreatePool(String poolName, long limit);
37+
PoolHandle getOrCreatePool(String poolName, long min, long max, PoolGroup group);
3738

3839
/**
39-
* Updates the limit of an existing pool. Children of the pool allocator
40-
* inherit the change automatically via Arrow's parent-cap check at
41-
* allocation time — no notification SPI is needed.
40+
* Updates the effective limit of an existing pool.
4241
*
4342
* @param poolName logical pool name
4443
* @param newLimit new maximum bytes for the pool
4544
*/
4645
void setPoolLimit(String poolName, long newLimit);
4746

4847
/**
49-
* Sets the root-level memory limit for the entire allocator.
48+
* Registers a virtual pool with initial min/max and a callback
49+
* invoked when the pool's limit changes.
50+
*
51+
* @param poolName logical pool name
52+
* @param min minimum guaranteed bytes
53+
* @param max initial maximum bytes (the pool's starting limit)
54+
* @param group the group this pool belongs to for aggregated stats
55+
* @param limitSetter callback invoked when the pool limit changes
56+
* @return a handle to update stats from the native layer
57+
*/
58+
VirtualPoolHandle registerVirtualPool(String poolName, long min, long max, PoolGroup group, Consumer<Long> limitSetter);
59+
60+
/**
61+
* Updates the minimum guaranteed bytes for a pool.
62+
*
63+
* @param poolName logical pool name
64+
* @param newMin new minimum bytes
65+
*/
66+
void setPoolMin(String poolName, long newMin);
67+
68+
/**
69+
* Returns all registered pool names.
70+
*/
71+
Set<String> getAllPoolNames();
72+
73+
/**
74+
* Adds a callback invoked before stats collection to refresh pool usage data.
75+
*
76+
* @param refresher runnable that updates pool stats
77+
*/
78+
void addStatsRefresher(Runnable refresher);
79+
80+
/**
81+
* Sets the supplier for process-wide native memory stats.
82+
*
83+
* @param supplier returns [allocatedBytes, residentBytes]
84+
*/
85+
void setNativeMemoryStatsSupplier(Supplier<long[]> supplier);
86+
87+
/**
88+
* Registers a listener invoked when the effective limit of a pool group changes.
89+
* The consumer receives the new total effective limit (sum of all pools in the group).
5090
*
51-
* @param limit new maximum bytes for the root allocator
91+
* @param group the pool group to listen on
92+
* @param listener consumer that receives the new grouped limit in bytes
93+
*/
94+
void addPoolGroupLimitListener(PoolGroup group, Consumer<Long> listener);
95+
96+
/**
97+
* Handle for a virtual pool. Plugins update stats via this handle.
5298
*/
53-
void setRootLimit(long limit);
99+
interface VirtualPoolHandle {
100+
/**
101+
* Update the current usage stats.
102+
*
103+
* @param allocatedBytes current allocated bytes
104+
* @param peakBytes peak allocated bytes
105+
*/
106+
void updateStats(long allocatedBytes, long peakBytes);
107+
108+
/** Returns current allocated bytes. */
109+
long allocatedBytes();
110+
111+
/** Returns peak allocated bytes. */
112+
long peakBytes();
113+
114+
/** Returns current limit. */
115+
long limit();
116+
}
54117

55118
/**
56-
* Opaque handle to a memory pool. Plugins downcast to the concrete type
57-
* (e.g., Arrow's {@code BufferAllocator}) in the implementation layer.
119+
* Opaque handle to a memory pool.
58120
*/
59121
interface PoolHandle {
60122

@@ -63,28 +125,20 @@ interface PoolHandle {
63125
*
64126
* @param childName name for debugging
65127
* @param childLimit maximum bytes for the child
66-
* @return an opaque child handle (downcast to BufferAllocator in Arrow impl)
128+
* @return a child handle
67129
*/
68130
PoolHandle newChild(String childName, long childLimit);
69131

70-
/**
71-
* Returns the current allocated bytes for this pool/child.
72-
*/
132+
/** Returns the current allocated bytes. */
73133
long allocatedBytes();
74134

75-
/**
76-
* Returns the peak memory allocation.
77-
*/
135+
/** Returns the peak memory allocation. */
78136
long peakBytes();
79137

80-
/**
81-
* Returns the configured limit.
82-
*/
138+
/** Returns the configured limit. */
83139
long limit();
84140

85-
/**
86-
* Releases this allocation handle.
87-
*/
141+
/** Releases this allocation handle. */
88142
void close();
89143
}
90144
}

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocatorPoolConfig.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ public final class NativeAllocatorPoolConfig {
3333
/** Pool name for query-execution memory (analytics-engine fragments and per-query allocators). */
3434
public static final String POOL_QUERY = "query";
3535

36-
/** Setting key for the root allocator limit. */
37-
public static final String SETTING_ROOT_LIMIT = "native.allocator.root.limit";
38-
3936
/** Setting key for the Flight pool minimum. */
4037
public static final String SETTING_FLIGHT_MIN = "native.allocator.pool.flight.min";
4138
/** Setting key for the Flight pool maximum. */
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.arrow.spi;
10+
11+
/**
12+
* Groups that memory pools belong to for aggregated customer-facing stats.
13+
* Each pool is assigned to exactly one group at registration time.
14+
*
15+
* @opensearch.api
16+
*/
17+
public enum PoolGroup {
18+
/** Arrow Flight transport pool group. */
19+
TRANSPORT("transport"),
20+
/** Query and analytics execution pool group. */
21+
SEARCH("search"),
22+
/** Ingest and write path pool group. */
23+
INDEXING("indexing"),
24+
/** Background merge operations pool group. */
25+
MERGE("merge");
26+
27+
private final String name;
28+
29+
PoolGroup(String name) {
30+
this.name = name;
31+
}
32+
33+
/** Returns the group name used in stats output. */
34+
public String getName() {
35+
return name;
36+
}
37+
}

libs/arrow-spi/src/test/java/org/opensearch/arrow/spi/NativeAllocatorPoolConfigTests.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,4 @@ public void testSettingKeys() {
2626
assertEquals("native.allocator.pool.query.min", NativeAllocatorPoolConfig.SETTING_QUERY_MIN);
2727
assertEquals("native.allocator.pool.query.max", NativeAllocatorPoolConfig.SETTING_QUERY_MAX);
2828
}
29-
30-
public void testRootSettingKey() {
31-
assertEquals("native.allocator.root.limit", NativeAllocatorPoolConfig.SETTING_ROOT_LIMIT);
32-
}
3329
}

0 commit comments

Comments
 (0)