Skip to content

Commit d8a4c2c

Browse files
committed
Add query builder for splitting up range-insert for move-tables feature
1 parent 2b3fb21 commit d8a4c2c

2 files changed

Lines changed: 365 additions & 0 deletions

File tree

go/sql/builder.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package sql
77

88
import (
99
"fmt"
10+
"slices"
1011
"strconv"
1112
"strings"
1213
)
@@ -329,6 +330,143 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
329330
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait)
330331
}
331332

333+
type MoveTablesCopySelectQueryBuilder struct {
334+
preparedStatement string
335+
argsMapping []int
336+
argsCount int
337+
}
338+
339+
func NewMoveTablesCopySelectQueryBuilder(sourceDatabaseName, sourceTableName string, sharedColumns *ColumnList, uniqueKey string, uniqueKeyColumns *ColumnList, includeRangeStartValues bool) (*MoveTablesCopySelectQueryBuilder, error) {
340+
sourceDatabaseName = EscapeName(sourceDatabaseName)
341+
sourceTableName = EscapeName(sourceTableName)
342+
sharedColumnsNames := sharedColumns.Names()
343+
for i := range sharedColumnsNames {
344+
sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i])
345+
}
346+
sharedColumnsListing := strings.Join(sharedColumnsNames, ", ")
347+
uniqueKey = EscapeName(uniqueKey)
348+
var minRangeComparisonSign = GreaterThanComparisonSign
349+
if includeRangeStartValues {
350+
minRangeComparisonSign = GreaterThanOrEqualsComparisonSign
351+
}
352+
rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns)
353+
rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns)
354+
dummyArgs := make([]any, len(uniqueKeyColumns.Columns()))
355+
for i := range dummyArgs {
356+
dummyArgs[i] = i
357+
}
358+
var argsMapping []int
359+
360+
rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, dummyArgs, minRangeComparisonSign)
361+
if err != nil {
362+
return nil, err
363+
}
364+
for _, a := range rangeExplodedArgs {
365+
idx := slices.Index(dummyArgs, a)
366+
if idx == -1 {
367+
return nil, fmt.Errorf("failed to build args mapping, missing argument pointer %v", a)
368+
}
369+
argsMapping = append(argsMapping, idx)
370+
}
371+
372+
rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, dummyArgs, LessThanOrEqualsComparisonSign)
373+
if err != nil {
374+
return nil, err
375+
}
376+
for _, a := range rangeExplodedArgs {
377+
idx := slices.Index(dummyArgs, a)
378+
if idx == -1 {
379+
return nil, fmt.Errorf("failed to build args mapping, missing argument pointer %v", a)
380+
}
381+
argsMapping = append(argsMapping, idx+len(dummyArgs))
382+
}
383+
384+
stmt := fmt.Sprintf(`
385+
select /* gh-ost %s.%s */ %s
386+
from
387+
%s.%s
388+
force index (%s)
389+
where
390+
(%s and %s)
391+
`,
392+
sourceDatabaseName, sourceTableName, sharedColumnsListing,
393+
sourceDatabaseName, sourceTableName,
394+
uniqueKey,
395+
rangeStartComparison, rangeEndComparison,
396+
)
397+
return &MoveTablesCopySelectQueryBuilder{
398+
preparedStatement: stmt,
399+
argsMapping: argsMapping,
400+
argsCount: len(dummyArgs) * 2,
401+
}, nil
402+
}
403+
404+
func (b *MoveTablesCopySelectQueryBuilder) BuildQuery(rangeStartArgs, rangeEndArgs []any) (string, []any, error) {
405+
if len(rangeStartArgs)+len(rangeEndArgs) != b.argsCount {
406+
return "", nil, fmt.Errorf("got %d args but expected %d", len(rangeStartArgs)+len(rangeEndArgs), b.argsCount)
407+
}
408+
explodedArgs := make([]any, 0, len(b.argsMapping))
409+
for _, idx := range b.argsMapping {
410+
if idx < len(rangeStartArgs) {
411+
explodedArgs = append(explodedArgs, rangeStartArgs[idx])
412+
} else {
413+
explodedArgs = append(explodedArgs, rangeEndArgs[idx-len(rangeStartArgs)])
414+
}
415+
}
416+
return b.preparedStatement, explodedArgs, nil
417+
}
418+
419+
type MoveTablesCopyInsertQueryBuilder struct {
420+
preparedStatement string
421+
valueListPlaceholder string
422+
valueListSize int
423+
}
424+
425+
func NewMoveTablesCopyInsertQueryBuilder(targetDatabaseName, targetTableName string, sharedColumns *ColumnList) (*MoveTablesCopyInsertQueryBuilder, error) {
426+
targetDatabaseName = EscapeName(targetDatabaseName)
427+
targetTableName = EscapeName(targetTableName)
428+
sharedColumnsNames := sharedColumns.Names()
429+
for i := range sharedColumnsNames {
430+
sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i])
431+
}
432+
sharedColumnsListing := strings.Join(sharedColumnsNames, ", ")
433+
valueListPlaceholder := "(" + strings.Join(buildColumnsPreparedValues(sharedColumns), ", ") + ")"
434+
valueListSize := len(sharedColumnsNames)
435+
stmt := fmt.Sprintf(`
436+
insert /* gh-ost %s.%s */ ignore
437+
into
438+
%s.%s
439+
(%s)
440+
values
441+
`,
442+
targetDatabaseName, targetTableName,
443+
targetDatabaseName, targetTableName,
444+
sharedColumnsListing,
445+
)
446+
return &MoveTablesCopyInsertQueryBuilder{
447+
preparedStatement: stmt,
448+
valueListPlaceholder: valueListPlaceholder,
449+
valueListSize: valueListSize,
450+
}, nil
451+
}
452+
453+
func (b *MoveTablesCopyInsertQueryBuilder) BuildQuery(values []*ColumnValues) (string, []any, error) {
454+
var explodedArgs []any
455+
var builder strings.Builder
456+
builder.WriteString(b.preparedStatement)
457+
for i, value := range values {
458+
if len(value.AbstractValues()) != b.valueListSize {
459+
return "", nil, fmt.Errorf("got %d column values but expected %d", len(value.AbstractValues()), b.valueListSize)
460+
}
461+
if i > 0 {
462+
builder.WriteString(",\n")
463+
}
464+
builder.WriteString(b.valueListPlaceholder)
465+
explodedArgs = append(explodedArgs, value.AbstractValues()...)
466+
}
467+
return builder.String(), explodedArgs, nil
468+
}
469+
332470
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
333471
if uniqueKeyColumns.Len() == 0 {
334472
return "", explodedArgs, fmt.Errorf("got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")

go/sql/builder_test.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,233 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) {
784784
}
785785
}
786786

