Skip to content

Commit 30744a6

Browse files
committed
Merge remote-tracking branch 'upstream/develop' into develop
2 parents 1bfd8fa + ea860ac commit 30744a6

7 files changed

Lines changed: 144 additions & 38 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## v4.6.6
9+
10+
### `substreams-sink-sql run`
11+
12+
* Fixed a potential ordering issue if some tables are seen before others but rows are still dependent of each other.
13+
814
## v4.6.5
915

1016
### `substreams-sink-sql run`

db_changes/db/dialect_postgres.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package db
22

33
import (
4+
"cmp"
45
"context"
56
"database/sql"
67
"encoding/json"
78
"fmt"
89
"reflect"
10+
"slices"
911
"sort"
1012
"strconv"
1113
"strings"
@@ -89,31 +91,43 @@ func (d PostgresDialect) Revert(tx Tx, ctx context.Context, l *Loader, lastValid
8991
}
9092

9193
func (d PostgresDialect) Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error) {
92-
var rowCount int
94+
var totalRows int
9395
for entriesPair := l.entries.Oldest(); entriesPair != nil; entriesPair = entriesPair.Next() {
94-
tableName := entriesPair.Key
9596
entries := entriesPair.Value
97+
totalRows += entries.Len()
9698

9799
if l.tracer.Enabled() {
98-
l.logger.Debug("flushing table rows", zap.String("table_name", tableName), zap.Int("row_count", entries.Len()))
100+
l.logger.Debug("flushing table rows", zap.String("table_name", entriesPair.Key), zap.Int("row_count", entries.Len()))
99101
}
102+
}
103+
104+
allOperations := make([]*Operation, 0, totalRows)
105+
for entriesPair := l.entries.Oldest(); entriesPair != nil; entriesPair = entriesPair.Next() {
106+
entries := entriesPair.Value
100107
for entryPair := entries.Oldest(); entryPair != nil; entryPair = entryPair.Next() {
101-
entry := entryPair.Value
108+
allOperations = append(allOperations, entryPair.Value)
109+
}
110+
}
102111

103-
query, err := d.prepareStatement(d.schemaName, entry)
104-
if err != nil {
105-
return 0, fmt.Errorf("failed to prepare statement: %w", err)
106-
}
112+
slices.SortFunc(allOperations, func(a, b *Operation) int {
113+
return cmp.Compare(a.ordinal, b.ordinal)
114+
})
107115

108-
if l.tracer.Enabled() {
109-
l.logger.Debug("adding query from operation to transaction", zap.Stringer("op", entry), zap.String("query", query))
110-
}
116+
var rowCount int
117+
for _, entry := range allOperations {
118+
query, err := d.prepareStatement(d.schemaName, entry)
119+
if err != nil {
120+
return 0, fmt.Errorf("failed to prepare statement: %w", err)
121+
}
111122

112-
if _, err := tx.ExecContext(ctx, query); err != nil {
113-
return 0, fmt.Errorf("executing flush query %q: %w", query, err)
114-
}
123+
if l.tracer.Enabled() {
124+
l.logger.Debug("adding query from operation to transaction", zap.Stringer("op", entry), zap.String("query", query), zap.Uint64("ordinal", entry.ordinal))
125+
}
126+
127+
if _, err := tx.ExecContext(ctx, query); err != nil {
128+
return 0, fmt.Errorf("executing flush query %q: %w", query, err)
115129
}
116-
rowCount += entries.Len()
130+
rowCount++
117131
}
118132

119133
if err := d.pruneReversibleSegment(tx, ctx, d.schemaName, lastFinalBlock); err != nil {

db_changes/db/operations.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,48 +29,53 @@ type Operation struct {
2929
opType OperationType
3030
primaryKey map[string]string
3131
data map[string]string
32+
ordinal uint64
3233
reversibleBlockNum *uint64 // nil if that block is known to be irreversible
3334
}
3435

3536
func (o *Operation) String() string {
3637
return fmt.Sprintf("%s/%s (%s)", o.table.identifier, createRowUniqueID(o.primaryKey), strings.ToLower(string(o.opType)))
3738
}
3839

39-
func (l *Loader) newInsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) *Operation {
40+
func (l *Loader) newInsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) *Operation {
4041
return &Operation{
4142
table: table,
4243
opType: OperationTypeInsert,
4344
primaryKey: primaryKey,
4445
data: data,
46+
ordinal: ordinal,
4547
reversibleBlockNum: reversibleBlockNum,
4648
}
4749
}
4850

49-
func (l *Loader) newUpsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) *Operation {
51+
func (l *Loader) newUpsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) *Operation {
5052
return &Operation{
5153
table: table,
5254
opType: OperationTypeUpsert,
5355
primaryKey: primaryKey,
5456
data: data,
57+
ordinal: ordinal,
5558
reversibleBlockNum: reversibleBlockNum,
5659
}
5760
}
5861

