feat: add --threads option to parallelize report data fetching#2197
feat: add --threads option to parallelize report data fetching#2197dtaniwaki wants to merge 4 commits intoelementary-data:masterfrom
Conversation
Add --threads CLI option (default 1) to `edr report` and `edr send-report`. When set to >1, independent dbt run-operations are executed concurrently using ThreadPoolExecutor with SubprocessDbtRunner. dbt's Python API (APIDbtRunner) is not thread-safe due to global mutable state (GLOBAL_FLAGS, adapter FACTORY, etc.), so parallel execution uses SubprocessDbtRunner which spawns independent dbt processes per call. The fetching is split into phases: - Phase 1: 14 independent operations run in parallel - Phase 2: exposures + test_results (depend on Phase 1) - Phase 3: lineage (depends on Phase 2) - Phase 4: pure computation (no dbt calls) With --threads=14, edr report time is expected to drop from ~3m40s to ~30-40s on adapters with high query latency (e.g. Athena).
|
👋 @dtaniwaki |
📝 WalkthroughWalkthroughAdds an optional Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI
participant DMR as DataMonitoringReport
participant ReportAPI as ReportAPI
participant Executor as ThreadPoolExecutor
participant DBT as SubprocessDbtRunner
participant APIs as External_APIs
CLI->>DMR: generate_report(threads=N)
DMR->>ReportAPI: get_report_data(threads=N)
alt threads > 1
ReportAPI->>Executor: start phased parallel fetches
Executor->>DBT: create runner and run dbt tasks
Executor->>APIs: fetch models/tests/source_freshnesses/invocations (phase 1, parallel)
APIs-->>Executor: results
Executor->>APIs: fetch dependent exposures/tests (phase 2)
APIs-->>Executor: results
Executor->>APIs: fetch lineage (phase 3)
APIs-->>Executor: lineage
else
ReportAPI->>DBT: sequential dbt/API calls
DBT-->>ReportAPI: results
APIs-->>ReportAPI: results
end
ReportAPI->>ReportAPI: _assemble_report_data(...)
ReportAPI-->>DMR: ReportDataSchema, err?
DMR-->>CLI: output/send report
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
elementary/monitor/api/report/report.py (1)
451-493: Reuse_build_report_datain the parallel path.The parallel path still duplicates most of the new builder logic, so the two execution modes can drift in serialized shape or invocation handling. Consider letting
_build_report_dataaccept optional pre-fetched invocation data, then call it from both paths.♻️ Directional refactor
def _build_report_data( self, ... filters, invocations_api, + models_latest_invocation=None, + invocations=None, ) -> Tuple[ReportDataSchema, Optional[Exception]]: @@ - models_latest_invocation = invocations_api.get_models_latest_invocation() - invocations = invocations_api.get_models_latest_invocations_data() + if models_latest_invocation is None: + models_latest_invocation = invocations_api.get_models_latest_invocation() + if invocations is None: + invocations = invocations_api.get_models_latest_invocations_data()Then
_get_report_data_parallelcan pass the already-fetchedmodels_latest_invocationandinvocations_datainto the same builder.Also applies to: 498-593
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 451 - 493, The parallel path duplicates the ReportDataSchema-building logic; change _build_report_data to accept optional parameters for pre-fetched invocation data (e.g., invocations_data and models_latest_invocation) and use those instead of redoing serialization when provided, keeping existing behavior by making them optional with defaults; then update _get_report_data_parallel to call _build_report_data passing the already-fetched models_latest_invocation and invocations_data and remove the duplicated serialization/ReportDataSchema construction (the serializable_* variables and ReportDataSchema instantiation should be produced only by _build_report_data), ensuring all callers that don't pass the new args continue to work.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@elementary/monitor/api/report/report.py`:
- Around line 76-95: Multiple concurrent dbt subprocesses created by
_create_subprocess_runner/SubprocessDbtRunner can write to the same artifact/log
directories (DBT_LOG_PATH and dbt_project.yml's target-path), causing race
conditions; update _create_subprocess_runner to inject per-subprocess isolation
by cloning the runner.env_vars and adding a unique artifact/log path (e.g., set
DBT_LOG_PATH to a worker-specific temp dir) and also ensure a per-subprocess
target path is provided (either via an environment variable or by passing a
config override) so that run_operation-created subprocesses do not share the
same project_dir artifacts.
In `@elementary/monitor/cli.py`:
- Around line 452-458: Replace the loose integer type on the "--threads" Click
option with a ranged validator so zero and negative values are rejected at the
CLI: change the option declaration for the "--threads" argument used by the
monitor-report and monitor-send-report commands from type=int to
type=click.IntRange(min=1) (or type=click.IntRange(1, None)) so Click will
enforce threads >= 1 and surface an immediate error to the user.
---
Nitpick comments:
In `@elementary/monitor/api/report/report.py`:
- Around line 451-493: The parallel path duplicates the
ReportDataSchema-building logic; change _build_report_data to accept optional
parameters for pre-fetched invocation data (e.g., invocations_data and
models_latest_invocation) and use those instead of redoing serialization when
provided, keeping existing behavior by making them optional with defaults; then
update _get_report_data_parallel to call _build_report_data passing the
already-fetched models_latest_invocation and invocations_data and remove the
duplicated serialization/ReportDataSchema construction (the serializable_*
variables and ReportDataSchema instantiation should be produced only by
_build_report_data), ensuring all callers that don't pass the new args continue
to work.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 16bd31bf-041a-4378-9f87-ea58c7ebec88
📒 Files selected for processing (3)
elementary/monitor/api/report/report.pyelementary/monitor/cli.pyelementary/monitor/data_monitoring/report/data_monitoring_report.py
- Extract shared serialization logic into _assemble_report_data, used by both sequential and parallel paths - Reduce API object construction duplication in parallel path with factory helpers - Add unit tests for subprocess runner creation, routing logic, thread pool usage, and error propagation
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
elementary/monitor/api/report/report.py (5)
239-239: Inconsistent internal default forthreads.
_get_report_data_paralleldefaultsthreads=4while the publicget_report_datadefaults tothreads=1. Since_get_report_data_parallelis only reached whenthreads > 1via the router, the internal default is effectively dead code and can mislead future callers. Consider removing the default or aligning it with the router semantics (threads >= 2).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` at line 239, The internal default for threads on _get_report_data_parallel (threads: int = 4) conflicts with the public get_report_data default (threads=1) and is effectively dead; update _get_report_data_parallel to remove the misleading default (make threads a required parameter) or set it to align with router semantics (e.g., threads: int = 2 or threads: int >= 2) so callers and future maintainers see consistent behavior—modify the signature of _get_report_data_parallel and any internal calls (and the router logic that branches to _get_report_data_parallel) to pass the explicit threads value from get_report_data or the router.
340-368: Phase 3/4 unnecessarily useSubprocessDbtRunnerfor sequential calls.Both
LineageAPI(line 340) andFiltersAPI(line 365) are instantiated withparallel_runnerbut invoked sequentially — a single call each. UsingSubprocessDbtRunnerincurs process-spawn overhead without any parallelism benefit, and it diverges from_get_report_data_sequentialwhich usesself.dbt_runner(the in-process runner) for these same APIs. Suggest usingself.dbt_runnerhere for parity and performance.♻️ Suggested fix
# Phase 3: lineage depends on all node IDs - lineage = LineageAPI(dbt_runner=parallel_runner).get_lineage( + lineage = LineageAPI(dbt_runner=self.dbt_runner).get_lineage( lineage_node_ids, exclude_elementary_models ) @@ - filters_api=FiltersAPI(dbt_runner=parallel_runner), + filters_api=FiltersAPI(dbt_runner=self.dbt_runner),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 340 - 368, LineageAPI and FiltersAPI are being constructed with parallel_runner (SubprocessDbtRunner) even though they are used sequentially; change their instantiation to use the in-process runner self.dbt_runner instead of parallel_runner to avoid unnecessary subprocess overhead — update the LineageAPI(dbt_runner=parallel_runner) call and the FiltersAPI(dbt_runner=parallel_runner) call to use dbt_runner=self.dbt_runner (or equivalent parameter name) before passing the resulting lineage and filters_api into _assemble_report_data.
369-370: Parallel path swallows exceptions without logging.A module-level
loggerwas just added (line 46) but theexcept Exception as errorblock here returns the error without logging it. In the parallel path, exceptions can originate from worker threads/subprocesses and are non-trivial to attribute without a log line. Suggestlogger.exception(...)to preserve the traceback for diagnosis (same applies to lines 225-226 in the sequential path, but the newloggeravailability makes this a good moment to address both).♻️ Suggested fix
except Exception as error: + logger.exception("Parallel report data fetch failed") return ReportDataSchema(), error🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 369 - 370, The parallel-path except block currently returns ReportDataSchema(), error without logging; update the handler to call logger.exception("Failed to build report (parallel path): %s", error) (or logger.exception with a clear message) before returning to preserve traceback; do the same change in the sequential-path except that returns ReportDataSchema(), error (use a distinct message like "Failed to build report (sequential path)"); reference the existing logger object and the ReportDataSchema/ error variables so you only add logger.exception(...) prior to the return.
372-396: Missing type annotations on_assemble_report_data.All 22 parameters lack type hints, which is inconsistent with the rest of the class (e.g.,
_get_groups,_serialize_*). Given the complexity of the argument list (mixedDict,List,Schematypes), annotations would meaningfully improve readability and catch mismatches between sequential/parallel callers at type-check time.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 372 - 396, The _assemble_report_data function is missing type annotations for all parameters; add explicit, precise type hints for each argument (e.g., days_back: int, project_name: str, env: str, warehouse_type: str, seeds: List[SeedSchema] or Dict[str, SeedSchema] as appropriate, snapshots: List[SnapshotSchema], models: List[ModelSchema], sources: List[SourceSchema], exposures: List[ExposureSchema], singular_tests: List[TestSchema], models_runs: Dict[str, InvocationSchema] or List[InvocationSchema], coverages: CoverageSchema or Dict[str, CoverageSchema], tests: List[TestSchema], test_invocation: Optional[InvocationSchema], test_results: List[TestResultSchema], source_freshness_results: List[FreshnessResultSchema], test_runs: List[RunSchema], source_freshness_runs: List[RunSchema], lineage: LineageSchema, filters_api: FiltersApiSchema, models_latest_invocation: Optional[InvocationSchema], invocations_data: InvocationsDataSchema) matching existing project schemas/types and keep the return type Tuple[ReportDataSchema, Optional[Exception]]; update any imports and/or type aliases used by _assemble_report_data to ensure type-checking passes and callers align with these annotations.
277-285: Use keyword arguments for thread pool submissions to prevent silent breakage if method signatures drift.The sequential path invokes
get_models_runs(days_back=..., exclude_elementary_models=...)(lines 179–181) andget_test_results(invocation_id=..., disable_samples=...)(lines 187–190) with keyword arguments. The parallel path at lines 278–281 and 329–333 relies on positional order. While the current signatures match, positional arguments create a silent failure risk: if either method's signature changes or parameters are reordered, the calls will invoke incorrectly without raising aTypeError. Usefunctools.partialor inlinelambdawith explicit keyword arguments for safety.♻️ Suggested fix
f_models_runs = pool.submit( - _new_models_api().get_models_runs, - days_back, - exclude_elementary_models, + lambda: _new_models_api().get_models_runs( + days_back=days_back, + exclude_elementary_models=exclude_elementary_models, + ) )And similarly for
f_test_results(lines 329–333):f_test_results = pool.submit( - _new_tests_api().get_test_results, - test_invocation.invocation_id, - disable_samples, + lambda: _new_tests_api().get_test_results( + invocation_id=test_invocation.invocation_id, + disable_samples=disable_samples, + ) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 277 - 285, The parallel pool.submit calls (e.g., f_models_runs, f_coverages, f_tests, f_test_invocation and the later f_test_results) pass positional args to _new_models_api().get_models_runs, _new_tests_api().get_tests and _new_invocations_api().get_test_invocation_from_filter which risks silent breakage if signatures change; update each pool.submit to pass keyword arguments (use functools.partial(fn, days_back=..., exclude_elementary_models=...) or a lambda that calls fn(days_back=..., exclude_elementary_models=...)) so the submitted callable binds parameters by name (apply the same change to the get_test_results/get_test_invocation calls referenced by f_test_results).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 76-121: The test allows hidden exceptions because
mock_pool.submit.return_value.result.return_value = {} makes Phase-2 code
operate on a dict (missing .invocation_id) which raises and is swallowed; fix
the test for test_uses_thread_pool_executor by returning a mock object with an
invocation_id for the test-invocation future (so Phase 2 sees .invocation_id),
assert that _get_report_data_parallel returned err is None or that
api._assemble_report_data was called to ensure the full parallel flow ran, and
rename the unused variable result in test_error_propagation to _ to satisfy
RUF059; update mocks created in the test to return appropriate per-future
results instead of a plain dict.
---
Nitpick comments:
In `@elementary/monitor/api/report/report.py`:
- Line 239: The internal default for threads on _get_report_data_parallel
(threads: int = 4) conflicts with the public get_report_data default (threads=1)
and is effectively dead; update _get_report_data_parallel to remove the
misleading default (make threads a required parameter) or set it to align with
router semantics (e.g., threads: int = 2 or threads: int >= 2) so callers and
future maintainers see consistent behavior—modify the signature of
_get_report_data_parallel and any internal calls (and the router logic that
branches to _get_report_data_parallel) to pass the explicit threads value from
get_report_data or the router.
- Around line 340-368: LineageAPI and FiltersAPI are being constructed with
parallel_runner (SubprocessDbtRunner) even though they are used sequentially;
change their instantiation to use the in-process runner self.dbt_runner instead
of parallel_runner to avoid unnecessary subprocess overhead — update the
LineageAPI(dbt_runner=parallel_runner) call and the
FiltersAPI(dbt_runner=parallel_runner) call to use dbt_runner=self.dbt_runner
(or equivalent parameter name) before passing the resulting lineage and
filters_api into _assemble_report_data.
- Around line 369-370: The parallel-path except block currently returns
ReportDataSchema(), error without logging; update the handler to call
logger.exception("Failed to build report (parallel path): %s", error) (or
logger.exception with a clear message) before returning to preserve traceback;
do the same change in the sequential-path except that returns
ReportDataSchema(), error (use a distinct message like "Failed to build report
(sequential path)"); reference the existing logger object and the
ReportDataSchema/ error variables so you only add logger.exception(...) prior to
the return.
- Around line 372-396: The _assemble_report_data function is missing type
annotations for all parameters; add explicit, precise type hints for each
argument (e.g., days_back: int, project_name: str, env: str, warehouse_type:
str, seeds: List[SeedSchema] or Dict[str, SeedSchema] as appropriate, snapshots:
List[SnapshotSchema], models: List[ModelSchema], sources: List[SourceSchema],
exposures: List[ExposureSchema], singular_tests: List[TestSchema], models_runs:
Dict[str, InvocationSchema] or List[InvocationSchema], coverages: CoverageSchema
or Dict[str, CoverageSchema], tests: List[TestSchema], test_invocation:
Optional[InvocationSchema], test_results: List[TestResultSchema],
source_freshness_results: List[FreshnessResultSchema], test_runs:
List[RunSchema], source_freshness_runs: List[RunSchema], lineage: LineageSchema,
filters_api: FiltersApiSchema, models_latest_invocation:
Optional[InvocationSchema], invocations_data: InvocationsDataSchema) matching
existing project schemas/types and keep the return type Tuple[ReportDataSchema,
Optional[Exception]]; update any imports and/or type aliases used by
_assemble_report_data to ensure type-checking passes and callers align with
these annotations.
- Around line 277-285: The parallel pool.submit calls (e.g., f_models_runs,
f_coverages, f_tests, f_test_invocation and the later f_test_results) pass
positional args to _new_models_api().get_models_runs, _new_tests_api().get_tests
and _new_invocations_api().get_test_invocation_from_filter which risks silent
breakage if signatures change; update each pool.submit to pass keyword arguments
(use functools.partial(fn, days_back=..., exclude_elementary_models=...) or a
lambda that calls fn(days_back=..., exclude_elementary_models=...)) so the
submitted callable binds parameters by name (apply the same change to the
get_test_results/get_test_invocation calls referenced by f_test_results).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 76e46349-7bcd-4e50-bda4-be6bf55b392a
📒 Files selected for processing (3)
elementary/monitor/api/report/report.pytests/unit/monitor/api/report/__init__.pytests/unit/monitor/api/report/test_report_parallel.py
There was a problem hiding this comment.
🧹 Nitpick comments (1)
elementary/monitor/cli.py (1)
672-678: LGTM — consistent with thereportcommand.Same validated option duplicated on
send-report. If more commands adopt--threadslater, consider lifting this intocommon_options(...)gated oncmd in (Command.REPORT, Command.SEND_REPORT)to avoid drift between the two declarations, but this is optional.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/cli.py` around lines 672 - 678, Duplicate click option definition for "--threads" exists in the report and send-report command declarations; refactor by moving the shared click.option into the common_options(...) helper and conditionally apply it only when cmd is in (Command.REPORT, Command.SEND_REPORT) so both command functions (the ones registering "--threads") use the single source of truth; update any command decorators to remove the local "--threads" declaration and ensure common_options(...) documents the same IntRange(min=1) default and help text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@elementary/monitor/cli.py`:
- Around line 672-678: Duplicate click option definition for "--threads" exists
in the report and send-report command declarations; refactor by moving the
shared click.option into the common_options(...) helper and conditionally apply
it only when cmd is in (Command.REPORT, Command.SEND_REPORT) so both command
functions (the ones registering "--threads") use the single source of truth;
update any command decorators to remove the local "--threads" declaration and
ensure common_options(...) documents the same IntRange(min=1) default and help
text.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/unit/monitor/api/report/test_report_parallel.py (1)
14-14: Make this test prove subprocess runs always raise on failure.Because the fixture already has
raise_on_failure=True, the assertion would still pass if_create_subprocess_runner()simply forwards the parent runner’s value. Set the fixture value toFalsein this test and keep expectingTrueso error propagation is actually covered.🧪 Proposed strengthening
def test_creates_runner_with_correct_config(self, mock_dbt_runner): + mock_dbt_runner.raise_on_failure = False api = ReportAPI(mock_dbt_runner) with patch( "elementary.monitor.api.report.report.SubprocessDbtRunner" ) as mock_cls: api._create_subprocess_runner() @@ - raise_on_failure=True, + raise_on_failure=True,Also applies to: 33-33
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/monitor/api/report/test_report_parallel.py` at line 14, The test currently sets runner.raise_on_failure = True which doesn't prove subprocesses are forced to raise; change the test to set runner.raise_on_failure = False but keep the assertion that the subprocess runner has raise_on_failure == True so the test verifies _create_subprocess_runner() forces error propagation; update both occurrences of runner.raise_on_failure in this test file and ensure the assertion still checks the subprocess runner produced by _create_subprocess_runner().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 11-12: Replace insecure /tmp literals used in the test by
assigning neutral fake paths to the runner attributes: change runner.project_dir
and runner.profiles_dir (and the other occurrences of those same assignments
later in the file) from "/tmp/project" and "/tmp/profiles" to non-/tmp
placeholders (e.g., "/fake/project" and "/fake/profiles" or "fake/project" and
"fake/profiles") so Ruff S108 is not triggered while preserving test semantics.
---
Nitpick comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Line 14: The test currently sets runner.raise_on_failure = True which doesn't
prove subprocesses are forced to raise; change the test to set
runner.raise_on_failure = False but keep the assertion that the subprocess
runner has raise_on_failure == True so the test verifies
_create_subprocess_runner() forces error propagation; update both occurrences of
runner.raise_on_failure in this test file and ensure the assertion still checks
the subprocess runner produced by _create_subprocess_runner().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 06033225-6488-4eee-8642-bcbf8d5c6227
📒 Files selected for processing (1)
tests/unit/monitor/api/report/test_report_parallel.py
| runner.project_dir = "/tmp/project" | ||
| runner.profiles_dir = "/tmp/profiles" |
There was a problem hiding this comment.
Use non-/tmp fake paths to avoid Ruff S108 failures.
These are only test literals, but Ruff flags them as probable insecure temp paths. Use neutral fake paths instead.
🧹 Proposed fix
- runner.project_dir = "/tmp/project"
- runner.profiles_dir = "/tmp/profiles"
+ runner.project_dir = "/mock/project"
+ runner.profiles_dir = "/mock/profiles"
@@
- project_dir="/tmp/project",
- profiles_dir="/tmp/profiles",
+ project_dir="/mock/project",
+ profiles_dir="/mock/profiles",Also applies to: 30-31
🧰 Tools
🪛 Ruff (0.15.10)
[error] 11-11: Probable insecure usage of temporary file or directory: "/tmp/project"
(S108)
[error] 12-12: Probable insecure usage of temporary file or directory: "/tmp/profiles"
(S108)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/monitor/api/report/test_report_parallel.py` around lines 11 - 12,
Replace insecure /tmp literals used in the test by assigning neutral fake paths
to the runner attributes: change runner.project_dir and runner.profiles_dir (and
the other occurrences of those same assignments later in the file) from
"/tmp/project" and "/tmp/profiles" to non-/tmp placeholders (e.g.,
"/fake/project" and "/fake/profiles" or "fake/project" and "fake/profiles") so
Ruff S108 is not triggered while preserving test semantics.
Background
edr reportexecutes ~14 independentdbt run-operationcalls sequentially. On adapters with high query latency (e.g. Athena), each call takes ~12s regardless of query complexity, resulting in ~3m40s total for report data fetching alone.Changes
Add a
--threadsCLI option (default1) toedr reportandedr send-report. When set to>1, independent dbt operations run concurrently.Thread safety
dbt's Python API (
APIDbtRunner) is not thread-safe due to global mutable state (GLOBAL_FLAGS, adapterFACTORY, etc.). To ensure correctness, parallel execution usesSubprocessDbtRunner, which spawns an independentdbtprocess per call and is safe to use from multiple threads.Execution phases
The fetching is split into dependency-aware phases:
get_seeds,get_models,get_test_results, etc.get_exposures+get_test_results— depend on Phase 1 resultsget_lineage— depends on all node IDsWith
--threads=1(default), the existing sequential code path is used — no behavioral change.Summary by CodeRabbit