Skip to content

Commit 84e95be

Browse files
committed
risingwave: make DDL column order consistent with from-proto inserts
- In CREATE TABLE, emit system columns (block_number, block_timestamp) before PK - Keeps inline PRIMARY KEY for explicit PK; composite PK unchanged - Adds test to assert block_number appears before id in emitted SQL
1 parent 8595e2a commit 84e95be

2 files changed

Lines changed: 29 additions & 21 deletions

File tree

cmd/substreams-sink-sql/from_proto_generate_csv_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,14 @@ func TestFromProtoGenerateCSVCompatibility_RisingWave(t *testing.T) {
147147
assert.NotContains(t, fullSQL, "_block_number_")
148148
assert.NotContains(t, fullSQL, "_block_timestamp_")
149149

150+
// Ensure ordering: system columns appear before id column
151+
// (block_number before id)
152+
idxBN := strings.Index(fullSQL, "block_number")
153+
idxID := strings.Index(fullSQL, "id BIGINT PRIMARY KEY")
154+
if idxBN >= 0 && idxID >= 0 {
155+
assert.Less(t, idxBN, idxID, "block_number should appear before id in column list")
156+
}
157+
150158
// User columns with correct types (id unquoted, others quoted by dialect)
151159
assert.Contains(t, fullSQL, "id BIGINT PRIMARY KEY")
152160
assert.Contains(t, fullSQL, `"name" CHARACTER VARYING`)

db_proto/sql/risingwave/dialect.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -71,27 +71,27 @@ func (d *DialectRisingwave) init() error {
7171
}
7272

7373
func (d *DialectRisingwave) createTable(table *schema.Table) error {
74-
var sb strings.Builder
75-
addedColumns := make(map[string]struct{})
76-
77-
tableName := d.FullTableName(table)
78-
79-
sb.WriteString(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (", tableName))
80-
81-
// Add primary key if it exists
82-
var primaryKeyFieldName string
83-
if table.PrimaryKey != nil {
84-
pk := table.PrimaryKey
85-
primaryKeyFieldName = pk.Name
86-
sb.WriteString(fmt.Sprintf("%s %s PRIMARY KEY,", pk.Name, MapFieldType(pk.FieldDescriptor)))
87-
addedColumns[pk.Name] = struct{}{}
88-
}
89-
90-
// Always add block metadata columns
91-
sb.WriteString(" block_number INTEGER,")
92-
sb.WriteString(" block_timestamp TIMESTAMP WITH TIME ZONE,")
93-
addedColumns["block_number"] = struct{}{}
94-
addedColumns["block_timestamp"] = struct{}{}
74+
var sb strings.Builder
75+
addedColumns := make(map[string]struct{})
76+
77+
tableName := d.FullTableName(table)
78+
79+
sb.WriteString(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (", tableName))
80+
81+
// Always add block metadata columns first (align column order across dialects)
82+
sb.WriteString(" block_number INTEGER,")
83+
sb.WriteString(" block_timestamp TIMESTAMP WITH TIME ZONE,")
84+
addedColumns["block_number"] = struct{}{}
85+
addedColumns["block_timestamp"] = struct{}{}
86+
87+
// Add primary key column next if it exists (inline PRIMARY KEY)
88+
var primaryKeyFieldName string
89+
if table.PrimaryKey != nil {
90+
pk := table.PrimaryKey
91+
primaryKeyFieldName = pk.Name
92+
sb.WriteString(fmt.Sprintf("%s %s PRIMARY KEY,", pk.Name, MapFieldType(pk.FieldDescriptor)))
93+
addedColumns[pk.Name] = struct{}{}
94+
}
9595

9696
// Add parent key for child tables
9797
var parentKeyColumns []string

0 commit comments

Comments
 (0)