Skip to content

Commit d1e06ca

Browse files
twmbclaude
andcommitted
iceberg: add require_schema_metadata strict-temporal-mode option
Addresses review comment #9 on PR #4402: a downstream mapping that drops schema_metadata between the schema-registry decoder and the iceberg sink would silently reintroduce the year-50000 corruption that the rest of the PR closes — the type-resolver picks TIMESTAMPTZ for the column based on metadata seen at table-creation time, but per-message metadata is what the shredder needs to interpret each numeric value's unit. When schema_evolution.require_schema_metadata is true (default false), the shredder rejects numeric inputs into time-typed columns when no schema.Common has been registered for that field. time.Time / time.Duration native inputs are unaffected — they carry their own unit unambiguously. Non-time columns are unaffected. The flag is gated to require schema_metadata also be set; setting strict mode without configuring metadata at all is a config error caught at startup. Plumbing: - config.go: new field with operator-facing description. - output_iceberg.go: parse and validate the require/has-metadata pair. - router.go: add to SchemaEvolutionConfig and pass to writer. - writer.go: extend NewWriter signature; flip shredder strict mode when the flag is set. - shredder/shredder.go: new SetStrictTemporalMode(bool) and a strictTemporal field; thread through convertLeafValue. - shredder/temporal.go: convertDate / convertTime / convertTimestamp return field-naming errors instead of falling through when strict mode is on and metadata is absent. Tests cover (a) numeric without metadata under strict mode is rejected with a require_schema_metadata=true error message, (b) native time.Time and time.Duration inputs are unaffected by strict mode, (c) numeric with metadata under strict mode succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 27486ba commit d1e06ca

10 files changed

Lines changed: 403 additions & 68 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ All notable changes to this project will be documented in this file.
1616
- **Pipelines whose own code reads the `schema_metadata` bytes via `meta()`** and pattern-matches the historical INT64 shape: schemas now contain `TIMESTAMP` / `DATE` / `TIME_OF_DAY` / `UUID` along with new `unit` and `adjust_to_utc` fields. Update the pattern.
1717
- **iceberg shredder** is now schema-aware for numeric inputs: a numeric millisecond value declared by the schema as `timestamp-millis` is correctly interpreted as milliseconds rather than as Unix seconds. This closes a previously-silent corruption case where an int64 millis input into a TIMESTAMPTZ column would land ~50,000 years in the future.
1818

19+
### Added
20+
21+
- iceberg: `schema_evolution.require_schema_metadata` (default `false`). When enabled along with `schema_evolution.schema_metadata`, numeric values shredded into a `timestamp`, `timestamptz`, `date`, or `time` column without registered schema metadata are rejected loudly instead of falling through to the bloblang Unix-seconds default. Use this when you cannot guarantee the upstream attaches schema metadata and prefer a hard error to silently corrupting dates by ~50,000 years. No effect on time-typed columns receiving native `time.Time` / `time.Duration` Go values.
22+
1923
### Changed
2024

2125
- iceberg: `NewWriter` now takes a `*typeResolver` argument so the writer can use schema metadata to interpret numeric inputs into time-typed columns at shredding time. Internal API change only.

