Skip to content

Commit f3e03d5

Browse files
committed
Execute big batches in chunks
Fixes tursodatabase/turso-cli#839 Refs tursodatabase/turso-cli#735 Signed-off-by: Piotr Jastrzebski <piotr@chiselstrike.com>
1 parent b0e77e6 commit f3e03d5

1 file changed

Lines changed: 109 additions & 4 deletions

File tree

libsql/internal/http/hranaV2/hranaV2.go

Lines changed: 109 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/json"
99
"errors"
1010
"fmt"
11+
"github.com/tursodatabase/libsql-client-go/sqliteparserutils"
1112
"io"
1213
"net/http"
1314
net_url "net/url"
@@ -303,10 +304,114 @@ func (h *hranaV2Conn) executeMsg(ctx context.Context, msg *hrana.PipelineRequest
303304
return result, nil
304305
}
305306

307+
type chunker struct {
308+
chunk []string
309+
iterator *sqliteparserutils.StatementIterator
310+
limit int
311+
}
312+
313+
func newChunker(iterator *sqliteparserutils.StatementIterator, limit int) *chunker {
314+
return &chunker{iterator: iterator, chunk: make([]string, 0, limit), limit: limit}
315+
}
316+
317+
func isTransactionStatement(stmt string) bool {
318+
patterns := [][]byte{[]byte("begin"), []byte("commit"), []byte("end"), []byte("rollback")}
319+
for _, p := range patterns {
320+
if len(stmt) >= len(p) && bytes.Equal(bytes.ToLower([]byte(stmt[0:len(p)])), p) {
321+
return true
322+
}
323+
}
324+
return false
325+
}
326+
327+
func (c *chunker) Next() (chunk []string, isEOF bool) {
328+
c.chunk = c.chunk[:0]
329+
var stmt string
330+
for !isEOF && len(c.chunk) < c.limit {
331+
stmt, _, isEOF = c.iterator.Next()
332+
// We need to skip transaction statements. Chunks run in a transaction by default.
333+
if stmt != "" && !isTransactionStatement(stmt) {
334+
c.chunk = append(c.chunk, stmt)
335+
}
336+
}
337+
return c.chunk, isEOF
338+
}
339+
340+
func (h *hranaV2Conn) executeSingleStmt(ctx context.Context, stmt string, wantRows bool) (*hrana.PipelineResponse, error) {
341+
msg := &hrana.PipelineRequest{}
342+
executeStream, err := hrana.ExecuteStream(stmt, nil, wantRows)
343+
if err != nil {
344+
return nil, fmt.Errorf("failed to execute SQL: %s\n%w", stmt, err)
345+
}
346+
msg.Add(*executeStream)
347+
res, err := h.executeMsg(ctx, msg)
348+
if err != nil {
349+
return nil, fmt.Errorf("failed to execute SQL: %s\n%w", stmt, err)
350+
}
351+
return res, nil
352+
}
353+
354+
func (h *hranaV2Conn) executeInChunks(ctx context.Context, query string, wantRows bool) (*hrana.PipelineResponse, error) {
355+
const chunkSize = 4096
356+
iterator := sqliteparserutils.CreateStatementIterator(query)
357+
chunker := newChunker(iterator, chunkSize)
358+
359+
chunk, isEOF := chunker.Next()
360+
if isEOF && len(chunk) == 1 {
361+
return h.executeSingleStmt(ctx, chunk[0], wantRows)
362+
}
363+
364+
_, err := h.executeSingleStmt(ctx, "BEGIN", false)
365+
if err != nil {
366+
return nil, err
367+
}
368+
369+
batch := &hrana.Batch{Steps: make([]hrana.BatchStep, chunkSize)}
370+
msg := &hrana.PipelineRequest{}
371+
msg.Add(hrana.StreamRequest{Type: "batch", Batch: batch})
372+
for idx := range batch.Steps {
373+
batch.Steps[idx].Stmt.WantRows = wantRows
374+
}
375+
376+
result := &hrana.PipelineResponse{}
377+
for {
378+
for idx := range chunk {
379+
batch.Steps[idx].Stmt.Sql = &chunk[idx]
380+
}
381+
if len(chunk) < chunkSize {
382+
// We can trim batch.Steps because this is the last chunk anyway.
383+
// isEOF has to be true at this point.
384+
batch.Steps = batch.Steps[:len(chunk)]
385+
}
386+
res, err := h.executeMsg(ctx, msg)
387+
if err != nil {
388+
h.closeStream()
389+
return nil, fmt.Errorf("failed to execute SQL:\n%w", err)
390+
}
391+
result.Baton = res.Baton
392+
result.BaseUrl = res.BaseUrl
393+
result.Results = append(result.Results, res.Results...)
394+
if isEOF {
395+
break
396+
}
397+
chunk, isEOF = chunker.Next()
398+
}
399+
_, err = h.executeSingleStmt(ctx, "COMMIT", false)
400+
if err != nil {
401+
h.closeStream()
402+
return nil, err
403+
}
404+
return result, nil
405+
}
406+
306407
func (h *hranaV2Conn) executeStmt(ctx context.Context, query string, args []driver.NamedValue, wantRows bool) (*hrana.PipelineResponse, error) {
408+
const querySizeLimitForChunking = 20 * 1024 * 1024
409+
if len(args) == 0 && len(query) > querySizeLimitForChunking && !h.schemaDb {
410+
return h.executeInChunks(ctx, query, wantRows)
411+
}
307412
stmts, params, err := shared.ParseStatementAndArgs(query, args)
308413
if err != nil {
309-
return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err)
414+
return nil, fmt.Errorf("failed to execute SQL:\n%w", err)
310415
}
311416
msg := &hrana.PipelineRequest{}
312417
if len(stmts) == 1 {
@@ -316,20 +421,20 @@ func (h *hranaV2Conn) executeStmt(ctx context.Context, query string, args []driv
316421
}
317422
executeStream, err := hrana.ExecuteStream(stmts[0], p, wantRows)
318423
if err != nil {
319-
return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err)
424+
return nil, fmt.Errorf("failed to execute SQL:\n%w", err)
320425
}
321426
msg.Add(*executeStream)
322427
} else {
323428
batchStream, err := hrana.BatchStream(stmts, params, wantRows, !h.schemaDb)
324429
if err != nil {
325-
return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err)
430+
return nil, fmt.Errorf("failed to execute SQL:\n%w", err)
326431
}
327432
msg.Add(*batchStream)
328433
}
329434

330435
resp, err := h.executeMsg(ctx, msg)
331436
if err != nil {
332-
return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err)
437+
return nil, fmt.Errorf("failed to execute SQL:\n%w", err)
333438
}
334439
return resp, nil
335440
}

0 commit comments

Comments
 (0)