Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pkg/bigquery/materialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,16 @@ func buildCreateReplaceQuery(asset *pipeline.Asset, query string) (string, error
clusterByClause = "CLUSTER BY " + strings.Join(mat.ClusterBy, ", ")
}

return fmt.Sprintf("CREATE OR REPLACE TABLE %s %s %s AS\n%s", asset.Name, partitionClause, clusterByClause, query), nil
optionsClause := ""
if asset.BigQuery.RequirePartitionFilter {
optionsClause = "OPTIONS (require_partition_filter = TRUE)"
}

if optionsClause == "" {
return fmt.Sprintf("CREATE OR REPLACE TABLE %s %s %s AS\n%s", asset.Name, partitionClause, clusterByClause, query), nil
}

return fmt.Sprintf("CREATE OR REPLACE TABLE %s %s %s %s AS\n%s", asset.Name, partitionClause, clusterByClause, optionsClause, query), nil
}

func buildTimeIntervalQuery(asset *pipeline.Asset, query string) (string, error) {
Expand Down Expand Up @@ -216,5 +225,11 @@ func BuildDDLQuery(asset *pipeline.Asset, query string) (string, error) {
q += "\nCLUSTER BY " + strings.Join(asset.Materialization.ClusterBy, ", ")
}

// When enabled, propagate the require_partition_filter option to the
// DDL statement so BigQuery enforces filters on partitioned tables.
if asset.BigQuery.RequirePartitionFilter {
q += "\nOPTIONS (\n require_partition_filter = TRUE\n)"
}

return q, nil
}
30 changes: 30 additions & 0 deletions pkg/bigquery/materialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ func TestMaterializer_Render(t *testing.T) {
query: "SELECT 1",
want: "CREATE OR REPLACE TABLE my.asset PARTITION BY dt CLUSTER BY event_type, event_name AS\nSELECT 1",
},
{
name: "materialize to a table with require partition filter",
task: &pipeline.Asset{
Name: "my.asset",
Materialization: pipeline.Materialization{
Type: pipeline.MaterializationTypeTable,
Strategy: pipeline.MaterializationStrategyCreateReplace,
PartitionBy: "dt",
},
BigQuery: pipeline.BigQueryConfig{RequirePartitionFilter: true},
},
query: "SELECT 1",
want: "CREATE OR REPLACE TABLE my.asset PARTITION BY dt OPTIONS (require_partition_filter = TRUE) AS\nSELECT 1",
},
{
name: "materialize to a table with append",
task: &pipeline.Asset{
Expand Down Expand Up @@ -470,6 +484,22 @@ func TestBuildDDLQuery(t *testing.T) {
},
want: "CREATE TABLE IF NOT EXISTS my_table_with_multiple_pks (\n id INT64,\n category STRING,\n name STRING OPTIONS(description=\"The name of the person\"),\n PRIMARY KEY (id, category) NOT ENFORCED\n)",
},
{
name: "table with partition filter option",
asset: &pipeline.Asset{
Name: "my_partitioned_table_opts",
Columns: []pipeline.Column{
{Name: "id", Type: "INT64"},
{Name: "created_at", Type: "DATE"},
},
Materialization: pipeline.Materialization{
Type: pipeline.MaterializationTypeTable,
PartitionBy: "created_at",
},
BigQuery: pipeline.BigQueryConfig{RequirePartitionFilter: true},
},
want: "CREATE TABLE IF NOT EXISTS my_partitioned_table_opts (\n id INT64,\n created_at DATE\n)\nPARTITION BY created_at\nOPTIONS (\n require_partition_filter = TRUE\n)",
},
}