internal/impl/iceberg/config.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,13 @@ const (
6969
ioFieldAzureAccessKey = "storage_access_key"
7070

7171
// Schema evolution fields
72-
ioFieldSchemaEvolution = "schema_evolution"
73-
ioFieldSchemaEvolutionEnabled = "enabled"
74-
ioFieldSchemaEvolutionPartitionSpec = "partition_spec"
75-
ioFieldSchemaEvolutionTableLoc = "table_location"
76-
ioFieldSchemaEvolutionSchemaMetadata = "schema_metadata"
77-
ioFieldSchemaEvolutionNewColumnTypeMapping = "new_column_type_mapping"
72+
ioFieldSchemaEvolution = "schema_evolution"
73+
ioFieldSchemaEvolutionEnabled = "enabled"
74+
ioFieldSchemaEvolutionPartitionSpec = "partition_spec"
75+
ioFieldSchemaEvolutionTableLoc = "table_location"
76+
ioFieldSchemaEvolutionSchemaMetadata = "schema_metadata"
77+
ioFieldSchemaEvolutionNewColumnTypeMapping = "new_column_type_mapping"
78+
ioFieldSchemaEvolutionRequireSchemaMetadata = "require_schema_metadata"
7879

7980
// Commit fields
8081
ioFieldCommit = "commit"
@@ -332,6 +333,10 @@ array:list
332333
Description("An optional Bloblang mapping to customize column types during schema evolution. This mapping is executed for each new column and can override the inferred or schema-metadata-derived type. The mapping receives an object with fields `name` (column name), `path` (dot-separated path), `value` (sample value), `inferred_type` (the type that would be used without this mapping), `message` (the full message body), `namespace`, and `table`. It must return a string with a valid Iceberg type name: `boolean`, `int`, `long`, `float`, `double`, `string`, `binary`, `date`, `time`, `timestamp`, `timestamptz`, `uuid`, `decimal(p,s)`, or `fixed[n]`.").
333334
Optional().
334335
Advanced(),
336+
service.NewBoolField(ioFieldSchemaEvolutionRequireSchemaMetadata).
337+
Description("When `true`, writing a numeric value into a `timestamp`, `timestamptz`, `date`, or `time` column without `schema_metadata` registered for that column is a hard error. The default `false` permits a fallback path that interprets bare numeric timestamps as Unix seconds and bare numeric times as already-microseconds — convenient, but silently wrong if upstream produced milliseconds. Enable this when you cannot guarantee the upstream attaches schema metadata and want to fail loudly rather than corrupt dates by ~50,000 years. No effect on time-typed columns receiving `time.Time`/`time.Duration` Go values, which carry their own unit unambiguously, and no effect on non-time columns. Requires `schema_metadata` to be set.").
338+
Default(false).
339+
Advanced(),
335340
).Description("Schema evolution configuration.").
336341
Optional().
337342
Advanced(),

internal/impl/iceberg/output_iceberg.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,17 @@ func parseSchemaEvolutionConfig(conf *service.ParsedConfig) (SchemaEvolutionConf
484484
}
485485
}
486486

487+
// Parse require_schema_metadata
488+
if conf.Contains(ioFieldSchemaEvolution, ioFieldSchemaEvolutionRequireSchemaMetadata) {
489+
cfg.RequireSchemaMetadata, err = conf.FieldBool(ioFieldSchemaEvolution, ioFieldSchemaEvolutionRequireSchemaMetadata)
490+
if err != nil {
491+
return cfg, err
492+
}
493+
if cfg.RequireSchemaMetadata && cfg.SchemaMetadata == "" {
494+
return cfg, fmt.Errorf("%s.%s requires %s.%s to be set", ioFieldSchemaEvolution, ioFieldSchemaEvolutionRequireSchemaMetadata, ioFieldSchemaEvolution, ioFieldSchemaEvolutionSchemaMetadata)
495+
}
496+
}
497+
487498
return cfg, nil
488499
}
489500

internal/impl/iceberg/router.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ type SchemaEvolutionConfig struct {
5252
// NewColumnTypeMapping is an optional Bloblang mapping that can override inferred
5353
// or schema-metadata-derived column types during schema evolution.
5454
NewColumnTypeMapping *bloblang.Executor
55+
// RequireSchemaMetadata enables strict mode: when true, writing a numeric
56+
// value into a time-typed column without registered schema metadata is a
57+
// hard error rather than a silent fallback to bloblang's seconds-default.
58+
// Only meaningful when SchemaMetadata is also set.
59+
RequireSchemaMetadata bool
5560
}
5661

