Skip to content

Commit 5050f42

Browse files
committed
pool: unify memory between multipart and asyncreader to use one pool
Before this the multipart code and asyncreader used separate pools which is inefficient on memory use.
1 parent fcbcdea commit 5050f42

4 files changed

Lines changed: 46 additions & 52 deletions

File tree

fs/asyncreader/asyncreader.go

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"errors"
88
"io"
99
"sync"
10-
"time"
1110

1211
"github.com/rclone/rclone/fs"
1312
"github.com/rclone/rclone/lib/pool"
@@ -16,10 +15,8 @@ import (
1615

1716
const (
1817
// BufferSize is the default size of the async buffer
19-
BufferSize = 1024 * 1024
20-
softStartInitial = 4 * 1024
21-
bufferCacheSize = 64 // max number of buffers to keep in cache
22-
bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long
18+
BufferSize = pool.BufferSize
19+
softStartInitial = 4 * 1024
2320
)
2421

2522
// ErrorStreamAbandoned is returned when the input is closed before the end of the stream
@@ -42,6 +39,7 @@ type AsyncReader struct {
4239
closed bool // whether we have closed the underlying stream
4340
mu sync.Mutex // lock for Read/WriteTo/Abandon/Close
4441
ci *fs.ConfigInfo // for reading config
42+
pool *pool.Pool // pool to get memory from
4543
}
4644

4745
// New returns a reader that will asynchronously read from
@@ -58,7 +56,8 @@ func New(ctx context.Context, rd io.ReadCloser, buffers int) (*AsyncReader, erro
5856
return nil, errors.New("nil reader supplied")
5957
}
6058
a := &AsyncReader{
61-
ci: fs.GetConfig(ctx),
59+
ci: fs.GetConfig(ctx),
60+
pool: pool.Global(),
6261
}
6362
a.init(rd, buffers)
6463
return a, nil
@@ -104,24 +103,16 @@ func (a *AsyncReader) init(rd io.ReadCloser, buffers int) {
104103
}()
105104
}
106105

107-
// bufferPool is a global pool of buffers
108-
var bufferPool *pool.Pool
109-
var bufferPoolOnce sync.Once
110-
111106
// return the buffer to the pool (clearing it)
112107
func (a *AsyncReader) putBuffer(b *buffer) {
113-
bufferPool.Put(b.buf)
108+
a.pool.Put(b.buf)
114109
b.buf = nil
115110
}
116111

117112
// get a buffer from the pool
118113
func (a *AsyncReader) getBuffer() *buffer {
119-
bufferPoolOnce.Do(func() {
120-
// Initialise the buffer pool when used
121-
bufferPool = pool.New(bufferCacheFlushTime, BufferSize, bufferCacheSize, a.ci.UseMmap)
122-
})
123114
return &buffer{
124-
buf: bufferPool.Get(),
115+
buf: a.pool.Get(),
125116
}
126117
}
127118

lib/multipart/multipart.go

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"context"
66
"fmt"
77
"io"
8-
"sync"
9-
"time"
108

