Skip to content

Commit 7858b1b

Browse files
committed
api: add Allocator interface for response buffers
Previously, response buffer allocation was hardcoded with an internal pooler implementation. Users had no control over memory management for responses, which could be problematic for applications with specific memory requirements or custom allocation strategies. The new Allocator interface allows users to implement custom buffer allocation and reuse strategies: ``` // Allocator is an interface for allocating and deallocating byte // slices. type Allocator interface { // Get returns a pointer to a byte slice of at least the given // length. // The caller should not assume anything about the slice's // capacity. // // If the allocator cannot allocate a buffer (e.g., invalid // length), it returns nil. The caller must handle this case // appropriately. Get(length int) *[]byte // Put returns a byte slice to the allocator for reuse. // After calling Put, the caller must not use the slice. // // The caller must ensure that the slice length remains unchanged // between Get and Put calls. Modifying the slice length before // calling Put may prevent the allocator from properly reusing the // buffer. Put(buf *[]byte) } ``` The PoolAllocator type provides a ready-to-use implementation based on sync.Pool for power-of-two sized byte slices. The Opts.Allocator option enables configuring a custom allocator for a connection. This is useful for applications that need to optimize memory usage or integrate with custom memory management systems. Closes #493
1 parent 7718cac commit 7858b1b

12 files changed

