Skip to content

Commit 075e365

Browse files
Merge pull request #212 from kaleido-io/large-inserts-support
[dbsql] InsertMany Row Batching
2 parents 70619c6 + 058ac72 commit 075e365

6 files changed

Lines changed: 130 additions & 24 deletions

File tree

pkg/dbsql/crud.go

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,42 @@ func (c *CrudBase[T]) InsertMany(ctx context.Context, instances []T, allowPartia
497497
}
498498
defer c.DB.RollbackTx(ctx, tx, autoCommit)
499499
if c.DB.Features().MultiRowInsert {
500-
insert := sq.Insert(c.Table).Columns(c.Columns...)
500+
if err := c.insertManyMultiRow(ctx, tx, instances, allowPartialSuccess); err != nil {
501+
return err
502+
}
503+
} else {
501504
for _, inst := range instances {
505+
err := c.attemptInsert(ctx, tx, inst, false)
506+
if err != nil {
507+
return err
508+
}
509+
}
510+
}
511+
512+
for _, hook := range hooks {
513+
tx.AddPostCommitHook(hook)
514+
}
515+
516+
return c.DB.CommitTx(ctx, tx, autoCommit)
517+
518+
}
519+
520+
func (c *CrudBase[T]) insertManyMultiRow(ctx context.Context, tx *TXWrapper, instances []T, allowPartialSuccess bool) error {
521+
chunkSize := len(instances)
522+
if maxPlaceholders := c.DB.Features().MaxPlaceholders; maxPlaceholders > 0 && len(c.Columns) > 0 {
523+
chunkSize = maxPlaceholders / len(c.Columns)
524+
}
525+
526+
allSequences := make([]int64, len(instances))
527+
for offset := 0; offset < len(instances); offset += chunkSize {
528+
end := offset + chunkSize
529+
if end > len(instances) {
530+
end = len(instances)
531+
}
532+
chunk := instances[offset:end]
533+
534+
insert := sq.Insert(c.Table).Columns(c.Columns...)
535+
for _, inst := range chunk {
502536
c.setInsertTimestamps(inst, fftypes.Now())
503537
values := make([]interface{}, len(c.Columns))
504538
for i, col := range c.Columns {
@@ -507,10 +541,9 @@ func (c *CrudBase[T]) InsertMany(ctx context.Context, instances []T, allowPartia
507541
insert = insert.Values(values...)
508542
}
509543

510-
// Use a single multi-row insert
511-
sequences := make([]int64, len(instances))
544+
sequences := allSequences[offset:end]
512545
err := c.DB.InsertTxRows(ctx, c.Table, tx, insert, func() {
513-
for _, inst := range instances {
546+
for _, inst := range chunk {
514547
if c.EventHandler != nil {
515548
c.EventHandler(inst.GetID(), Created)
516549
}
@@ -519,27 +552,12 @@ func (c *CrudBase[T]) InsertMany(ctx context.Context, instances []T, allowPartia
519552
if err != nil {
520553
return err
521554
}
522-
if len(sequences) == len(instances) {
523-
for i, seq := range sequences {
524-
c.attemptSetSequence(instances[i], seq)
525-
}
526-
}
527-
} else {
528-
// Fall back to individual inserts grouped in a TX
529-
for _, inst := range instances {
530-
err := c.attemptInsert(ctx, tx, inst, false)
531-
if err != nil {
532-
return err
533-
}
534-
}
535555
}
536556

537-
for _, hook := range hooks {
538-
tx.AddPostCommitHook(hook)
557+
for i, seq := range allSequences {
558+
c.attemptSetSequence(instances[i], seq)
539559
}
540-
541-
return c.DB.CommitTx(ctx, tx, autoCommit)
542-
560+
return nil
543561
}
544562

545563
func (c *CrudBase[T]) Insert(ctx context.Context, inst T, hooks ...PostCompletionHook) (err error) {

pkg/dbsql/crud_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,80 @@ func TestInsertManyMultiRowFail(t *testing.T) {
10131013
assert.NoError(t, mock.ExpectationsWereMet())
10141014
}
10151015

1016+
func TestInsertManyMultiRowChunkedOK(t *testing.T) {
1017+
mp := NewMockProvider()
1018+
mp.MultiRowInsert = true
1019+
mp.MaxPlaceholders = 22 // 11 columns per row, so 2 rows per chunk
1020+
db, mock := mp.UTInit()
1021+
db.FakePSQLInsert = true
1022+
tc := newCRUDCollection(&db.Database, "ns1")
1023+
mock.ExpectBegin()
1024+
// First chunk: 2 rows
1025+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\),\(.*\)`).WillReturnRows(
1026+
sqlmock.NewRows([]string{db.sequenceColumn}).
1027+
AddRow(101).
1028+
AddRow(102),
1029+
)
1030+
// Second chunk: 1 row
1031+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\)`).WillReturnRows(
1032+
sqlmock.NewRows([]string{db.sequenceColumn}).
1033+
AddRow(103),
1034+
)
1035+
mock.ExpectCommit()
1036+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1037+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1038+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1039+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1040+
}, false)
1041+
assert.NoError(t, err)
1042+
assert.NoError(t, mock.ExpectationsWereMet())
1043+
}
1044+
1045+
func TestInsertManyMultiRowChunkedFailSecondChunk(t *testing.T) {
1046+
mp := NewMockProvider()
1047+
mp.MultiRowInsert = true
1048+
mp.MaxPlaceholders = 22
1049+
db, mock := mp.UTInit()
1050+
db.FakePSQLInsert = true
1051+
tc := newCRUDCollection(&db.Database, "ns1")
1052+
mock.ExpectBegin()
1053+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\),\(.*\)`).WillReturnRows(
1054+
sqlmock.NewRows([]string{db.sequenceColumn}).
1055+
AddRow(101).
1056+
AddRow(102),
1057+
)
1058+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\)`).WillReturnError(fmt.Errorf("pop"))
1059+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1060+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1061+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1062+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1063+
}, false)
1064+
assert.Regexp(t, "FF00177", err)
1065+
assert.NoError(t, mock.ExpectationsWereMet())
1066+
}
1067+
1068+
func TestInsertManyMultiRowNoChunkingWhenUnderLimit(t *testing.T) {
1069+
mp := NewMockProvider()
1070+
mp.MultiRowInsert = true
1071+
mp.MaxPlaceholders = 100 // 11 columns * 2 rows = 22, well under 100
1072+
db, mock := mp.UTInit()
1073+
db.FakePSQLInsert = true
1074+
tc := newCRUDCollection(&db.Database, "ns1")
1075+
mock.ExpectBegin()
1076+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\),\(.*\)`).WillReturnRows(
1077+
sqlmock.NewRows([]string{db.sequenceColumn}).
1078+
AddRow(201).
1079+
AddRow(202),
1080+
)
1081+
mock.ExpectCommit()
1082+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1083+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1084+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1085+
}, false)
1086+
assert.NoError(t, err)
1087+
assert.NoError(t, mock.ExpectationsWereMet())
1088+
}
1089+
10161090
func TestInsertManyFallbackSingleRowFail(t *testing.T) {
10171091
db, mock := NewMockProvider().UTInit()
10181092
tc := newCRUDCollection(&db.Database, "ns1")

pkg/dbsql/database_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,15 @@ func TestInitDatabaseFeatures(t *testing.T) {
7878
assert.Equal(t, false, s.Features().MultiRowInsert)
7979
}
8080

81+
func TestInitDatabaseMaxPlaceholdersProviderDefault(t *testing.T) {
82+
mp := NewMockProvider()
83+
mp.MaxPlaceholders = 65535
84+
mp.Database.InitConfig(mp, mp.config)
85+
err := mp.Database.Init(context.Background(), mp, mp.config)
86+
assert.NoError(t, err)
87+
assert.Equal(t, 65535, mp.Database.Features().MaxPlaceholders)
88+
}
89+
8190
func TestInitDatabaseOpenFailed(t *testing.T) {
8291
mp := NewMockProvider()
8392
mp.OpenError = fmt.Errorf("pop")

pkg/dbsql/mock_provider.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type MockProviderConfig struct {
5050
GetMigrationDriverError error
5151
IndividualSort bool
5252
MultiRowInsert bool
53+
MaxPlaceholders int
5354
FakePSQLUpsertOptimization bool
5455
}
5556

@@ -92,6 +93,7 @@ func (mp *MockProvider) Features() SQLFeatures {
9293
return fmt.Sprintf(`<acquire lock %s>`, lockName)
9394
}
9495
features.MultiRowInsert = mp.MultiRowInsert
96+
features.MaxPlaceholders = mp.MaxPlaceholders
9597
if mp.FakePSQLUpsertOptimization {
9698
features.DBOptimizedUpsertBuilder = BuildPostgreSQLOptimizedUpsert
9799
}

pkg/dbsql/postgres_helpers.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/hyperledger/firefly-common/pkg/i18n"
2727
)
2828

29-
// PostgreSQL helper to avoid implementing this lots of times in child packages
29+
// BuildPostgreSQLOptimizedUpsert is a PostgreSQL helper to avoid implementing this lots of times in child packages
3030
func BuildPostgreSQLOptimizedUpsert(ctx context.Context, table string, idColumn string, insertCols, updateCols []string, returnCol string, values map[string]driver.Value) (insert sq.InsertBuilder, err error) {
3131
insertValues := make([]interface{}, 0, len(insertCols))
3232
for _, c := range insertCols {
@@ -43,5 +43,4 @@ func BuildPostgreSQLOptimizedUpsert(ctx context.Context, table string, idColumn
4343
return insert, i18n.NewError(ctx, i18n.MsgDBErrorBuildingStatement, err)
4444
}
4545
return insert.Suffix(fmt.Sprintf("ON CONFLICT (%s) DO UPDATE", idColumn)).SuffixExpr(sq.Expr(updateSQL, updateValues...)).Suffix("RETURNING " + returnCol), nil
46-
4746
}

pkg/dbsql/provider.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ type SQLFeatures struct {
3434
// DB specific query builder for RDBMS-side optimized upsert, returning the requested column from the query
3535
// (the CRUD layer will request the create time column to detect if the record was new or not)
3636
DBOptimizedUpsertBuilder func(ctx context.Context, table string, idColumn string, insertCols, updateCols []string, returnCol string, values map[string]driver.Value) (sq.InsertBuilder, error)
37+
// MaxPlaceholders caps the number of SQL placeholders ($N) in a single statement.
38+
// When >0 and MultiRowInsert is true, InsertMany will chunk rows so that
39+
// rows*columns never exceeds this limit. PostgreSQL's wire protocol limit is 65535.
40+
MaxPlaceholders int
3741
}
3842

3943
func DefaultSQLProviderFeatures() SQLFeatures {

0 commit comments

Comments
 (0)