for _, tt := range tests {
Expand Down
13 changes: 13 additions & 0 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,18 @@ func (s AthenaConfig) MarshalJSON() ([]byte, error) {
return json.Marshal(s)
}

type BigQueryConfig struct {
RequirePartitionFilter bool `json:"require_partition_filter"`
}

func (b BigQueryConfig) MarshalJSON() ([]byte, error) {
if !b.RequirePartitionFilter {
return []byte("null"), nil
}

return json.Marshal(b)
}

type Asset struct { //nolint:recvcheck
ID string `json:"id" yaml:"-" mapstructure:"-"`
URI string `json:"uri" yaml:"uri,omitempty" mapstructure:"uri"`
Expand All @@ -649,6 +661,7 @@ type Asset struct { //nolint:recvcheck
Metadata EmptyStringMap `json:"metadata" yaml:"metadata,omitempty" mapstructure:"metadata"`
Snowflake SnowflakeConfig `json:"snowflake" yaml:"snowflake,omitempty" mapstructure:"snowflake"`
Athena AthenaConfig `json:"athena" yaml:"athena,omitempty" mapstructure:"athena"`
BigQuery BigQueryConfig `json:"bigquery" yaml:"bigquery,omitempty" mapstructure:"bigquery"`
IntervalModifiers IntervalModifiers `json:"interval_modifiers" yaml:"interval_modifiers,omitempty" mapstructure:"interval_modifiers"`

upstream []*Asset
Expand Down
12 changes: 8 additions & 4 deletions pkg/pipeline/testdata/pipeline/first-pipeline_unix.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
"custom_checks": [],
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
},
{
"id": "c69409a1840ddb3639a4acbaaec46c238c63b6431cc74ee5254b6dcef7b88c4b",
Expand Down Expand Up @@ -93,7 +94,8 @@
"custom_checks": [],
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
},
{
"id": "21f2fa1b09d584a6b4fe30cd82b4540b769fd777da7c547353386e2930291ef9",
Expand Down Expand Up @@ -167,7 +169,8 @@
"custom_checks": [],
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
},
{
"id": "5812ba61bb0f08ce192bf074c9de21c19355e08cd52e75d008bbff59e5729e5b",
Expand Down Expand Up @@ -240,7 +243,8 @@
"custom_checks": [],
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
}
],
"notifications": {
Expand Down
12 changes: 8 additions & 4 deletions pkg/pipeline/testdata/pipeline/first-pipeline_windows.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
"owner": "",
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
},
{
"id": "c69409a1840ddb3639a4acbaaec46c238c63b6431cc74ee5254b6dcef7b88c4b",
Expand Down Expand Up @@ -96,7 +97,8 @@
"owner": "",
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
},
{
"id": "21f2fa1b09d584a6b4fe30cd82b4540b769fd777da7c547353386e2930291ef9",
Expand Down Expand Up @@ -170,7 +172,8 @@
"owner": "",
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
},
{
"id": "5812ba61bb0f08ce192bf074c9de21c19355e08cd52e75d008bbff59e5729e5b",
Expand Down Expand Up @@ -243,7 +246,8 @@
"owner": "",
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
}
],
"notifications": {
Expand Down
9 changes: 6 additions & 3 deletions pkg/pipeline/testdata/pipeline/second-pipeline_unix.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
"custom_checks": [],
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
},
{
"id": "a01e7580b118b5fbbdc1f7c8de6b8c377c684727e4e8ad574e9153a3dbd46dd1",
Expand Down Expand Up @@ -87,7 +88,8 @@
"custom_checks": [],
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
},
{
"id": "21f2fa1b09d584a6b4fe30cd82b4540b769fd777da7c547353386e2930291ef9",
Expand Down Expand Up @@ -235,7 +237,8 @@
],
"metadata": {},
"snowflake": null,
"athena": null
"athena": null,
"bigquery": null
}
],
"notifications": {
Expand Down
3 changes: 3 additions & 0 deletions pkg/pipeline/testdata/pipeline/second-pipeline_windows.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"connection": "",
"secrets": [],
"athena": null,
"bigquery": null,
"upstreams": [],
"materialization": null,
"interval_modifiers": null,
Expand Down Expand Up @@ -82,6 +83,7 @@
"secrets": [],
"upstreams": [],
"athena": null,
"bigquery": null,
"materialization": null,
"interval_modifiers": null,
"columns": [],
Expand Down Expand Up @@ -130,6 +132,7 @@
}
],
"athena": null,
"bigquery": null,
"upstreams": [
{
"type": "asset",
Expand Down
6 changes: 6 additions & 0 deletions pkg/pipeline/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ type athena struct {
QueryResultsPath string `yaml:"query_results_path"`
}

type bigquery struct {
RequirePartitionFilter bool `yaml:"require_partition_filter"`
}

type taskDefinition struct {
Name string `yaml:"name"`
URI string `yaml:"uri"`
Expand All @@ -319,6 +323,7 @@ type taskDefinition struct {
Tags []string `yaml:"tags"`
Snowflake snowflake `yaml:"snowflake"`
Athena athena `yaml:"athena"`
BigQuery bigquery `yaml:"bigquery"`
IntervalModifiers IntervalModifiers `yaml:"interval_modifiers"`
}

Expand Down Expand Up @@ -486,6 +491,7 @@ func ConvertYamlToTask(content []byte) (*Asset, error) {
CustomChecks: make([]CustomCheck, len(definition.CustomChecks)),
Snowflake: SnowflakeConfig{Warehouse: definition.Snowflake.Warehouse},
Athena: AthenaConfig{Location: definition.Athena.QueryResultsPath},
BigQuery: BigQueryConfig{RequirePartitionFilter: definition.BigQuery.RequirePartitionFilter},
IntervalModifiers: definition.IntervalModifiers,
}

Expand Down
Loading