5762
const maxSchemaEvolutionRetries = 10
@@ -666,7 +671,7 @@ func (r *Router) createWriter(ctx context.Context, key tableKey) (*writer, error
666671
// Create writer with its own table reference and the committer.
667672
// The resolver is passed so the writer can use schema metadata to
668673
// interpret numeric inputs into time-typed columns at shredding time.
669-
w := NewWriter(writerTbl, comm, r.caseSensitive, r.writerOpts, r.resolver, r.logger)
674+
w := NewWriter(writerTbl, comm, r.caseSensitive, r.writerOpts, r.resolver, r.schemaEvoCfg.RequireSchemaMetadata, r.logger)
670675
r.logger.Debugf("Created writer for table %s.%s", key.namespace, key.table)
671676

672677
return w, nil

internal/impl/iceberg/shredder/shredder.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ type RecordShredder struct {
8787
// typed columns instead of guessing. nil entries fall back to the
8888
// pre-schema-metadata behavior — see [convertLeafValue].
8989
fieldCommons map[int]*schema.Common
90+
// strictTemporal causes [convertLeafValue] to refuse numeric inputs
91+
// into time-typed columns when no schema metadata has been
92+
// registered for that column. When false (the default), the value
93+
// converter falls back to [bloblang.ValueAsTimestamp]'s seconds
94+
// default — convenient but silently wrong if the upstream produced
95+
// a different unit. When true, the writer fails the batch loudly.
96+
strictTemporal bool
9097
}
9198

9299
// NewRecordShredder creates a new shredder for the given schema.
@@ -100,6 +107,22 @@ func NewRecordShredder(schema *iceberg.Schema, caseSensitive bool) *RecordShredd
100107
}
101108
}
102109

