Skip to content

Commit 6d8af3c

Browse files
authored
feat(ingest/snowflake): snowflake enhancements to support tasks,snowpipe and stages (#16888)
1 parent c138053 commit 6d8af3c

18 files changed

Lines changed: 8123 additions & 1 deletion

metadata-ingestion/docs/sources/snowflake/snowflake_post.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,43 @@ semantic_view_pattern:
163163
- Requires `REFERENCES` or `SELECT` privileges on semantic views (they are treated as views in Snowflake's permission model)
164164
- The semantic view definition (SQL DDL) is extracted when available through the `GET_DDL` function
165165

166+
#### Stages, Tasks, and Pipes
167+
168+
DataHub supports ingestion of Snowflake Stages, Tasks, and Snowpipe objects. All three features are disabled by default and can be enabled independently.
169+
170+
##### Stages (`include_stages: true`)
171+
172+
Stages are ingested as containers nested under their parent schema. Internal stages additionally emit a placeholder dataset representing the staged data, which is used for pipe lineage resolution. External stages (S3, GCS, Azure) resolve their URLs to the corresponding cloud platform dataset URN.
173+
174+
```yaml
175+
include_stages: true
176+
stage_pattern:
177+
allow:
178+
- "MY_DB.MY_SCHEMA.*"
179+
```
180+
181+
##### Tasks (`include_tasks: true`)
182+
183+
Tasks are ingested as DataJob entities grouped under a per-schema DataFlow. Predecessor dependencies between tasks are captured as `inputDatajobs` on the DataJobInputOutput aspect, preserving the DAG structure.
184+
185+
```yaml
186+
include_tasks: true
187+
task_pattern:
188+
allow:
189+
- "MY_DB.MY_SCHEMA.*"
190+
```
191+
192+
##### Pipes (`include_pipes: true`)
193+
194+
Snowpipe objects are ingested as DataJob entities with lineage derived from parsing the `COPY INTO` statement. The pipe's source stage resolves to an upstream dataset (internal placeholder or external cloud URN) and the target table resolves to a downstream dataset. Enabling pipes automatically scans stages for lineage resolution, even if `include_stages` is false.
195+
196+
```yaml
197+
include_pipes: true
198+
pipe_pattern:
199+
allow:
200+
- "MY_DB.MY_SCHEMA.*"
201+
```
202+
166203
### Limitations
167204

168205
Module behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features.

metadata-ingestion/docs/sources/snowflake/snowflake_pre.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ grant imported privileges on database snowflake to role datahub_role;
5454
// Optional - required if extracting Streamlit Apps
5555
grant usage on all streamlits in database "<your-database>" to role datahub_role;
5656
grant usage on future streamlits in database "<your-database>" to role datahub_role;
57+
58+
// Optional - required if extracting Stages, Tasks, or Pipes
59+
grant usage on all stages in database "<your-database>" to role datahub_role;
60+
grant usage on future stages in database "<your-database>" to role datahub_role;
61+
grant monitor on all tasks in database "<your-database>" to role datahub_role;
62+
grant monitor on future tasks in database "<your-database>" to role datahub_role;
63+
grant monitor on all pipes in database "<your-database>" to role datahub_role;
64+
grant monitor on future pipes in database "<your-database>" to role datahub_role;
5765
```
5866

5967
The details of each granted privilege can be viewed in the [Snowflake docs](https://docs.snowflake.com/en/user-guide/security-access-control-privileges.html). A summary of each privilege and why it is required for this connector:
@@ -71,6 +79,9 @@ grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;
7179

7280
- `select` on `streams` is required for stream definitions to be available. This does not allow selecting the data (not required) unless the underlying dataset has select access as well.
7381
- `usage` on `streamlit` is required to show streamlits in a database. See the schema-level `usage` example above.
82+
- `usage` on `stages` is required to list stages via `SHOW STAGES`. Only needed if `include_stages: true` or `include_pipes: true`.
83+
- `monitor` on `tasks` is required to list tasks via `SHOW TASKS`. Only needed if `include_tasks: true`.
84+
- `monitor` on `pipes` is required to list pipes via `SHOW PIPES`. Only needed if `include_pipes: true`.
7485

7586
This represents the bare minimum privileges required to extract databases, schemas, views, and tables from Snowflake.
7687

metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class DatasetSubTypes(StrEnum):
4040
GOOGLE_SHEETS = "Google Sheets"
4141
GOOGLE_SHEETS_NAMED_RANGE = "Google Sheets Named Range"
4242
SEMANTIC_MODEL = "Semantic Model"
43+
SNOWFLAKE_STAGE_DATA = "Snowflake Stage Data"
4344

4445
# TODO: Create separate entity...
4546
NOTEBOOK = "Notebook"
@@ -74,6 +75,7 @@ class DatasetContainerSubTypes(StrEnum):
7475
FABRIC_LAKEHOUSE = "Fabric Lakehouse"
7576
FABRIC_WAREHOUSE = "Fabric Warehouse"
7677
FABRIC_SCHEMA = "Fabric Schema"
78+
SNOWFLAKE_STAGE = "Snowflake Stage"
7779
# Pinecone
7880
PINECONE_INDEX = "Pinecone Index"
7981
PINECONE_NAMESPACE = "Pinecone Namespace"
@@ -102,6 +104,8 @@ class FlowContainerSubTypes(StrEnum):
102104
MSSQL_JOB = "Job"
103105
PROCEDURE_CONTAINER = "Procedures Container"
104106
ADF_DATA_FACTORY = "Data Factory"
107+
SNOWFLAKE_TASK_GROUP = "Snowflake Task Group"
108+
SNOWFLAKE_PIPE_GROUP = "Snowflake Pipe Group"
105109

106110

107111
class JobContainerSubTypes(StrEnum):
@@ -220,6 +224,10 @@ class DataJobSubTypes(StrEnum):
220224
FABRIC_DATA_LAKE_ANALYTICS = "Data Lake Analytics"
221225
FABRIC_AZURE_ML_EXECUTE_PIPELINE = "Azure ML Execute Pipeline"
222226

227+
# Snowflake
228+
SNOWFLAKE_TASK = "Snowflake Task"
229+
SNOWFLAKE_PIPE = "Snowflake Pipe"
230+
223231

224232
def create_source_capability_modifier_enum():
225233
all_values: Dict[str, Any] = {}

metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ class SnowflakeObjectDomain(StrEnum):
6060
PROCEDURE = "procedure"
6161
DYNAMIC_TABLE = "dynamic table"
6262
STREAMLIT = "streamlit"
63+
STAGE = "stage"
64+
TASK = "task"
65+
PIPE = "pipe"
6366

6467

6568
GENERIC_PERMISSION_ERROR_KEY = "permission-error"

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,24 @@ class SnowflakeFilterConfig(SQLFilterConfig):
189189
" use the regex 'Analytics.public.sales.*'",
190190
)
191191

192+
stage_pattern: AllowDenyPattern = Field(
193+
default=AllowDenyPattern.allow_all(),
194+
description="Regex patterns for stages to filter in ingestion. "
195+
"Specify regex to match the entire stage name in database.schema.stage format.",
196+
)
197+
198+
task_pattern: AllowDenyPattern = Field(
199+
default=AllowDenyPattern.allow_all(),
200+
description="Regex patterns for tasks to filter in ingestion. "
201+
"Specify regex to match the entire task name in database.schema.task format.",
202+
)
203+
204+
pipe_pattern: AllowDenyPattern = Field(
205+
default=AllowDenyPattern.allow_all(),
206+
description="Regex patterns for pipes to filter in ingestion. "
207+
"Specify regex to match the entire pipe name in database.schema.pipe format.",
208+
)
209+
192210
match_fully_qualified_names: bool = Field(
193211
default=False,
194212
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
@@ -438,6 +456,21 @@ class SnowflakeV2Config(
438456
description="Configuration for semantic views ingestion.",
439457
)
440458

459+
include_stages: bool = Field(
460+
default=False,
461+
description="If enabled, Snowflake Stages will be ingested as containers with associated metadata.",
462+
)
463+
464+
include_tasks: bool = Field(
465+
default=False,
466+
description="If enabled, Snowflake Tasks will be ingested as DataJobs with DAG dependencies and SQL lineage.",
467+
)
468+
469+
include_pipes: bool = Field(
470+
default=False,
471+
description="If enabled, Snowflake Snowpipe objects will be ingested as DataJobs with COPY INTO lineage.",
472+
)
473+
441474
structured_property_pattern: AllowDenyPattern = Field(
442475
default=AllowDenyPattern.allow_all(),
443476
description=(

0 commit comments

Comments
 (0)