Skip to content

Commit 373e712

Browse files
feat(memory): add customizable MemoryAllocator interface (#2467)
## What does this PR do? This PR introduces a new `MemoryAllocator` interface that allows customisation of memory allocation strategies in `MemoryBuffer`. This enables users to implement custom allocation policies. ## Related issues - Closes #2459 - #2457 - Closes #2350. ## Does this PR introduce any user-facing change? <!-- If any user-facing interface changes, please [open an issue](https://github.com/apache/fory/issues/new/choose) describing the need to do so and update the document if necessary. --> - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? --------- Co-authored-by: Shawn Yang <chaokunyang@apache.org>
1 parent 461d5f8 commit 373e712

4 files changed

Lines changed: 331 additions & 13 deletions

File tree

docs/guide/java_serialization_guide.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,84 @@ Note that when implementing custom map or collection serializers:
10891089

10901090
Besides registering serializes, one can also implement `java.io.Externalizable` for a class to customize serialization logic, such type will be serialized by fory `ExternalizableSerializer`.
10911091

1092+
### Memory Allocation Customization
1093+
1094+
Fory provides a `MemoryAllocator` interface that allows you to customize how memory buffers are allocated and grown during serialization operations. This can be useful for performance optimization, memory pooling, or debugging memory usage.
1095+
1096+
#### MemoryAllocator Interface
1097+
1098+
The `MemoryAllocator` interface defines two key methods:
1099+
1100+
```java
1101+
public interface MemoryAllocator {
1102+
/**
1103+
* Allocates a new MemoryBuffer with the specified initial capacity.
1104+
*/
1105+
MemoryBuffer allocate(int initialCapacity);
1106+
1107+
/**
1108+
* Grows an existing buffer to accommodate the new capacity.
1109+
* The implementation must grow the buffer in-place by modifying
1110+
* the existing buffer instance.
1111+
*/
1112+
MemoryBuffer grow(MemoryBuffer buffer, int newCapacity);
1113+
}
1114+
```
1115+
1116+
#### Using Custom Memory Allocators
1117+
1118+
You can set a global memory allocator that will be used by all `MemoryBuffer` instances:
1119+
1120+
```java
1121+
// Create a custom allocator
1122+
MemoryAllocator customAllocator = new MemoryAllocator() {
1123+
@Override
1124+
public MemoryBuffer allocate(int initialCapacity) {
1125+
// Add extra capacity for debugging or pooling
1126+
return MemoryBuffer.fromByteArray(new byte[initialCapacity + 100]);
1127+
}
1128+
1129+
@Override
1130+
public MemoryBuffer grow(MemoryBuffer buffer, int newCapacity) {
1131+
if (newCapacity <= buffer.size()) {
1132+
return buffer;
1133+
}
1134+
1135+
// Custom growth strategy - add 100% extra capacity
1136+
int newSize = (int) (newCapacity * 2);
1137+
byte[] data = new byte[newSize];
1138+
buffer.copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, buffer.size());
1139+
buffer.initHeapBuffer(data, 0, data.length);
1140+
return buffer;
1141+
}
1142+
};
1143+
1144+
// Set the custom allocator globally
1145+
MemoryBuffer.setGlobalAllocator(customAllocator);
1146+
1147+
// All subsequent MemoryBuffer allocations will use your custom allocator
1148+
Fory fory = Fory.builder().withLanguage(Language.JAVA).build();
1149+
byte[] bytes = fory.serialize(someObject); // Uses custom allocator
1150+
```
1151+
1152+
#### Default Memory Allocator Behavior
1153+
1154+
The default allocator uses the following growth strategy:
1155+
1156+
- For buffers smaller than `BUFFER_GROW_STEP_THRESHOLD` (100MB): multiply capacity by 2
1157+
- For larger buffers: multiply capacity by 1.5 (capped at `Integer.MAX_VALUE - 8`)
1158+
1159+
This provides a balance between avoiding frequent reallocations and preventing excessive memory usage.
1160+
1161+
#### Use Cases
1162+
1163+
Custom memory allocators are useful for:
1164+
1165+
- **Memory Pooling**: Reuse allocated buffers to reduce GC pressure
1166+
- **Performance Tuning**: Use different growth strategies based on your workload
1167+
- **Debugging**: Add logging or tracking to monitor memory usage
1168+
- **Off-heap Memory**: Integrate with off-heap memory management systems
1169+
10921170
### Security & Class Registration
10931171

10941172
`ForyBuilder#requireClassRegistration` can be used to disable class registration, this will allow to deserialize objects
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.fory.memory;
21+
22+
/** Interface for customizing memory allocation strategies in MemoryBuffer. */
23+
public interface MemoryAllocator {
24+
/**
25+
* Allocates a new MemoryBuffer with the specified initial capacity.
26+
*
27+
* @param initialCapacity the initial capacity for the buffer
28+
* @return a new MemoryBuffer instance
29+
*/
30+
MemoryBuffer allocate(int initialCapacity);
31+
32+
/**
33+
* Grows an existing buffer to accommodate the new capacity. The implementation must grow the
34+
* buffer in-place by modifying the existing buffer instance.
35+
*
36+
* @param buffer the existing buffer to grow
37+
* @param newCapacity the required new capacity
38+
* @return the same MemoryBuffer instance with at least the new capacity
39+
*/
40+
MemoryBuffer grow(MemoryBuffer buffer, int newCapacity);
41+
}

java/fory-core/src/main/java/org/apache/fory/memory/MemoryBuffer.java

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public final class MemoryBuffer {
6464
private static final Unsafe UNSAFE = Platform.UNSAFE;
6565
private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
6666

67+
// Global allocator instance that can be customized
68+
private static volatile MemoryAllocator globalAllocator = new DefaultMemoryAllocator();
69+
6770
// If the data in on the heap, `heapMemory` will be non-null, and its' the object relative to
6871
// which we access the memory.
6972
// If we have this buffer, we must never void this reference, or the memory buffer will point
@@ -1233,27 +1236,17 @@ public void writePrimitiveArray(Object arr, int offset, int numBytes) {
12331236
public void grow(int neededSize) {
12341237
int length = writerIndex + neededSize;
12351238
if (length > size) {
1236-
growBuffer(length);
1239+
globalAllocator.grow(this, length);
12371240
}
12381241
}
12391242

12401243
/** For off-heap buffer, this will make a heap buffer internally. */
12411244
public void ensure(int length) {
12421245
if (length > size) {
1243-
growBuffer(length);
1246+
globalAllocator.grow(this, length);
12441247
}
12451248
}
12461249

1247-
private void growBuffer(int length) {
1248-
int newSize =
1249-
length < BUFFER_GROW_STEP_THRESHOLD
1250-
? length << 2
1251-
: (int) Math.min(length * 1.5d, Integer.MAX_VALUE - 8);
1252-
byte[] data = new byte[newSize];
1253-
copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, size());
1254-
initHeapBuffer(data, 0, data.length);
1255-
}
1256-
12571250
// -------------------------------------------------------------------------
12581251
// Read Methods
12591252
// -------------------------------------------------------------------------
@@ -2607,6 +2600,59 @@ public String toString() {
26072600
+ '}';
26082601
}
26092602

2603+
// ------------------------------------------------------------------------
2604+
// Memory Allocator Support
2605+
// ------------------------------------------------------------------------
2606+
2607+
/** Default memory allocator that uses the original heap-based allocation strategy. */
2608+
private static final class DefaultMemoryAllocator implements MemoryAllocator {
2609+
@Override
2610+
public MemoryBuffer allocate(int initialSize) {
2611+
return fromByteArray(new byte[initialSize]);
2612+
}
2613+
2614+
@Override
2615+
public MemoryBuffer grow(MemoryBuffer buffer, int newCapacity) {
2616+
if (newCapacity <= buffer.size()) {
2617+
return buffer;
2618+
}
2619+
2620+
int newSize =
2621+
newCapacity < BUFFER_GROW_STEP_THRESHOLD
2622+
? newCapacity << 1
2623+
: (int) Math.min(newCapacity * 1.5d, Integer.MAX_VALUE - 8);
2624+
2625+
byte[] data = new byte[newSize];
2626+
buffer.copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, buffer.size());
2627+
buffer.initHeapBuffer(data, 0, data.length);
2628+
2629+
return buffer;
2630+
}
2631+
}
2632+
2633+
/**
2634+
* Sets the global memory allocator. This affects all new MemoryBuffer allocations and growth
2635+
* operations.
2636+
*
2637+
* @param allocator the new global allocator to use
2638+
* @throws NullPointerException if allocator is null
2639+
*/
2640+
public static void setGlobalAllocator(MemoryAllocator allocator) {
2641+
if (allocator == null) {
2642+
throw new NullPointerException("Memory allocator cannot be null");
2643+
}
2644+
globalAllocator = allocator;
2645+
}
2646+
2647+
/**
2648+
* Gets the current global memory allocator.
2649+
*
2650+
* @return the current global allocator
2651+
*/
2652+
public static MemoryAllocator getGlobalAllocator() {
2653+
return globalAllocator;
2654+
}
2655+
26102656
/** Point this buffer to a new byte array. */
26112657
public void pointTo(byte[] buffer, int offset, int length) {
26122658
initHeapBuffer(buffer, offset, length);
@@ -2663,6 +2709,6 @@ public static MemoryBuffer fromNativeAddress(long address, int size) {
26632709
* enough.
26642710
*/
26652711
public static MemoryBuffer newHeapBuffer(int initialSize) {
2666-
return fromByteArray(new byte[initialSize]);
2712+
return globalAllocator.allocate(initialSize);
26672713
}
26682714
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.fory.memory;
21+
22+
import static org.testng.Assert.assertEquals;
23+
import static org.testng.Assert.assertFalse;
24+
import static org.testng.Assert.assertSame;
25+
import static org.testng.Assert.assertTrue;
26+
27+
import org.testng.annotations.AfterMethod;
28+
import org.testng.annotations.BeforeMethod;
29+
import org.testng.annotations.Test;
30+
31+
public class MemoryAllocatorTest {
32+
33+
private MemoryAllocator originalAllocator;
34+
35+
@BeforeMethod
36+
public void setUp() {
37+
// Save the original allocator before each test
38+
originalAllocator = MemoryBuffer.getGlobalAllocator();
39+
}
40+
41+
@AfterMethod
42+
public void tearDown() {
43+
// Restore the original allocator after each test
44+
MemoryBuffer.setGlobalAllocator(originalAllocator);
45+
}
46+
47+
@Test
48+
public void testDefaultMemoryAllocator() {
49+
MemoryAllocator defaultAllocator = MemoryBuffer.getGlobalAllocator();
50+
51+
MemoryBuffer buffer = defaultAllocator.allocate(100);
52+
assertEquals(buffer.size(), 100);
53+
assertFalse(buffer.isOffHeap());
54+
55+
// Test growth below BUFFER_GROW_STEP_THRESHOLD (should multiply by 2)
56+
defaultAllocator.grow(buffer, 200);
57+
assertEquals(buffer.size(), 200 << 1);
58+
59+
// Test growth above BUFFER_GROW_STEP_THRESHOLD
60+
buffer = defaultAllocator.allocate(100);
61+
int largeCapacity = MemoryBuffer.BUFFER_GROW_STEP_THRESHOLD + 1000;
62+
defaultAllocator.grow(buffer, largeCapacity);
63+
int expectedSize = (int) Math.min(largeCapacity * 1.5d, Integer.MAX_VALUE - 8);
64+
assertEquals(buffer.size(), expectedSize);
65+
}
66+
67+
@Test
68+
public void testDefaultMemoryAllocatorDataPreservation() {
69+
MemoryAllocator defaultAllocator = MemoryBuffer.getGlobalAllocator();
70+
MemoryBuffer buffer = defaultAllocator.allocate(100);
71+
72+
// Write some test data
73+
byte[] testData = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
74+
buffer.writeBytes(testData);
75+
buffer.writeInt32(42);
76+
buffer.writeInt64(123456789L);
77+
78+
int writerIndexBeforeGrowth = buffer.writerIndex();
79+
80+
// Grow the buffer
81+
defaultAllocator.grow(buffer, 500);
82+
83+
// Verify data is preserved
84+
buffer.readerIndex(0);
85+
byte[] readData = new byte[testData.length];
86+
buffer.readBytes(readData);
87+
for (int i = 0; i < testData.length; i++) {
88+
assertEquals(readData[i], testData[i]);
89+
}
90+
91+
assertEquals(buffer.readInt32(), 42);
92+
assertEquals(buffer.readInt64(), 123456789L);
93+
assertEquals(buffer.writerIndex(), writerIndexBeforeGrowth);
94+
}
95+
96+
@Test
97+
public void testDefaultMemoryAllocatorGrowthSameInstance() {
98+
MemoryAllocator defaultAllocator = MemoryBuffer.getGlobalAllocator();
99+
MemoryBuffer buffer = defaultAllocator.allocate(100);
100+
101+
// Growth should return the same instance
102+
MemoryBuffer grownBuffer = defaultAllocator.grow(buffer, 200);
103+
assertSame(buffer, grownBuffer);
104+
}
105+
106+
@Test
107+
public void testCustomAllocator() {
108+
// Create a custom allocator that adds a marker
109+
MemoryAllocator customAllocator =
110+
new MemoryAllocator() {
111+
@Override
112+
public MemoryBuffer allocate(int initialCapacity) {
113+
// Use larger capacity as a marker
114+
return MemoryBuffer.fromByteArray(new byte[initialCapacity + 10]);
115+
}
116+
117+
@Override
118+
public MemoryBuffer grow(MemoryBuffer buffer, int newCapacity) {
119+
if (newCapacity <= buffer.size()) {
120+
return buffer;
121+
}
122+
123+
// Use default grow logic but with custom marker
124+
int newSize = newCapacity + 10; // Add 10 as marker
125+
byte[] data = new byte[newSize];
126+
buffer.copyToUnsafe(0, data, Platform.BYTE_ARRAY_OFFSET, buffer.size());
127+
buffer.initHeapBuffer(data, 0, data.length);
128+
return buffer;
129+
}
130+
};
131+
132+
// Set the custom allocator
133+
MemoryBuffer.setGlobalAllocator(customAllocator);
134+
assertSame(MemoryBuffer.getGlobalAllocator(), customAllocator);
135+
136+
// Test allocation
137+
MemoryBuffer buffer = MemoryBuffer.newHeapBuffer(100);
138+
assertEquals(buffer.size(), 110); // 100 + 10 marker
139+
140+
// Test growth
141+
buffer.writerIndex(50);
142+
buffer.readerIndex(10);
143+
buffer.ensure(200); // This should trigger growth
144+
assertEquals(buffer.writerIndex(), 50);
145+
assertEquals(buffer.readerIndex(), 10);
146+
assertTrue(buffer.size() >= 210); // Should be at least 200 + 10 marker
147+
}
148+
149+
@Test(expectedExceptions = NullPointerException.class)
150+
public void testSetNullAllocator() {
151+
MemoryBuffer.setGlobalAllocator(null);
152+
}
153+
}

0 commit comments

Comments
 (0)