Skip to content

Commit a698ef8

Browse files
ermyaroleg-jukovec
authored andcommitted
internal: add Release method for Execute and Prepare Responses
Now, ExecuteResponse.Release() PrepareResponse.Release() are freeing Response resources by using sync.Pool. Closes #558
1 parent 7858b1b commit a698ef8

5 files changed

Lines changed: 174 additions & 3 deletions

File tree

prepared.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,10 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er
9090
if err != nil {
9191
return nil, err
9292
}
93+
resp := preparesPool.Get().(*PrepareResponse)
94+
resp.baseResponse = baseResp
9395

94-
return &PrepareResponse{baseResponse: baseResp}, nil
96+
return resp, nil
9597
}
9698

9799
// UnprepareRequest helps you to create an unprepare request object for
@@ -205,6 +207,8 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp
205207
if err != nil {
206208
return nil, err
207209
}
210+
exec := executesPool.Get().(*ExecuteResponse)
211+
exec.baseResponse = baseResp
208212

209-
return &ExecuteResponse{baseResponse: baseResp}, nil
213+
return exec, nil
210214
}

request.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1160,14 +1160,23 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
11601160
return req
11611161
}
11621162

1163+
var executesPool = sync.Pool{
1164+
New: func() interface{} {
1165+
return &ExecuteResponse{}
1166+
},
1167+
}
1168+
11631169
// Response creates a response for the ExecuteRequest.
11641170
func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, error) {
11651171
baseResp, err := createBaseResponse(header, body)
11661172
if err != nil {
11671173
return nil, err
11681174
}
11691175

1170-
return &ExecuteResponse{baseResponse: baseResp}, nil
1176+
exec := executesPool.Get().(*ExecuteResponse)
1177+
exec.baseResponse = baseResp
1178+
1179+
return exec, nil
11711180
}
11721181

11731182
// WatchOnceRequest synchronously fetches the value currently associated with a

response.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"fmt"
55
"io"
6+
"sync"
67

