Skip to content

Commit 95fde1e

Browse files
authored
Fix: add pseudo columns for ingest time partitioned tables (#1033)
* fix: add pseudo columns for ingest time partitioned tables * feat: explicit pseudocolumn flag in method interface * style: run make style * chore: make pseudocolumn inclusion opt-in
1 parent 2c9a5ed commit 95fde1e

4 files changed

Lines changed: 21 additions & 5 deletions

File tree

sqlmesh/core/engine_adapter/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,9 @@ def drop_view(self, view_name: TableName, ignore_if_not_exists: bool = True) ->
462462
exp.Drop(this=exp.to_table(view_name), exists=ignore_if_not_exists, kind="VIEW")
463463
)
464464

465-
def columns(self, table_name: TableName) -> t.Dict[str, exp.DataType]:
465+
def columns(
466+
self, table_name: TableName, include_pseudo_columns: bool = False
467+
) -> t.Dict[str, exp.DataType]:
466468
"""Fetches column names and types for the target table."""
467469
self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
468470
describe_output = self.cursor.fetchall()

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ class BasePostgresEngineAdapter(EngineAdapter):
2121
COLUMNS_TABLE = "information_schema.columns"
2222
SUPPORTS_MATERIALIZED_VIEWS = True
2323

24-
def columns(self, table_name: TableName) -> t.Dict[str, exp.DataType]:
24+
def columns(
25+
self, table_name: TableName, include_pseudo_columns: bool = False
26+
) -> t.Dict[str, exp.DataType]:
2527
"""Fetches column names and types for the target table."""
2628
table = exp.to_table(table_name)
2729
sql = (

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,22 @@ def create_schema(self, schema_name: str, ignore_if_exists: bool = True) -> None
9999
return
100100
raise e
101101

102-
def columns(self, table_name: TableName) -> t.Dict[str, exp.DataType]:
102+
def columns(
103+
self, table_name: TableName, include_pseudo_columns: bool = False
104+
) -> t.Dict[str, exp.DataType]:
103105
"""Fetches column names and types for the target table."""
106+
from google.cloud.bigquery import TimePartitioningType
107+
104108
table = self._get_table(table_name)
105-
return {
109+
columns = {
106110
field.name: exp.DataType.build(field.field_type, dialect=self.dialect)
107111
for field in table.schema
108112
}
113+
if include_pseudo_columns and table.time_partitioning and not table.time_partitioning.field:
114+
columns["_PARTITIONTIME"] = exp.DataType.build("TIMESTAMP")
115+
if table.time_partitioning.type_ == TimePartitioningType.DAY:
116+
columns["_PARTITIONDATE"] = exp.DataType.build("DATE")
117+
return columns
109118

110119
def fetchone(
111120
self,

sqlmesh/core/schema_loader.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ def create_schema_file(
5454
"columns": {c: t.sql(dialect=dialect) for c, t in columns.items()},
5555
}
5656
for table, columns in sorted(
57-
pool.map(lambda table: (table, adapter.columns(table)), external_tables)
57+
pool.map(
58+
lambda table: (table, adapter.columns(table, include_pseudo_columns=True)),
59+
external_tables,
60+
)
5861
)
5962
]
6063

0 commit comments

Comments
 (0)