59-
func (l *Loader) newUpdateOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) *Operation {
62+
func (l *Loader) newUpdateOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) *Operation {
6063
return &Operation{
6164
table: table,
6265
opType: OperationTypeUpdate,
6366
primaryKey: primaryKey,
6467
data: data,
68+
ordinal: ordinal,
6569
reversibleBlockNum: reversibleBlockNum,
6670
}
6771
}
6872

69-
func (l *Loader) newDeleteOperation(table *TableInfo, primaryKey map[string]string, reversibleBlockNum *uint64) *Operation {
73+
func (l *Loader) newDeleteOperation(table *TableInfo, primaryKey map[string]string, ordinal uint64, reversibleBlockNum *uint64) *Operation {
7074
return &Operation{
7175
table: table,
7276
opType: OperationTypeDelete,
7377
primaryKey: primaryKey,
78+
ordinal: ordinal,
7479
reversibleBlockNum: reversibleBlockNum,
7580
}
7681
}
@@ -86,6 +91,20 @@ func (o *Operation) mergeData(newData map[string]string) error {
8691
return nil
8792
}
8893

94+
// mergeOperation merges another operation into this one, keeping the lowest ordinal
95+
func (o *Operation) mergeOperation(otherOrdinal uint64, otherData map[string]string) error {
96+
if o.opType == OperationTypeDelete {
97+
return fmt.Errorf("unable to merge operation for a delete operation")
98+
}
99+
100+
// Keep the lowest ordinal
101+
if otherOrdinal < o.ordinal {
102+
o.ordinal = otherOrdinal
103+
}
104+
105+
return o.mergeData(otherData)
106+
}
107+
89108
var integerRegex = regexp.MustCompile(`^\d+$`)
90109
var dateRegex = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}$`)
91110
var reflectTypeTime = reflect.TypeOf(time.Time{})

db_changes/db/ops.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
// Insert a row in the DB, it is assumed the table exists, you can do a
1313
// check before with HasTable()
14-
func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error {
14+
func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) error {
1515
uniqueID := createRowUniqueID(primaryKey)
1616

1717
if l.tracer.Enabled() {
@@ -59,7 +59,7 @@ func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map
5959
}
6060
}
6161

62-
entry.Set(uniqueID, l.newInsertOperation(table, primaryKey, data, reversibleBlockNum))
62+
entry.Set(uniqueID, l.newInsertOperation(table, primaryKey, data, ordinal, reversibleBlockNum))
6363
l.entriesCount++
6464
return nil
6565
}
@@ -101,7 +101,7 @@ func (l *Loader) GetPrimaryKey(tableName string, pk string) (map[string]string,
101101

102102
// Upsert a row in the DB, it is assumed the table exists, you can do a
103103
// check before with HasTable().
104-
func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error {
104+
func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) error {
105105
if l.dialect.OnlyInserts() {
106106
return fmt.Errorf("update operation is not supported by the current database")
107107
}
@@ -147,7 +147,7 @@ func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map
147147
l.logger.Debug("primary key entry already exist for table, merging columns together", zap.String("primary_key", uniqueID), zap.String("table_name", tableName))
148148
}
149149

150-
op.mergeData(data)
150+
op.mergeOperation(ordinal, data)
151151
entry.Set(uniqueID, op)
152152
return nil
153153
} else {
@@ -165,13 +165,13 @@ func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map
165165
}
166166
}
167167

168-
entry.Set(uniqueID, l.newUpsertOperation(table, primaryKey, data, reversibleBlockNum))
168+
entry.Set(uniqueID, l.newUpsertOperation(table, primaryKey, data, ordinal, reversibleBlockNum))
169169
return nil
170170
}
171171

172172
// Update a row in the DB, it is assumed the table exists, you can do a
173173
// check before with HasTable()
174-
func (l *Loader) Update(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error {
174+
func (l *Loader) Update(tableName string, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) error {
175175
if l.dialect.OnlyInserts() {
176176
return fmt.Errorf("update operation is not supported by the current database")
177177
}
@@ -216,7 +216,7 @@ func (l *Loader) Update(tableName string, primaryKey map[string]string, data map
216216
l.logger.Debug("primary key entry already exist for table, merging fields together", zap.String("primary_key", uniqueID), zap.String("table_name", tableName))
217217
}
218218

219-
op.mergeData(data)
219+
op.mergeOperation(ordinal, data)
220220
entry.Set(uniqueID, op)
221221
return nil
222222
} else {
@@ -227,13 +227,13 @@ func (l *Loader) Update(tableName string, primaryKey map[string]string, data map
227227
l.logger.Debug("primary key entry never existed for table, adding update operation", zap.String("primary_key", uniqueID), zap.String("table_name", tableName))
228228
}
229229

230-
entry.Set(uniqueID, l.newUpdateOperation(table, primaryKey, data, reversibleBlockNum))
230+
entry.Set(uniqueID, l.newUpdateOperation(table, primaryKey, data, ordinal, reversibleBlockNum))
231231
return nil
232232
}
233233

234234
// Delete a row in the DB, it is assumed the table exists, you can do a
235235
// check before with HasTable()
236-
func (l *Loader) Delete(tableName string, primaryKey map[string]string, reversibleBlockNum *uint64) error {
236+
func (l *Loader) Delete(tableName string, primaryKey map[string]string, ordinal uint64, reversibleBlockNum *uint64) error {
237237
if l.dialect.OnlyInserts() {
238238
return fmt.Errorf("delete operation is not supported by the current database")
239239
}
@@ -274,6 +274,6 @@ func (l *Loader) Delete(tableName string, primaryKey map[string]string, reversib
274274
l.logger.Debug("adding deleting operation", zap.String("primary_key", uniqueID), zap.String("table_name", tableName))
275275
}
276276

277-
entry.Set(uniqueID, l.newDeleteOperation(table, primaryKey, reversibleBlockNum))
277+
entry.Set(uniqueID, l.newDeleteOperation(table, primaryKey, ordinal, reversibleBlockNum))
278278
return nil
279279
}

db_changes/sinker/setup.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ const (
1818
supportedDeployableService = "type.googleapis.com/sf.substreams.sink.sql.service.v1.Service"
1919
)
2020

21-
var supportedDeployableUnits = []string{deprecated_supportedDeployableService, supportedDeployableService}
22-
2321
// SinkerSetupOptions contains configuration for the setup operation
2422
type SinkerSetupOptions struct {
2523
CursorTableName string

db_changes/sinker/sinker.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ func (s *SQLSinker) flushWithRetry(ctx context.Context, moduleHash string, curso
120120
var lastErr error
121121
for attempt := 0; attempt <= retries; attempt++ {
122122
if attempt > 0 {
123+
// Do not retry if flush delay is 0, useful in tests
124+
if s.flushRetryDelay == 0 {
125+
return 0, lastErr
126+
}
127+
123128
delay := time.Duration(attempt) * s.flushRetryDelay
124129
s.logger.Warn("retrying flush after error",
125130
zap.Int("attempt", attempt),
@@ -224,7 +229,7 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
224229
}
225230

226231
func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges, blockNum, finalBlockNum uint64) error {
227-
for _, change := range dbChanges.TableChanges {
232+
for ordinal, change := range dbChanges.TableChanges {
228233
if !s.loader.HasTable(change.Table) {
229234
return fmt.Errorf(
230235
"your Substreams sent us a change for a table named %s we don't know about on %s (available tables: %s)",
@@ -260,22 +265,22 @@ func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges,
260265

261266
switch change.Operation {
262267
case pbdatabase.TableChange_OPERATION_CREATE:
263-
err := s.loader.Insert(change.Table, primaryKeys, changes, reversibleBlockNum)
268+
err := s.loader.Insert(change.Table, primaryKeys, changes, uint64(ordinal), reversibleBlockNum)
264269
if err != nil {
265270
return fmt.Errorf("database insert: %w", err)
266271
}
267272
case pbdatabase.TableChange_OPERATION_UPSERT:
268-
err := s.loader.Upsert(change.Table, primaryKeys, changes, reversibleBlockNum)
273+
err := s.loader.Upsert(change.Table, primaryKeys, changes, uint64(ordinal), reversibleBlockNum)
269274
if err != nil {
270275
return fmt.Errorf("database upsert: %w", err)
271276
}
272277
case pbdatabase.TableChange_OPERATION_UPDATE:
273-
err := s.loader.Update(change.Table, primaryKeys, changes, reversibleBlockNum)
278+
err := s.loader.Update(change.Table, primaryKeys, changes, uint64(ordinal), reversibleBlockNum)
274279
if err != nil {
275280
return fmt.Errorf("database update: %w", err)
276281
}
277282
case pbdatabase.TableChange_OPERATION_DELETE:
278-
err := s.loader.Delete(change.Table, primaryKeys, reversibleBlockNum)
283+
err := s.loader.Delete(change.Table, primaryKeys, uint64(ordinal), reversibleBlockNum)
279284
if err != nil {
280285
return fmt.Errorf("database delete: %w", err)
281286
}

tests/integration/db_changes_postgres_test.go

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,59 @@ func TestSinker_Integration_ParentChildOrdering(t *testing.T) {
497497
)
498498
}
499499

500+
func TestSinker_Integration_ComplexDependentTableOrdering(t *testing.T) {
501+
dbConnectionString, postgresContainer := setupDbChangesPostgresContainer(t)
502+
503+
runSinkerTest(
504+
t,
505+
dbConnectionString,
506+
postgresContainer,
507+
rawSQLInput(`
508+
CREATE TABLE IF NOT EXISTS %[1]s.departments (
509+
id TEXT PRIMARY KEY
510+
);
511+
CREATE TABLE IF NOT EXISTS %[1]s.employees (
512+
id TEXT PRIMARY KEY,
513+
department_id TEXT NOT NULL,
514+
CONSTRAINT fk_department
515+
FOREIGN KEY(department_id)
516+
REFERENCES %[1]s.departments(id)
517+
);
518+
519+
-- Pre-existing data, like if the sinker had stopped at that point
520+
INSERT INTO %[1]s.departments (id) VALUES ('dept1');
521+
`, testSchema),
522+
streamMock(
523+
dbChangesBlockData(t, "10a", finalBlock("10a"),
524+
insertRowSinglePK("employees", "emp1", "department_id", "dept1"),
525+
insertRowSinglePK("departments", "dept2"),
526+
insertRowSinglePK("employees", "emp2", "department_id", "dept2"),
527+
),
528+
),
529+
func(t *testing.T, dbx *sqlx.DB) {
530+
type DepartmentRow struct {
531+
ID string `db:"id"`
532+
}
533+
534+
type EmployeeRow struct {
535+
ID string `db:"id"`
536+
DepartmentID string `db:"department_id"`
537+
}
538+
539+
require.Equal(t, []*DepartmentRow{
540+
{ID: "dept1"},
541+
{ID: "dept2"},
542+
}, readDbChangesRows[DepartmentRow](t, dbx, "departments"))
543+
544+
require.Equal(t, []*EmployeeRow{
545+
{ID: "emp1", DepartmentID: "dept1"},
546+
{ID: "emp2", DepartmentID: "dept2"},
547+
}, readDbChangesRows[EmployeeRow](t, dbx, "employees"))
548+
},
549+
"Block #10 (10a) - LIB #10 (10a)",
550+
)
551+
}
552+
500553
func TestSinker_Integration_UndoBufferWorks(t *testing.T) {
501554
testTables := db2.TestSinglePrimaryKeyTables(testSchema)
502555
dbConnectionString, postgresContainer := setupDbChangesPostgresContainer(t)
@@ -624,8 +677,8 @@ func runCustomizedSinkerTest(
624677
LiveBlockFlushInterval: 1,
625678
OnModuleHashMismatch: setupOptions.OnModuleHashMismatch,
626679
HandleReorgs: true,
627-
FlushRetryCount: 3,
628-
FlushRetryDelay: 1 * time.Second,
680+
FlushRetryCount: 0,
681+
FlushRetryDelay: 0,
629682
}
630683

631684
dbSinker, err := sinker.SinkerFactory(baseSink, options)(ctx, dbDSN, logger, tracer)
@@ -705,6 +758,17 @@ func insertRowCompositePK(table string, pk map[string]string, fieldsAndValues ..
705758
}
706759
}
707760

761+
func updateRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbdatabase.TableChange {
762+
return &pbdatabase.TableChange{
763+
Table: table,
764+
PrimaryKey: &pbdatabase.TableChange_Pk{
765+
Pk: pk,
766+
},
767+
Operation: pbdatabase.TableChange_OPERATION_UPDATE,
768+
Fields: getFields(fieldsAndValues...),
769+
}
770+
}
771+
708772
func upsertRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbdatabase.TableChange {
709773
return &pbdatabase.TableChange{
710774
Table: table,

0 commit comments

Comments
 (0)