Skip to content

Commit 89c1dcb

Browse files
committed
post rfc100 cleanup
1 parent 1bb170f commit 89c1dcb

29 files changed

Lines changed: 167 additions & 2352 deletions

dags/import_base.py

Lines changed: 50 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@
3333
*DAG ID*: {{ dag.dag_id }}
3434
*Execution Time*: {{ execution_date }}
3535
"""
36-
import_sql_failure_slack_msg = """
37-
:red_circle: Import SQL Failed. Please check the notification file in the Airflow logs.
36+
import_ch_failure_slack_msg = """
37+
:red_circle: ClickHouse Import Failed. Please check the notification file in the Airflow logs.
3838
*DAG ID*: {{ dag.dag_id }}
3939
*Execution Time*: {{ execution_date }}
40-
*Log Url*: {{ import_sql_log_url }}
40+
*Log Url*: {{ import_ch_log_url }}
4141
"""
42-
import_sql_success_slack_msg = """
43-
:large_green_circle: Import SQL Success!
42+
import_ch_success_slack_msg = """
43+
:large_green_circle: ClickHouse Import Success!
4444
*DAG ID*: {{ dag.dag_id }}
4545
*Execution Time*: {{ execution_date }}
46-
*Log Url*: {{ import_sql_log_url }}
46+
*Log Url*: {{ import_ch_log_url }}
4747
"""
4848
dag_failure_slack_webhook_notification = send_slack_webhook_notification(
4949
slack_webhook_conn_id="slack_default", text=fail_slack_msg
@@ -86,10 +86,13 @@ class ImporterConfig:
8686
schedule_interval: Optional[str] = None
8787

8888

89-
def _script(scripts_dir: str, script_name: str, *args: object) -> str:
89+
def _script(scripts_dir: str, script_name: str, *args: object, source_automation_env: bool = False) -> str:
9090
parts = [f"{scripts_dir}/{script_name}"]
9191
parts.extend(str(arg) for arg in args)
92-
return " ".join(parts)
92+
cmd = " ".join(parts)
93+
if source_automation_env:
94+
return f"source {scripts_dir}/automation-environment.sh && {cmd}"
95+
return cmd
9396

9497

9598
def build_import_dag(config: ImporterConfig) -> DAG:
@@ -124,33 +127,33 @@ def build_import_dag(config: ImporterConfig) -> DAG:
124127
@task
125128
def get_data_repos(repos: list[str]) -> str:
126129
return " ".join(repos)
127-
128-
# run this task even if import_sql failed
130+
131+
# run this task even if import_direct_to_clickhouse failed
129132
@task(trigger_rule=TriggerRule.ALL_DONE)
130133
def send_update_notification(notification_filepath: str, ssh_conn_id: str) -> None:
131134
"""
132-
Sends a Slack message to the #airflow-logs channel with a link to the import_sql logs URL.
135+
Sends a Slack message to the #airflow-logs channel with a link to the import_direct_to_clickhouse logs URL.
133136
This tells the curators whether there were any studies that suceeded or failed to import during a given run.
134137
To avoid confusion -- we run this task towards the end of the DAG
135138
(eg. after the transfer_deployment step) because we don't want to
136139
send a success message before the entire import run completes.
137140
"""
138141

139-
# Get the log URL for the import_sql task
142+
# Get the log URL for the import_direct_to_clickhouse task
140143
context = get_current_context()
141144
dag_run = context.get("dag_run")
142-
import_sql_ti = None
145+
import_ch_ti = None
143146
if dag_run is not None:
144-
import_sql_ti = dag_run.get_task_instance("import_sql", map_index=0)
145-
import_sql_log_url = import_sql_ti.log_url if import_sql_ti is not None else ""
146-
if not import_sql_log_url:
147-
logger.warning("Could not determine import_sql log url; skipping Slack notification.")
147+
import_ch_ti = dag_run.get_task_instance("import_direct_to_clickhouse", map_index=0)
148+
import_ch_log_url = import_ch_ti.log_url if import_ch_ti is not None else ""
149+
if not import_ch_log_url:
150+
logger.warning("Could not determine import_direct_to_clickhouse log url; skipping Slack notification.")
148151
raise AirflowSkipException()
149152

150-
import_sql_failed = (
151-
import_sql_ti is not None and import_sql_ti.state == State.FAILED
153+
import_ch_failed = (
154+
import_ch_ti is not None and import_ch_ti.state == State.FAILED
152155
)
153-
if not import_sql_failed:
156+
if not import_ch_failed:
154157
# Read the notification file from the remote node to check if any studies failed
155158
try:
156159
ssh_hook = SSHHook(ssh_conn_id=ssh_conn_id)
@@ -160,21 +163,21 @@ def send_update_notification(notification_filepath: str, ssh_conn_id: str) -> No
160163
)
161164
if exit_status != 0:
162165
logger.warning("Notification file not found at %s; treating as failure", notification_filepath)
163-
import_sql_failed = True
166+
import_ch_failed = True
164167
else:
165168
notification_content = notif_contents.decode("utf-8")
166169
ERROR_STRING = "The following studies had errors during import"
167-
import_sql_failed = (ERROR_STRING in notification_content)
170+
import_ch_failed = (ERROR_STRING in notification_content)
168171
except Exception as exc:
169172
logger.warning("Could not read notification file from remote node; skipping Slack notification")
170173
logger.warning("Stack trace:")
171174
logger.warning(exc)
172175
raise AirflowSkipException() from exc
173176

174177
# Build the msg and send to Slack
175-
msg_template = import_sql_failure_slack_msg if import_sql_failed else import_sql_success_slack_msg
178+
msg_template = import_ch_failure_slack_msg if import_ch_failed else import_ch_success_slack_msg
176179
rendered_message = Template(msg_template).render(
177-
import_sql_log_url=import_sql_log_url,
180+
import_ch_log_url=import_ch_log_url,
178181
**context,
179182
)
180183
SlackWebhookHook(slack_webhook_conn_id="slack_default").send(text=rendered_message)
@@ -189,27 +192,35 @@ def send_update_notification(notification_filepath: str, ssh_conn_id: str) -> No
189192
db_properties_filepath,
190193
color_swap_config_filepath,
191194
),
192-
"scale_up_rds_node": _script(
195+
"clone_database": _script(
193196
scripts_dir,
194-
"scale-rds.sh",
195-
"up",
197+
"airflow-clone-db.sh",
196198
importer,
197-
color_swap_config_filepath,
199+
scripts_dir,
200+
db_properties_filepath,
198201
),
199-
"clone_database": _script(
202+
"create_derived_tables": _script(
200203
scripts_dir,
201-
"airflow-clone-db.sh",
204+
"airflow-create-derived-tables.sh",
202205
importer,
203206
scripts_dir,
204207
db_properties_filepath,
205208
),
209+
"set_import_complete": _script(
210+
scripts_dir,
211+
"set_update_process_state.sh",
212+
db_properties_filepath,
213+
"complete",
214+
source_automation_env=True,
215+
),
206216
"fetch_data": _script(
207217
scripts_dir,
208218
"data_source_repo_clone_manager.sh",
209219
data_source_properties_filepath,
210220
"pull",
211221
importer,
212222
data_repos,
223+
source_automation_env=True,
213224
),
214225
"setup_import": _script(
215226
scripts_dir,
@@ -218,58 +229,35 @@ def send_update_notification(notification_filepath: str, ssh_conn_id: str) -> No
218229
scripts_dir,
219230
db_properties_filepath,
220231
),
221-
"import_sql": _script(
232+
# reuse the old import-sql script for now
233+
"import_direct_to_clickhouse": _script(
222234
scripts_dir,
223235
"airflow-import-sql.sh",
224236
importer,
225237
scripts_dir,
226238
db_properties_filepath,
227239
notification_filepath,
228240
),
229-
"import_clickhouse": _script(
230-
scripts_dir,
231-
"airflow-import-clickhouse.sh",
232-
importer,
233-
scripts_dir,
234-
db_properties_filepath,
235-
),
236-
"scale_down_rds_node": _script(
237-
scripts_dir,
238-
"scale-rds.sh",
239-
"down",
240-
importer,
241-
color_swap_config_filepath,
242-
# Normally, we would verify that we are in a "scaled up" state before trying to scale down.
243-
# However, if the DAG run failed before "scale_up_rds_node" completed successfully,
244-
# we may still be in a "scaled down" state when we run the scale down task
245-
# (which runs regardless of upstream failures).
246-
# In those cases -- skip verifying that we're in a scaled down state
247-
"{{ '' if (dag_run.get_task_instance('scale_up_rds_node', map_index=ti.map_index) and dag_run.get_task_instance('scale_up_rds_node', map_index=ti.map_index).state == 'success') else '--skip-pre-validation' }}",
248-
),
249241
"transfer_deployment": _script(
250242
scripts_dir,
251243
"airflow-transfer-deployment.sh",
252244
scripts_dir,
253245
db_properties_filepath,
254246
color_swap_config_filepath,
255247
),
256-
"clear_persistence_caches": _script(
257-
scripts_dir,
258-
"airflow-clear-persistence-caches.sh",
259-
importer,
260-
scripts_dir,
261-
),
262248
"set_import_running": _script(
263249
scripts_dir,
264250
"set_update_process_state.sh",
265251
db_properties_filepath,
266252
"running",
253+
source_automation_env=True,
267254
),
268255
"set_import_abandoned": _script(
269256
scripts_dir,
270257
"set_update_process_state.sh",
271258
db_properties_filepath,
272259
"abandoned",
260+
source_automation_env=True,
273261
),
274262
"cleanup_data": _script(
275263
scripts_dir,
@@ -278,6 +266,7 @@ def send_update_notification(notification_filepath: str, ssh_conn_id: str) -> No
278266
"cleanup",
279267
importer,
280268
data_repos,
269+
source_automation_env=True,
281270
),
282271
}
283272

@@ -312,9 +301,11 @@ def _build_task(name: str) -> object:
312301

313302
return SSHOperator.partial(**params).expand(ssh_conn_id=list(ssh_targets))
314303

315-
tasks: dict[str, object] = {"data_repos": data_repos}
304+
tasks: dict[str, object] = {}
316305
for name in config.task_names:
317-
if name == "send_update_notification":
306+
if name == "data_repos":
307+
tasks[name] = data_repos
308+
elif name == "send_update_notification":
318309
tasks[name] = send_update_notification(
319310
notification_filepath=notification_filepath,
320311
ssh_conn_id=config.target_nodes[0],

0 commit comments

Comments
 (0)