Fixes #27150: Bulk-fetch TaskInstances per DAG to eliminate N+1 in yield_pipeline_status#27152
Conversation
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
There was a problem hiding this comment.
Pull request overview
This PR optimizes the Airflow DB-backed ingestion path by eliminating an N+1 query pattern when collecting task instances for recent DAG runs, switching from per-DagRun task-instance queries to a single bulk query per DAG.
Changes:
- Updated
AirflowSource.get_task_instancesto accept multiplerun_idsand return task instances grouped byrun_id. - Refactored
yield_pipeline_statusto bulk-fetch task instances once per DAG and reuse grouped results per DagRun. - Added a unit test intended to validate the bulk query + grouping behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py |
Implements the bulk run_id IN (...) query and per-run grouping; updates yield_pipeline_status to use the grouped results. |
ingestion/tests/unit/topology/pipeline/test_airflow.py |
Adds a unit test for the new bulk task-instance retrieval behavior. |
011b336 to
e1025f2
Compare
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
…ate N+1 in yield_pipeline_status
e1025f2 to
1023bb6
Compare
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
@RajdeepKushwaha5 did you run this against an airflow instance and made sure that all dags are listed properly without any regression? |
🟡 Playwright Results — all passed (29 flaky)✅ 3658 passed · ❌ 0 failed · 🟡 29 flaky · ⏭️ 89 skipped
🟡 29 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
| result: Dict[str, List[OMTaskInstance]] = defaultdict(list) | ||
|
|
||
| # Short-circuit: avoid building and executing a query with an empty | ||
| # IN(...) list - unnecessary DB round-trip and rejected by some SQL | ||
| # dialects. Caller (yield_pipeline_status) already guards this, but | ||
| # defend at the boundary as well. | ||
| if not run_ids: | ||
| return result |
There was a problem hiding this comment.
get_task_instances is annotated to return Dict[str, List[OMTaskInstance]], but it actually returns a defaultdict(list). Returning a defaultdict can introduce subtle side effects for callers (e.g., result[missing_key] will create keys instead of raising), and it’s inconsistent with the declared return type. Consider using DefaultDict[...] internally and converting to a plain dict on return (or change the return annotation if you intend to expose defaultdict).
|
Hi @IceS2 The shard-3 failure looks like Playwright flakiness unrelated to this PR's changes. |
I had already done that, let's see if it passes. Will check if it is a know issue otherwise |
Code Review ✅ Approved 2 resolved / 2 findingsBulk-fetch implementation optimizes TaskInstance retrieval by querying per DAG to eliminate N+1 performance overhead. Test suite issues regarding read-only property assignment and incorrect mock return types have been addressed. ✅ 2 resolved✅ Bug: Test assigns to read-only property
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
|
|
ping @IceS2 |



Describe your changes:
Fixes #27150
AirflowSource.yield_pipeline_statuswas callingget_task_instancesonce perDagRuninside a loop — one separateSELECT ... FROM task_instance WHERE dag_id = ? AND run_id = ?per run. With the defaultnumberOfStatus=10and N DAGs this producesN × 10extra DB round-trips on every ingestion run.Root cause:
get_task_instancesaccepted a singlerun_idstring, so the only way to retrieve task instances for multiple runs was a loop.Fix:
get_task_instancessignaturerun_id: str→List[OMTaskInstance]run_ids: List[str]→Dict[str, List[OMTaskInstance]]TaskInstance.run_id == run_idTaskInstance.run_id.in_(run_ids)defaultdict(list)byrun_idyield_pipeline_statusget_task_instancesinsidefor dag_run in dag_run_listrun_ids→ callsget_task_instancesonce → looks up withtasks_by_run_id.get(run_id, [])Task-instance query count per DAG drops from
numberOfStatusto 1.Error handling preserved:
result(defaultdict(list)) is initialised before thetryblock, so on DB exception the method returns an empty dict (same safe fallback behaviour as before — all runs get empty task lists).How I tested:
ast.parserun_idsis derived from the samedag_run_listused in the iteration, so norun_idcan be present in the loop but absent from the bulk queryrun_idsguard (if run_ids else {}) avoids issuingIN ()which is illegal in some SQL dialectsType of change:
Checklist:
Fixes #NNNNN: Bulk-fetch TaskInstances per DAG to eliminate N+1 in yield_pipeline_status