787+
func TestMoveTablesCopySelectQueryBuilder(t *testing.T) {
788+
t.Run("single column unique key", func(t *testing.T) {
789+
sharedColumns := NewColumnList([]string{"id", "name", "position"})
790+
uniqueKeyColumns := NewColumnList([]string{"id"})
791+
792+
builder, err := NewMoveTablesCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true)
793+
require.NoError(t, err)
794+
795+
query, args, err := builder.BuildQuery([]any{3}, []any{103})
796+
require.NoError(t, err)
797+
798+
expected := `
799+
select /* gh-ost mydb.tbl */ id, name, position
800+
from
801+
mydb.tbl
802+
force index (PRIMARY)
803+
where
804+
(((id > ?) or ((id = ?))) and ((id < ?) or ((id = ?))))
805+
`
806+
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
807+
require.Equal(t, []any{3, 3, 103, 103}, args)
808+
})
809+
810+
t.Run("single column unique key without range start", func(t *testing.T) {
811+
sharedColumns := NewColumnList([]string{"id", "name", "position"})
812+
uniqueKeyColumns := NewColumnList([]string{"id"})
813+
814+
builder, err := NewMoveTablesCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, false)
815+
require.NoError(t, err)
816+
817+
query, args, err := builder.BuildQuery([]any{3}, []any{103})
818+
require.NoError(t, err)
819+
820+
expected := `
821+
select /* gh-ost mydb.tbl */ id, name, position
822+
from
823+
mydb.tbl
824+
force index (PRIMARY)
825+
where
826+
(((id > ?)) and ((id < ?) or ((id = ?))))
827+
`
828+
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
829+
require.Equal(t, []any{3, 103, 103}, args)
830+
})
831+
832+
t.Run("compound unique key", func(t *testing.T) {
833+
sharedColumns := NewColumnList([]string{"id", "name", "position"})
834+
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
835+
836+
builder, err := NewMoveTablesCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "name_position_uidx", uniqueKeyColumns, true)
837+
require.NoError(t, err)
838+
839+
query, args, err := builder.BuildQuery([]any{3, 17}, []any{103, 117})
840+
require.NoError(t, err)
841+
842+
expected := `
843+
select /* gh-ost mydb.tbl */ id, name, position
844+
from
845+
mydb.tbl
846+
force index (name_position_uidx)
847+
where
848+
(((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?)))
849+
and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?))))
850+
`
851+
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
852+
require.Equal(t, []any{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, args)
853+
})
854+
855+
t.Run("reuses prepared statement across calls", func(t *testing.T) {
856+
sharedColumns := NewColumnList([]string{"id", "name"})
857+
uniqueKeyColumns := NewColumnList([]string{"id"})
858+
859+
builder, err := NewMoveTablesCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true)
860+
require.NoError(t, err)
861+
862+
query1, args1, err := builder.BuildQuery([]any{1}, []any{10})
863+
require.NoError(t, err)
864+
query2, args2, err := builder.BuildQuery([]any{11}, []any{20})
865+
require.NoError(t, err)
866+
867+
require.Equal(t, query1, query2)
868+
require.Equal(t, []any{1, 1, 10, 10}, args1)
869+
require.Equal(t, []any{11, 11, 20, 20}, args2)
870+
})
871+
872+
t.Run("wrong args count", func(t *testing.T) {
873+
sharedColumns := NewColumnList([]string{"id", "name"})
874+
uniqueKeyColumns := NewColumnList([]string{"id"})
875+
876+
builder, err := NewMoveTablesCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true)
877+
require.NoError(t, err)
878+
879+
_, _, err = builder.BuildQuery([]any{1, 2}, []any{10})
880+
require.Error(t, err)
881+
})
882+
}
883+
884+
func BenchmarkMoveTablesCopySelectQueryBuilderBuildQuery(b *testing.B) {
885+
sharedColumns := NewColumnList([]string{"id", "name", "position"})
886+
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
887+
888+
builder, err := NewMoveTablesCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "name_position_uidx", uniqueKeyColumns, true)
889+
if err != nil {
890+
b.Fatal(err)
891+
}
892+
893+
rangeStartArgs := []any{3, 17}
894+
rangeEndArgs := []any{103, 117}
895+
896+
b.ResetTimer()
897+
for i := 0; i < b.N; i++ {
898+
_, _, err := builder.BuildQuery(rangeStartArgs, rangeEndArgs)
899+
if err != nil {
900+
b.Fatal(err)
901+
}
902+
}
903+
}
904+
905+
func TestMoveTablesCopyInsertQueryBuilder(t *testing.T) {
906+
t.Run("single row", func(t *testing.T) {
907+
sharedColumns := NewColumnList([]string{"id", "name", "position"})
908+
909+
builder, err := NewMoveTablesCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
910+
require.NoError(t, err)
911+
912+
values := []*ColumnValues{
913+
ToColumnValues([]interface{}{1, "alice", 10}),
914+
}
915+
query, args, err := builder.BuildQuery(values)
916+
require.NoError(t, err)
917+
918+
expected := `
919+
insert /* gh-ost mydb.ghost */ ignore
920+
into
921+
mydb.ghost
922+
(id, name, position)
923+
values
924+
(?, ?, ?)
925+
`
926+
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
927+
require.Equal(t, []any{1, "alice", 10}, args)
928+
})
929+
930+
t.Run("multiple rows", func(t *testing.T) {
931+
sharedColumns := NewColumnList([]string{"id", "name", "position"})
932+
933+
builder, err := NewMoveTablesCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
934+
require.NoError(t, err)
935+
936+
values := []*ColumnValues{
937+
ToColumnValues([]interface{}{1, "alice", 10}),
938+
ToColumnValues([]interface{}{2, "bob", 20}),
939+
ToColumnValues([]interface{}{3, "carol", 30}),
940+
}
941+
query, args, err := builder.BuildQuery(values)
942+
require.NoError(t, err)
943+
944+
expected := `
945+
insert /* gh-ost mydb.ghost */ ignore
946+
into
947+
mydb.ghost
948+
(id, name, position)
949+
values
950+
(?, ?, ?),
951+
(?, ?, ?),
952+
(?, ?, ?)
953+
`
954+
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
955+
require.Equal(t, []any{1, "alice", 10, 2, "bob", 20, 3, "carol", 30}, args)
956+
})
957+
958+
t.Run("wrong column count", func(t *testing.T) {
959+
sharedColumns := NewColumnList([]string{"id", "name", "position"})
960+
961+
builder, err := NewMoveTablesCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
962+
require.NoError(t, err)
963+
964+
values := []*ColumnValues{
965+
ToColumnValues([]interface{}{1, "alice"}),
966+
}
967+
_, _, err = builder.BuildQuery(values)
968+
require.Error(t, err)
969+
})
970+
971+
t.Run("reuses prepared statement", func(t *testing.T) {
972+
sharedColumns := NewColumnList([]string{"id", "name"})
973+
974+
builder, err := NewMoveTablesCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
975+
require.NoError(t, err)
976+
977+
values1 := []*ColumnValues{ToColumnValues([]interface{}{1, "a"})}
978+
values2 := []*ColumnValues{ToColumnValues([]interface{}{2, "b"})}
979+
980+
query1, args1, err := builder.BuildQuery(values1)
981+
require.NoError(t, err)
982+
query2, args2, err := builder.BuildQuery(values2)
983+
require.NoError(t, err)
984+
985+
require.Equal(t, query1, query2)
986+
require.Equal(t, []any{1, "a"}, args1)
987+
require.Equal(t, []any{2, "b"}, args2)
988+
})
989+
}
990+
991+
func BenchmarkMoveTablesCopyInsertQueryBuilderBuildQuery(b *testing.B) {
992+
sharedColumns := NewColumnList([]string{"id", "name", "position"})
993+
994+
builder, err := NewMoveTablesCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
995+
if err != nil {
996+
b.Fatal(err)
997+
}
998+
999+
values := []*ColumnValues{
1000+
ToColumnValues([]interface{}{1, "alice", 10}),
1001+
ToColumnValues([]interface{}{2, "bob", 20}),
1002+
ToColumnValues([]interface{}{3, "carol", 30}),
1003+
}
1004+
1005+
b.ResetTimer()
1006+
for i := 0; i < b.N; i++ {
1007+
_, _, err := builder.BuildQuery(values)
1008+
if err != nil {
1009+
b.Fatal(err)
1010+
}
1011+
}
1012+
}
1013+
7871014
func TestCheckpointQueryBuilder(t *testing.T) {
7881015
databaseName := "mydb"
7891016
tableName := "_tbl_ghk"

0 commit comments

Comments
 (0)