From 96778d7e5fcb7ec83d1159c1bc498ad2f25f4888 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sabri=20Karag=C3=B6nen?= Date: Wed, 11 Jun 2025 02:49:16 +0200 Subject: [PATCH] Add partition filter option to DDL --- pkg/bigquery/materialization.go | 17 ++++++++++- pkg/bigquery/materialization_test.go | 30 +++++++++++++++++++ pkg/pipeline/pipeline.go | 13 ++++++++ .../pipeline/first-pipeline_unix.json | 12 +++++--- .../pipeline/first-pipeline_windows.json | 12 +++++--- .../pipeline/second-pipeline_unix.json | 9 ++++-- .../pipeline/second-pipeline_windows.json | 3 ++ pkg/pipeline/yaml.go | 6 ++++ 8 files changed, 90 insertions(+), 12 deletions(-) diff --git a/pkg/bigquery/materialization.go b/pkg/bigquery/materialization.go index b13324be6..0751efb7a 100644 --- a/pkg/bigquery/materialization.go +++ b/pkg/bigquery/materialization.go @@ -145,7 +145,16 @@ func buildCreateReplaceQuery(asset *pipeline.Asset, query string) (string, error clusterByClause = "CLUSTER BY " + strings.Join(mat.ClusterBy, ", ") } - return fmt.Sprintf("CREATE OR REPLACE TABLE %s %s %s AS\n%s", asset.Name, partitionClause, clusterByClause, query), nil + optionsClause := "" + if asset.BigQuery.RequirePartitionFilter { + optionsClause = "OPTIONS (require_partition_filter = TRUE)" + } + + if optionsClause == "" { + return fmt.Sprintf("CREATE OR REPLACE TABLE %s %s %s AS\n%s", asset.Name, partitionClause, clusterByClause, query), nil + } + + return fmt.Sprintf("CREATE OR REPLACE TABLE %s %s %s %s AS\n%s", asset.Name, partitionClause, clusterByClause, optionsClause, query), nil } func buildTimeIntervalQuery(asset *pipeline.Asset, query string) (string, error) { @@ -216,5 +225,11 @@ func BuildDDLQuery(asset *pipeline.Asset, query string) (string, error) { q += "\nCLUSTER BY " + strings.Join(asset.Materialization.ClusterBy, ", ") } + // When enabled, propagate the require_partition_filter option to the + // DDL statement so BigQuery enforces filters on partitioned tables. + if asset.BigQuery.RequirePartitionFilter { + q += "\nOPTIONS (\n require_partition_filter = TRUE\n)" + } + return q, nil } diff --git a/pkg/bigquery/materialization_test.go b/pkg/bigquery/materialization_test.go index a162417a9..30a622402 100644 --- a/pkg/bigquery/materialization_test.go +++ b/pkg/bigquery/materialization_test.go @@ -100,6 +100,20 @@ func TestMaterializer_Render(t *testing.T) { query: "SELECT 1", want: "CREATE OR REPLACE TABLE my.asset PARTITION BY dt CLUSTER BY event_type, event_name AS\nSELECT 1", }, + { + name: "materialize to a table with require partition filter", + task: &pipeline.Asset{ + Name: "my.asset", + Materialization: pipeline.Materialization{ + Type: pipeline.MaterializationTypeTable, + Strategy: pipeline.MaterializationStrategyCreateReplace, + PartitionBy: "dt", + }, + BigQuery: pipeline.BigQueryConfig{RequirePartitionFilter: true}, + }, + query: "SELECT 1", + want: "CREATE OR REPLACE TABLE my.asset PARTITION BY dt OPTIONS (require_partition_filter = TRUE) AS\nSELECT 1", + }, { name: "materialize to a table with append", task: &pipeline.Asset{ @@ -470,6 +484,22 @@ func TestBuildDDLQuery(t *testing.T) { }, want: "CREATE TABLE IF NOT EXISTS my_table_with_multiple_pks (\n id INT64,\n category STRING,\n name STRING OPTIONS(description=\"The name of the person\"),\n PRIMARY KEY (id, category) NOT ENFORCED\n)", }, + { + name: "table with partition filter option", + asset: &pipeline.Asset{ + Name: "my_partitioned_table_opts", + Columns: []pipeline.Column{ + {Name: "id", Type: "INT64"}, + {Name: "created_at", Type: "DATE"}, + }, + Materialization: pipeline.Materialization{ + Type: pipeline.MaterializationTypeTable, + PartitionBy: "created_at", + }, + BigQuery: pipeline.BigQueryConfig{RequirePartitionFilter: true}, + }, + want: "CREATE TABLE IF NOT EXISTS my_partitioned_table_opts (\n id INT64,\n created_at DATE\n)\nPARTITION BY created_at\nOPTIONS (\n require_partition_filter = TRUE\n)", + }, } for _, tt := range tests { diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 7d4781eed..51b3cfe9f 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -626,6 +626,18 @@ func (s AthenaConfig) MarshalJSON() ([]byte, error) { return json.Marshal(s) } +type BigQueryConfig struct { + RequirePartitionFilter bool `json:"require_partition_filter"` +} + +func (b BigQueryConfig) MarshalJSON() ([]byte, error) { + if !b.RequirePartitionFilter { + return []byte("null"), nil + } + + return json.Marshal(b) +} + type Asset struct { //nolint:recvcheck ID string `json:"id" yaml:"-" mapstructure:"-"` URI string `json:"uri" yaml:"uri,omitempty" mapstructure:"uri"` @@ -649,6 +661,7 @@ type Asset struct { //nolint:recvcheck Metadata EmptyStringMap `json:"metadata" yaml:"metadata,omitempty" mapstructure:"metadata"` Snowflake SnowflakeConfig `json:"snowflake" yaml:"snowflake,omitempty" mapstructure:"snowflake"` Athena AthenaConfig `json:"athena" yaml:"athena,omitempty" mapstructure:"athena"` + BigQuery BigQueryConfig `json:"bigquery" yaml:"bigquery,omitempty" mapstructure:"bigquery"` IntervalModifiers IntervalModifiers `json:"interval_modifiers" yaml:"interval_modifiers,omitempty" mapstructure:"interval_modifiers"` upstream []*Asset diff --git a/pkg/pipeline/testdata/pipeline/first-pipeline_unix.json b/pkg/pipeline/testdata/pipeline/first-pipeline_unix.json index 36a34dc35..04dcab60f 100644 --- a/pkg/pipeline/testdata/pipeline/first-pipeline_unix.json +++ b/pkg/pipeline/testdata/pipeline/first-pipeline_unix.json @@ -56,7 +56,8 @@ "custom_checks": [], "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null }, { "id": "c69409a1840ddb3639a4acbaaec46c238c63b6431cc74ee5254b6dcef7b88c4b", @@ -93,7 +94,8 @@ "custom_checks": [], "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null }, { "id": "21f2fa1b09d584a6b4fe30cd82b4540b769fd777da7c547353386e2930291ef9", @@ -167,7 +169,8 @@ "custom_checks": [], "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null }, { "id": "5812ba61bb0f08ce192bf074c9de21c19355e08cd52e75d008bbff59e5729e5b", @@ -240,7 +243,8 @@ "custom_checks": [], "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null } ], "notifications": { diff --git a/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json b/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json index df252dd45..1839d40cd 100644 --- a/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json +++ b/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json @@ -59,7 +59,8 @@ "owner": "", "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null }, { "id": "c69409a1840ddb3639a4acbaaec46c238c63b6431cc74ee5254b6dcef7b88c4b", @@ -96,7 +97,8 @@ "owner": "", "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null }, { "id": "21f2fa1b09d584a6b4fe30cd82b4540b769fd777da7c547353386e2930291ef9", @@ -170,7 +172,8 @@ "owner": "", "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null }, { "id": "5812ba61bb0f08ce192bf074c9de21c19355e08cd52e75d008bbff59e5729e5b", @@ -243,7 +246,8 @@ "owner": "", "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null } ], "notifications": { diff --git a/pkg/pipeline/testdata/pipeline/second-pipeline_unix.json b/pkg/pipeline/testdata/pipeline/second-pipeline_unix.json index 5fc0d502a..7087bd62a 100644 --- a/pkg/pipeline/testdata/pipeline/second-pipeline_unix.json +++ b/pkg/pipeline/testdata/pipeline/second-pipeline_unix.json @@ -50,7 +50,8 @@ "custom_checks": [], "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null }, { "id": "a01e7580b118b5fbbdc1f7c8de6b8c377c684727e4e8ad574e9153a3dbd46dd1", @@ -87,7 +88,8 @@ "custom_checks": [], "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null }, { "id": "21f2fa1b09d584a6b4fe30cd82b4540b769fd777da7c547353386e2930291ef9", @@ -235,7 +237,8 @@ ], "metadata": {}, "snowflake": null, - "athena": null + "athena": null, + "bigquery": null } ], "notifications": { diff --git a/pkg/pipeline/testdata/pipeline/second-pipeline_windows.json b/pkg/pipeline/testdata/pipeline/second-pipeline_windows.json index 5368926a5..e9aff0540 100644 --- a/pkg/pipeline/testdata/pipeline/second-pipeline_windows.json +++ b/pkg/pipeline/testdata/pipeline/second-pipeline_windows.json @@ -44,6 +44,7 @@ "connection": "", "secrets": [], "athena": null, + "bigquery": null, "upstreams": [], "materialization": null, "interval_modifiers": null, @@ -82,6 +83,7 @@ "secrets": [], "upstreams": [], "athena": null, + "bigquery": null, "materialization": null, "interval_modifiers": null, "columns": [], @@ -130,6 +132,7 @@ } ], "athena": null, + "bigquery": null, "upstreams": [ { "type": "asset", diff --git a/pkg/pipeline/yaml.go b/pkg/pipeline/yaml.go index 08ded2f3b..cc0538fb4 100644 --- a/pkg/pipeline/yaml.go +++ b/pkg/pipeline/yaml.go @@ -298,6 +298,10 @@ type athena struct { QueryResultsPath string `yaml:"query_results_path"` } +type bigquery struct { + RequirePartitionFilter bool `yaml:"require_partition_filter"` +} + type taskDefinition struct { Name string `yaml:"name"` URI string `yaml:"uri"` @@ -319,6 +323,7 @@ type taskDefinition struct { Tags []string `yaml:"tags"` Snowflake snowflake `yaml:"snowflake"` Athena athena `yaml:"athena"` + BigQuery bigquery `yaml:"bigquery"` IntervalModifiers IntervalModifiers `yaml:"interval_modifiers"` } @@ -486,6 +491,7 @@ func ConvertYamlToTask(content []byte) (*Asset, error) { CustomChecks: make([]CustomCheck, len(definition.CustomChecks)), Snowflake: SnowflakeConfig{Warehouse: definition.Snowflake.Warehouse}, Athena: AthenaConfig{Location: definition.Athena.QueryResultsPath}, + BigQuery: BigQueryConfig{RequirePartitionFilter: definition.BigQuery.RequirePartitionFilter}, IntervalModifiers: definition.IntervalModifiers, }