Skip to content

Commit 508da4c

Browse files
committed
chore(scheduler): remove dataset debug logging
Drop [DEBUG DATASETS] instrumentation from SchedulerJobRunner and DagModel dataset-readiness loop; inline timetable dataset_condition where it is only used once.
1 parent d11bc74 commit 508da4c

2 files changed

Lines changed: 1 addition & 64 deletions

File tree

airflow/jobs/scheduler_job_runner.py

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,13 +1326,6 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio
13261326
non_dataset_dags = all_dags_needing_dag_runs.difference(dataset_triggered_dags)
13271327
self._create_dag_runs(non_dataset_dags, session)
13281328
if dataset_triggered_dags:
1329-
self.log.info(
1330-
"[DEBUG DATASETS] Dataset-triggered DAGs ready: %s",
1331-
{
1332-
dag_id: (str(first), str(last))
1333-
for dag_id, (first, last) in dataset_triggered_dag_info.items()
1334-
},
1335-
)
13361329
self._create_dag_runs_dataset_triggered(
13371330
dataset_triggered_dags, dataset_triggered_dag_info, session
13381331
)
@@ -1487,26 +1480,6 @@ def _create_dag_runs_dataset_triggered(
14871480
.where(*dataset_event_filters)
14881481
).all()
14891482

1490-
ddrq_records = session.scalars(
1491-
select(DatasetDagRunQueue).where(
1492-
DatasetDagRunQueue.target_dag_id == dag.dag_id
1493-
)
1494-
).all()
1495-
ddrq_uris = {r.dataset.uri for r in ddrq_records}
1496-
consumed_uris = {e.dataset.uri for e in dataset_events}
1497-
missing_uris = ddrq_uris - consumed_uris
1498-
if missing_uris:
1499-
self.log.warning(
1500-
"[DEBUG DATASETS] DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching "
1501-
"DatasetEvent in range (prev_exec=%s, exec_date=%s]. "
1502-
"Consumed URIs: %s. Possible stale DDRQ records.",
1503-
dag.dag_id,
1504-
sorted(missing_uris),
1505-
previous_dag_run.execution_date if previous_dag_run else None,
1506-
exec_date,
1507-
sorted(consumed_uris),
1508-
)
1509-
15101483
data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events)
15111484
run_id = dag.timetable.generate_run_id(
15121485
run_type=DagRunType.DATASET_TRIGGERED,
@@ -1529,32 +1502,6 @@ def _create_dag_runs_dataset_triggered(
15291502
)
15301503
Stats.incr("dataset.triggered_dagruns")
15311504
dag_run.consumed_dataset_events.extend(dataset_events)
1532-
self.log.info(
1533-
"[DEBUG DATASETS] Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, "
1534-
"prev_exec=%s, data_interval=(%s, %s), "
1535-
"events_consumed=%d, event_uris=%s",
1536-
dag.dag_id,
1537-
exec_date,
1538-
previous_dag_run.execution_date if previous_dag_run else None,
1539-
data_interval.start,
1540-
data_interval.end,
1541-
len(dataset_events),
1542-
sorted({e.dataset.uri for e in dataset_events}),
1543-
)
1544-
if dataset_events:
1545-
event_timestamps = [e.timestamp for e in dataset_events]
1546-
self.log.debug(
1547-
"[DEBUG DATASETS] Consumed event details: dag_id=%s, "
1548-
"event_ts_range=(%s, %s), "
1549-
"events=[%s]",
1550-
dag.dag_id,
1551-
min(event_timestamps),
1552-
max(event_timestamps),
1553-
", ".join(
1554-
f"{e.dataset.uri}|ts={e.timestamp}|src={e.source_dag_id}/{e.source_run_id}"
1555-
for e in dataset_events
1556-
),
1557-
)
15581505
session.execute(
15591506
delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id)
15601507
)

airflow/models/dag.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4109,20 +4109,10 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None:
41094109
for ser_dag in ser_dags:
41104110
dag_id = ser_dag.dag_id
41114111
statuses = dag_statuses[dag_id]
4112-
dataset_condition = ser_dag.dag.timetable.dataset_condition
41134112

4114-
if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses):
4113+
if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses):
41154114
del by_dag[dag_id]
41164115
del dag_statuses[dag_id]
4117-
else:
4118-
log.info(
4119-
"[DEBUG DATASETS] Dataset condition satisfied: dag_id=%s, condition=%s, "
4120-
"ddrq_uris=%s, ddrq_count=%d",
4121-
dag_id,
4122-
dataset_condition,
4123-
sorted(statuses.keys()),
4124-
len(statuses),
4125-
)
41264116
del dag_statuses
41274117
dataset_triggered_dag_info = {}
41284118
for dag_id, records in by_dag.items():

0 commit comments

Comments
 (0)