Skip to content

Commit d1fde4d

Browse files
committed
risingwave: remove ON CONFLICT from static DDL and generator output
- Update risingwaveStaticSql to avoid ON CONFLICT clauses, aligning with expected seed behavior and tests. - Adjust from-proto-generate-csv getStaticSQL for risingwave accordingly. - Update inserter comments to reflect plain INSERT usage. - Update RisingWave dialect tests to assert absence of ON CONFLICT.
1 parent 15db376 commit d1fde4d

5 files changed

Lines changed: 10 additions & 15 deletions

File tree

cmd/substreams-sink-sql/from_proto_generate_csv.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -524,17 +524,17 @@ CREATE TABLE IF NOT EXISTS "%s"._blocks_ (
524524
);`, schemaName, schemaName, schemaName, schemaName)
525525

526526
case "risingwave":
527-
// Matches db_proto/sql/risingwave/dialect.go risingwaveStaticSql
527+
// Matches db_proto/sql/risingwave/dialect.go risingwaveStaticSql (no ON CONFLICT clauses)
528528
return fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS "%s";
529529
530530
CREATE TABLE IF NOT EXISTS "%s"._sink_info_ (
531531
schema_hash VARCHAR PRIMARY KEY
532-
) ON CONFLICT DO NOTHING;
532+
);
533533
534534
CREATE TABLE IF NOT EXISTS "%s"._cursor_ (
535535
name VARCHAR PRIMARY KEY,
536536
cursor VARCHAR
537-
) ON CONFLICT OVERWRITE;
537+
);
538538
539539
CREATE TABLE IF NOT EXISTS "%s"._blocks_ (
540540
number INTEGER PRIMARY KEY,

db_proto/sql/risingwave/accumulator_inserter.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,8 @@ func (i *AccumulatorInserter) init(database *Database) error {
4646
query: fmt.Sprintf("INSERT INTO %s (number, hash, timestamp) VALUES ", tableName(database.schema.Name, "_blocks_")),
4747
}
4848

49-
// RisingWave doesn't support PostgreSQL's ON CONFLICT syntax
50-
// We'll use a simple INSERT for the cursor, relying on the table to handle conflicts
51-
// The _cursor_ table should be created with appropriate ON CONFLICT behavior
52-
cursorQuery := fmt.Sprintf("INSERT INTO %s (name, cursor) VALUES ($1, $2)", tableName(database.schema.Name, "_cursor_"))
49+
// RisingWave: use a simple INSERT for the cursor; no ON CONFLICT clause in DDL
50+
cursorQuery := fmt.Sprintf("INSERT INTO %s (name, cursor) VALUES ($1, $2)", tableName(database.schema.Name, "_cursor_"))
5351
cs, err := database.db.Prepare(cursorQuery)
5452
if err != nil {
5553
return fmt.Errorf("preparing statement %q: %w", cursorQuery, err)

db_proto/sql/risingwave/dialect.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ const risingwaveStaticSql = `
1818
1919
CREATE TABLE IF NOT EXISTS "%s"._sink_info_ (
2020
schema_hash VARCHAR PRIMARY KEY
21-
) ON CONFLICT DO NOTHING;
21+
);
2222
2323
CREATE TABLE IF NOT EXISTS "%s"._cursor_ (
2424
name VARCHAR PRIMARY KEY,
2525
cursor VARCHAR
26-
) ON CONFLICT OVERWRITE;
26+
);
2727
2828
CREATE TABLE IF NOT EXISTS "%s"._blocks_ (
2929
number INTEGER PRIMARY KEY,

db_proto/sql/risingwave/dialect_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,9 @@ func TestDialectRisingwave_CreateTableStaticSql(t *testing.T) {
8585
assert.Contains(t, sql, "_cursor_")
8686
assert.Contains(t, sql, "name varchar primary key")
8787
assert.Contains(t, sql, "cursor varchar")
88-
assert.Contains(t, sql, "on conflict overwrite")
8988

90-
// Check conflict policies per table
91-
// _sink_info_: DO NOTHING
92-
assert.Contains(t, sql, "_sink_info_")
93-
assert.Contains(t, sql, "on conflict do nothing")
89+
// No ON CONFLICT clauses expected in static DDL
90+
assert.NotContains(t, sql, "on conflict")
9491

9592
// Check _blocks_ table
9693
assert.Contains(t, sql, "_blocks_")

db_proto/sql/risingwave/row_inserter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (i *RowInserter) init(database *Database) error {
5555
}
5656
insertStatements["_blocks_"] = bs
5757

58-
// Use plain INSERT; table-level ON CONFLICT policy (OVERWRITE) is defined in CREATE TABLE
58+
// Use plain INSERT; RisingWave static DDL does not use ON CONFLICT clauses.
5959
insertQueries["_cursor_"] = fmt.Sprintf("INSERT INTO %s (name, cursor) VALUES ($1, $2)", tableName(database.schema.Name, "_cursor_"))
6060
cs, err := database.db.Prepare(insertQueries["_cursor_"])
6161
if err != nil {

0 commit comments

Comments
 (0)