Skip to content

Commit 50ffd80

Browse files
committed
feat(risingwave): fully align timestamp handling between from-proto and from-proto-generate-csv
1 parent d1fde4d commit 50ffd80

9 files changed

Lines changed: 668 additions & 550 deletions

File tree

cmd/substreams-sink-sql/from_proto_generate_csv.go

Lines changed: 446 additions & 439 deletions
Large diffs are not rendered by default.

cmd/substreams-sink-sql/from_proto_generate_csv_comprehensive_test.go

Lines changed: 77 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
package main
2+
23
// Note: file renamed to align with from-proto-generate-csv command
34

45
import (
@@ -11,20 +12,21 @@ import (
1112
"github.com/golang/protobuf/protoc-gen-go/descriptor"
1213
"github.com/jhump/protoreflect/desc"
1314
"github.com/jhump/protoreflect/dynamic"
14-
"github.com/stretchr/testify/assert"
15-
"github.com/stretchr/testify/require"
1615
"github.com/streamingfast/substreams-sink-sql/db_proto/sql"
1716
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/click_house"
1817
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/postgres"
1918
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/risingwave"
2019
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema"
20+
"github.com/streamingfast/substreams-sink-sql/internal/timefmt"
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
2123
"go.uber.org/zap"
2224
)
2325

2426
// TestParentChildRelationships validates parent-child table relationships
2527
func TestParentChildRelationships(t *testing.T) {
2628
logger := zap.NewNop()
27-
29+
2830
// Create parent and child message descriptors
2931
parentMsgProto := &descriptor.DescriptorProto{
3032
Name: proto.String("Orders"),
@@ -125,7 +127,7 @@ func TestParentChildRelationships(t *testing.T) {
125127

126128
// Test 1: Child table CSV should include parent column
127129
columns := gen.getColumnsForTable(orderItemsTable)
128-
130+
129131
// Should have: _block_number_, _block_timestamp_, item_id, order_id, product_name
130132
expectedColumns := []string{
131133
sql.DialectFieldBlockNumber,
@@ -134,7 +136,7 @@ func TestParentChildRelationships(t *testing.T) {
134136
"order_id", // Parent reference
135137
"product_name", // Regular column
136138
}
137-
139+
138140
assert.Equal(t, expectedColumns, columns, "Child table should include parent reference column")
139141

140142
// Test 2: Parent table primary key extraction
@@ -151,7 +153,7 @@ func TestParentChildRelationships(t *testing.T) {
151153

152154
// Test walking with parent context
153155
blockTime := time.Now()
154-
156+
155157
// Walk parent message
156158
parentRows, err := gen.walkMessageAndCollectRows(dm, 100, blockTime, nil)
157159
require.NoError(t, err)
@@ -243,7 +245,7 @@ func TestBinaryDataFormattingAllDialects(t *testing.T) {
243245
// TestPrimaryKeyValueExtraction validates that primary key value is extracted correctly
244246
func TestPrimaryKeyValueExtraction(t *testing.T) {
245247
// This tests the fix for the parent ID extraction bug
246-
248+
247249
// Create a message with fields
248250
msgProto := &descriptor.DescriptorProto{
249251
Name: proto.String("TestMessage"),
@@ -321,11 +323,11 @@ func TestPrimaryKeyValueExtraction(t *testing.T) {
321323
// The primary key value should be at primaryKeyOffset position
322324
// With just block_number and block_timestamp, primaryKeyOffset = 2
323325
// So fieldValues[2] should be "PK-VALUE-123"
324-
326+
325327
blockTime := time.Now()
326328
_, err = gen.walkMessageAndCollectRows(dm, 100, blockTime, nil)
327329
require.NoError(t, err)
328-
330+
329331
// Verify the primary key was extracted correctly
330332
// Note: Since we're using useProtoOptions: false, it will create default TableInfo
331333
t.Log("Primary key extraction index validated")
@@ -355,7 +357,7 @@ func TestCSVColumnOrdering(t *testing.T) {
355357
{Name: "name", FieldDescriptor: createSimpleFieldDescriptor("name", descriptor.FieldDescriptorProto_TYPE_STRING)},
356358
},
357359
PrimaryKey: &schema.PrimaryKey{
358-
Name: "id",
360+
Name: "id",
359361
FieldDescriptor: createSimpleFieldDescriptor("id", descriptor.FieldDescriptorProto_TYPE_STRING),
360362
},
361363
},
@@ -375,7 +377,7 @@ func TestCSVColumnOrdering(t *testing.T) {
375377
{Name: "data", FieldDescriptor: createSimpleFieldDescriptor("data", descriptor.FieldDescriptorProto_TYPE_STRING)},
376378
},
377379
PrimaryKey: &schema.PrimaryKey{
378-
Name: "child_id",
380+
Name: "child_id",
379381
FieldDescriptor: createSimpleFieldDescriptor("child_id", descriptor.FieldDescriptorProto_TYPE_STRING),
380382
},
381383
ChildOf: &schema.ChildOf{
@@ -399,7 +401,7 @@ func TestCSVColumnOrdering(t *testing.T) {
399401
t.Run(tc.name, func(t *testing.T) {
400402
// Clear registry for each test
401403
testSchema.TableRegistry = make(map[string]*schema.Table)
402-
404+
403405
// Add parent table if this is a child table
404406
if tc.table.ChildOf != nil {
405407
parentFd := createSimpleFieldDescriptor("parent_id", descriptor.FieldDescriptorProto_TYPE_STRING)
@@ -415,9 +417,9 @@ func TestCSVColumnOrdering(t *testing.T) {
415417
}
416418
testSchema.TableRegistry[tc.table.ChildOf.ParentTable] = parentTable
417419
}
418-
420+
419421
testSchema.TableRegistry[tc.table.Name] = tc.table
420-
422+
421423
dialect, err := postgres.NewDialectPostgres(testSchema, zap.NewNop())
422424
require.NoError(t, err)
423425

@@ -471,7 +473,7 @@ func TestRepeatedFieldsError(t *testing.T) {
471473
Name: "test",
472474
TableRegistry: make(map[string]*schema.Table),
473475
}
474-
476+
475477
dialect, err := postgres.NewDialectPostgres(testSchema, zap.NewNop())
476478
require.NoError(t, err)
477479

@@ -524,9 +526,9 @@ func TestCSVRowFormatting(t *testing.T) {
524526
row := map[string]interface{}{
525527
sql.DialectFieldBlockNumber: uint64(12345),
526528
sql.DialectFieldBlockTimestamp: blockTime,
527-
"text_field": "Hello, \"World\"",
528-
"int_field": int64(42),
529-
"bool_field": true,
529+
"text_field": "Hello, \"World\"",
530+
"int_field": int64(42),
531+
"bool_field": true,
530532
}
531533

532534
csvData := gen.formatRowForCSV(row, testTable)
@@ -537,6 +539,53 @@ func TestCSVRowFormatting(t *testing.T) {
537539
assert.Equal(t, expected, csvString, "CSV row should be properly formatted")
538540
}
539541

542+
func TestCSVRowFormattingRisingWaveTimestamp(t *testing.T) {
543+
testSchema := &schema.Schema{
544+
Name: "test",
545+
TableRegistry: make(map[string]*schema.Table),
546+
}
547+
548+
textFd := createSimpleFieldDescriptor("text_field", descriptor.FieldDescriptorProto_TYPE_STRING)
549+
intFd := createSimpleFieldDescriptor("int_field", descriptor.FieldDescriptorProto_TYPE_INT64)
550+
boolFd := createSimpleFieldDescriptor("bool_field", descriptor.FieldDescriptorProto_TYPE_BOOL)
551+
552+
testTable := &schema.Table{
553+
Name: "test_table",
554+
Columns: []*schema.Column{
555+
{Name: "text_field", FieldDescriptor: textFd},
556+
{Name: "int_field", FieldDescriptor: intFd},
557+
{Name: "bool_field", FieldDescriptor: boolFd},
558+
},
559+
}
560+
561+
testSchema.TableRegistry["test_table"] = testTable
562+
563+
dialect, err := risingwave.NewDialectRisingwave(testSchema.Name, testSchema.TableRegistry, zap.NewNop())
564+
require.NoError(t, err)
565+
566+
gen := &protoAwareCSVGenerator{
567+
schema: testSchema,
568+
dialect: dialect,
569+
logger: zap.NewNop(),
570+
}
571+
572+
blockTime := time.Date(2024, time.January, 15, 10, 30, 0, 987000000, time.FixedZone("UTC-5", -5*3600))
573+
row := map[string]interface{}{
574+
sql.DialectFieldBlockNumber: uint64(12345),
575+
sql.DialectFieldBlockTimestamp: blockTime,
576+
"text_field": "Hello, \"World\"",
577+
"int_field": int64(42),
578+
"bool_field": true,
579+
}
580+
581+
csvData := gen.formatRowForCSV(row, testTable)
582+
csvString := string(csvData)
583+
584+
expectedTimestamp := timefmt.FormatRisingWave(blockTime)
585+
expected := fmt.Sprintf(`12345,%s,"Hello, ""World""",42,true`, expectedTimestamp) + "\n"
586+
assert.Equal(t, expected, csvString, "RisingWave CSV should use canonical timestamp layout")
587+
}
588+
540589
// TestNullHandling validates NULL value handling in CSV
541590
func TestNullHandling(t *testing.T) {
542591
gen := &protoAwareCSVGenerator{
@@ -565,11 +614,11 @@ func TestCompleteIntegration(t *testing.T) {
565614
Name: proto.String("DatabaseChanges"),
566615
Field: []*descriptor.FieldDescriptorProto{
567616
{
568-
Name: proto.String("orders"),
569-
Number: proto.Int32(1),
570-
Type: descriptor.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
617+
Name: proto.String("orders"),
618+
Number: proto.Int32(1),
619+
Type: descriptor.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
571620
TypeName: proto.String(".Orders"),
572-
Label: descriptor.FieldDescriptorProto_LABEL_REPEATED.Enum(),
621+
Label: descriptor.FieldDescriptorProto_LABEL_REPEATED.Enum(),
573622
},
574623
},
575624
}
@@ -583,11 +632,11 @@ func TestCompleteIntegration(t *testing.T) {
583632
Type: descriptor.FieldDescriptorProto_TYPE_STRING.Enum(),
584633
},
585634
{
586-
Name: proto.String("items"),
587-
Number: proto.Int32(2),
588-
Type: descriptor.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
635+
Name: proto.String("items"),
636+
Number: proto.Int32(2),
637+
Type: descriptor.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
589638
TypeName: proto.String(".OrderItems"),
590-
Label: descriptor.FieldDescriptorProto_LABEL_REPEATED.Enum(),
639+
Label: descriptor.FieldDescriptorProto_LABEL_REPEATED.Enum(),
591640
},
592641
},
593642
}
@@ -699,8 +748,8 @@ func createSimpleFieldDescriptor(name string, fieldType descriptor.FieldDescript
699748
Field: []*descriptor.FieldDescriptorProto{fdp},
700749
}
701750

702-
testFDSeq++
703-
fileName := fmt.Sprintf("test_%s_%d.proto", name, testFDSeq)
751+
testFDSeq++
752+
fileName := fmt.Sprintf("test_%s_%d.proto", name, testFDSeq)
704753
fdProto := &descriptor.FileDescriptorProto{
705754
Name: &fileName,
706755
MessageType: []*descriptor.DescriptorProto{mdp},

db_changes/db/dialect_risingwave.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/streamingfast/cli"
1616
sink "github.com/streamingfast/substreams-sink"
17+
"github.com/streamingfast/substreams-sink-sql/internal/timefmt"
1718
"go.uber.org/zap"
1819
"golang.org/x/exp/maps"
1920
)
@@ -237,6 +238,12 @@ func (d RisingwaveDialect) GetAllCursorsQuery(table string) string {
237238
}
238239

239240
func (d RisingwaveDialect) ParseDatetimeNormalization(value string) string {
241+
if parsed, err := time.Parse(time.RFC3339Nano, value); err == nil {
242+
return escapeStringValue(timefmt.FormatRisingWave(parsed))
243+
}
244+
if parsed, err := time.Parse(time.RFC3339, value); err == nil {
245+
return escapeStringValue(timefmt.FormatRisingWave(parsed))
246+
}
240247
return escapeStringValue(value)
241248
}
242249

@@ -501,7 +508,7 @@ func (d *RisingwaveDialect) normalizeValueType(value string, valueType reflect.T
501508
return "", fmt.Errorf("could not convert %s to int: %w", value, err)
502509
}
503510

504-
return escapeStringValue(time.Unix(int64(i), 0).Format(time.RFC3339)), nil
511+
return escapeStringValue(timefmt.FormatRisingWave(time.Unix(int64(i), 0))), nil
505512
}
506513

507514
// It's a plain string, parse by dialect it and pass it to the databaseName

db_proto/sql/postgres/row_inserter.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,16 @@ func (i *RowInserter) insert(table string, values []any, database *Database) err
138138
values[i] = strconv.FormatUint(v, 10)
139139
case []uint8:
140140
values[i] = base64.StdEncoding.EncodeToString(v)
141+
case time.Time:
142+
values[i] = v.UTC()
143+
case *time.Time:
144+
if v == nil {
145+
values[i] = nil
146+
continue
147+
}
148+
values[i] = v.UTC()
141149
case *timestamppb.Timestamp:
142-
values[i] = "'" + v.AsTime().Format(time.RFC3339) + "'"
150+
values[i] = v.AsTime().UTC()
143151
case []interface{}:
144152
// Handle arrays by converting to PostgreSQL array format
145153
var elements []string

db_proto/sql/risingwave/row_inserter.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ func (i *RowInserter) init(database *Database) error {
5555
}
5656
insertStatements["_blocks_"] = bs
5757

58-
// Use plain INSERT; RisingWave static DDL does not use ON CONFLICT clauses.
59-
insertQueries["_cursor_"] = fmt.Sprintf("INSERT INTO %s (name, cursor) VALUES ($1, $2)", tableName(database.schema.Name, "_cursor_"))
58+
// Use plain INSERT; RisingWave static DDL does not use ON CONFLICT clauses.
59+
insertQueries["_cursor_"] = fmt.Sprintf("INSERT INTO %s (name, cursor) VALUES ($1, $2)", tableName(database.schema.Name, "_cursor_"))
6060
cs, err := database.db.Prepare(insertQueries["_cursor_"])
6161
if err != nil {
6262
return fmt.Errorf("preparing statement %q: %w", insertQueries["_cursor_"], err)
@@ -142,8 +142,16 @@ func (i *RowInserter) insert(table string, values []any, database *Database) err
142142
values[i] = strconv.FormatUint(v, 10)
143143
case []uint8:
144144
values[i] = base64.StdEncoding.EncodeToString(v)
145+
case time.Time:
146+
values[i] = v.UTC()
147+
case *time.Time:
148+
if v == nil {
149+
values[i] = nil
150+
continue
151+
}
152+
values[i] = v.UTC()
145153
case *timestamppb.Timestamp:
146-
values[i] = "'" + v.AsTime().Format(time.RFC3339) + "'"
154+
values[i] = v.AsTime().UTC()
147155
case []interface{}:
148156
// Convert to PostgreSQL/RisingWave array literal: {elem1,elem2,...}
149157
var elements []string

0 commit comments

Comments
 (0)