Skip to content
This repository was archived by the owner on Sep 16, 2025. It is now read-only.

Commit 9516cde

Browse files
navadotanyshak
andauthored
Support more sources and targets (#14)
* added target options to write to Snowflake target * addes support for ingestion job to write to Snowflake target #10 * added support for merge/insert to load to other targets #11 * override get strategi to make config options insensitive * improved error message for unsupported types #7 * added type dict options support * added add_missing_columns option to snowflake transformation options * added target_type to the models according to the new syntax * changed column_transformations data type * made default Datalake for target_type * corrected typo in options, made made target_connection required * v0.2.2 --------- Co-authored-by: Tanya Shemet <tanyshak@gmail.com>
1 parent ce69d21 commit 9516cde

14 files changed

Lines changed: 163 additions & 81 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version = "0.2.1"
1+
version = "0.2.2"

dbt/adapters/upsolver/impl.py

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
from dbt.adapters.upsolver.options.copy_options import Copy_options
88
from dbt.adapters.upsolver.options.connection_options import Connection_options
99
from dbt.adapters.upsolver.options.transformation_options import Transformation_options
10-
from dbt.adapters.upsolver.options.table_options import Table_options
11-
from dbt.adapters.upsolver.options.materialized_view_options import Materialized_view_options
10+
from dbt.adapters.upsolver.options.target_options import Target_options
1211
import agate
1312
import datetime
1413
import re
@@ -50,7 +49,7 @@ def get_connection_from_sql(self, sql):
5049
.translate(str.maketrans({'\"':'', '\'':''}))
5150
return connection_identifier
5251
except Exception:
53-
raise dbt.exceptions.ParsingError(f"Error while parsing connection name from sql:\n{sql}")
52+
raise dbt.exceptions.ParsingError(f"Error while parsing connection name from sql: {sql}")
5453

5554
@available
5655
def get_columns_names_with_types(self, list_dict):
@@ -73,6 +72,31 @@ def separate_options(self, config_options, source):
7372
source_options = self.enrich_options(config_options, source, 'source_options')
7473
return job_options, source_options
7574

75+
def render_option_from_dict(self, option_value):
76+
res = []
77+
try:
78+
for key, value in option_value.items():
79+
item = [f'{key}=']
80+
if isinstance(value, list):
81+
item.append('(')
82+
item.append(' ,'.join(value))
83+
item.append(')')
84+
else:
85+
item.append(value)
86+
res.append(''.join(item))
87+
return f"({' ,'.join(res)})"
88+
except Exception:
89+
raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}")
90+
91+
def render_option_from_list(self, option_value):
92+
try:
93+
if not isinstance(option_value, str):
94+
return tuple(i for i in option_value)
95+
else:
96+
return f"('{option_value}')"
97+
except Exception:
98+
raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}")
99+
76100
@available
77101
def enrich_options(self, config_options, source, options_type):
78102
options = self.get_options(source, options_type)
@@ -81,10 +105,9 @@ def enrich_options(self, config_options, source, options_type):
81105
find_value = options.get(option.lower(), None)
82106
if find_value:
83107
if options[option.lower()]['type'] == 'list':
84-
if not isinstance(value, str):
85-
value = tuple(i for i in value)
86-
else:
87-
value = f"('{value}')"
108+
value = self.render_option_from_list(value)
109+
elif options[option.lower()]['type'] == 'dict':
110+
value = self.render_option_from_dict(value)
88111
enriched_options[option] = find_value
89112
enriched_options[option]['value'] = value
90113
else:
@@ -96,18 +119,35 @@ def filter_options(self, options, parametr):
96119
editable = {key:val for key, val in options.items() if val[parametr] == True}
97120
return editable
98121

99-
def get_options(self, source, options_type):
100-
if options_type == 'connection_options':
101-
options = Connection_options[source.lower()]
102-
elif options_type == 'transformation_options':
103-
options = Transformation_options[source.lower()]
104-
elif options_type == 'table_options':
105-
options = Table_options
106-
elif options_type == 'materialized_view_options':
107-
options = Materialized_view_options
122+
@available
123+
def get(self, config, key, default=None):
124+
config = {k.lower(): v for k, v in config.items()}
125+
value = config.get(key, default)
126+
return value
127+
128+
@available
129+
def require(self, config, key):
130+
config = {k.lower(): v for k, v in config.items()}
131+
value = config.get(key, None)
132+
if value:
133+
return value
108134
else:
109-
options = Copy_options[source.lower()][options_type]
110-
return options
135+
raise dbt.exceptions.ParsingError(f"Required option is missing: {key}")
136+
137+
138+
def get_options(self, source, options_type):
139+
try:
140+
if options_type == 'connection_options':
141+
options = Connection_options[source.lower()]
142+
elif options_type == 'transformation_options':
143+
options = Transformation_options[source.lower()]
144+
elif options_type == 'target_options':
145+
options = Target_options[source.lower()]
146+
else:
147+
options = Copy_options[source.lower()][options_type]
148+
return options
149+
except Exception:
150+
raise dbt.exceptions.ParsingError(f"Undefined option value: {source}")
111151

