-
Notifications
You must be signed in to change notification settings - Fork 84
Expand file tree
/
Copy pathexample_end_to_end.py
More file actions
69 lines (59 loc) · 1.97 KB
/
example_end_to_end.py
File metadata and controls
69 lines (59 loc) · 1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from datetime import datetime
from airflow.providers.amazon.aws.operators.emr import (
EmrServerlessCreateApplicationOperator,
EmrServerlessDeleteApplicationOperator,
EmrServerlessStartJobOperator,
)
from airflow import DAG
from airflow.models import Variable
# Replace these with your correct values
JOB_ROLE_ARN = Variable.get("emr_serverless_job_role")
S3_LOGS_BUCKET = Variable.get("emr_serverless_log_bucket")
DEFAULT_MONITORING_CONFIG = {
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": f"s3://{S3_LOGS_BUCKET}/logs/"}
},
}
with DAG(
dag_id="example_e2e_emrserverless",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=["example"],
catchup=False,
) as dag:
create_app = EmrServerlessCreateApplicationOperator(
task_id="create_spark_app",
job_type="SPARK",
release_label="emr-6.6.0",
config={"name": "sample-job"},
)
application_id = create_app.output
job1 = EmrServerlessStartJobOperator(
task_id="start_job_1",
application_id=application_id,
execution_role_arn=JOB_ROLE_ARN,
job_driver={
"sparkSubmit": {
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
}
},
configuration_overrides=DEFAULT_MONITORING_CONFIG,
)
job2 = EmrServerlessStartJobOperator(
task_id="start_job_2",
application_id=application_id,
execution_role_arn=JOB_ROLE_ARN,
job_driver={
"sparkSubmit": {
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
"entryPointArguments": ["1000"],
}
},
configuration_overrides=DEFAULT_MONITORING_CONFIG,
)
delete_app = EmrServerlessDeleteApplicationOperator(
task_id="delete_app",
application_id=application_id,
trigger_rule="all_done",
)
(create_app >> [job1, job2] >> delete_app)