119
"github.com/rclone/rclone/fs"
1210
"github.com/rclone/rclone/fs/accounting"
@@ -18,30 +16,12 @@ import (
1816

1917
const (
2018
// BufferSize is the default size of the pages used in the reader
21-
BufferSize = 1024 * 1024
22-
bufferCacheSize = 64 // max number of buffers to keep in cache
23-
bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long
19+
BufferSize = pool.BufferSize
2420
)
2521

26-
// bufferPool is a global pool of buffers
27-
var (
28-
bufferPool *pool.Pool
29-
bufferPoolOnce sync.Once
30-
)
31-
32-
// get a buffer pool
33-
func getPool() *pool.Pool {
34-
bufferPoolOnce.Do(func() {
35-
ci := fs.GetConfig(context.Background())
36-
// Initialise the buffer pool when used
37-
bufferPool = pool.New(bufferCacheFlushTime, BufferSize, bufferCacheSize, ci.UseMmap)
38-
})
39-
return bufferPool
40-
}
41-
42-
// NewRW gets a pool.RW using the multipart pool
22+
// NewRW gets a pool.RW using the global pool
4323
func NewRW() *pool.RW {
44-
return pool.NewRW(getPool())
24+
return pool.NewRW(pool.Global())
4525
}
4626

4727
// UploadMultipartOptions options for the generic multipart upload

lib/pool/pool.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ import (
1313
"golang.org/x/sync/semaphore"
1414
)
1515

16+
const (
17+
// BufferSize is the page size of the Global() pool
18+
BufferSize = 1024 * 1024
19+
// BufferCacheSize is the max number of buffers to keep in the cache for the Global() pool
20+
BufferCacheSize = 64
21+
// BufferCacheFlushTime is the max time to keep buffers in the Global() pool
22+
BufferCacheFlushTime = 5 * time.Second
23+
)
24+
1625
// Pool of internal buffers
1726
//
1827
// We hold buffers in cache. Every time we Get or Put we update
@@ -67,6 +76,17 @@ func New(flushTime time.Duration, bufferSize, poolSize int, useMmap bool) *Pool
6776
return nil
6877
}
6978
}
79+
80+
// Initialise total memory limit if required
81+
totalMemoryInit.Do(func() {
82+
ci := fs.GetConfig(context.Background())
83+
84+
// Set max buffer memory limiter
85+
if ci.MaxBufferMemory > 0 {
86+
totalMemory = semaphore.NewWeighted(int64(ci.MaxBufferMemory))
87+
}
88+
})
89+
7090
bp.timer = time.AfterFunc(flushTime, bp.flushAged)
7191
return bp
7292
}
@@ -157,20 +177,10 @@ func (bp *Pool) updateMinFill() {
157177

158178
// acquire mem bytes of memory
159179
func (bp *Pool) acquire(mem int64) error {
160-
ctx := context.Background()
161-
162-
totalMemoryInit.Do(func() {
163-
ci := fs.GetConfig(ctx)
164-
165-
// Set max buffer memory limiter
166-
if ci.MaxBufferMemory > 0 {
167-
totalMemory = semaphore.NewWeighted(int64(ci.MaxBufferMemory))
168-
}
169-
})
170-
171180
if totalMemory == nil {
172181
return nil
173182
}
183+
ctx := context.Background()
174184
return totalMemory.Acquire(ctx, mem)
175185
}
176186

@@ -248,3 +258,17 @@ func (bp *Pool) Put(buf []byte) {
248258
bp.updateMinFill()
249259
bp.kickFlusher()
250260
}
261+
262+
// bufferPool is a global pool of buffers
263+
var bufferPool *Pool
264+
var bufferPoolOnce sync.Once
265+
266+
// Global gets a global pool of BufferSize, BufferCacheSize, BufferCacheFlushTime.
267+
func Global() *Pool {
268+
bufferPoolOnce.Do(func() {
269+
// Initialise the buffer pool when used
270+
ci := fs.GetConfig(context.Background())
271+
bufferPool = New(BufferCacheFlushTime, BufferSize, BufferCacheSize, ci.UseMmap)
272+
})
273+
return bufferPool
274+
}

lib/pool/pool_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,10 @@ func TestPoolMaxBufferMemory(t *testing.T) {
240240
totalMemoryInit = sync.Once{} // reset the sync.Once as it likely has been used
241241
totalMemory = nil
242242
bp := New(60*time.Second, 4096, 2, true)
243+
assert.NotNil(t, totalMemory)
243244

244245
assert.Equal(t, bp.alloced, 0)
245-
assert.Nil(t, totalMemory)
246246
buf := bp.Get()
247-
assert.NotNil(t, totalMemory)
248247
bp.Put(buf)
249248
assert.Equal(t, bp.alloced, 1)
250249

0 commit comments

Comments
 (0)