Skip to content

Commit dd4ae31

Browse files
committed
Merge branch 'develop' into chris/from-proto-export
2 parents 7183bdb + 30744a6 commit dd4ae31

34 files changed

Lines changed: 1628 additions & 1190 deletions

.github/workflows/build.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
name: Build and Test
2+
3+
on:
4+
push:
5+
branches: [ "develop", "main" ]
6+
pull_request:
7+
branches: [ "develop", "main" ]
8+
9+
jobs:
10+
build:
11+
runs-on: ubuntu-latest
12+
13+
steps:
14+
- uses: actions/checkout@v4
15+
16+
- name: Set up Go
17+
uses: actions/setup-go@v5
18+
with:
19+
go-version-file: 'go.mod'
20+
21+
- name: Cache Go modules
22+
uses: actions/cache@v4
23+
with:
24+
path: |
25+
~/.cache/go-build
26+
~/go/pkg/mod
27+
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
28+
restore-keys: |
29+
${{ runner.os }}-go-
30+
31+
- name: Download dependencies
32+
run: go mod download
33+
34+
- name: Verify dependencies
35+
run: go mod verify
36+
37+
- name: Build
38+
run: go build -v ./...
39+
40+
- name: Run tests
41+
run: go test -v ./...

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ build/
1010
/substreams-sink-sql
1111
/cursor.txt
1212
/replay.log
13-
/spl_all_schema_hash.txt
13+
/*_schema_hash.txt

CHANGELOG.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,32 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
## Unreleased
8+
## v4.6.6
9+
10+
### `substreams-sink-sql run`
11+
12+
* Fixed a potential ordering issue if some tables are seen before others but rows are still dependent of each other.
13+
14+
## v4.6.5
15+
16+
### `substreams-sink-sql run`
17+
18+
* Fixed delete operation on composite keys.
19+
20+
* Read tables information only for given schema to avoid walking non-related tables.
921

1022
### Clickhouse & `substreams-sink-sql from-proto`
1123

24+
* Improved signal handling by properly logging which signal was received.
25+
1226
* If a `clickhouse_table_options.partition_fields` already contains some partition for `_block_timestamp_`, the default `(toYYYYMM(_block_timestamp_))` will not be added anymore.
1327

1428
* The default partition on `_block_timestamp_` is now `(toYYYYMM(_block_timestamp_))` instead of just `_block_timestamp_`.
1529

30+
### Both implementations
31+
32+
* Automatically skip block flush if the average flush duration is above the time between blocks, preventing drift in these situations
33+
1634
## v4.6.4
1735

1836
### Clickhouse & `substreams-sink-sql from-proto`

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ The Substreams:SQL sink helps you quickly and easily sync Substreams modules to
5757
**ClickHouse**
5858

5959
```bash
60-
export DSN="clickhouse://default:@localhost:9000/default"
60+
export DSN="clickhouse://default:default@localhost:9000/default"
6161
substreams-sink-sql setup $DSN docs/tutorial/substreams.yaml
6262
```
6363

cmd/substreams-sink-sql/common_flags.go

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,13 @@ package main
22

33
import (
44
"context"
5-
"fmt"
6-
"strings"
75
"time"
86

9-
"github.com/golang/protobuf/proto"
107
"github.com/spf13/pflag"
118
"github.com/streamingfast/bstream"
129
"github.com/streamingfast/cli"
1310
"github.com/streamingfast/shutter"
1411
sink "github.com/streamingfast/substreams-sink"
15-
pbsql "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/services/v1"
1612
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
1713
"go.uber.org/zap"
1814
)
@@ -23,36 +19,6 @@ var (
2319

2420
var supportedOutputTypes = "sf.substreams.sink.database.v1.DatabaseChanges,sf.substreams.database.v1.DatabaseChanges"
2521

26-
var (
27-
supportedDeployableUnits []string
28-
deprecated_supportedDeployableService = "sf.substreams.sink.sql.v1.Service"
29-
supportedDeployableService = "sf.substreams.sink.sql.service.v1.Service"
30-
)
31-
32-
func init() {
33-
supportedDeployableUnits = []string{
34-
deprecated_supportedDeployableService,
35-
}
36-
}
37-
38-
func extractSinkService(pkg *pbsubstreams.Package) (*pbsql.Service, error) {
39-
if pkg.SinkConfig == nil {
40-
return nil, fmt.Errorf("no sink config found in spkg")
41-
}
42-
43-
switch pkg.SinkConfig.TypeUrl {
44-
case deprecated_supportedDeployableService, supportedDeployableService:
45-
service := &pbsql.Service{}
46-
47-
if err := proto.Unmarshal(pkg.SinkConfig.Value, service); err != nil {
48-
return nil, fmt.Errorf("failed to proto unmarshal: %w", err)
49-
}
50-
return service, nil
51-
}
52-
53-
return nil, fmt.Errorf("invalid config type %q, supported configs are %q", pkg.SinkConfig.TypeUrl, strings.Join(supportedDeployableUnits, ", "))
54-
}
55-
5622
// AddCommonSinkerFlags adds the flags common to all command that needs to create a sinker,
5723
// namely the `run` and `generate-csv` commands.
5824
func AddCommonSinkerFlags(flags *pflag.FlagSet) {

cmd/substreams-sink-sql/from_proto.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"github.com/jhump/protoreflect/desc"
77
"github.com/spf13/cobra"
88
"github.com/spf13/pflag"
9+
"github.com/streamingfast/cli"
910
. "github.com/streamingfast/cli"
1011
"github.com/streamingfast/cli/sflags"
1112
sink "github.com/streamingfast/substreams-sink"
13+
sinksql "github.com/streamingfast/substreams-sink-sql"
1214
"github.com/streamingfast/substreams-sink-sql/db_changes/db"
1315
"github.com/streamingfast/substreams-sink-sql/db_proto"
1416
"github.com/streamingfast/substreams-sink-sql/db_proto/proto"
@@ -49,7 +51,7 @@ var fromProtoCmd = Command(fromProtoE,
4951
//todo: handle network
5052

5153
func fromProtoE(cmd *cobra.Command, args []string) error {
52-
//app := NewApplication(cmd.Context())
54+
app := cli.NewApplication(cmd.Context())
5355

5456
dsnString := args[0]
5557
manifestPath := args[1]
@@ -60,18 +62,11 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
6062
}
6163

6264
useConstraints := !sflags.MustGetBool(cmd, "no-constraints")
63-
//useTransactions := !sflags.MustGetBool(cmd, "no-transactions")
6465
blockBatchSize := sflags.MustGetInt(cmd, "block-batch-size")
6566

6667
useTransactions := true
6768
parallel := false
6869

69-
//parallel := sflags.MustGetBool(cmd, "parallel")
70-
//if parallel {
71-
// useConstraints = false
72-
// useTransactions = false
73-
//}
74-
7570
endpoint := sflags.MustGetString(cmd, "substreams-endpoint")
7671
if endpoint == "" {
7772
network := sflags.MustGetString(cmd, "network")
@@ -121,7 +116,7 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
121116
return fmt.Errorf("could not find output type for module %s", outputModuleName)
122117
}
123118

124-
service, err := extractSinkService(spkg)
119+
service, err := sinksql.ExtractSinkService(spkg)
125120
if err != nil {
126121
service = &pbsql.Service{}
127122
}
@@ -199,11 +194,11 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
199194
return fmt.Errorf("creating sinker: %w", err)
200195
}
201196

202-
err = sinker.Run(cmd.Context())
203-
if err != nil {
204-
return fmt.Errorf("running sinker: %w", err)
197+
app.SuperviseAndStartUsing(sinker, sinker.Run)
198+
199+
if err := app.WaitForTermination(zlog, 0, 0); err != nil {
200+
cli.Quit("application terminated with error: %s", err)
205201
}
206202

207-
sinker.LogStats()
208203
return nil
209204
}

cmd/substreams-sink-sql/from_proto_apply_constraints.go

Lines changed: 0 additions & 111 deletions
This file was deleted.

cmd/substreams-sink-sql/run.go

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"errors"
54
"fmt"
65
"time"
76

@@ -10,7 +9,6 @@ import (
109
. "github.com/streamingfast/cli"
1110
"github.com/streamingfast/cli/sflags"
1211
sink "github.com/streamingfast/substreams-sink"
13-
db "github.com/streamingfast/substreams-sink-sql/db_changes/db"
1412
sinker2 "github.com/streamingfast/substreams-sink-sql/db_changes/sinker"
1513
"github.com/streamingfast/substreams/manifest"
1614
)
@@ -100,42 +98,24 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
10098
flushRetryCount := sflags.MustGetInt(cmd, "flush-retry-count")
10199
flushRetryDelay := sflags.MustGetDuration(cmd, "flush-retry-delay")
102100

103-
dsn, err := db.ParseDSN(dsnString)
104-
if err != nil {
105-
return fmt.Errorf("parsing dsn: %w", err)
106-
}
107-
108101
cursorTableName := sflags.MustGetString(cmd, "cursors-table")
109102
historyTableName := sflags.MustGetString(cmd, "history-table")
110-
111103
handleReorgs := sflags.MustGetInt(cmd, "undo-buffer-size") != 0
112-
dbLoader, err := db.NewLoader(
113-
dsn,
114-
cursorTableName,
115-
historyTableName,
116-
sflags.MustGetString(cmd, "clickhouse-cluster"),
117-
batchBlockFlushInterval, batchRowFlushInterval, liveBlockFlushInterval,
118-
sflags.MustGetString(cmd, onModuleHashMistmatchFlag),
119-
&handleReorgs,
120-
zlog, tracer,
121-
)
122-
123-
if err != nil {
124-
return fmt.Errorf("creating loader: %w", err)
125-
}
126-
127-
if err := dbLoader.LoadTables(dsn.Schema(), cursorTableName, historyTableName); err != nil {
128-
var e *db.SystemTableError
129-
if errors.As(err, &e) {
130-
fmt.Printf("Error validating the system table: %s\n", e)
131-
fmt.Println("Did you run setup ?")
132-
return e
133-
}
134-
135-
return fmt.Errorf("load psql table: %w", err)
136-
}
137104

138-
postgresSinker, err := sinker2.New(sink, dbLoader, zlog, tracer, flushRetryCount, flushRetryDelay)
105+
sinkerFactory := sinker2.SinkerFactory(sink, sinker2.SinkerFactoryOptions{
106+
CursorTableName: cursorTableName,
107+
HistoryTableName: historyTableName,
108+
ClickhouseCluster: sflags.MustGetString(cmd, "clickhouse-cluster"),
109+
BatchBlockFlushInterval: batchBlockFlushInterval,
110+
BatchRowFlushInterval: batchRowFlushInterval,
111+
LiveBlockFlushInterval: liveBlockFlushInterval,
112+
OnModuleHashMismatch: sflags.MustGetString(cmd, onModuleHashMistmatchFlag),
113+
HandleReorgs: handleReorgs,
114+
FlushRetryCount: flushRetryCount,
115+
FlushRetryDelay: flushRetryDelay,
116+
})
117+
118+
postgresSinker, err := sinkerFactory(app.Context(), dsnString, zlog, tracer)
139119
if err != nil {
140120
return fmt.Errorf("unable to setup postgres sinker: %w", err)
141121
}

0 commit comments

Comments
 (0)