From 5192dbbb7a56392594c8b473b9e98404417cb6b1 Mon Sep 17 00:00:00 2001 From: Sambhav Jain Date: Fri, 8 May 2026 13:38:14 +0000 Subject: [PATCH 1/6] add benchmark for the buffer read writer --- lib/store/base/buffer_readwriter_test.go | 117 +++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/lib/store/base/buffer_readwriter_test.go b/lib/store/base/buffer_readwriter_test.go index cfcf7c5bf..b55aa4b0f 100644 --- a/lib/store/base/buffer_readwriter_test.go +++ b/lib/store/base/buffer_readwriter_test.go @@ -15,7 +15,10 @@ package base import ( + "fmt" "io" + "runtime" + "sync" "testing" "testing/iotest" @@ -308,3 +311,117 @@ func TestBufferReadWriter_TestReader(t *testing.T) { err = iotest.TestReader(buf, content) require.NoError(t, err) } + +// TestBufferReadWriter_ConcurrentWriteAt validates that concurrent writes to +// non-overlapping byte ranges on a pre-sized buffer produce correct results. +// Run with -race to confirm no data races. +func TestBufferReadWriter_ConcurrentWriteAt(t *testing.T) { + const numShards, shardSize = 10, 1024 + data := make([]byte, numShards*shardSize) + for i := range data { + data[i] = byte(i % 256) + } + + buf := NewBufferReadWriter(numShards * shardSize) + var wg sync.WaitGroup + for i := 0; i < numShards; i++ { + wg.Add(1) + go func(shard int) { + defer wg.Done() + off := int64(shard * shardSize) + _, err := buf.WriteAt(data[off:off+shardSize], off) + require.NoError(t, err) + }(i) + } + wg.Wait() + assert.Equal(t, data, buf.Bytes()) +} + +// totalMutexContentions returns the total number of mutex contention events +// recorded in the runtime mutex profile since profiling was enabled. +func totalMutexContentions() int64 { + records := make([]runtime.BlockProfileRecord, 10000) + n, _ := runtime.MutexProfile(records) + var total int64 + for i := 0; i < n; i++ { + total += records[i].Count + } + return total +} + +// benchmarkWriteAt is the shared helper for all WriteAt benchmarks. +// numShards goroutines each write a non-overlapping 4 MiB shard concurrently, +// matching the transfermanager production workload. +// initSize controls the buffer's initial allocation: +// - initSize == totalSize: pre-sized fast path (production case) +// - initSize == 0: dynamic growth (sequential / wrong-size case) +// - initSize == totalSize/2: partial pre-allocation, triggers growth mid-download +func benchmarkWriteAt(b *testing.B, numShards int, initSize uint64) { + b.Helper() + const shardSize = 4 * 1024 * 1024 + totalSize := uint64(numShards) * shardSize + + shards := make([][]byte, numShards) + for i := range shards { + shards[i] = make([]byte, shardSize) + for j := range shards[i] { + shards[i][j] = byte(i) + } + } + + runtime.SetMutexProfileFraction(1) + b.ResetTimer() + b.SetBytes(int64(totalSize)) + b.ReportAllocs() + + startContentions := totalMutexContentions() + + for i := 0; i < b.N; i++ { + buf := NewBufferReadWriter(initSize) + var wg sync.WaitGroup + for shard := 0; shard < numShards; shard++ { + wg.Add(1) + go func(s int) { + defer wg.Done() + _, err := buf.WriteAt(shards[s], int64(s)*shardSize) + require.NoError(b, err) + }(shard) + } + wg.Wait() + } + + b.StopTimer() + if b.N > 0 { + b.ReportMetric(float64(totalMutexContentions()-startContentions)/float64(b.N), "mutex-contentions/op") + } +} + +// BenchmarkBufferReadWriter_WriteAt exercises three buffer initialisation +// strategies × three shard counts, giving a full picture of throughput and +// mutex contention under varying concurrency and pre-allocation. +// +// Run before and after the implementation change, then compare with: +// +// benchstat bench-results/before.txt bench-results/after.txt +func BenchmarkBufferReadWriter_WriteAt(b *testing.B) { + const shardSize = 4 * 1024 * 1024 + cases := []struct { + label string + initFunc func(total uint64) uint64 + }{ + {"presized", func(total uint64) uint64 { return total }}, // production fast path + {"half_presized", func(total uint64) uint64 { return total / 2 }}, // partial pre-alloc, growth needed + {"dynamic", func(total uint64) uint64 { return 0 }}, // no pre-alloc, always grows + } + shardCounts := []int{1, 4, 10} + + for _, tc := range cases { + for _, numShards := range shardCounts { + totalSize := uint64(numShards) * shardSize + initSize := tc.initFunc(totalSize) + b.Run(fmt.Sprintf("%s_%d_shards", tc.label, numShards), func(b *testing.B) { + benchmarkWriteAt(b, numShards, initSize) + }) + } + } +} From f7eb51af9f773ed320b7bc471e9d3d58decf27e2 Mon Sep 17 00:00:00 2001 From: Sambhav Jain Date: Fri, 8 May 2026 13:46:52 +0000 Subject: [PATCH 2/6] update buffer read writer to use []byte array --- lib/store/base/buffer_readwriter.go | 115 +++++++++++++++-------- lib/store/base/buffer_readwriter_test.go | 50 +++++----- 2 files changed, 104 insertions(+), 61 deletions(-) diff --git a/lib/store/base/buffer_readwriter.go b/lib/store/base/buffer_readwriter.go index 67aa5191e..7e76fd409 100644 --- a/lib/store/base/buffer_readwriter.go +++ b/lib/store/base/buffer_readwriter.go @@ -17,54 +17,96 @@ package base import ( "fmt" "io" - - "github.com/aws/aws-sdk-go/aws" + "sync" + "sync/atomic" ) var _ FileReadWriter = &BufferReadWriter{} -// BufferReadWriter implements FileReadWriter interface for in-memory buffering. +// BufferReadWriter implements FileReadWriter for in-memory buffering. +// +// When created with size > 0, WriteAt takes a fast path using RLock, allowing +// concurrent goroutines writing to non-overlapping byte ranges to proceed in +// parallel with no serialization. The maximum written extent is tracked via an +// atomic so that Bytes(), Size(), Read, and ReadAt return only the data that +// was actually written. +// +// When created with size == 0, every WriteAt that extends the buffer acquires a +// full write lock to grow the backing slice. +// +// Write, Read, ReadAt, and Seek must not be called concurrently with each other +// or with WriteAt. type BufferReadWriter struct { - buf *aws.WriteAtBuffer - offset int64 + mu sync.RWMutex + buf []byte + written atomic.Int64 + offset int64 } -// NewBufferReadWriter creates a new BufferReadWriter with an initial capacity of size bytes. +// NewBufferReadWriter creates a new BufferReadWriter pre-allocated to size bytes. +// Pass the exact blob size when known to enable lock-free +// concurrent WriteAt calls for non-overlapping shard ranges. func NewBufferReadWriter(size uint64) *BufferReadWriter { - bytesSlice := make([]byte, 0, size) - buf := aws.NewWriteAtBuffer(bytesSlice) - // Although this is default, this is explicitly set to notify that we are reserving - // only as much capacity as needed - buf.GrowthCoeff = 1 - - return &BufferReadWriter{ - buf: buf, - offset: 0, - } + return &BufferReadWriter{buf: make([]byte, size)} } -// Write implements io.Writer by using WriteAt with current write offset. +// Write implements io.Writer using the current sequential write offset. func (b *BufferReadWriter) Write(p []byte) (n int, err error) { - n, err = b.buf.WriteAt(p, b.offset) + n, err = b.WriteAt(p, b.offset) b.offset += int64(n) return n, err } -// WriteAt implements io.WriterAt for parallel writes. -func (b *BufferReadWriter) WriteAt(p []byte, off int64) (n int, err error) { +// WriteAt implements io.WriterAt. +// +// Fast path (off+len(p) within pre-allocated buffer): multiple goroutines may +// call WriteAt concurrently, provided their byte ranges do not overlap. +// Slow path (write extends beyond current buffer): acquires an exclusive lock +// to grow the buffer, then writes. +func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) { if off < 0 { return 0, fmt.Errorf("negative offset") } - return b.buf.WriteAt(p, off) + end := off + int64(len(p)) + if end < off { + return 0, fmt.Errorf("write at offset %d length %d overflows int64", off, len(p)) + } + + b.mu.RLock() + if end <= int64(len(b.buf)) { + n := copy(b.buf[off:], p) + for { + cur := b.written.Load() + if end <= cur || b.written.CompareAndSwap(cur, end) { + break + } + } + b.mu.RUnlock() + return n, nil + } + b.mu.RUnlock() + + b.mu.Lock() + defer b.mu.Unlock() + if end > int64(len(b.buf)) { + grown := make([]byte, end) + copy(grown, b.buf) + b.buf = grown + } + n := copy(b.buf[off:], p) + if end > b.written.Load() { + b.written.Store(end) + } + return n, nil } // Read implements io.Reader for sequential reads. func (b *BufferReadWriter) Read(p []byte) (n int, err error) { - bufBytes := b.buf.Bytes() - if b.offset >= int64(len(bufBytes)) { + written := b.written.Load() + if b.offset >= written { return 0, io.EOF } - n = copy(p, bufBytes[b.offset:]) + n = copy(p, b.buf[b.offset:written]) b.offset += int64(n) if n < len(p) { err = io.EOF @@ -77,11 +119,14 @@ func (b *BufferReadWriter) ReadAt(p []byte, off int64) (n int, err error) { if off < 0 { return 0, fmt.Errorf("negative offset") } - bufBytes := b.buf.Bytes() - if off >= int64(len(bufBytes)) { + b.mu.RLock() + buf := b.buf + written := b.written.Load() + b.mu.RUnlock() + if off >= written { return 0, io.EOF } - n = copy(p, bufBytes[off:]) + n = copy(p, buf[off:written]) if n < len(p) { err = io.EOF } @@ -91,23 +136,19 @@ func (b *BufferReadWriter) ReadAt(p []byte, off int64) (n int, err error) { // Seek implements io.Seeker. func (b *BufferReadWriter) Seek(offset int64, whence int) (int64, error) { var newOffset int64 - bufSize := int64(len(b.buf.Bytes())) - switch whence { case io.SeekStart: newOffset = offset case io.SeekCurrent: newOffset = b.offset + offset case io.SeekEnd: - newOffset = bufSize + offset + newOffset = b.written.Load() + offset default: return 0, fmt.Errorf("invalid whence: %d", whence) } - if newOffset < 0 { return 0, fmt.Errorf("negative position: %d", newOffset) } - b.offset = newOffset return newOffset, nil } @@ -117,10 +158,8 @@ func (b *BufferReadWriter) Close() error { return nil } -// Size returns the size of the buffer -func (b *BufferReadWriter) Size() int64 { - return int64(len(b.buf.Bytes())) -} +// Size returns the largest end offset written so far. +func (b *BufferReadWriter) Size() int64 { return b.written.Load() } // Cancel is no-op func (b *BufferReadWriter) Cancel() error { @@ -132,7 +171,7 @@ func (b *BufferReadWriter) Commit() error { return nil } -// Bytes returns the full buffer +// Bytes returns the bytes that have been written so far. func (b *BufferReadWriter) Bytes() []byte { - return b.buf.Bytes() + return b.buf[:b.written.Load()] } diff --git a/lib/store/base/buffer_readwriter_test.go b/lib/store/base/buffer_readwriter_test.go index b55aa4b0f..e516aa635 100644 --- a/lib/store/base/buffer_readwriter_test.go +++ b/lib/store/base/buffer_readwriter_test.go @@ -15,6 +15,7 @@ package base import ( + "errors" "fmt" "io" "runtime" @@ -314,7 +315,6 @@ func TestBufferReadWriter_TestReader(t *testing.T) { // TestBufferReadWriter_ConcurrentWriteAt validates that concurrent writes to // non-overlapping byte ranges on a pre-sized buffer produce correct results. -// Run with -race to confirm no data races. func TestBufferReadWriter_ConcurrentWriteAt(t *testing.T) { const numShards, shardSize = 10, 1024 data := make([]byte, numShards*shardSize) @@ -323,35 +323,40 @@ func TestBufferReadWriter_ConcurrentWriteAt(t *testing.T) { } buf := NewBufferReadWriter(numShards * shardSize) + errs := make([]error, numShards) var wg sync.WaitGroup for i := 0; i < numShards; i++ { wg.Add(1) go func(shard int) { defer wg.Done() - off := int64(shard * shardSize) - _, err := buf.WriteAt(data[off:off+shardSize], off) - require.NoError(t, err) + off := shard * shardSize + _, errs[shard] = buf.WriteAt(data[off:off+shardSize], int64(off)) }(i) } wg.Wait() + + require.NoError(t, errors.Join(errs...)) assert.Equal(t, data, buf.Bytes()) } -// totalMutexContentions returns the total number of mutex contention events -// recorded in the runtime mutex profile since profiling was enabled. func totalMutexContentions() int64 { - records := make([]runtime.BlockProfileRecord, 10000) - n, _ := runtime.MutexProfile(records) - var total int64 - for i := 0; i < n; i++ { - total += records[i].Count + size := 1024 + for { + records := make([]runtime.BlockProfileRecord, size) + n, ok := runtime.MutexProfile(records) + if ok { + var total int64 + for i := 0; i < n; i++ { + total += records[i].Count + } + return total + } + size = n + 64 } - return total } // benchmarkWriteAt is the shared helper for all WriteAt benchmarks. // numShards goroutines each write a non-overlapping 4 MiB shard concurrently, -// matching the transfermanager production workload. // initSize controls the buffer's initial allocation: // - initSize == totalSize: pre-sized fast path (production case) // - initSize == 0: dynamic growth (sequential / wrong-size case) @@ -369,13 +374,15 @@ func benchmarkWriteAt(b *testing.B, numShards int, initSize uint64) { } } - runtime.SetMutexProfileFraction(1) + prev := runtime.SetMutexProfileFraction(1) + defer runtime.SetMutexProfileFraction(prev) b.ResetTimer() b.SetBytes(int64(totalSize)) b.ReportAllocs() startContentions := totalMutexContentions() + errs := make([]error, numShards) for i := 0; i < b.N; i++ { buf := NewBufferReadWriter(initSize) var wg sync.WaitGroup @@ -383,11 +390,13 @@ func benchmarkWriteAt(b *testing.B, numShards int, initSize uint64) { wg.Add(1) go func(s int) { defer wg.Done() - _, err := buf.WriteAt(shards[s], int64(s)*shardSize) - require.NoError(b, err) + _, errs[s] = buf.WriteAt(shards[s], int64(s)*shardSize) }(shard) } wg.Wait() + if err := errors.Join(errs...); err != nil { + b.Fatal(err) + } } b.StopTimer() @@ -396,13 +405,8 @@ func benchmarkWriteAt(b *testing.B, numShards int, initSize uint64) { } } -// BenchmarkBufferReadWriter_WriteAt exercises three buffer initialisation -// strategies × three shard counts, giving a full picture of throughput and -// mutex contention under varying concurrency and pre-allocation. -// -// Run before and after the implementation change, then compare with: -// -// benchstat bench-results/before.txt bench-results/after.txt +// BenchmarkBufferReadWriter_WriteAt exercises three buffer initialization +// strategies × three shard counts. func BenchmarkBufferReadWriter_WriteAt(b *testing.B) { const shardSize = 4 * 1024 * 1024 cases := []struct { From 7c1f47655a432d46afc158e60047e768f0785cf6 Mon Sep 17 00:00:00 2001 From: Sambhav Jain Date: Mon, 11 May 2026 13:21:42 +0000 Subject: [PATCH 3/6] add new test case for checking actual values --- lib/store/base/buffer_readwriter_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/store/base/buffer_readwriter_test.go b/lib/store/base/buffer_readwriter_test.go index e516aa635..768e7fc46 100644 --- a/lib/store/base/buffer_readwriter_test.go +++ b/lib/store/base/buffer_readwriter_test.go @@ -53,6 +53,7 @@ func TestBufferReadWriter_Write(t *testing.T) { } assert.Equal(t, tt.expectedSize, buf.Size()) + assert.Equal(t, tt.expectedResult, buf.Bytes()) }) } } @@ -115,6 +116,9 @@ func TestBufferReadWriter_WriteAt(t *testing.T) { } assert.Equal(t, tt.expectedSize, buf.Size()) + if tt.expectedResult != nil { + assert.Equal(t, tt.expectedResult, buf.Bytes()) + } }) } } From f3785c7143487c3f554a5ff031db34f6d1e0b358 Mon Sep 17 00:00:00 2001 From: Sambhav Jain Date: Mon, 18 May 2026 11:04:25 +0000 Subject: [PATCH 4/6] resolve comments --- lib/store/base/buffer_readwriter.go | 57 ++++++++++++++---------- lib/store/base/buffer_readwriter_test.go | 16 +++++-- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/lib/store/base/buffer_readwriter.go b/lib/store/base/buffer_readwriter.go index 7e76fd409..f5d4f8e51 100644 --- a/lib/store/base/buffer_readwriter.go +++ b/lib/store/base/buffer_readwriter.go @@ -25,17 +25,12 @@ var _ FileReadWriter = &BufferReadWriter{} // BufferReadWriter implements FileReadWriter for in-memory buffering. // -// When created with size > 0, WriteAt takes a fast path using RLock, allowing -// concurrent goroutines writing to non-overlapping byte ranges to proceed in -// parallel with no serialization. The maximum written extent is tracked via an -// atomic so that Bytes(), Size(), Read, and ReadAt return only the data that -// was actually written. +// Pre-sizing (size > 0) allows concurrent WriteAt calls to non-overlapping +// ranges to run in parallel. Without pre-sizing, each write that grows the +// buffer is serialized. // -// When created with size == 0, every WriteAt that extends the buffer acquires a -// full write lock to grow the backing slice. -// -// Write, Read, ReadAt, and Seek must not be called concurrently with each other -// or with WriteAt. +// Bytes, Size, Write, Read, ReadAt, and Seek must not be called concurrently +// with each other or with WriteAt. type BufferReadWriter struct { mu sync.RWMutex buf []byte @@ -44,8 +39,8 @@ type BufferReadWriter struct { } // NewBufferReadWriter creates a new BufferReadWriter pre-allocated to size bytes. -// Pass the exact blob size when known to enable lock-free -// concurrent WriteAt calls for non-overlapping shard ranges. +// Pass the exact blob size when known so concurrent WriteAt calls for +// non-overlapping shard ranges can run in parallel without writer serialization. func NewBufferReadWriter(size uint64) *BufferReadWriter { return &BufferReadWriter{buf: make([]byte, size)} } @@ -67,25 +62,20 @@ func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) { if off < 0 { return 0, fmt.Errorf("negative offset") } + if len(p) == 0 { + return 0, nil + } end := off + int64(len(p)) if end < off { return 0, fmt.Errorf("write at offset %d length %d overflows int64", off, len(p)) } - b.mu.RLock() - if end <= int64(len(b.buf)) { - n := copy(b.buf[off:], p) - for { - cur := b.written.Load() - if end <= cur || b.written.CompareAndSwap(cur, end) { - break - } - } - b.mu.RUnlock() + // fast path + if n, shouldGrowBuffer := b.updateBuffer(p, off, end); !shouldGrowBuffer { return n, nil } - b.mu.RUnlock() + // slow path b.mu.Lock() defer b.mu.Unlock() if end > int64(len(b.buf)) { @@ -100,6 +90,24 @@ func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) { return n, nil } +// updateBuffer copies p into [off, end). +// Returns -1, false if end exceeds the buffer, which cause WriteAt to take the slow path +func (b *BufferReadWriter) updateBuffer(p []byte, off, end int64) (int, bool) { + b.mu.RLock() + defer b.mu.RUnlock() + if end <= int64(len(b.buf)) { + n := copy(b.buf[off:], p) + for { + cur := b.written.Load() + if end <= cur || b.written.CompareAndSwap(cur, end) { + break + } + } + return n, false + } + return -1, true +} + // Read implements io.Reader for sequential reads. func (b *BufferReadWriter) Read(p []byte) (n int, err error) { written := b.written.Load() @@ -171,7 +179,8 @@ func (b *BufferReadWriter) Commit() error { return nil } -// Bytes returns the bytes that have been written so far. +// Bytes returns the buffer up to the highest offset written so far. +// Any gaps between writes are zero-filled. func (b *BufferReadWriter) Bytes() []byte { return b.buf[:b.written.Load()] } diff --git a/lib/store/base/buffer_readwriter_test.go b/lib/store/base/buffer_readwriter_test.go index 768e7fc46..b9e41cad5 100644 --- a/lib/store/base/buffer_readwriter_test.go +++ b/lib/store/base/buffer_readwriter_test.go @@ -343,6 +343,15 @@ func TestBufferReadWriter_ConcurrentWriteAt(t *testing.T) { assert.Equal(t, data, buf.Bytes()) } +func TestBufferReadWriter_WriteAtEmpty(t *testing.T) { + buf := NewBufferReadWriter(0) + n, err := buf.WriteAt(nil, 1<<30) + require.NoError(t, err) + assert.Equal(t, 0, n) + assert.Equal(t, int64(0), buf.Size()) + assert.Empty(t, buf.Bytes()) +} + func totalMutexContentions() int64 { size := 1024 for { @@ -380,14 +389,13 @@ func benchmarkWriteAt(b *testing.B, numShards int, initSize uint64) { prev := runtime.SetMutexProfileFraction(1) defer runtime.SetMutexProfileFraction(prev) - b.ResetTimer() + startContentions := totalMutexContentions() b.SetBytes(int64(totalSize)) b.ReportAllocs() - - startContentions := totalMutexContentions() + b.ResetTimer() errs := make([]error, numShards) - for i := 0; i < b.N; i++ { + for b.Loop() { buf := NewBufferReadWriter(initSize) var wg sync.WaitGroup for shard := 0; shard < numShards; shard++ { From cc5ac871b57ffce3bc070760a2b6df11889e2854 Mon Sep 17 00:00:00 2001 From: Sambhav Jain <67923444+sambhav-jain-16@users.noreply.github.com> Date: Fri, 22 May 2026 10:39:20 +0200 Subject: [PATCH 5/6] fix docs Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- lib/store/base/buffer_readwriter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/store/base/buffer_readwriter.go b/lib/store/base/buffer_readwriter.go index f5d4f8e51..e33b649b0 100644 --- a/lib/store/base/buffer_readwriter.go +++ b/lib/store/base/buffer_readwriter.go @@ -91,7 +91,7 @@ func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) { } // updateBuffer copies p into [off, end). -// Returns -1, false if end exceeds the buffer, which cause WriteAt to take the slow path +// Returns (-1, true) if end exceeds the buffer, causing WriteAt to take the slow path. func (b *BufferReadWriter) updateBuffer(p []byte, off, end int64) (int, bool) { b.mu.RLock() defer b.mu.RUnlock() From 190c3fb99feb4b29090a3db74e76a8b43eda44c8 Mon Sep 17 00:00:00 2001 From: Sambhav Jain Date: Fri, 22 May 2026 09:49:35 +0000 Subject: [PATCH 6/6] add test and update WriteAt --- lib/store/base/buffer_readwriter.go | 28 ++++++++----- lib/store/base/buffer_readwriter_test.go | 51 +++++++++++++++--------- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/lib/store/base/buffer_readwriter.go b/lib/store/base/buffer_readwriter.go index e33b649b0..1b009f994 100644 --- a/lib/store/base/buffer_readwriter.go +++ b/lib/store/base/buffer_readwriter.go @@ -56,8 +56,10 @@ func (b *BufferReadWriter) Write(p []byte) (n int, err error) { // // Fast path (off+len(p) within pre-allocated buffer): multiple goroutines may // call WriteAt concurrently, provided their byte ranges do not overlap. +// // Slow path (write extends beyond current buffer): acquires an exclusive lock -// to grow the buffer, then writes. +// only to grow the buffer, then copies under a shared read lock so other +// writers can proceed in parallel. func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) { if off < 0 { return 0, fmt.Errorf("negative offset") @@ -75,19 +77,23 @@ func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) { return n, nil } - // slow path + // slow path: grow, then call updateBuffer again + b.growBuffer(end) + n, _ := b.updateBuffer(p, off, end) + return n, nil +} + +// growBuffer expands b.buf to at least size bytes. It is a no-op when the +// buffer is already large enough. +func (b *BufferReadWriter) growBuffer(size int64) { b.mu.Lock() defer b.mu.Unlock() - if end > int64(len(b.buf)) { - grown := make([]byte, end) - copy(grown, b.buf) - b.buf = grown - } - n := copy(b.buf[off:], p) - if end > b.written.Load() { - b.written.Store(end) + if size <= int64(len(b.buf)) { + return } - return n, nil + grown := make([]byte, size) + copy(grown, b.buf) + b.buf = grown } // updateBuffer copies p into [off, end). diff --git a/lib/store/base/buffer_readwriter_test.go b/lib/store/base/buffer_readwriter_test.go index b9e41cad5..7316290ff 100644 --- a/lib/store/base/buffer_readwriter_test.go +++ b/lib/store/base/buffer_readwriter_test.go @@ -317,30 +317,43 @@ func TestBufferReadWriter_TestReader(t *testing.T) { require.NoError(t, err) } -// TestBufferReadWriter_ConcurrentWriteAt validates that concurrent writes to -// non-overlapping byte ranges on a pre-sized buffer produce correct results. func TestBufferReadWriter_ConcurrentWriteAt(t *testing.T) { const numShards, shardSize = 10, 1024 - data := make([]byte, numShards*shardSize) - for i := range data { - data[i] = byte(i % 256) - } + totalSize := uint64(numShards * shardSize) - buf := NewBufferReadWriter(numShards * shardSize) - errs := make([]error, numShards) - var wg sync.WaitGroup - for i := 0; i < numShards; i++ { - wg.Add(1) - go func(shard int) { - defer wg.Done() - off := shard * shardSize - _, errs[shard] = buf.WriteAt(data[off:off+shardSize], int64(off)) - }(i) + tests := []struct { + name string + initSize uint64 + }{ + {"presized", totalSize}, + {"half_presized", totalSize / 2}, + {"dynamic", 0}, } - wg.Wait() - require.NoError(t, errors.Join(errs...)) - assert.Equal(t, data, buf.Bytes()) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data := make([]byte, totalSize) + for i := range data { + data[i] = byte(i % 256) + } + + buf := NewBufferReadWriter(tt.initSize) + errs := make([]error, numShards) + var wg sync.WaitGroup + for i := 0; i < numShards; i++ { + wg.Add(1) + go func(shard int) { + defer wg.Done() + off := shard * shardSize + _, errs[shard] = buf.WriteAt(data[off:off+shardSize], int64(off)) + }(i) + } + wg.Wait() + + require.NoError(t, errors.Join(errs...)) + assert.Equal(t, data, buf.Bytes()) + }) + } } func TestBufferReadWriter_WriteAtEmpty(t *testing.T) {