Skip to content

Commit a09135f

Browse files
committed
[dbsql] Insert Batching and Tranposed Array Inserts for Compatble SQL Implementations
Signed-off-by: hfuss <hayden.fuss@kaleido.io>
1 parent 10dbe1a commit a09135f

9 files changed

Lines changed: 369 additions & 21 deletions

File tree

pkg/dbsql/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ const (
3737
SQLConfMaxIdleConns = "maxIdleConns"
3838
// SQLConfMaxConnLifetime maximum connections to the database
3939
SQLConfMaxConnLifetime = "maxConnLifetime"
40+
// SQLConfMaxPlaceholders overrides the provider's max SQL placeholders per statement (0 = use provider default)
41+
SQLConfMaxPlaceholders = "maxPlaceholders"
4042
)
4143

4244
const (
@@ -51,4 +53,5 @@ func (s *Database) InitConfig(provider Provider, config config.Section) {
5153
config.AddKnownKey(SQLConfMaxConnIdleTime, "1m")
5254
config.AddKnownKey(SQLConfMaxIdleConns) // defaults to the max connections
5355
config.AddKnownKey(SQLConfMaxConnLifetime)
56+
config.AddKnownKey(SQLConfMaxPlaceholders) // overrides provider default for max SQL placeholders
5457
}

pkg/dbsql/crud.go

Lines changed: 94 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"fmt"
2424
"reflect"
2525
"strings"
26+
"time"
2627

2728
sq "github.com/Masterminds/squirrel"
2829
"github.com/hyperledger/firefly-common/pkg/ffapi"
@@ -496,9 +497,50 @@ func (c *CrudBase[T]) InsertMany(ctx context.Context, instances []T, allowPartia
496497
return err
497498
}
498499
defer c.DB.RollbackTx(ctx, tx, autoCommit)
499-
if c.DB.Features().MultiRowInsert {
500-
insert := sq.Insert(c.Table).Columns(c.Columns...)
500+
if c.DB.Features().ArrayInsertBuilder != nil {
501+
if err := c.insertManyArray(ctx, tx, instances); err != nil {
502+
return err
503+
}
504+
} else if c.DB.Features().MultiRowInsert {
505+
if err := c.insertManyMultiRow(ctx, tx, instances, allowPartialSuccess); err != nil {
506+
return err
507+
}
508+
} else {
501509
for _, inst := range instances {
510+
err := c.attemptInsert(ctx, tx, inst, false)
511+
if err != nil {
512+
return err
513+
}
514+
}
515+
}
516+
517+
for _, hook := range hooks {
518+
tx.AddPostCommitHook(hook)
519+
}
520+
521+
return c.DB.CommitTx(ctx, tx, autoCommit)
522+
523+
}
524+
525+
func (c *CrudBase[T]) insertManyMultiRow(ctx context.Context, tx *TXWrapper, instances []T, allowPartialSuccess bool) error {
526+
chunkSize := len(instances)
527+
if maxPlaceholders := c.DB.Features().MaxPlaceholders; maxPlaceholders > 0 && len(c.Columns) > 0 {
528+
chunkSize = maxPlaceholders / len(c.Columns)
529+
if chunkSize < 1 {
530+
chunkSize = 1
531+
}
532+
}
533+
534+
allSequences := make([]int64, len(instances))
535+
for offset := 0; offset < len(instances); offset += chunkSize {
536+
end := offset + chunkSize
537+
if end > len(instances) {
538+
end = len(instances)
539+
}
540+
chunk := instances[offset:end]
541+
542+
insert := sq.Insert(c.Table).Columns(c.Columns...)
543+
for _, inst := range chunk {
502544
c.setInsertTimestamps(inst, fftypes.Now())
503545
values := make([]interface{}, len(c.Columns))
504546
for i, col := range c.Columns {
@@ -507,10 +549,9 @@ func (c *CrudBase[T]) InsertMany(ctx context.Context, instances []T, allowPartia
507549
insert = insert.Values(values...)
508550
}
509551

510-
// Use a single multi-row insert
511-
sequences := make([]int64, len(instances))
552+
sequences := allSequences[offset:end]
512553
err := c.DB.InsertTxRows(ctx, c.Table, tx, insert, func() {
513-
for _, inst := range instances {
554+
for _, inst := range chunk {
514555
if c.EventHandler != nil {
515556
c.EventHandler(inst.GetID(), Created)
516557
}
@@ -519,27 +560,60 @@ func (c *CrudBase[T]) InsertMany(ctx context.Context, instances []T, allowPartia
519560
if err != nil {
520561
return err
521562
}
522-
if len(sequences) == len(instances) {
523-
for i, seq := range sequences {
524-
c.attemptSetSequence(instances[i], seq)
525-
}
526-
}
527-
} else {
528-
// Fall back to individual inserts grouped in a TX
529-
for _, inst := range instances {
530-
err := c.attemptInsert(ctx, tx, inst, false)
531-
if err != nil {
532-
return err
533-
}
563+
}
564+
565+
for i, seq := range allSequences {
566+
c.attemptSetSequence(instances[i], seq)
567+
}
568+
return nil
569+
}
570+
571+
func (c *CrudBase[T]) insertManyArray(ctx context.Context, tx *TXWrapper, instances []T) error {
572+
l := log.L(ctx)
573+
builder := c.DB.Features().ArrayInsertBuilder
574+
575+
columnValues := make([][]interface{}, len(c.Columns))
576+
for i := range c.Columns {
577+
columnValues[i] = make([]interface{}, len(instances))
578+
}
579+
580+
for rowIdx, inst := range instances {
581+
c.setInsertTimestamps(inst, fftypes.Now())
582+
for colIdx, col := range c.Columns {
583+
columnValues[colIdx][rowIdx] = c.getFieldValue(inst, col)
534584
}
535585
}
536586

537-
for _, hook := range hooks {
538-
tx.AddPostCommitHook(hook)
587+
sqlQuery, args, err := builder(ctx, c.Table, c.Columns, columnValues, c.DB.SequenceColumn())
588+
if err != nil {
589+
return i18n.WrapError(ctx, err, i18n.MsgDBQueryBuildFailed)
539590
}
540591

541-
return c.DB.CommitTx(ctx, tx, autoCommit)
592+
before := time.Now()
593+
l.Tracef(`SQL-> array insert: %s (args: %d arrays)`, sqlQuery, len(args))
594+
rows, err := tx.sqlTX.QueryContext(ctx, sqlQuery, args...)
595+
if err != nil {
596+
l.Errorf(`SQL array insert failed: %s sql=[ %s ]: %s`, err, sqlQuery, err)
597+
return i18n.WrapError(ctx, err, i18n.MsgDBInsertFailed)
598+
}
599+
defer rows.Close()
600+
601+
for i := 0; i < len(instances) && rows.Next(); i++ {
602+
var seq int64
603+
if err := rows.Scan(&seq); err != nil {
604+
return i18n.WrapError(ctx, err, i18n.MsgDBReadErr, c.Table)
605+
}
606+
c.attemptSetSequence(instances[i], seq)
607+
}
608+
609+
for _, inst := range instances {
610+
if c.EventHandler != nil {
611+
c.EventHandler(inst.GetID(), Created)
612+
}
613+
}
542614

615+
l.Debugf(`SQL<- array insert %s rows=%d (%.2fms)`, c.Table, len(instances), floatMillisSince(before))
616+
return nil
543617
}
544618

545619
func (c *CrudBase[T]) Insert(ctx context.Context, inst T, hooks ...PostCompletionHook) (err error) {

pkg/dbsql/crud_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,157 @@ func TestInsertManyMultiRowFail(t *testing.T) {
10131013
assert.NoError(t, mock.ExpectationsWereMet())
10141014
}
10151015

1016+
func TestInsertManyMultiRowChunkedOK(t *testing.T) {
1017+
mp := NewMockProvider()
1018+
mp.MultiRowInsert = true
1019+
mp.MaxPlaceholders = 22 // 11 columns per row, so 2 rows per chunk
1020+
db, mock := mp.UTInit()
1021+
db.FakePSQLInsert = true
1022+
tc := newCRUDCollection(&db.Database, "ns1")
1023+
mock.ExpectBegin()
1024+
// First chunk: 2 rows
1025+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\),\(.*\)`).WillReturnRows(
1026+
sqlmock.NewRows([]string{db.sequenceColumn}).
1027+
AddRow(101).
1028+
AddRow(102),
1029+
)
1030+
// Second chunk: 1 row
1031+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\)`).WillReturnRows(
1032+
sqlmock.NewRows([]string{db.sequenceColumn}).
1033+
AddRow(103),
1034+
)
1035+
mock.ExpectCommit()
1036+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1037+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1038+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1039+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1040+
}, false)
1041+
assert.NoError(t, err)
1042+
assert.NoError(t, mock.ExpectationsWereMet())
1043+
}
1044+
1045+
func TestInsertManyMultiRowChunkedFailSecondChunk(t *testing.T) {
1046+
mp := NewMockProvider()
1047+
mp.MultiRowInsert = true
1048+
mp.MaxPlaceholders = 22
1049+
db, mock := mp.UTInit()
1050+
db.FakePSQLInsert = true
1051+
tc := newCRUDCollection(&db.Database, "ns1")
1052+
mock.ExpectBegin()
1053+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\),\(.*\)`).WillReturnRows(
1054+
sqlmock.NewRows([]string{db.sequenceColumn}).
1055+
AddRow(101).
1056+
AddRow(102),
1057+
)
1058+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\)`).WillReturnError(fmt.Errorf("pop"))
1059+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1060+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1061+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1062+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1063+
}, false)
1064+
assert.Regexp(t, "FF00177", err)
1065+
assert.NoError(t, mock.ExpectationsWereMet())
1066+
}
1067+
1068+
func TestInsertManyMultiRowNoChunkingWhenUnderLimit(t *testing.T) {
1069+
mp := NewMockProvider()
1070+
mp.MultiRowInsert = true
1071+
mp.MaxPlaceholders = 100 // 11 columns * 2 rows = 22, well under 100
1072+
db, mock := mp.UTInit()
1073+
db.FakePSQLInsert = true
1074+
tc := newCRUDCollection(&db.Database, "ns1")
1075+
mock.ExpectBegin()
1076+
mock.ExpectQuery(`INSERT INTO crudables.*VALUES \(.*\),\(.*\)`).WillReturnRows(
1077+
sqlmock.NewRows([]string{db.sequenceColumn}).
1078+
AddRow(201).
1079+
AddRow(202),
1080+
)
1081+
mock.ExpectCommit()
1082+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1083+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1084+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1085+
}, false)
1086+
assert.NoError(t, err)
1087+
assert.NoError(t, mock.ExpectationsWereMet())
1088+
}
1089+
1090+
func TestInsertManyArrayInsertOK(t *testing.T) {
1091+
mp := NewMockProvider()
1092+
mp.FakePSQLArrayInsert = true
1093+
db, mock := mp.UTInit()
1094+
tc := newCRUDCollection(&db.Database, "ns1")
1095+
mock.ExpectBegin()
1096+
mock.ExpectQuery(`INSERT INTO crudables .* SELECT UNNEST.*RETURNING seq`).WillReturnRows(
1097+
sqlmock.NewRows([]string{db.sequenceColumn}).
1098+
AddRow(301).
1099+
AddRow(302),
1100+
)
1101+
mock.ExpectCommit()
1102+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1103+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1104+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1105+
}, false)
1106+
assert.NoError(t, err)
1107+
assert.NoError(t, mock.ExpectationsWereMet())
1108+
}
1109+
1110+
func TestInsertManyArrayInsertFail(t *testing.T) {
1111+
mp := NewMockProvider()
1112+
mp.FakePSQLArrayInsert = true
1113+
db, mock := mp.UTInit()
1114+
tc := newCRUDCollection(&db.Database, "ns1")
1115+
mock.ExpectBegin()
1116+
mock.ExpectQuery(`INSERT INTO crudables .* SELECT UNNEST.*`).WillReturnError(fmt.Errorf("pop"))
1117+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1118+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1119+
}, false)
1120+
assert.Regexp(t, "FF00177", err)
1121+
assert.NoError(t, mock.ExpectationsWereMet())
1122+
}
1123+
1124+
func TestInsertManyArrayInsertPrefersOverMultiRow(t *testing.T) {
1125+
mp := NewMockProvider()
1126+
mp.FakePSQLArrayInsert = true
1127+
mp.MultiRowInsert = true
1128+
db, mock := mp.UTInit()
1129+
tc := newCRUDCollection(&db.Database, "ns1")
1130+
mock.ExpectBegin()
1131+
// Should use UNNEST, not multi-row VALUES
1132+
mock.ExpectQuery(`INSERT INTO crudables .* SELECT UNNEST.*RETURNING seq`).WillReturnRows(
1133+
sqlmock.NewRows([]string{db.sequenceColumn}).
1134+
AddRow(401),
1135+
)
1136+
mock.ExpectCommit()
1137+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1138+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1139+
}, false)
1140+
assert.NoError(t, err)
1141+
assert.NoError(t, mock.ExpectationsWereMet())
1142+
}
1143+
1144+
func TestInsertManyArrayInsertWithEvents(t *testing.T) {
1145+
mp := NewMockProvider()
1146+
mp.FakePSQLArrayInsert = true
1147+
db, mock := mp.UTInit()
1148+
tc := newCRUDCollection(&db.Database, "ns1")
1149+
mock.ExpectBegin()
1150+
mock.ExpectQuery(`INSERT INTO crudables .* SELECT UNNEST.*RETURNING seq`).WillReturnRows(
1151+
sqlmock.NewRows([]string{db.sequenceColumn}).
1152+
AddRow(501).
1153+
AddRow(502),
1154+
)
1155+
mock.ExpectCommit()
1156+
err := tc.InsertMany(context.Background(), []*TestCRUDable{
1157+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1158+
{ResourceBase: ResourceBase{ID: fftypes.NewUUID()}},
1159+
}, false)
1160+
assert.NoError(t, err)
1161+
assert.Equal(t, 2, len(tc.events))
1162+
assert.Equal(t, Created, tc.events[0])
1163+
assert.Equal(t, Created, tc.events[1])
1164+
assert.NoError(t, mock.ExpectationsWereMet())
1165+
}
1166+
10161167
func TestInsertManyFallbackSingleRowFail(t *testing.T) {
10171168
db, mock := NewMockProvider().UTInit()
10181169
tc := newCRUDCollection(&db.Database, "ns1")

pkg/dbsql/database.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ func (s *Database) Init(ctx context.Context, provider Provider, config config.Se
7979
s.features = s.provider.Features()
8080
s.sequenceColumn = s.provider.SequenceColumn()
8181

82+
if maxPlaceholders := config.GetInt(SQLConfMaxPlaceholders); maxPlaceholders > 0 {
83+
s.features.MaxPlaceholders = maxPlaceholders
84+
}
85+
8286
if config.GetString(SQLConfDatasourceURL) == "" {
8387
return i18n.NewError(ctx, i18n.MsgMissingConfig, "url", fmt.Sprintf("database.%s", s.provider.Name()))
8488
}

pkg/dbsql/database_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,25 @@ func TestInitDatabaseFeatures(t *testing.T) {
7878
assert.Equal(t, false, s.Features().MultiRowInsert)
7979
}
8080

81+
func TestInitDatabaseMaxPlaceholdersConfig(t *testing.T) {
82+
s := &Database{}
83+
tp := NewMockProvider()
84+
s.InitConfig(tp, tp.config)
85+
tp.config.Set(SQLConfMaxPlaceholders, 1000)
86+
err := s.Init(context.Background(), tp, tp.config)
87+
assert.NoError(t, err)
88+
assert.Equal(t, 1000, s.Features().MaxPlaceholders)
89+
}
90+
91+
func TestInitDatabaseMaxPlaceholdersProviderDefault(t *testing.T) {
92+
mp := NewMockProvider()
93+
mp.MaxPlaceholders = 65535
94+
mp.Database.InitConfig(mp, mp.config)
95+
err := mp.Database.Init(context.Background(), mp, mp.config)
96+
assert.NoError(t, err)
97+
assert.Equal(t, 65535, mp.Database.Features().MaxPlaceholders)
98+
}
99+
81100
func TestInitDatabaseOpenFailed(t *testing.T) {
82101
mp := NewMockProvider()
83102
mp.OpenError = fmt.Errorf("pop")

pkg/dbsql/mock_provider.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ type MockProviderConfig struct {
4646
GetMigrationDriverError error
4747
IndividualSort bool
4848
MultiRowInsert bool
49+
MaxPlaceholders int
4950
FakePSQLUpsertOptimization bool
51+
FakePSQLArrayInsert bool
5052
}
5153

5254
func NewMockProvider() *MockProvider {
@@ -88,6 +90,10 @@ func (mp *MockProvider) Features() SQLFeatures {
8890
return fmt.Sprintf(`<acquire lock %s>`, lockName)
8991
}
9092
features.MultiRowInsert = mp.MultiRowInsert
93+
features.MaxPlaceholders = mp.MaxPlaceholders
94+
if mp.FakePSQLArrayInsert {
95+
features.ArrayInsertBuilder = BuildPostgreSQLArrayInsert
96+
}
9197
if mp.FakePSQLUpsertOptimization {
9298
features.DBOptimizedUpsertBuilder = BuildPostgreSQLOptimizedUpsert
9399
}

0 commit comments

Comments
 (0)