Skip to content

Commit 1e2bb63

Browse files
authored
feat(python/c++): shrink stream buffers after struct deserialization (apache#3453)
## Why? ## What does this PR do? ## Related issues apache#3449 apache#3307 ## Does this PR introduce any user-facing change? - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark
1 parent 9c4510a commit 1e2bb63

14 files changed

Lines changed: 190 additions & 13 deletions

File tree

cpp/fory/serialization/stream_test.cc

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <istream>
2222
#include <map>
2323
#include <memory>
24+
#include <sstream>
2425
#include <streambuf>
2526
#include <string>
2627
#include <utility>
@@ -218,17 +219,20 @@ TEST(StreamSerializationTest, SequentialDeserializeFromSingleStream) {
218219
auto first = fory.deserialize<int32_t>(stream);
219220
ASSERT_TRUE(first.ok()) << first.error().to_string();
220221
EXPECT_EQ(first.value(), 12345);
221-
EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
222+
const uint32_t first_reader_index = stream.get_buffer().reader_index();
223+
EXPECT_GT(first_reader_index, 0U);
222224

223225
auto second = fory.deserialize<std::string>(stream);
224226
ASSERT_TRUE(second.ok()) << second.error().to_string();
225227
EXPECT_EQ(second.value(), "next-value");
226-
EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
228+
const uint32_t second_reader_index = stream.get_buffer().reader_index();
229+
EXPECT_GT(second_reader_index, first_reader_index);
227230

228231
auto third = fory.deserialize<StreamEnvelope>(stream);
229232
ASSERT_TRUE(third.ok()) << third.error().to_string();
230233
EXPECT_EQ(third.value(), envelope);
231-
EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
234+
const uint32_t third_reader_index = stream.get_buffer().reader_index();
235+
EXPECT_GT(third_reader_index, second_reader_index);
232236

233237
EXPECT_EQ(stream.get_buffer().remaining_size(), 0U);
234238
}
@@ -310,6 +314,49 @@ TEST(StreamSerializationTest, SerializeToOStreamOverloadParity) {
310314
EXPECT_EQ(out.data(), expected.value());
311315
}
312316

317+
TEST(StreamSerializationTest,
318+
StructDeserializeFromStreamBackedBufferShrinksAfterEachStruct) {
319+
auto fory = Fory::builder().xlang(true).track_ref(true).build();
320+
register_stream_types(fory);
321+
322+
std::vector<int32_t> first_values;
323+
std::vector<int32_t> second_values;
324+
first_values.reserve(6000);
325+
second_values.reserve(6000);
326+
for (int32_t i = 0; i < 6000; ++i) {
327+
first_values.push_back(i);
328+
second_values.push_back(6000 - i);
329+
}
330+
331+
StreamEnvelope first{
332+
"first", std::move(first_values), {{"a", 11}, {"b", 22}}, {7, 8}, true,
333+
};
334+
StreamEnvelope second{
335+
"second", std::move(second_values), {{"c", 33}, {"d", 44}}, {9, 10},
336+
false,
337+
};
338+
339+
std::vector<uint8_t> bytes;
340+
ASSERT_TRUE(fory.serialize_to(bytes, first).ok());
341+
ASSERT_TRUE(fory.serialize_to(bytes, second).ok());
342+
343+
std::string payload(reinterpret_cast<const char *>(bytes.data()),
344+
bytes.size());
345+
std::istringstream source(payload);
346+
StdInputStream stream(source, 4096);
347+
Buffer &buffer = stream.get_buffer();
348+
349+
auto first_result = fory.deserialize<StreamEnvelope>(buffer);
350+
ASSERT_TRUE(first_result.ok()) << first_result.error().to_string();
351+
EXPECT_EQ(first_result.value(), first);
352+
EXPECT_EQ(buffer.reader_index(), 0U);
353+
354+
auto second_result = fory.deserialize<StreamEnvelope>(buffer);
355+
ASSERT_TRUE(second_result.ok()) << second_result.error().to_string();
356+
EXPECT_EQ(second_result.value(), second);
357+
EXPECT_EQ(buffer.reader_index(), 0U);
358+
}
359+
313360
} // namespace test
314361
} // namespace serialization
315362
} // namespace fory