78
"github.com/tarantool/go-iproto"
89
"github.com/vmihailenco/msgpack/v5"
@@ -79,6 +80,12 @@ type SelectResponse struct {
7980
// You need to cast to PrepareResponse a response from PrepareRequest.
8081
type PrepareResponse ExecuteResponse
8182

83+
var preparesPool = sync.Pool{
84+
New: func() interface{} {
85+
return &PrepareResponse{}
86+
},
87+
}
88+
8289
// ExecuteResponse is used for the execute requests.
8390
// It might contain meta-data and sql info.
8491
//
@@ -699,6 +706,20 @@ func (resp *SelectResponse) Pos() ([]byte, error) {
699706
return resp.pos, resp.err
700707
}
701708

709+
func (resp *ExecuteResponse) Release() {
710+
resp.baseResponse.Release()
711+
*resp = ExecuteResponse{}
712+
713+
executesPool.Put(resp)
714+
}
715+
716+
func (resp *PrepareResponse) Release() {
717+
resp.baseResponse.Release()
718+
*resp = PrepareResponse{}
719+
720+
preparesPool.Put(resp)
721+
}
722+
702723
// MetaData returns ExecuteResponse meta-data.
703724
// If the response was not decoded, this method will call Decode().
704725
func (resp *ExecuteResponse) MetaData() ([]ColumnMetaData, error) {

response_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,89 @@ func TestSelectResponseReleaseMultipleObjects(t *testing.T) {
154154
require.Equal(t, []interface{}{buf3}, data3)
155155
require.Equal(t, header3, resp3.Header())
156156
}
157+
158+
func TestExecuteResponseRelease(t *testing.T) {
159+
req := tarantool.NewExecuteRequest(insertQuery)
160+
161+
header := tarantool.Header{RequestId: 123}
162+
buf := []byte{'v', '3'}
163+
resp, err := req.Response(header, encodeResponseData(t, buf))
164+
165+
require.NoError(t, err)
166+
require.NotNil(t, resp)
167+
168+
resp.Release()
169+
170+
execResp, ok := resp.(*tarantool.ExecuteResponse)
171+
require.True(t, ok)
172+
require.Equal(t, tarantool.ExecuteResponse{}, *execResp)
173+
}
174+
175+
func TestExecuteResponseReleaseReuse(t *testing.T) {
176+
req := tarantool.NewExecuteRequest(insertQuery)
177+
178+
header1 := tarantool.Header{RequestId: 100}
179+
buf1 := []byte{'d', 'a', 't', 'a', '1'}
180+
resp1, err := req.Response(header1, encodeResponseData(t, buf1))
181+
require.NoError(t, err)
182+
183+
data1, err := resp1.Decode()
184+
require.NoError(t, err)
185+
require.Equal(t, []interface{}{buf1}, data1)
186+
187+
resp1.Release()
188+
189+
header2 := tarantool.Header{RequestId: 200}
190+
buf2 := []byte{'d', 'a', 't', 'a', '2'}
191+
resp2, err := req.Response(header2, encodeResponseData(t, buf2))
192+
require.NoError(t, err)
193+
194+
data2, err := resp2.Decode()
195+
require.NoError(t, err)
196+
require.Equal(t, []interface{}{buf2}, data2)
197+
require.Equal(t, header2, resp2.Header())
198+
require.Equal(t, []interface{}{buf2}, data2)
199+
}
200+
201+
func TestPrepareResponseRelease(t *testing.T) {
202+
req := tarantool.NewPrepareRequest(insertQuery)
203+
204+
header := tarantool.Header{RequestId: 123}
205+
buf := []byte{'v', '3'}
206+
resp, err := req.Response(header, encodeResponseData(t, buf))
207+
208+
require.NoError(t, err)
209+
require.NotNil(t, resp)
210+
211+
resp.Release()
212+
213+
prepResp, ok := resp.(*tarantool.PrepareResponse)
214+
require.True(t, ok)
215+
require.Equal(t, tarantool.PrepareResponse{}, *prepResp)
216+
}
217+
218+
func TestPrepareResponseReleaseReuse(t *testing.T) {
219+
req := tarantool.NewPrepareRequest(insertQuery)
220+
221+
header1 := tarantool.Header{RequestId: 100}
222+
buf1 := []byte{'d', 'a', 't', 'a', '1'}
223+
resp1, err := req.Response(header1, encodeResponseData(t, buf1))
224+
require.NoError(t, err)
225+
226+
data1, err := resp1.Decode()
227+
require.NoError(t, err)
228+
require.Equal(t, []interface{}{buf1}, data1)
229+
230+
resp1.Release()
231+
232+
header2 := tarantool.Header{RequestId: 200}
233+
buf2 := []byte{'d', 'a', 't', 'a', '2'}
234+
resp2, err := req.Response(header2, encodeResponseData(t, buf2))
235+
require.NoError(t, err)
236+
237+
data2, err := resp2.Decode()
238+
require.NoError(t, err)
239+
require.Equal(t, []interface{}{buf2}, data2)
240+
require.Equal(t, header2, resp2.Header())
241+
require.Equal(t, []interface{}{buf2}, data2)
242+
}

tarantool_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,33 @@ func BenchmarkSync_naive_with_custom_type_with_Release(b *testing.B) {
271271
}
272272
}
273273

274+
func BenchmarkSync_execute_with_Release(b *testing.B) {
275+
test_helpers.SkipIfSQLUnsupported(b)
276+
277+
conn := test_helpers.ConnectWithValidation(b, dialer, opts)
278+
defer func() { _ = conn.Close() }()
279+
280+
var mem []Member
281+
282+
req := NewExecuteRequest(selectTypedQuery).Args(
283+
[]interface{}{1},
284+
)
285+
286+
b.ResetTimer()
287+
288+
for b.Loop() {
289+
fut := conn.Do(req)
290+
if err := fut.GetTyped(&mem); err != nil {
291+
b.Errorf("request error: %s", err)
292+
}
293+
294+
if len(mem) != 1 || mem[0].Name != "test" {
295+
b.Errorf("invalid result, got: %v", mem)
296+
}
297+
fut.Release()
298+
}
299+
}
300+
274301
func BenchmarkSync_multithread(b *testing.B) {
275302
var err error
276303

@@ -1536,6 +1563,30 @@ func TestNewPrepared(t *testing.T) {
15361563
}
15371564
}
15381565

1566+
func TestExecutePrepareResponseRelease(t *testing.T) {
1567+
test_helpers.SkipIfSQLUnsupported(t)
1568+
1569+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
1570+
1571+
stmt, err := conn.NewPrepared(selectNamedQuery2)
1572+
if err != nil {
1573+
t.Errorf("failed to prepare: %v", err)
1574+
}
1575+
1576+
req := NewExecutePreparedRequest(stmt)
1577+
1578+
resp, err := conn.Do(req.Args([]interface{}{1, "test"})).GetResponse()
1579+
if err != nil {
1580+
t.Errorf("failed to execute prepared: %v", err)
1581+
}
1582+
1583+
resp.Release()
1584+
1585+
exPrepResp, ok := resp.(*ExecuteResponse)
1586+
require.True(t, ok, "got wrong response type")
1587+
require.Equal(t, ExecuteResponse{}, *exPrepResp)
1588+
}
1589+
15391590
func TestConnection_DoWithStrangerConn(t *testing.T) {
15401591
expectedErr := fmt.Errorf("the passed connected request doesn't belong to the current" +
15411592
" connection or connection pool")

0 commit comments

Comments
 (0)