|
67 | 67 | DEFAULT_SCHEMA_ID = 0 |
68 | 68 |
|
69 | 69 | SUPPORTED_TABLE_FORMAT_VERSION = 2 |
| 70 | +ONE_MINUTE_MS = 60_000 |
70 | 71 |
|
71 | 72 |
|
72 | 73 | def cleanup_snapshot_id(data: dict[str, Any]) -> dict[str, Any]: |
@@ -125,6 +126,29 @@ def construct_refs(table_metadata: TableMetadata) -> TableMetadata: |
125 | 126 | return table_metadata |
126 | 127 |
|
127 | 128 |
|
| 129 | +def check_snapshot_timestamps(table_metadata: TableMetadata) -> TableMetadata: |
| 130 | + """Validate snapshot and snapshot-log timestamps with small clock skew tolerance.""" |
| 131 | + last_snapshot_log_entry: SnapshotLogEntry | None = None |
| 132 | + for snapshot_log_entry in table_metadata.snapshot_log: |
| 133 | + if ( |
| 134 | + last_snapshot_log_entry is not None |
| 135 | + and snapshot_log_entry.timestamp_ms - last_snapshot_log_entry.timestamp_ms < -ONE_MINUTE_MS |
| 136 | + ): |
| 137 | + raise ValidationError("[BUG] Expected sorted snapshot log entries.") |
| 138 | + last_snapshot_log_entry = snapshot_log_entry |
| 139 | + |
| 140 | + if ( |
| 141 | + last_snapshot_log_entry is not None |
| 142 | + and table_metadata.last_updated_ms - last_snapshot_log_entry.timestamp_ms < -ONE_MINUTE_MS |
| 143 | + ): |
| 144 | + raise ValidationError( |
| 145 | + f"Invalid update timestamp {table_metadata.last_updated_ms}: " |
| 146 | + f"before last snapshot log entry at {last_snapshot_log_entry.timestamp_ms}" |
| 147 | + ) |
| 148 | + |
| 149 | + return table_metadata |
| 150 | + |
| 151 | + |
128 | 152 | class TableMetadataCommonFields(IcebergBaseModel): |
129 | 153 | """Metadata for an Iceberg table as specified in the Apache Iceberg spec. |
130 | 154 |
|
@@ -378,6 +402,10 @@ def cleanup_snapshot_id(cls, data: dict[str, Any]) -> dict[str, Any]: |
378 | 402 | def construct_refs(self) -> TableMetadataV1: |
379 | 403 | return construct_refs(self) |
380 | 404 |
|
| 405 | + @model_validator(mode="after") |
| 406 | + def check_snapshot_timestamps(self) -> TableMetadata: |
| 407 | + return check_snapshot_timestamps(self) |
| 408 | + |
381 | 409 | @model_validator(mode="before") |
382 | 410 | def set_v2_compatible_defaults(cls, data: dict[str, Any]) -> dict[str, Any]: |
383 | 411 | """Set default values to be compatible with the format v2. |
@@ -519,6 +547,10 @@ def check_sort_orders(self) -> TableMetadata: |
519 | 547 | def construct_refs(self) -> TableMetadata: |
520 | 548 | return construct_refs(self) |
521 | 549 |
|
| 550 | + @model_validator(mode="after") |
| 551 | + def check_snapshot_timestamps(self) -> TableMetadata: |
| 552 | + return check_snapshot_timestamps(self) |
| 553 | + |
522 | 554 | format_version: Literal[2] = Field(alias="format-version", default=2) |
523 | 555 | """An integer version number for the format. Implementations must throw |
524 | 556 | an exception if a table’s version is higher than the supported version.""" |
@@ -563,6 +595,10 @@ def check_sort_orders(self) -> TableMetadata: |
563 | 595 | def construct_refs(self) -> TableMetadata: |
564 | 596 | return construct_refs(self) |
565 | 597 |
|
| 598 | + @model_validator(mode="after") |
| 599 | + def check_snapshot_timestamps(self) -> TableMetadata: |
| 600 | + return check_snapshot_timestamps(self) |
| 601 | + |
566 | 602 | format_version: Literal[3] = Field(alias="format-version", default=3) |
567 | 603 | """An integer version number for the format. Implementations must throw |
568 | 604 | an exception if a table’s version is higher than the supported version.""" |
|
0 commit comments