110+
// SetStrictTemporalMode toggles whether numeric inputs into time-typed
111+
// columns require registered schema metadata. With strict mode on, a bare
112+
// int64 / float64 value reaching a TIMESTAMP / TIMESTAMPTZ / DATE / TIME
113+
// column with no [schema.Common] in the field map is rejected with a
114+
// per-field error rather than guessed-as-Unix-seconds.
115+
//
116+
// Strict mode has no effect on time.Time / time.Duration values, which
117+
// carry their own unit unambiguously, and no effect on non-time columns.
118+
//
119+
// Defaults to off (back-compat). Operators that cannot guarantee schema
120+
// metadata flows end-to-end can flip this on to fail loudly instead of
121+
// silently corrupting dates by ~50,000 years.
122+
func (rs *RecordShredder) SetStrictTemporalMode(on bool) {
123+
rs.strictTemporal = on
124+
}
125+
103126
// SetFieldSchemaMetadata supplies a field-ID → schema.Common map that the
104127
// leaf value converter consults when it sees a numeric input destined for a
105128
// time-typed Iceberg column (TIMESTAMP, TIMESTAMPTZ, TIMESTAMP_NS,
@@ -241,7 +264,7 @@ func (rs *RecordShredder) shredValue(
241264

242265
default:
243266
// Leaf/primitive type.
244-
pqVal, err := convertLeafValue(value, typ, rs.commonForField(fieldID))
267+
pqVal, err := convertLeafValue(value, typ, rs.commonForField(fieldID), rs.strictTemporal)
245268
if err != nil {
246269
return err
247270
}
@@ -355,7 +378,7 @@ func (rs *RecordShredder) shredMap(
355378

356379
// Shred the key. Map keys are always leaf primitives, never time
357380
// types, so the schema-metadata lookup never fires for them.
358-
keyVal, err := convertLeafValue(k, mapType.KeyType, nil)
381+
keyVal, err := convertLeafValue(k, mapType.KeyType, nil, false)
359382
if err != nil {
360383
return fmt.Errorf("map key: %w", err)
361384
}
@@ -430,7 +453,7 @@ func (rs *RecordShredder) shredNull(
430453
// time.Duration directly and treats bare numerics as already-in-the-target
431454
// unit (microseconds for time, seconds for timestamp via bloblang
432455
// ValueAsTimestamp's default — preserves the pre-PR behavior).
433-
func convertLeafValue(value any, typ iceberg.Type, common *schema.Common) (parquet.Value, error) {
456+
func convertLeafValue(value any, typ iceberg.Type, common *schema.Common, strictTemporal bool) (parquet.Value, error) {
434457
if value == nil {
435458
return parquet.NullValue(), nil
436459
}
@@ -472,25 +495,25 @@ func convertLeafValue(value any, typ iceberg.Type, common *schema.Common) (parqu
472495
return parquet.ByteArrayValue(v), err
473496

474497
case iceberg.DateType:
475-
return convertDate(value)
498+
return convertDate(value, common, strictTemporal)
476499

477500
case iceberg.TimeType:
478501
// Iceberg TIME is microseconds since midnight. Accept time.Duration
479502
// directly (the twmb/avro decode of time-millis/time-micros), and
480503
// fall back to numeric input interpreted via the schema-declared
481504
// unit when available.
482-
return convertTime(value, common)
505+
return convertTime(value, common, strictTemporal)
483506

484507
case iceberg.TimestampType, iceberg.TimestampTzType:
485508
// Iceberg TIMESTAMP / TIMESTAMPTZ are microseconds since epoch.
486509
// time.Time inputs are unambiguous and used directly. For numeric
487510
// inputs, prefer the schema's declared unit so that millis stays
488511
// millis (instead of being interpreted as seconds and landing in
489512
// year 56755).
490-
return convertTimestamp(value, common, false)
513+
return convertTimestamp(value, common, false, strictTemporal)
491514

492515
case iceberg.TimestampNsType, iceberg.TimestampTzNsType:
493-
return convertTimestamp(value, common, true)
516+
return convertTimestamp(value, common, true, strictTemporal)
494517

495518
case iceberg.UUIDType:
496519
switch v := value.(type) {

internal/impl/iceberg/shredder/shredder_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ func TestConvertLeafValueDecimal(t *testing.T) {
10551055

10561056
for _, tt := range tests {
10571057
t.Run(tt.name, func(t *testing.T) {
1058-
result, err := convertLeafValue(tt.value, dt, nil)
1058+
result, err := convertLeafValue(tt.value, dt, nil, false)
10591059
if tt.wantErr {
10601060
require.Error(t, err)
10611061
return
@@ -1069,7 +1069,7 @@ func TestConvertLeafValueDecimal(t *testing.T) {
10691069
func TestConvertLeafValueDecimalPrecision(t *testing.T) {
10701070
dt := iceberg.DecimalTypeOf(10, 2)
10711071

1072-
result, err := convertLeafValue(float64(123.45), dt, nil)
1072+
result, err := convertLeafValue(float64(123.45), dt, nil, false)
10731073
require.NoError(t, err)
10741074

10751075
b := result.ByteArray()
@@ -1085,7 +1085,7 @@ func TestConvertLeafValueDecimalPrecision(t *testing.T) {
10851085
func TestConvertLeafValueDecimalNegative(t *testing.T) {
10861086
dt := iceberg.DecimalTypeOf(10, 2)
10871087

1088-
result, err := convertLeafValue(float64(-123.45), dt, nil)
1088+
result, err := convertLeafValue(float64(-123.45), dt, nil, false)
10891089
require.NoError(t, err)
10901090

10911091
b := result.ByteArray()
@@ -1131,7 +1131,7 @@ func TestConvertLeafValueDecimalExactValues(t *testing.T) {
11311131

11321132
for _, tt := range tests {
11331133
t.Run(tt.name, func(t *testing.T) {
1134-
result, err := convertLeafValue(tt.value, dt, nil)
1134+
result, err := convertLeafValue(tt.value, dt, nil, false)
11351135
require.NoError(t, err)
11361136
assert.Equal(t, tt.wantUnscaled, decodeUnscaled(t, result))
11371137
})
@@ -1143,57 +1143,57 @@ func TestConvertLeafValueDecimalOverflow(t *testing.T) {
11431143
dt := iceberg.DecimalTypeOf(5, 2)
11441144

11451145
// 999.99 should succeed
1146-
_, err := convertLeafValue(float64(999.99), dt, nil)
1146+
_, err := convertLeafValue(float64(999.99), dt, nil, false)
11471147
require.NoError(t, err)
11481148

11491149
// 1000.00 exceeds precision — unscaled 100000 >= 10^5
1150-
_, err = convertLeafValue(float64(1000.00), dt, nil)
1150+
_, err = convertLeafValue(float64(1000.00), dt, nil, false)
11511151
require.Error(t, err)
11521152
assert.Contains(t, err.Error(), "exceeds decimal(5, 2) precision")
11531153

11541154
// Large negative should also fail
1155-
_, err = convertLeafValue(float64(-1000.00), dt, nil)
1155+
_, err = convertLeafValue(float64(-1000.00), dt, nil, false)
11561156
require.Error(t, err)
11571157
assert.Contains(t, err.Error(), "exceeds decimal(5, 2) precision")
11581158
}
11591159

11601160
func TestConvertLeafValueDecimalStringError(t *testing.T) {
11611161
dt := iceberg.DecimalTypeOf(10, 2)
11621162

1163-
_, err := convertLeafValue("not_a_number", dt, nil)
1163+
_, err := convertLeafValue("not_a_number", dt, nil, false)
11641164
require.Error(t, err)
11651165
assert.Contains(t, err.Error(), "cannot parse")
11661166
}
11671167

11681168
func TestConvertLeafValueDecimalNaNInf(t *testing.T) {
11691169
dt := iceberg.DecimalTypeOf(10, 2)
11701170

1171-
_, err := convertLeafValue(math.NaN(), dt, nil)
1171+
_, err := convertLeafValue(math.NaN(), dt, nil, false)
11721172
require.Error(t, err)
11731173
assert.Contains(t, err.Error(), "cannot convert")
11741174

1175-
_, err = convertLeafValue(math.Inf(1), dt, nil)
1175+
_, err = convertLeafValue(math.Inf(1), dt, nil, false)
11761176
require.Error(t, err)
11771177
assert.Contains(t, err.Error(), "cannot convert")
11781178

1179-
_, err = convertLeafValue(math.Inf(-1), dt, nil)
1179+
_, err = convertLeafValue(math.Inf(-1), dt, nil, false)
11801180
require.Error(t, err)
11811181
assert.Contains(t, err.Error(), "cannot convert")
11821182

1183-
_, err = convertLeafValue(float32(math.NaN()), dt, nil)
1183+
_, err = convertLeafValue(float32(math.NaN()), dt, nil, false)
11841184
require.Error(t, err)
11851185

1186-
_, err = convertLeafValue(float32(math.Inf(1)), dt, nil)
1186+
_, err = convertLeafValue(float32(math.Inf(1)), dt, nil, false)
11871187
require.Error(t, err)
11881188
}
11891189

11901190
func TestConvertLeafValueUint64Overflow(t *testing.T) {
1191-
_, err := convertLeafValue(uint64(math.MaxInt64+1), iceberg.Int64Type{}, nil)
1191+
_, err := convertLeafValue(uint64(math.MaxInt64+1), iceberg.Int64Type{}, nil, false)
11921192
require.Error(t, err)
11931193
assert.Contains(t, err.Error(), "exceeds int64 range")
11941194

11951195
// Value within range should succeed
1196-
_, err = convertLeafValue(uint64(math.MaxInt64), iceberg.Int64Type{}, nil)
1196+
_, err = convertLeafValue(uint64(math.MaxInt64), iceberg.Int64Type{}, nil, false)
11971197
require.NoError(t, err)
11981198
}
11991199

0 commit comments

Comments
 (0)