Skip to content

Commit 8a56bcc

Browse files
update buffer read writer to use []byte array
1 parent 5192dbb commit 8a56bcc

1 file changed

Lines changed: 74 additions & 38 deletions

File tree

lib/store/base/buffer_readwriter.go

Lines changed: 74 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,54 +17,93 @@ package base
1717
import (
1818
"fmt"
1919
"io"
20-
21-
"github.com/aws/aws-sdk-go/aws"
20+
"sync"
21+
"sync/atomic"
2222
)
2323

2424
var _ FileReadWriter = &BufferReadWriter{}
2525

26-
// BufferReadWriter implements FileReadWriter interface for in-memory buffering.
26+
// BufferReadWriter implements FileReadWriter for in-memory buffering.
27+
//
28+
// When created with size > 0, WriteAt takes a fast path using RLock, allowing
29+
// concurrent goroutines writing to non-overlapping byte ranges to proceed in
30+
// parallel with no serialization. The maximum written extent is tracked via an
31+
// atomic so that Bytes(), Size(), Read, and ReadAt return only the data that
32+
// was actually written.
33+
//
34+
// When created with size == 0, every WriteAt that extends the buffer acquires a
35+
// full write lock to grow the backing slice.
36+
//
37+
// Write, Read, ReadAt, and Seek must not be called concurrently with each other
38+
// or with WriteAt.
2739
type BufferReadWriter struct {
28-
buf *aws.WriteAtBuffer
29-
offset int64
40+
mu sync.RWMutex
41+
buf []byte
42+
written atomic.Int64
43+
offset int64
3044
}
3145

32-
// NewBufferReadWriter creates a new BufferReadWriter with an initial capacity of size bytes.
46+
// NewBufferReadWriter creates a new BufferReadWriter pre-allocated to size bytes.
47+
// Pass the exact blob size when known to enable lock-free
48+
// concurrent WriteAt calls for non-overlapping shard ranges.
3349
func NewBufferReadWriter(size uint64) *BufferReadWriter {
34-
bytesSlice := make([]byte, 0, size)
35-
buf := aws.NewWriteAtBuffer(bytesSlice)
36-
// Although this is default, this is explicitly set to notify that we are reserving
37-
// only as much capacity as needed
38-
buf.GrowthCoeff = 1
39-
40-
return &BufferReadWriter{
41-
buf: buf,
42-
offset: 0,
43-
}
50+
return &BufferReadWriter{buf: make([]byte, size)}
4451
}
4552

46-
// Write implements io.Writer by using WriteAt with current write offset.
53+
// Write implements io.Writer using the current sequential write offset.
4754
func (b *BufferReadWriter) Write(p []byte) (n int, err error) {
48-
n, err = b.buf.WriteAt(p, b.offset)
55+
n, err = b.WriteAt(p, b.offset)
4956
b.offset += int64(n)
5057
return n, err
5158
}
5259

53-
// WriteAt implements io.WriterAt for parallel writes.
54-
func (b *BufferReadWriter) WriteAt(p []byte, off int64) (n int, err error) {
60+
// WriteAt implements io.WriterAt.
61+
//
62+
// Fast path (off+len(p) within pre-allocated buffer): multiple goroutines may
63+
// call WriteAt concurrently, provided their byte ranges do not overlap.
64+
// Slow path (write extends beyond current buffer): acquires an exclusive lock
65+
// to grow the buffer, then writes.
66+
func (b *BufferReadWriter) WriteAt(p []byte, off int64) (int, error) {
5567
if off < 0 {
5668
return 0, fmt.Errorf("negative offset")
5769
}
58-
return b.buf.WriteAt(p, off)
70+
end := off + int64(len(p))
71+
72+
b.mu.RLock()
73+
if end <= int64(len(b.buf)) {
74+
n := copy(b.buf[off:], p)
75+
for {
76+
cur := b.written.Load()
77+
if end <= cur || b.written.CompareAndSwap(cur, end) {
78+
break
79+
}
80+
}
81+
b.mu.RUnlock()
82+
return n, nil
83+
}
84+
b.mu.RUnlock()
85+
86+
b.mu.Lock()
87+
defer b.mu.Unlock()
88+
if end > int64(len(b.buf)) {
89+
grown := make([]byte, end)
90+
copy(grown, b.buf)
91+
b.buf = grown
92+
}
93+
n := copy(b.buf[off:], p)
94+
if end > b.written.Load() {
95+
b.written.Store(end)
96+
}
97+
return n, nil
5998
}
6099

61100
// Read implements io.Reader for sequential reads.
62101
func (b *BufferReadWriter) Read(p []byte) (n int, err error) {
63-
bufBytes := b.buf.Bytes()
64-
if b.offset >= int64(len(bufBytes)) {
102+
written := b.written.Load()
103+
if b.offset >= written {
65104
return 0, io.EOF
66105
}
67-
n = copy(p, bufBytes[b.offset:])
106+
n = copy(p, b.buf[b.offset:written])
68107
b.offset += int64(n)
69108
if n < len(p) {
70109
err = io.EOF
@@ -77,11 +116,14 @@ func (b *BufferReadWriter) ReadAt(p []byte, off int64) (n int, err error) {
77116
if off < 0 {
78117
return 0, fmt.Errorf("negative offset")
79118
}
80-
bufBytes := b.buf.Bytes()
81-
if off >= int64(len(bufBytes)) {
119+
b.mu.RLock()
120+
buf := b.buf
121+
written := b.written.Load()
122+
b.mu.RUnlock()
123+
if off >= written {
82124
return 0, io.EOF
83125
}
84-
n = copy(p, bufBytes[off:])
126+
n = copy(p, buf[off:written])
85127
if n < len(p) {
86128
err = io.EOF
87129
}
@@ -91,23 +133,19 @@ func (b *BufferReadWriter) ReadAt(p []byte, off int64) (n int, err error) {
91133
// Seek implements io.Seeker.
92134
func (b *BufferReadWriter) Seek(offset int64, whence int) (int64, error) {
93135
var newOffset int64
94-
bufSize := int64(len(b.buf.Bytes()))
95-
96136
switch whence {
97137
case io.SeekStart:
98138
newOffset = offset
99139
case io.SeekCurrent:
100140
newOffset = b.offset + offset
101141
case io.SeekEnd:
102-
newOffset = bufSize + offset
142+
newOffset = b.written.Load() + offset
103143
default:
104144
return 0, fmt.Errorf("invalid whence: %d", whence)
105145
}
106-
107146
if newOffset < 0 {
108147
return 0, fmt.Errorf("negative position: %d", newOffset)
109148
}
110-
111149
b.offset = newOffset
112150
return newOffset, nil
113151
}
@@ -117,10 +155,8 @@ func (b *BufferReadWriter) Close() error {
117155
return nil
118156
}
119157

120-
// Size returns the size of the buffer
121-
func (b *BufferReadWriter) Size() int64 {
122-
return int64(len(b.buf.Bytes()))
123-
}
158+
// Size returns the largest end offset written so far.
159+
func (b *BufferReadWriter) Size() int64 { return b.written.Load() }
124160

125161
// Cancel is no-op
126162
func (b *BufferReadWriter) Cancel() error {
@@ -132,7 +168,7 @@ func (b *BufferReadWriter) Commit() error {
132168
return nil
133169
}
134170

135-
// Bytes returns the full buffer
171+
// Bytes returns the bytes that have been written so far.
136172
func (b *BufferReadWriter) Bytes() []byte {
137-
return b.buf.Bytes()
173+
return b.buf[:b.written.Load()]
138174
}

0 commit comments

Comments
 (0)