Skip to content

Commit 5b6a4fe

Browse files
Merge pull request #24 from Snowflake-Labs/dcm-projects-for-tasks
DCM Part 4: DCM Projects for Tasks companion
2 parents 6dd31c9 + 78087d7 commit 5b6a4fe

12 files changed

Lines changed: 878 additions & 0 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
out/
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
manifest_version: 2
2+
3+
type: DCM_PROJECT
4+
5+
#---------------------------------------------
6+
7+
default_target: DCM_DEV
8+
9+
targets:
10+
DCM_DEV:
11+
account_identifier: MYORG-MY_DEV_ACCOUNT # <-- Replace with your account identifier
12+
project_name: DCM_DEMO.PROJECTS.DCM_PROJECT_DEV
13+
project_owner: DCM_DEVELOPER
14+
templating_config: DEV
15+
16+
DCM_PROD:
17+
account_identifier: MYORG-MY_PROD_ACCOUNT # <-- Replace with your account identifier
18+
project_name: DCM_DEMO.PROJECTS.DCM_PROJECT_PROD
19+
project_owner: DCM_PROD_DEPLOYER
20+
templating_config: PROD
21+
22+
#---------------------------------------------
23+
24+
templating:
25+
defaults:
26+
wh_size: "X-SMALL"
27+
runtime_multiplier: 5
28+
29+
configurations:
30+
DEV:
31+
env_suffix: "_DEV"
32+
user: "INSERT_YOUR_USER" # <-- Replace with your Snowflake username
33+
project_owner_role: "DCM_DEVELOPER"
34+
notification_recipient: "INSERT_YOUR_EMAIL" # <-- Replace with your verified email
35+
36+
PROD:
37+
env_suffix: ""
38+
wh_size: "SMALL"
39+
project_owner_role: "DCM_PROD_DEPLOYER"
40+
user: "GITHUB_ACTIONS_SERVICE_USER"
41+
notification_recipient: "prod_alerts@example.com"
42+
runtime_multiplier: 10
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
-- Serverless alert that monitors the task graph and emails on failures.
2+
--
3+
-- Defined with DCM so the alert is managed alongside the rest of the project.
4+
-- The alert is created in a suspended state — resume it in the post-deploy
5+
-- script with `ALTER ALERT ... RESUME`.
6+
-- Recipient email comes from the {{notification_recipient}} manifest value,
7+
-- the same one the finalizer uses, so you only configure it once.
8+
9+
DEFINE ALERT DCM_DEMO_4{{env_suffix}}.PIPELINE.FAILED_TASK_ALERT
10+
SCHEDULE = '60 MINUTE'
11+
IF (EXISTS (
12+
SELECT NAME, SCHEMA_NAME
13+
FROM TABLE(DCM_DEMO_4{{env_suffix}}.INFORMATION_SCHEMA.TASK_HISTORY(
14+
SCHEDULED_TIME_RANGE_START => (GREATEST(
15+
TIMEADD('DAY', -7, CURRENT_TIMESTAMP),
16+
SNOWFLAKE.ALERT.LAST_SUCCESSFUL_SCHEDULED_TIME())),
17+
SCHEDULED_TIME_RANGE_END => SNOWFLAKE.ALERT.SCHEDULED_TIME(),
18+
ERROR_ONLY => TRUE))))
19+
THEN
20+
BEGIN
21+
LET task_names STRING := (
22+
SELECT LISTAGG(DISTINCT(SCHEMA_NAME || '.' || NAME), ', ')
23+
FROM TABLE(RESULT_SCAN(SNOWFLAKE.ALERT.GET_CONDITION_QUERY_UUID())));
24+
25+
CALL SYSTEM$SEND_SNOWFLAKE_NOTIFICATION(
26+
SNOWFLAKE.NOTIFICATION.TEXT_HTML(
27+
'Failed tasks detected: <b>' || :task_names || '</b>'),
28+
SNOWFLAKE.NOTIFICATION.EMAIL_INTEGRATION_CONFIG(
29+
'dcm_demo_email_notifications',
30+
'DCM Pipeline — Failed Task Alert',
31+
ARRAY_CONSTRUCT('{{notification_recipient}}'),
32+
NULL, NULL));
33+
END;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*=============================================================================
2+
expectations.sql — DMF attachments to the landing table
3+
4+
Uses the native DCM `ATTACH DATA METRIC FUNCTION` statement so the entire
5+
quality gate — DMF definitions and their column attachments — is managed
6+
by DCM Plan & Deploy. Adding or removing an attachment here changes the
7+
CHECK_DATA_QUALITY task's behavior on the next deploy, with no manual
8+
ALTER TABLE statements needed.
9+
10+
Note: DMFs will only run if the landing table is created with
11+
DATA_METRIC_SCHEDULE = 'TRIGGER_ON_CHANGES' (see tables.sql).
12+
=============================================================================*/
13+
14+
--- System DMFs attached to landing-table columns
15+
ATTACH DATA METRIC FUNCTION SNOWFLAKE.CORE.DUPLICATE_COUNT
16+
TO TABLE DCM_DEMO_4{{env_suffix}}.PIPELINE.RAW_WEATHER_DATA
17+
ON (ROW_ID)
18+
EXPECTATION NO_DUPLICATE_ROW_IDS (value = 0);
19+
20+
ATTACH DATA METRIC FUNCTION SNOWFLAKE.CORE.NULL_COUNT
21+
TO TABLE DCM_DEMO_4{{env_suffix}}.PIPELINE.RAW_WEATHER_DATA
22+
ON (DS)
23+
EXPECTATION NO_NULL_DATES (value = 0);
24+
25+
ATTACH DATA METRIC FUNCTION SNOWFLAKE.CORE.NULL_COUNT
26+
TO TABLE DCM_DEMO_4{{env_suffix}}.PIPELINE.RAW_WEATHER_DATA
27+
ON (ZIPCODE)
28+
EXPECTATION NO_NULL_ZIPCODES (value = 0);
29+
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*=============================================================================
2+
functions.sql — SQL helper functions used by the task graph
3+
4+
These demonstrate DEFINE FUNCTION for:
5+
- SQL helpers (runtime randomization, task-history summary)
6+
- A UDTF that lists currently-assigned DMFs on a table
7+
8+
Note: INFORMATION_SCHEMA table-functions must be fully qualified with the
9+
database name so the DCM planner can resolve them statically.
10+
=============================================================================*/
11+
12+
----------------------------------------------------------------------
13+
-- 1. Runtime randomizer — used by every demo task to simulate load
14+
----------------------------------------------------------------------
15+
DEFINE FUNCTION DCM_DEMO_4{{env_suffix}}.PIPELINE.RUNTIME_WITH_OUTLIERS(REGULAR_RUNTIME NUMBER(6,0))
16+
RETURNS NUMBER(6,0)
17+
LANGUAGE SQL
18+
COMMENT = 'Input and output in milliseconds; 1/10 runs are 2x as long (outliers)'
19+
AS
20+
$$
21+
SELECT CASE
22+
WHEN UNIFORM(1, 10, RANDOM()) = 10
23+
THEN CAST((REGULAR_RUNTIME * 2 + (UNIFORM(-10, 10, RANDOM()))/100 * REGULAR_RUNTIME) AS NUMBER(6,0))
24+
ELSE CAST((REGULAR_RUNTIME + (UNIFORM(-10, 10, RANDOM()))/100 * REGULAR_RUNTIME) AS NUMBER(6,0))
25+
END
26+
$$;
27+
28+
----------------------------------------------------------------------
29+
-- 2. Summarize a task graph run as a JSON array of task rows
30+
-- Called by the finalizer to build the email body.
31+
----------------------------------------------------------------------
32+
DEFINE FUNCTION DCM_DEMO_4{{env_suffix}}.PIPELINE.GET_TASK_GRAPH_RUN_SUMMARY(
33+
MY_ROOT_TASK_ID STRING, MY_START_TIME TIMESTAMP_LTZ)
34+
RETURNS STRING
35+
LANGUAGE SQL
36+
AS
37+
$$
38+
(SELECT ARRAY_AGG(OBJECT_CONSTRUCT(
39+
'TASK_NAME', NAME,
40+
'RUN_STATUS', STATE,
41+
'RETURN_VALUE', RETURN_VALUE,
42+
'STARTED', QUERY_START_TIME,
43+
'DURATION', DURATION,
44+
'ERROR_MESSAGE', ERROR_MESSAGE
45+
)) AS GRAPH_RUN_SUMMARY
46+
FROM (
47+
SELECT
48+
NAME,
49+
CASE WHEN STATE = 'SUCCEEDED' THEN '🟢 SUCCEEDED'
50+
WHEN STATE = 'FAILED' THEN '🔴 FAILED'
51+
WHEN STATE = 'SKIPPED' THEN '🔵 SKIPPED'
52+
WHEN STATE = 'CANCELLED' THEN '🔘 CANCELLED'
53+
END AS STATE,
54+
RETURN_VALUE,
55+
TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
56+
CONCAT(TIMESTAMPDIFF('seconds', QUERY_START_TIME, COMPLETED_TIME), ' s') AS DURATION,
57+
ERROR_MESSAGE
58+
FROM TABLE(DCM_DEMO_4{{env_suffix}}.INFORMATION_SCHEMA.TASK_HISTORY(
59+
ROOT_TASK_ID => MY_ROOT_TASK_ID::STRING,
60+
SCHEDULED_TIME_RANGE_START => MY_START_TIME,
61+
SCHEDULED_TIME_RANGE_END => CURRENT_TIMESTAMP()))
62+
ORDER BY SCHEDULED_TIME))::STRING
63+
$$;
64+
65+
----------------------------------------------------------------------
66+
-- 3. UDTF returning all DMFs currently assigned to a given table.
67+
-- Used by the CHECK_DATA_QUALITY task to iterate through checks.
68+
----------------------------------------------------------------------
69+
DEFINE FUNCTION DCM_DEMO_4{{env_suffix}}.PIPELINE.GET_ACTIVE_QUALITY_CHECKS("TABLE_NAME" VARCHAR)
70+
RETURNS TABLE(DMF VARCHAR, COL VARCHAR)
71+
LANGUAGE SQL
72+
AS
73+
$$
74+
SELECT
75+
t1.METRIC_DATABASE_NAME || '.' || METRIC_SCHEMA_NAME || '.' || METRIC_NAME AS DMF,
76+
REF.value:name::STRING AS COL
77+
FROM TABLE(
78+
DCM_DEMO_4{{env_suffix}}.INFORMATION_SCHEMA.DATA_METRIC_FUNCTION_REFERENCES(
79+
REF_ENTITY_NAME => TABLE_NAME,
80+
REF_ENTITY_DOMAIN => 'table'
81+
)) AS t1,
82+
LATERAL FLATTEN(input => PARSE_JSON(t1.REF_ARGUMENTS)) AS REF
83+
WHERE SCHEDULE_STATUS = 'STARTED'
84+
$$;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*=============================================================================
2+
infrastructure.sql — Database, schema, warehouse, roles, grants
3+
4+
Only DCM Projects can create these as "managed entities" with full
5+
lifecycle (rename / drop on removal). Templating values come from
6+
manifest.yml so DEV and PROD deploy with different sizes and owners.
7+
=============================================================================*/
8+
9+
DEFINE WAREHOUSE DCM_DEMO_4_WH{{env_suffix}}
10+
WITH
11+
WAREHOUSE_SIZE = '{{wh_size}}'
12+
AUTO_SUSPEND = 60
13+
COMMENT = 'For Quickstart Demo of DCM Projects with Tasks';
14+
15+
DEFINE DATABASE DCM_DEMO_4{{env_suffix}}
16+
COMMENT = 'Quickstart Demo for DCM Projects with Tasks and task graphs';
17+
18+
DEFINE SCHEMA DCM_DEMO_4{{env_suffix}}.PIPELINE
19+
COMMENT = 'Task graph, helper functions, procedures, and DMFs';
20+
21+
DEFINE DATABASE ROLE DCM_DEMO_4{{env_suffix}}.ADMIN{{env_suffix}};
22+
GRANT DATABASE ROLE DCM_DEMO_4{{env_suffix}}.ADMIN{{env_suffix}} TO ROLE {{project_owner_role}};
23+
24+
DEFINE ROLE DCM_DEMO_4{{env_suffix}}_READ;
25+
GRANT USAGE ON DATABASE DCM_DEMO_4{{env_suffix}} TO ROLE DCM_DEMO_4{{env_suffix}}_READ;
26+
GRANT USAGE ON SCHEMA DCM_DEMO_4{{env_suffix}}.PIPELINE TO ROLE DCM_DEMO_4{{env_suffix}}_READ;
27+
GRANT USAGE ON WAREHOUSE DCM_DEMO_4_WH{{env_suffix}} TO ROLE DCM_DEMO_4{{env_suffix}}_READ;
28+
GRANT SELECT ON ALL TABLES IN DATABASE DCM_DEMO_4{{env_suffix}} TO ROLE DCM_DEMO_4{{env_suffix}}_READ;
29+
GRANT SELECT ON ALL VIEWS IN DATABASE DCM_DEMO_4{{env_suffix}} TO ROLE DCM_DEMO_4{{env_suffix}}_READ;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*=============================================================================
2+
procedures.sql — SQL stored procedures used by demo tasks
3+
4+
Uses the new DCM `DEFINE PROCEDURE` statement (early-access) so procedure
5+
lifecycle is fully managed by DCM — no separate CREATE OR ALTER.
6+
=============================================================================*/
7+
8+
-- Always-succeeds procedure
9+
DEFINE PROCEDURE DCM_DEMO_4{{env_suffix}}.PIPELINE.DEMO_PROCEDURE_1()
10+
RETURNS VARCHAR(16777216)
11+
LANGUAGE SQL
12+
EXECUTE AS OWNER
13+
AS
14+
$$
15+
SELECT SYSTEM$WAIT(3);
16+
$$;
17+
18+
-- Fails ~50% of the time by selecting from a missing table
19+
DEFINE PROCEDURE DCM_DEMO_4{{env_suffix}}.PIPELINE.DEMO_PROCEDURE_2()
20+
RETURNS VARCHAR(16777216)
21+
LANGUAGE SQL
22+
EXECUTE AS OWNER
23+
AS
24+
$$
25+
DECLARE
26+
RANDOM_VALUE NUMBER(2,0);
27+
BEGIN
28+
RANDOM_VALUE := (SELECT UNIFORM(1, 2, RANDOM()));
29+
IF (:RANDOM_VALUE = 2) THEN
30+
SELECT COUNT(*) FROM OLD_TABLE; -- intentional failure
31+
END IF;
32+
SELECT SYSTEM$WAIT(2);
33+
END
34+
$$;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*=============================================================================
2+
tables.sql — Source, landing, target, and quarantine tables
3+
4+
The task graph demonstrates a realistic ELT pattern:
5+
source ─► landing (quality gate) ─► target
6+
└───► quarantine (if DMFs fail)
7+
plus a generic TASK_DEMO_TABLE used by the stream-conditional task.
8+
=============================================================================*/
9+
10+
--- weather landing table where new batches are loaded and checked
11+
DEFINE TABLE DCM_DEMO_4{{env_suffix}}.PIPELINE.RAW_WEATHER_DATA (
12+
ROW_ID NUMBER,
13+
INSERTED TIMESTAMP_NTZ,
14+
DS DATE,
15+
ZIPCODE VARCHAR,
16+
MIN_TEMP_IN_F NUMBER,
17+
AVG_TEMP_IN_F NUMBER,
18+
MAX_TEMP_IN_F NUMBER
19+
)
20+
CHANGE_TRACKING = TRUE
21+
DATA_METRIC_SCHEDULE = 'TRIGGER_ON_CHANGES'
22+
COMMENT = 'Landing table where DMF quality gates run';
23+
24+
--- clean rows that passed quality checks
25+
DEFINE TABLE DCM_DEMO_4{{env_suffix}}.PIPELINE.CLEAN_WEATHER_DATA (
26+
INSERTED TIMESTAMP_NTZ,
27+
DS DATE,
28+
ZIPCODE VARCHAR,
29+
MIN_TEMP_IN_F NUMBER,
30+
AVG_TEMP_IN_F NUMBER,
31+
MAX_TEMP_IN_F NUMBER
32+
)
33+
COMMENT = 'Target table for data that passed quality gates';
34+
35+
--- rows that failed quality checks, isolated for later review
36+
DEFINE TABLE DCM_DEMO_4{{env_suffix}}.PIPELINE.QUARANTINED_WEATHER_DATA (
37+
INSERTED TIMESTAMP_NTZ,
38+
DS DATE,
39+
ZIPCODE VARCHAR,
40+
MIN_TEMP_IN_F NUMBER,
41+
AVG_TEMP_IN_F NUMBER,
42+
MAX_TEMP_IN_F NUMBER
43+
)
44+
COMMENT = 'Quarantine table for rows that failed quality gates';
45+
46+
--- source table that the pipeline reads rows from
47+
DEFINE TABLE DCM_DEMO_4{{env_suffix}}.PIPELINE.WEATHER_DATA_SOURCE (
48+
ROW_ID NUMBER AUTOINCREMENT START 1 INCREMENT 1 ORDER,
49+
DS DATE,
50+
ZIPCODE VARCHAR,
51+
MIN_TEMP_IN_F NUMBER,
52+
AVG_TEMP_IN_F NUMBER,
53+
MAX_TEMP_IN_F NUMBER
54+
)
55+
CHANGE_TRACKING = TRUE
56+
COMMENT = 'Source rows that LOAD_RAW_DATA pulls into the landing table';
57+
58+
--- generic demo table used by the stream-conditional task DEMO_TASK_8
59+
DEFINE TABLE DCM_DEMO_4{{env_suffix}}.PIPELINE.TASK_DEMO_TABLE (
60+
TIME_STAMP TIMESTAMP_NTZ(9),
61+
ID NUMBER(38,0) AUTOINCREMENT START 1 INCREMENT 1 ORDER,
62+
MESSAGE VARCHAR,
63+
COMMENT VARCHAR
64+
)
65+
CHANGE_TRACKING = TRUE
66+
COMMENT = 'Generic table used by the stream-conditional task';

0 commit comments

Comments
 (0)