Skip to content

Commit 5034ae2

Browse files
committed
risingwave: align docs and implementation with latest risingwave behaviour for ON CONFLICT
1 parent 9e8b770 commit 5034ae2

7 files changed

Lines changed: 30 additions & 29 deletions

File tree

cmd/substreams-sink-sql/from_proto_generate_csv.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,10 +391,10 @@ func exportSQLSchema(dialect sql.Dialect, sqlSchema *schema.Schema, useConstrain
391391
b.WriteString(seed)
392392
b.WriteString("\n")
393393
case "risingwave":
394-
// RisingWave: emulate DO NOTHING semantics with WHERE NOT EXISTS
394+
// RisingWave: rely on table-level ON CONFLICT policy (DO NOTHING) defined in CREATE TABLE
395395
seed := fmt.Sprintf(
396-
"INSERT INTO \"%s\".\"_sink_info_\" (schema_hash) SELECT '%s' WHERE NOT EXISTS (SELECT 1 FROM \"%s\".\"_sink_info_\" WHERE schema_hash = '%s');",
397-
sqlSchema.Name, dialect.SchemaHash(), sqlSchema.Name, dialect.SchemaHash(),
396+
"INSERT INTO \"%s\".\"_sink_info_\" (schema_hash) VALUES ('%s');",
397+
sqlSchema.Name, dialect.SchemaHash(),
398398
)
399399
b.WriteString("-- Seed\n")
400400
b.WriteString(seed)
@@ -529,7 +529,7 @@ CREATE TABLE IF NOT EXISTS "%s"._blocks_ (
529529
530530
CREATE TABLE IF NOT EXISTS "%s"._sink_info_ (
531531
schema_hash VARCHAR PRIMARY KEY
532-
) ON CONFLICT OVERWRITE;
532+
) ON CONFLICT DO NOTHING;
533533
534534
CREATE TABLE IF NOT EXISTS "%s"._cursor_ (
535535
name VARCHAR PRIMARY KEY,

cmd/substreams-sink-sql/from_proto_generate_csv_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,13 @@ func TestSchemaSeed_RisingWave(t *testing.T) {
185185
err = exportSQLSchema(dialect, testSchema, true, "risingwave", f.Name())
186186
require.NoError(t, err)
187187

188-
// Read and assert WHERE NOT EXISTS pattern
188+
// Read and assert RisingWave plain INSERT (table-level policy is in DDL)
189189
data, err := os.ReadFile(f.Name())
190190
require.NoError(t, err)
191191
sqlText := string(data)
192192

193-
assert.Contains(t, sqlText, "INSERT INTO \"test_schema\".\"_sink_info_\" (schema_hash) SELECT ")
194-
assert.Contains(t, sqlText, "WHERE NOT EXISTS (")
193+
assert.Contains(t, sqlText, "INSERT INTO \"test_schema\".\"_sink_info_\" (schema_hash) VALUES (")
194+
assert.NotContains(t, sqlText, "ON CONFLICT")
195195
}
196196

197197
// TestCSVColumnOrder ensures CSV columns are in the exact order from-proto expects

db_proto/sql/risingwave/database.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,12 @@ func (d *Database) FetchSinkInfo(schemaName string) (*sql.SinkInfo, error) {
279279
}
280280

281281
func (d *Database) StoreSinkInfo(schemaName string, schemaHash string) error {
282-
_, err := d.execSql(fmt.Sprintf("INSERT INTO %s._sink_info_ (schema_hash) VALUES ($1)", schemaName), schemaHash)
283-
if err != nil {
284-
return fmt.Errorf("storing schema hash: %w", err)
285-
}
286-
return nil
282+
// Use plain INSERT; table-level ON CONFLICT DO NOTHING is defined in CREATE TABLE
283+
_, err := d.execSql(fmt.Sprintf("INSERT INTO %s._sink_info_ (schema_hash) VALUES ($1)", schemaName), schemaHash)
284+
if err != nil {
285+
return fmt.Errorf("storing schema hash: %w", err)
286+
}
287+
return nil
287288
}
288289

289290
func (d *Database) UpdateSinkInfoHash(schemaName string, newHash string) error {

db_proto/sql/risingwave/dialect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const risingwaveStaticSql = `
1818
1919
CREATE TABLE IF NOT EXISTS "%s"._sink_info_ (
2020
schema_hash VARCHAR PRIMARY KEY
21-
) ON CONFLICT OVERWRITE;
21+
) ON CONFLICT DO NOTHING;
2222
2323
CREATE TABLE IF NOT EXISTS "%s"._cursor_ (
2424
name VARCHAR PRIMARY KEY,

db_proto/sql/risingwave/dialect_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ func TestDialectRisingwave_CreateTableStaticSql(t *testing.T) {
8585
assert.Contains(t, sql, "_cursor_")
8686
assert.Contains(t, sql, "name varchar primary key")
8787
assert.Contains(t, sql, "cursor varchar")
88-
assert.Contains(t, sql, "on conflict overwrite", "RisingWave should use ON CONFLICT OVERWRITE")
88+
assert.Contains(t, sql, "on conflict overwrite")
8989

90-
// Check that _sink_info_ also uses ON CONFLICT OVERWRITE semantics
91-
// Count occurrences to ensure both _sink_info_ and _cursor_ include it
92-
occurrences := strings.Count(sql, "on conflict overwrite")
93-
assert.GreaterOrEqual(t, occurrences, 2, "_sink_info_ and _cursor_ should both include ON CONFLICT OVERWRITE")
90+
// Check conflict policies per table
91+
// _sink_info_: DO NOTHING
92+
assert.Contains(t, sql, "_sink_info_")
93+
assert.Contains(t, sql, "on conflict do nothing")
9494

9595
// Check _blocks_ table
9696
assert.Contains(t, sql, "_blocks_")

db_proto/sql/risingwave/row_inserter.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,8 @@ func (i *RowInserter) init(database *Database) error {
5555
}
5656
insertStatements["_blocks_"] = bs
5757

58-
// RisingWave doesn't support PostgreSQL's ON CONFLICT syntax in INSERT statements
59-
// The _cursor_ table is created with ON CONFLICT OVERWRITE to handle this automatically
60-
insertQueries["_cursor_"] = fmt.Sprintf("INSERT INTO %s (name, cursor) VALUES ($1, $2)", tableName(database.schema.Name, "_cursor_"))
58+
// Use plain INSERT; table-level ON CONFLICT policy (OVERWRITE) is defined in CREATE TABLE
59+
insertQueries["_cursor_"] = fmt.Sprintf("INSERT INTO %s (name, cursor) VALUES ($1, $2)", tableName(database.schema.Name, "_cursor_"))
6160
cs, err := database.db.Prepare(insertQueries["_cursor_"])
6261
if err != nil {
6362
return fmt.Errorf("preparing statement %q: %w", insertQueries["_cursor_"], err)

docs/FROM_PROTO_GENERATE_CSV_README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,14 @@ Notes:
7474
ON CONFLICT (schema_hash) DO NOTHING;
7575
```
7676
- RisingWave:
77-
- Tables are created with `ON CONFLICT OVERWRITE` at CREATE TABLE for `_sink_info_` and `_cursor_`.
78-
- The seed uses a DO-NOTHING equivalent pattern:
77+
- Conflict policy is defined at CREATE TABLE (table-level), not per INSERT.
78+
- We set: `_sink_info_``ON CONFLICT DO NOTHING`; `_cursor_``ON CONFLICT OVERWRITE`.
79+
- The seed uses a plain INSERT and relies on the table-level policy:
7980
```sql
8081
INSERT INTO "<schema>"."_sink_info_" (schema_hash)
81-
SELECT '<hash>'
82-
WHERE NOT EXISTS (
83-
SELECT 1 FROM "<schema>"."_sink_info_" WHERE schema_hash = '<hash>'
84-
);
82+
VALUES ('<hash>');
8583
```
86-
This lets `from-proto` start without additional migrations.
84+
This lets `from-proto` start without additional migrations and keeps the seed idempotent.
8785

8886
## End‑to‑End Workflow (PostgreSQL)
8987

@@ -130,7 +128,10 @@ substreams-sink-sql from-proto "$PSQL_DSN" "$MANIFEST" "$MODULE" \
130128
## Dialect Notes
131129

132130
- PostgreSQL: `_sink_info_`, `_cursor_`, `_blocks_` tables are created in `schema.sql`. Cursor injection uses `COPY ... WITH (HEADER)`.
133-
- RisingWave: `_sink_info_` and `_cursor_` are created with `ON CONFLICT OVERWRITE` to make subsequent inserts idempotent. The seed uses a `WHERE NOT EXISTS` form.
131+
- RisingWave: Conflict policy is defined on CREATE TABLE (not per INSERT). We set:
132+
- `_sink_info_`: `ON CONFLICT DO NOTHING` (idempotent seed)
133+
- `_cursor_`: `ON CONFLICT OVERWRITE` (upsert semantics for the single cursor row)
134+
The seed is a plain `INSERT`, relying on table-level policies.
134135
- ClickHouse: CSVs include dialect-specific version/deleted fields. Schema.sql includes database and `_blocks_` table; cursor/sink info are not table-based — follow FROM_PROTO.md for CH specifics.
135136

136137
## Operational Tips

0 commit comments

Comments
 (0)