cpp/fory/serialization/struct_serializer.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2773,7 +2773,7 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx,
27732773
// Note: varint bounds checking is done per-byte during reading since
27742774
// varint lengths are variable (actual size << max possible size)
27752775
if constexpr (varint_count > 0) {
2776-
if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
2776+
if (FORY_PREDICT_FALSE(buffer.has_input_stream())) {
27772777
// Stream-backed buffers may not have all varint bytes materialized yet.
27782778
// Fall back to per-field readers that propagate stream read errors.
27792779
read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
@@ -2828,7 +2828,7 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx,
28282828

28292829
// Phase 2: Read consecutive varint primitives (int32, int64) if any
28302830
if constexpr (varint_count > 0) {
2831-
if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
2831+
if (FORY_PREDICT_FALSE(buffer.has_input_stream())) {
28322832
// Stream-backed buffers may not have all varint bytes materialized yet.
28332833
// Fall back to per-field readers that propagate stream read errors.
28342834
read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
@@ -3270,6 +3270,7 @@ struct Serializer<T, std::enable_if_t<is_fory_serializable_v<T>>> {
32703270
if (FORY_PREDICT_FALSE(ctx.has_error())) {
32713271
return T{};
32723272
}
3273+
ctx.buffer().shrink_input_buffer();
32733274
return obj;
32743275
}
32753276

@@ -3279,6 +3280,7 @@ struct Serializer<T, std::enable_if_t<is_fory_serializable_v<T>>> {
32793280
if (FORY_PREDICT_FALSE(ctx.has_error())) {
32803281
return T{};
32813282
}
3283+
ctx.buffer().shrink_input_buffer();
32823284
return obj;
32833285
}
32843286

@@ -3290,6 +3292,7 @@ struct Serializer<T, std::enable_if_t<is_fory_serializable_v<T>>> {
32903292
return T{};
32913293
}
32923294

3295+
ctx.buffer().shrink_input_buffer();
32933296
return obj;
32943297
}
32953298

@@ -3333,6 +3336,7 @@ struct Serializer<T, std::enable_if_t<is_fory_serializable_v<T>>> {
33333336
return T{};
33343337
}
33353338

3339+
ctx.buffer().shrink_input_buffer();
33363340
return obj;
33373341
}
33383342

cpp/fory/util/buffer.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ class Buffer {
106106

107107
FORY_ALWAYS_INLINE bool own_data() const { return own_data_; }
108108

109-
FORY_ALWAYS_INLINE bool is_stream_backed() const {
109+
FORY_ALWAYS_INLINE bool has_input_stream() const {
110110
return input_stream_ != nullptr;
111111
}
112112

113-
FORY_ALWAYS_INLINE bool is_output_stream_backed() const {
113+
FORY_ALWAYS_INLINE bool has_output_stream() const {
114114
return output_stream_ != nullptr;
115115
}
116116

@@ -135,8 +135,12 @@ class Buffer {
135135
output_stream_ = nullptr;
136136
}
137137

138-
FORY_ALWAYS_INLINE void shrink_stream_buffer() {
139-
if (input_stream_ != nullptr) {
138+
// Best-effort stream buffer compaction entry point.
139+
// Stage 1 guard: avoid calling into stream shrinking for very small progress.
140+
// Stage 2 guard lives in InputStream::shrink_buffer(), which can decide based
141+
// on stream-specific configured buffer size.
142+
FORY_ALWAYS_INLINE void shrink_input_buffer() {
143+
if (FORY_PREDICT_FALSE(input_stream_ != nullptr && reader_index_ > 4096)) {
140144
input_stream_->shrink_buffer();
141145
}
142146
}

cpp/fory/util/buffer_test.cc

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include <iostream>
2121
#include <limits>
22+
#include <sstream>
23+
#include <string>
2224
#include <utility>
2325
#include <vector>
2426

@@ -330,6 +332,33 @@ TEST(Buffer, StreamSkipAndUnread) {
330332
EXPECT_EQ(view.reader_index(), 2U);
331333
}
332334

335+
TEST(Buffer, StreamShrinkBufferBestEffortUsesConfiguredBufferSize) {
336+
constexpr uint32_t kConfiguredBufferSize = 32768;
337+
constexpr uint32_t kPayloadSize = kConfiguredBufferSize * 2;
338+
std::string payload(kPayloadSize, '\x7');
339+
std::istringstream source(payload);
340+
StdInputStream stream(source, kConfiguredBufferSize);
341+
Buffer reader(stream);
342+
Error error;
343+
344+
reader.skip(5000, error);
345+
ASSERT_TRUE(error.ok()) << error.to_string();
346+
EXPECT_EQ(reader.reader_index(), 5000U);
347+
348+
// Below configured input buffer size, shrink should be a no-op.
349+
reader.shrink_input_buffer();
350+
EXPECT_EQ(reader.reader_index(), 5000U);
351+
352+
reader.skip(kConfiguredBufferSize, error);
353+
ASSERT_TRUE(error.ok()) << error.to_string();
354+
ASSERT_GT(reader.reader_index(), kConfiguredBufferSize);
355+
356+
const uint32_t remaining_before = reader.remaining_size();
357+
reader.shrink_input_buffer();
358+
EXPECT_EQ(reader.reader_index(), 0U);
359+
EXPECT_EQ(reader.size(), remaining_before);
360+
}
361+
333362
TEST(Buffer, StreamReadErrorWhenInsufficientData) {
334363
std::vector<uint8_t> raw{0x01, 0x02, 0x03};
335364
OneByteIStream one_byte_stream(raw);
@@ -398,8 +427,8 @@ TEST(Buffer, OutputStreamRebindDetachesPreviousBufferBacklink) {
398427

399428
first->bind_output_stream(&writer);
400429
second->bind_output_stream(&writer);
401-
EXPECT_FALSE(first->is_output_stream_backed());
402-
EXPECT_TRUE(second->is_output_stream_backed());
430+
EXPECT_FALSE(first->has_output_stream());
431+
EXPECT_TRUE(second->has_output_stream());
403432

404433
writer.enter_flush_barrier();
405434
std::vector<uint8_t> second_payload(5000, 7);

cpp/fory/util/stream.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,14 @@ void StdInputStream::shrink_buffer() {
204204
}
205205

206206
const uint32_t read_pos = buffer_->reader_index_;
207+
// Best-effort policy:
208+
// 1) keep a hard 4096-byte floor to avoid tiny frequent compactions;
209+
// 2) for larger configured input buffers, require at least one full initial
210+
// buffer worth of consumed bytes before moving unread data.
211+
if (FORY_PREDICT_TRUE(read_pos <= 4096 || read_pos < initial_buffer_size_)) {
212+
return;
213+
}
214+
207215
const uint32_t remaining = remaining_size();
208216
if (read_pos > 0) {
209217
if (remaining > 0) {

cpp/fory/util/stream.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ class InputStream : public std::enable_shared_from_this<InputStream> {
133133

134134
virtual Result<void, Error> unread(uint32_t size) = 0;
135135

136+
// Best-effort input-buffer compaction/reclaim hook. Callers may invoke this
137+
// frequently; implementations should return quickly unless configured
138+
// compaction thresholds are met.
136139
virtual void shrink_buffer() = 0;
137140

138141
virtual Buffer &get_buffer() = 0;

python/pyfory/_fory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,8 @@ def write_buffer_object(self, buffer, buffer_object: BufferObject):
698698
def read_buffer_object(self, buffer) -> Buffer:
699699
if not self.is_peer_out_of_band_enabled:
700700
size = buffer.read_var_uint32()
701+
if buffer.has_input_stream():
702+
return buffer.read_bytes(size)
701703
reader_index = buffer.get_reader_index()
702704
buf = buffer.slice(reader_index, size)
703705
buffer.set_reader_index(reader_index + size)
@@ -707,6 +709,8 @@ def read_buffer_object(self, buffer) -> Buffer:
707709
assert self._buffers is not None
708710
return next(self._buffers)
709711
size = buffer.read_var_uint32()
712+
if buffer.has_input_stream():
713+
return buffer.read_bytes(size)
710714
reader_index = buffer.get_reader_index()
711715
buf = buffer.slice(reader_index, size)
712716
buffer.set_reader_index(reader_index + size)

python/pyfory/buffer.pxi

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ cdef class Buffer:
244244
raise ValueError("writer_index must be >= 0")
245245
self.c_buffer.writer_index(<uint32_t>value)
246246

247+
cpdef inline void shrink_input_buffer(self):
248+
self.c_buffer.shrink_input_buffer()
249+
250+
cpdef inline c_bool has_input_stream(self):
251+
return self.c_buffer.has_input_stream()
252+
247253
cpdef c_bool own_data(self):
248254
return self.c_buffer.own_data()
249255

python/pyfory/cpp/pyfory.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,14 @@ class PyInputStream final : public InputStream {
366366
}
367367

368368
const uint32_t read_pos = buffer_->reader_index_;
369+
// Keep Python-backed InputStream shrink behavior aligned with C++:
370+
// best-effort compaction only after both the global floor (4096) and the
371+
// configured stream buffer size threshold are crossed.
372+
if (FORY_PREDICT_TRUE(read_pos <= 4096 ||
373+
read_pos < initial_buffer_size_)) {
374+
return;
375+
}
376+
369377
const uint32_t remaining = remaining_size();
370378
if (read_pos > 0) {
371379
if (remaining > 0) {
@@ -1491,13 +1499,13 @@ int Fory_PyPrimitiveCollectionReadFromBuffer(PyObject *collection,
14911499
"tuple collection size is smaller than requested read size");
14921500
return -1;
14931501
}
1494-
if (!buffer->is_stream_backed() && kind == PythonCollectionKind::List) {
1502+
if (!buffer->has_input_stream() && kind == PythonCollectionKind::List) {
14951503
return read_primitive_sequence_indexed(
14961504
buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
14971505
PyList_SET_ITEM(collection, i, item);
14981506
});
14991507
}
1500-
if (!buffer->is_stream_backed() && kind == PythonCollectionKind::Tuple) {
1508+
if (!buffer->has_input_stream() && kind == PythonCollectionKind::Tuple) {
15011509
return read_primitive_sequence_indexed(
15021510
buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
15031511
PyTuple_SET_ITEM(collection, i, item);

python/pyfory/includes/libutil.pxd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ cdef extern from "fory/util/buffer.h" namespace "fory" nogil:
205205

206206
void skip(uint32_t length, CError& error)
207207

208+
c_bool has_input_stream() const
209+
210+
void shrink_input_buffer()
211+
208212
void copy(uint32_t start, uint32_t nbytes,
209213
uint8_t* out, uint32_t offset) const
210214

0 commit comments

Comments
 (0)