Skip to content

Commit e0e40bf

Browse files
resolve comments
1 parent 7c1f476 commit e0e40bf

2 files changed

Lines changed: 45 additions & 28 deletions

File tree

lib/store/base/buffer_readwriter.go

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,12 @@ var _ FileReadWriter = &BufferReadWriter{}
2525

2626
// BufferReadWriter implements FileReadWriter for in-memory buffering.
2727
//
28-
// When created with size > 0, WriteAt takes a fast path using RLock, allowing
29-
// concurrent goroutines writing to non-overlapping byte ranges to proceed in
30-
// parallel with no serialization. The maximum written extent is tracked via an
31-
// atomic so that Bytes(), Size(), Read, and ReadAt return only the data that
32-
// was actually written.
28+
// Pre-sizing (size > 0) allows concurrent WriteAt calls to non-overlapping
29+
// ranges to run in parallel. Without pre-sizing, each write that grows the
30+
// buffer is serialized.
3331
//
34-
// When created with size == 0, every WriteAt that extends the buffer acquires a
35-
// full write lock to grow the backing slice.
36-
//
37-
// Write, Read, ReadAt, and Seek must not be called concurrently with each other
38-
// or with WriteAt.
32+
// Bytes, Size, Write, Read, ReadAt, and Seek must not be called concurrently
33+
// with each other or with WriteAt.
3934
type BufferReadWriter struct {
4035
mu sync.RWMutex
4136
buf []byte
@@ -44,8 +39,8 @@ type BufferReadWriter struct {
4439
}
4540

4641
// NewBufferReadWriter creates a new BufferReadWriter pre-allocated to size bytes.
47-
// Pass the exact blob size when known to enable lock-free
48-
// concurrent WriteAt calls for non-overlapping shard ranges.
42+
// Pass the exact blob size when known so concurrent WriteAt calls for
43+
// non-overlapping shard ranges can run in parallel without writer serialization.
4944
func NewBufferReadWriter(size uint64) *BufferReadWriter {
5045
return &BufferReadWriter{buf: make([]byte, size)}
5146
}
@@ -67,25 +62,20 @@ func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) {
6762
if off < 0 {
6863
return 0, fmt.Errorf("negative offset")
6964
}
65+
if len(p) == 0 {
66+
return 0, nil
67+
}
7068
end := off + int64(len(p))
7169
if end < off {
7270
return 0, fmt.Errorf("write at offset %d length %d overflows int64", off, len(p))
7371
}
7472

75-
b.mu.RLock()
76-
if end <= int64(len(b.buf)) {
77-
n := copy(b.buf[off:], p)
78-
for {
79-
cur := b.written.Load()
80-
if end <= cur || b.written.CompareAndSwap(cur, end) {
81-
break
82-
}
83-
}
84-
b.mu.RUnlock()
73+
// fast path
74+
if n, shouldGrowThebuffer := b.updateBuffer(p, off, end); shouldGrowThebuffer == false {
8575
return n, nil
8676
}
87-
b.mu.RUnlock()
8877

78+
// slow path
8979
b.mu.Lock()
9080
defer b.mu.Unlock()
9181
if end > int64(len(b.buf)) {
@@ -100,6 +90,24 @@ func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) {
10090
return n, nil
10191
}
10292

93+
// updateBuffer copies p into [off, end).
94+
// Returns -1, false if end exceeds the buffer, which cause WriteAt to take the slow path
95+
func (b *BufferReadWriter) updateBuffer(p []byte, off, end int64) (int, bool) {
96+
b.mu.RLock()
97+
defer b.mu.RUnlock()
98+
if end <= int64(len(b.buf)) {
99+
n := copy(b.buf[off:], p)
100+
for {
101+
cur := b.written.Load()
102+
if end <= cur || b.written.CompareAndSwap(cur, end) {
103+
break
104+
}
105+
}
106+
return n, false
107+
}
108+
return -1, true
109+
}
110+
103111
// Read implements io.Reader for sequential reads.
104112
func (b *BufferReadWriter) Read(p []byte) (n int, err error) {
105113
written := b.written.Load()
@@ -171,7 +179,8 @@ func (b *BufferReadWriter) Commit() error {
171179
return nil
172180
}
173181

174-
// Bytes returns the bytes that have been written so far.
182+
// Bytes returns the buffer up to the highest offset written so far.
183+
// Any gaps between writes are zero-filled.
175184
func (b *BufferReadWriter) Bytes() []byte {
176185
return b.buf[:b.written.Load()]
177186
}

lib/store/base/buffer_readwriter_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,15 @@ func TestBufferReadWriter_ConcurrentWriteAt(t *testing.T) {
343343
assert.Equal(t, data, buf.Bytes())
344344
}
345345

346+
func TestBufferReadWriter_WriteAtEmpty(t *testing.T) {
347+
buf := NewBufferReadWriter(0)
348+
n, err := buf.WriteAt(nil, 1<<30)
349+
require.NoError(t, err)
350+
assert.Equal(t, 0, n)
351+
assert.Equal(t, int64(0), buf.Size())
352+
assert.Empty(t, buf.Bytes())
353+
}
354+
346355
func totalMutexContentions() int64 {
347356
size := 1024
348357
for {
@@ -380,14 +389,13 @@ func benchmarkWriteAt(b *testing.B, numShards int, initSize uint64) {
380389

381390
prev := runtime.SetMutexProfileFraction(1)
382391
defer runtime.SetMutexProfileFraction(prev)
383-
b.ResetTimer()
392+
startContentions := totalMutexContentions()
384393
b.SetBytes(int64(totalSize))
385394
b.ReportAllocs()
386-
387-
startContentions := totalMutexContentions()
395+
b.ResetTimer()
388396

389397
errs := make([]error, numShards)
390-
for i := 0; i < b.N; i++ {
398+
for b.Loop() {
391399
buf := NewBufferReadWriter(initSize)
392400
var wg sync.WaitGroup
393401
for shard := 0; shard < numShards; shard++ {

0 commit comments

Comments
 (0)