Lines changed: 794 additions & 225 deletions

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2323
* Resources allocated for a `Future` object created by the `Connection` type
2424
could be released with the `Future.Release()` call.
2525
* Added function String() for type interval (#322).
26+
* New `Allocator` interface for custom allocation of response buffers (#493).
27+
* New `PoolAllocator` type that implements `Allocator` using sync.Pool for
28+
power-of-two sized byte slices (#493).
29+
* New `Opts.Allocator` option to configure a custom allocator for a
30+
connection (#493).
2631

2732
### Changed
2833

alloc.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package tarantool
2+
3+
// Allocator is an interface for allocating and deallocating byte slices.
4+
type Allocator interface {
5+
// Get returns a pointer to a byte slice of at least the given length.
6+
// The caller should not assume anything about the slice's capacity.
7+
//
8+
// If the allocator cannot allocate a buffer (e.g., invalid length), it
9+
// returns nil. The caller must handle this case appropriately.
10+
Get(length int) *[]byte
11+
// Put returns a byte slice to the allocator for reuse.
12+
// After calling Put, the caller must not use the slice.
13+
//
14+
// The caller must ensure that the slice length remains unchanged between
15+
// Get and Put calls. Modifying the slice length before calling Put may
16+
// prevent the allocator from properly reusing the buffer.
17+
Put(buf *[]byte)
18+
}

connection.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ type Connection struct {
211211
// requestCnt is a counter of active requests.
212212
requestCnt int64
213213

214-
pool allocator
214+
// alloc is an allocator for response buffers.
215+
alloc Allocator
215216
}
216217

217218
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -329,9 +330,9 @@ type Opts struct {
329330
Handle interface{}
330331
// Logger is user specified logger used for error messages.
331332
Logger Logger
332-
// PoolSizes is a list of pool sizes (power-of-two) for each connection,
333-
// nil value means default -- not using pool inside.
334-
PoolSizes []int
333+
// Allocator is used to allocate and deallocate response buffers.
334+
// If nil, default Go allocation is used.
335+
Allocator Allocator
335336
}
336337

337338
// Connect creates and configures a new Connection.
@@ -378,7 +379,12 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
378379
conn.opts.Logger = defaultLogger{}
379380
}
380381

382+
if conn.opts.Allocator != nil {
383+
conn.alloc = conn.opts.Allocator
384+
}
385+
381386
conn.cond = sync.NewCond(&conn.mutex)
387+
382388
if conn.opts.Reconnect > 0 {
383389
// We don't need these mutex.Lock()/mutex.Unlock() here, but
384390
// runReconnects() expects mutex.Lock() to be set, so it's
@@ -400,10 +406,6 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
400406
go conn.timeouts()
401407
}
402408

403-
if conn.opts.PoolSizes != nil {
404-
conn.pool = newPooler(conn.opts.PoolSizes)
405-
}
406-
407409
// TODO: reload schema after reconnect.
408410
if !conn.opts.SkipSchema {
409411
schema, err := GetSchema(conn)
@@ -891,7 +893,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
891893
for atomic.LoadUint32(&conn.state) != connClosed {
892894
var err error
893895

894-
buf, err = read(r, conn.lenbuf[:], conn.pool)
896+
buf, err = read(r, conn.lenbuf[:], conn.alloc)
895897

896898
if err != nil {
897899
err = ClientError{
@@ -1225,36 +1227,40 @@ func (conn *Connection) timeouts() {
12251227
}
12261228
}
12271229

1228-
// read uses args to allocate slices for responses using sync.Pool.
1229-
// data must be released later using Release.
1230-
func read(r io.Reader, lenbuf []byte, pool allocator) (buf smallBuf, err error) {
1230+
// read reads a response from the reader and allocates a buffer using the
1231+
// allocator. If alloc is nil, a regular Go allocation is used.
1232+
// The returned buffer could be released using Release.
1233+
func read(r io.Reader, lenbuf []byte, alloc Allocator) (smallBuf, error) {
12311234
var length uint64
12321235

1233-
if _, err = io.ReadFull(r, lenbuf); err != nil {
1234-
return
1236+
if _, err := io.ReadFull(r, lenbuf); err != nil {
1237+
return smallBuf{}, fmt.Errorf("failed to read response length: %w", err)
12351238
}
1239+
12361240
if lenbuf[0] != 0xce {
1237-
err = errors.New("wrong response header")
1238-
return
1241+
return smallBuf{}, errors.New("wrong response header")
12391242
}
1243+
12401244
length = (uint64(lenbuf[1]) << 24) +
12411245
(uint64(lenbuf[2]) << 16) +
12421246
(uint64(lenbuf[3]) << 8) +
12431247
uint64(lenbuf[4])
12441248

12451249
switch {
12461250
case length == 0:
1247-
err = errors.New("response should not be 0 length")
1248-
return
1251+
return smallBuf{}, errors.New("response should not be 0 length")
12491252
case length > math.MaxUint32:
1250-
err = errors.New("response is too big")
1251-
return
1253+
return smallBuf{}, errors.New("response is too big")
12521254
}
12531255

1254-
buf = CreateBuf(pool, int(length))
1255-
_, err = io.ReadFull(r, buf.b)
1256+
buf := createBuf(alloc, int(length))
1257+
if _, err := io.ReadFull(r, buf.b); err != nil {
1258+
buf.Release()
12561259

1257-
return
1260+
return smallBuf{}, fmt.Errorf("failed to read response: %w", err)
1261+
}
1262+
1263+
return buf, nil
12581264
}
12591265

12601266
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {

example_custom_request_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func ExampleRequest_Response_manual() {
221221
defer func() { _ = conn.Close() }()
222222

223223
// Insert test data.
224-
_, err := conn.Do(tarantool.NewInsertRequest(spaceNo).
224+
_, err := conn.Do(tarantool.NewReplaceRequest(spaceNo).
225225
Tuple([]interface{}{uint(1111), "hello", "world"}),
226226
).Get()
227227
if err != nil {
@@ -258,7 +258,7 @@ func ExampleRequest_Response_manualDecodeTyped() {
258258
defer func() { _ = conn.Close() }()
259259

260260
// Insert test data.
261-
_, err := conn.Do(tarantool.NewInsertRequest(spaceNo).
261+
_, err := conn.Do(tarantool.NewReplaceRequest(spaceNo).
262262
Tuple([]interface{}{uint(1111), "hello", "world"}),
263263
).Get()
264264
if err != nil {

example_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,3 +1542,50 @@ func ExampleFdDialer() {
15421542
// Output:
15431543
// <nil>
15441544
}
1545+
1546+
// ExamplePoolAllocator demonstrates how to use PoolAllocator for
1547+
// buffer pooling to reduce memory allocations when working with
1548+
// a connection.
1549+
func ExamplePoolAllocator() {
1550+
// Create a pool allocator with pools for 256, 1024, and 4096 byte slices.
1551+
// Exponents must be sorted in ascending order and in range [0, 31].
1552+
allocator, err := tarantool.NewPoolAllocator([]int{8, 10, 12})
1553+
if err != nil {
1554+
fmt.Printf("Failed to create allocator: %s\n", err)
1555+
return
1556+
}
1557+
1558+
// Use the allocator with a connection.
1559+
optsWithAlloc := opts
1560+
optsWithAlloc.Allocator = allocator
1561+
1562+
conn := exampleConnect(dialer, optsWithAlloc)
1563+
defer func() { _ = conn.Close() }()
1564+
1565+
// Insert a tuple.
1566+
fut := conn.Do(tarantool.NewReplaceRequest("test").
1567+
Tuple([]interface{}{uint(1111), "hello", "world"}))
1568+
_, err = fut.Get()
1569+
if err != nil {
1570+
fmt.Printf("Failed to replace: %s\n", err)
1571+
return
1572+
}
1573+
// Release the buffer back to the pool after use.
1574+
fut.Release()
1575+
1576+
// Select the tuple - the allocator will be used for response buffers.
1577+
fut = conn.Do(tarantool.NewSelectRequest("test").
1578+
Index("primary").
1579+
Key(tarantool.UintKey{1111}))
1580+
data, err := fut.Get()
1581+
if err != nil {
1582+
fmt.Printf("Failed to select: %s\n", err)
1583+
return
1584+
}
1585+
fmt.Println(data)
1586+
// Release the buffer back to the pool after use.
1587+
fut.Release()
1588+
1589+
// Output:
1590+
// [[1111 hello world]]
1591+
}

poolalloc.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package tarantool
2+
3+
import (
4+
"errors"
5+
"math/bits"
6+
"slices"
7+
"sync"
8+
)
9+
10+
// PoolAllocator implements the Allocator interface using a set of sync.Pool
11+
// instances for different power-of-two sized byte slices.
12+
//
13+
// The exponents parameter in NewPoolAllocator specifies the powers of two for
14+
// the pool sizes. For example, []int{8, 10, 12} creates pools for 256, 1024,
15+
// and 4096 byte slices.
16+
type PoolAllocator struct {
17+
pool []*sync.Pool
18+
size []int
19+
help []int
20+
}
21+
22+
var _ Allocator = (*PoolAllocator)(nil)
23+
24+
// NewPoolAllocator creates a new PoolAllocator with the given exponents.
25+
// Each exponent represents a power of two pool size. For example, exponent 10
26+
// creates a pool for 1024-byte slices.
27+
//
28+
// Exponents must be sorted in ascending order and each exponent must be
29+
// in range [0, 31].
30+
func NewPoolAllocator(exponents []int) (*PoolAllocator, error) {
31+
hSize := 32
32+
33+
for i, s := range exponents {
34+
if s < 0 || s >= hSize {
35+
return nil, errors.New("exponent must be in range [0, 31]")
36+
}
37+
38+
if i > 0 && exponents[i-1] >= s {
39+
return nil, errors.New("exponents must be sorted in ascending order")
40+
}
41+
}
42+
43+
var p = PoolAllocator{
44+
size: make([]int, len(exponents)),
45+
pool: make([]*sync.Pool, len(exponents)),
46+
help: slices.Repeat([]int{-1}, hSize),
47+
}
48+
49+
for i, s := range exponents {
50+
p.size[i] = 1 << s
51+
p.help[s] = i
52+
p.pool[i] = &sync.Pool{
53+
New: func() interface{} {
54+
buf := make([]byte, p.size[i])
55+
return &buf
56+
},
57+
}
58+
}
59+
60+
for i := hSize - 2; i >= 0; i-- {
61+
if p.help[i] != -1 {
62+
continue
63+
}
64+
65+
p.help[i] = p.help[i+1]
66+
}
67+
68+
return &p, nil
69+
}
70+
71+
func (p *PoolAllocator) getInd(size int) int {
72+
if size <= 0 {
73+
return -1
74+
}
75+
76+
idx := bits.Len(uint(size - 1))
77+
if idx >= len(p.help) {
78+
return -1
79+
}
80+
81+
return p.help[idx]
82+
}
83+
84+
// Get returns a pointer to a byte slice of at least the given length.
85+
// If the requested size fits within one of the pool sizes, it returns a
86+
// slice from the appropriate pool.
87+
//
88+
// It returns nil if:
89+
// - length is less than or equal to zero
90+
// - length exceeds the maximum pool size
91+
func (p *PoolAllocator) Get(length int) *[]byte {
92+
if length <= 0 {
93+
return nil
94+
}
95+
96+
ind := p.getInd(length)
97+
if ind == -1 {
98+
return nil
99+
}
100+
101+
bs := p.pool[ind].Get().(*[]byte)
102+
*bs = (*bs)[:length]
103+
return bs
104+
}
105+
106+
// Put returns a byte slice to the appropriate pool for reuse.
107+
// The slice is cleared before being returned to the pool.
108+
// If the slice size doesn't match any pool, it is discarded.
109+
// If buf is nil, the method does nothing.
110+
func (p *PoolAllocator) Put(buf *[]byte) {
111+
if buf == nil {
112+
return
113+
}
114+
115+
if ind := p.getInd(len(*buf)); ind != -1 {
116+
clear((*buf)[:len(*buf)])
117+
p.pool[ind].Put(buf)
118+
}
119+
}

0 commit comments

Comments
 (0)