112152
def list_relations_without_caching(
113153
self,

dbt/adapters/upsolver/options/materialized_view_options.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

dbt/adapters/upsolver/options/table_options.py

Lines changed: 0 additions & 13 deletions
This file was deleted.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
Target_options = {
2+
"datalake": {
3+
"globally_unique_keys": {"type": "boolean", "editable": False, "optional": True},
4+
"storage_connection": {"type": "identifier", "editable": False, "optional": True},
5+
"storage_location": {"type": "text", "editable": False, "optional": True},
6+
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
7+
"compression": {"type": "value", "editable": True, "optional": True},
8+
"compaction_processes": {"type": "integer", "editable": True, "optional": True},
9+
"disable_compaction": {"type": "boolean", "editable": True, "optional": True},
10+
"retention_date_partition": {"type": "text", "editable": True, "optional": True},
11+
"table_data_retention": {"type": "text", "editable": True, "optional": True},
12+
"column_data_retention": {"type": "text", "editable": True, "optional": True},
13+
"comment": {"type": "text", "editable": True, "optional": True}
14+
},
15+
"materialized_view": {
16+
"storage_connection": {"type": "identifier", "editable": False, "optional": True},
17+
"storage_location": {"type": "text", "editable": False, "optional": True},
18+
"max_time_travel_duration": {"type": "integer", "editable": True, "optional": True},
19+
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
20+
},
21+
"snowflake": {
22+
"column_transformations": {"type": "dict", "editable": False, "optional": True},
23+
"deduplicate_with": {"type": "dict", "editable": False, "optional": True},
24+
"exclude_columns": {"type": "list", "editable": False, "optional": True},
25+
"create_table_if_missing": {"type": "boolean", "editable": False, "optional": True},
26+
"write_interval": {"type": "integer", "editable": False, "optional": True},
27+
}
28+
}

dbt/adapters/upsolver/options/transformation_options.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"output_offset": {"type": "integer", "editable": False, "optional": True},
1414
"location": {"type": "text", "editable": False, "optional": True}
1515
},
16-
'elasticsearch': {
16+
"elasticsearch": {
1717
"run_interval": {"type": "ineger", "editable": False, "optional": True},
1818
"start_from": {"type": "value", "editable": False, "optional": True},
1919
"end_at": {"type": "value", "editable": True, "optional": True},
@@ -25,7 +25,8 @@
2525
"index_partition_size": {"type": "value", "editable": True, "optional": True},
2626
"comment": {"type": "text", "editable": True, "optional": True}
2727
},
28-
'snowflake': {
28+
"snowflake": {
29+
"add_missing_columns": {"type": "boolean", "editable": False, "optional": True},
2930
"run_interval": {"type": "ineger", "editable": False, "optional": True},
3031
"start_from": {"type": "value", "editable": False, "optional": True},
3132
"end_at": {"type": "value", "editable": True, "optional": True},
@@ -35,7 +36,7 @@
3536
"run_parallelism": {"type": "integer", "editable": True, "optional": True},
3637
"comment": {"type": "text", "editable": True, "optional": True}
3738
},
38-
'upsolver_data_lake': {
39+
"datalake": {
3940
"add_missing_columns": {"type": "boolean", "editable": False, "optional": True},
4041
"run_interval": {"type": "ineger", "editable": False, "optional": True},
4142
"start_from": {"type": "value", "editable": False, "optional": True},
@@ -46,15 +47,15 @@
4647
"run_parallelism": {"type": "integer", "editable": True, "optional": True},
4748
"comment": {"type": "text", "editable": True, "optional": True}
4849
},
49-
'redshift': {
50+
"redshift": {
5051
"run_interval": {"type": "ineger", "editable": False, "optional": True},
5152
"start_from": {"type": "value", "editable": False, "optional": True},
5253
"end_at": {"type": "value", "editable": True, "optional": True},
5354
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
5455
"allow_cartesian_products": {"type": "boolean", "editable": False, "optional": True},
5556
"aggregation_parallelism": {"type": "integer", "editable": True, "optional": True},
5657
"run_parallelism": {"type": "integer", "editable": True, "optional": True},
57-
"skip_faild_files": {"type": "boolean", "editable": False, "optional": True},
58+
"skip_failed_files": {"type": "boolean", "editable": False, "optional": True},
5859
"fail_on_write_error": {"type": "boolean", "editable": False, "optional": True},
5960
"comment": {"type": "text", "editable": True, "optional": True}
6061
}

dbt/include/upsolver/macros/materializations/connection.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{% materialization connection, adapter='upsolver' %}
22
{%- set identifier = model['alias'] -%}
3-
4-
{% set connection_type = config.require('connection_type') %}
5-
{% set connection_options = config.require('connection_options') %}
3+
{%- set model_config = model['config'] -%}
4+
{% set connection_type = adapter.require(model_config, 'connection_type') %}
5+
{% set connection_options = adapter.require(model_config, 'connection_options') %}
66
{% set enriched_options = adapter.enrich_options(connection_options, connection_type, 'connection_options') %}
77
{% set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') %}
88

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
1-
{% macro get_create_copy_job_sql(job_identifier, sql, table, sync, options, source) -%}
1+
{% macro get_create_copy_job_sql(job_identifier, sql, into_relation, sync, options, source, target_type) -%}
22

33
{% set connection_identifier = adapter.get_connection_from_sql(sql) %}
44
{% set job_options, source_options = adapter.separate_options(options, source) %}
5+
{%- if target_type != 'datalake' -%}
6+
{% set target_options = adapter.enrich_options(options, target_type, 'target_options') %}
7+
{% set target_type = target_type %}
8+
{%- else -%}
9+
{% set target_options = {} %}
10+
{% set target_type = '' %}
11+
{%- endif -%}
512

613
CREATE
714
{% if sync %}
815
SYNC
916
{% endif %}
1017
JOB {{job_identifier}}
1118
{{ render_options(job_options, 'create') }}
19+
{{ render_options(target_options, 'create') }}
1220
AS COPY FROM {{source}} {{connection_identifier}}
1321
{{ render_options(source_options, 'create') }}
14-
INTO {{table}}
22+
INTO {{target_type}} {{into_relation}}
1523

1624
{%- endmacro %}

dbt/include/upsolver/macros/materializations/incremental/create_insert_job.sql

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1-
{% macro get_create_insert_job_sql(job_identifier, table, sync, options, map_columns_by_name) -%}
1+
{% macro get_create_insert_job_sql(job_identifier, into_relation, sync, options, map_columns_by_name, target_type) -%}
22

3-
{% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %}
3+
{% set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') %}
4+
{%- if target_type == 'datalake' -%}
5+
{% set target_type = '' %}
6+
{%- endif -%}
47

58
CREATE
69
{% if sync %}
710
SYNC
811
{% endif %}
912
JOB {{job_identifier}}
1013
{{ render_options(enriched_options, 'create') }}
11-
AS INSERT INTO {{table}}
14+
AS INSERT INTO {{target_type}} {{into_relation}}
1215
{% if map_columns_by_name %}
1316
MAP_COLUMNS_BY_NAME
1417
{% endif %}

dbt/include/upsolver/macros/materializations/incremental/create_merge_job.sql

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1-
{% macro get_create_merge_job_sql(job_identifier, table, sync, options, primary_key, delete_condition) -%}
1+
{% macro get_create_merge_job_sql(job_identifier, into_relation, sync, options, primary_key, delete_condition, target_type) -%}
22

3-
{% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %}
3+
{% set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') %}
4+
5+
{%- if target_type == 'datalake' -%}
6+
{% set target_type = '' %}
7+
{%- endif -%}
48

59
CREATE
610
{% if sync %}
711
SYNC
812
{% endif %}
913
JOB {{ job_identifier }}
1014
{{ render_options(enriched_options, 'create') }}
11-
AS MERGE INTO {{ table }} AS target
15+
AS MERGE INTO {{ target_type }} {{ into_relation }} AS target
1216
USING (
1317
{{ sql }}
1418
)

0 commit comments

Comments
 (0)