Skip to content

feat(elt-common): Configure iceberg warehouse based on pipeline directory name#376

Merged
WHTaylor merged 3 commits into
mainfrom
321-warehouse-configuration
Jun 30, 2026
Merged

feat(elt-common): Configure iceberg warehouse based on pipeline directory name#376
WHTaylor merged 3 commits into
mainfrom
321-warehouse-configuration

Conversation

@WHTaylor

@WHTaylor WHTaylor commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

ref #321

Instead of requiring the warehouse value to be configured for pyiceberg, set it based on the pipeline directory name. This is as per the most recent couple of comments in the issue.

I suspect is_ingest_job may not be the best way to handle what it does, but probably best to revisit when we add the transform functionality and have to actually start thinking about non-ingest jobs.

Summary by CodeRabbit

  • New Features
    • Ingest jobs now use the project’s warehouse name to choose the correct storage location.
    • Warehouse selection is now explicit, with landing-area handling for ingest jobs.
  • Bug Fixes
    • Improved catalog setup to override mismatched warehouse settings and fail clearly if configuration is missing.
    • Updated job discovery and execution so manifests carry the right warehouse details.
  • Documentation
    • Clarified the required directory structure guidance for using the shared pipeline tools.

@WHTaylor WHTaylor requested review from a team as code owners June 29, 2026 16:14
@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

ELTJobManifest gains warehouse_name and is_ingest_job fields with a computed destination_warehouse property. Pipeline discovery extracts the warehouse name from the project root and stamps manifests with it. connect_catalog now requires an explicit warehouse_name argument, validates config existence, and overrides the configured warehouse when they differ. The runner passes job.destination_warehouse into connect_catalog.

Changes

Warehouse-aware catalog connection and manifest routing

Layer / File(s) Summary
ELTJobManifest contract: warehouse fields and destination_warehouse property
elt-common/src/elt_common/typing.py, elt-common/tests/unit_tests/test_pipeline.py
Adds warehouse_name and is_ingest_job fields to ELTJobManifest and implements destination_warehouse to return {warehouse_name}_landing for ingest jobs or warehouse_name otherwise. Tests replace two separate property tests with a single test_properties() covering all three properties.
Pipeline discovery: warehouse extraction and manifest stamping
elt-common/src/elt_common/pipeline.py
PipelinesProject.__init__ captures the warehouse name from root.parts[-1]; _discover_jobs and _create_ingest_manifest now accept warehouse_name and stamp warehouse_name plus is_ingest_job=True onto each created manifest.
connect_catalog: warehouse_name parameter, validation, and override
elt-common/src/elt_common/iceberg/catalog.py, elt-common/tests/unit_tests/iceberg/test_catalog.py
Adds a module-level LOGGER; rewrites connect_catalog to require warehouse_name, raise RuntimeError when the catalog config is absent, log a warning when the preconfigured warehouse differs, and override the warehouse value before calling load_catalog. Tests add a RuntimeError case and update existing assertions for the warehouse override.
Runner wiring and test fixture updates
elt-common/src/elt_common/runner.py, elt-common/tests/unit_tests/test_runner.py, elt-common/tests/unit_tests/test_extract.py, elt-pipelines/README.md
IcebergIO is initialised with connect_catalog(job.destination_warehouse) rather than connect_catalog(). Test fixtures for runner and extract are updated with warehouse_name and is_ingest_job=True. README directory-structure bullet is reworded to state the full layout is required for elt-common.

Possibly related PRs

  • ISISNeutronMuon/analytics-data-platform#352: Implements run_job/run_ingest in runner.py with Iceberg wiring; this PR adjusts the same runner to pass job.destination_warehouse into the updated connect_catalog signature.

Suggested reviewers

  • martyngigg

Poem

🐇 A warehouse name, once lost in thin air,
Now stamped on each manifest with care.
The catalog checks, warns if it strays,
And lands the data in the right place always.
Hop hop, the pipeline runs without a snare! 🏭

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 35.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The title clearly matches the main change: deriving the Iceberg warehouse from the pipeline directory name.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
elt-common/src/elt_common/iceberg/catalog.py (1)

27-41: 🗄️ Data Integrity & Integration | 🔵 Trivial | ⚡ Quick win

Copy the catalog config before overriding warehouse. get_catalog_config() returns a live mapping, so mutating conf here can leak the warehouse into later calls and silently retarget writes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@elt-common/src/elt_common/iceberg/catalog.py` around lines 27 - 41, The
catalog configuration returned by IcebergCatalogConfig.get_catalog_config() is
being mutated in place when setting "warehouse", which can leak the override
into later calls. In the catalog setup flow that loads config and assigns
conf["warehouse"], first make a copy of the mapping before modifying it, then
apply the warehouse override and continue using the copied config so the
original catalog state remains unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@elt-common/src/elt_common/pipeline.py`:
- Line 19: The pipeline initialization in the class using `_warehouse` should
not rely on `root.parts[-1]`, because `Path(".")` can produce an empty parts
tuple and break setup. Update the assignment to use `root.name` with a safe
resolved fallback for cases like the current directory, so the logic in the
pipeline/root handling remains robust regardless of how ROOT is passed in.

In `@elt-pipelines/README.md`:
- Around line 52-54: The README text has a possessive typo in the description of
the transform pipelines. Update the wording in the relevant documentation
sentence to use “its final state” instead of “it’s final state” so the
`transform` subdirectory description reads correctly.

---

Nitpick comments:
In `@elt-common/src/elt_common/iceberg/catalog.py`:
- Around line 27-41: The catalog configuration returned by
IcebergCatalogConfig.get_catalog_config() is being mutated in place when setting
"warehouse", which can leak the override into later calls. In the catalog setup
flow that loads config and assigns conf["warehouse"], first make a copy of the
mapping before modifying it, then apply the warehouse override and continue
using the copied config so the original catalog state remains unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 1b4f41a2-6011-45ca-ab73-7b769781d060

📥 Commits

Reviewing files that changed from the base of the PR and between da6cf38 and b63abc5.

📒 Files selected for processing (9)
  • elt-common/src/elt_common/iceberg/catalog.py
  • elt-common/src/elt_common/pipeline.py
  • elt-common/src/elt_common/runner.py
  • elt-common/src/elt_common/typing.py
  • elt-common/tests/unit_tests/iceberg/test_catalog.py
  • elt-common/tests/unit_tests/test_extract.py
  • elt-common/tests/unit_tests/test_pipeline.py
  • elt-common/tests/unit_tests/test_runner.py
  • elt-pipelines/README.md

Comment thread elt-common/src/elt_common/pipeline.py Outdated
raise ValueError(f"Invalid project. Ingest directory '{ingest_dir}' does not exist.")

self._root = root
self._warehouse = root.parts[-1]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
python - <<'PY'
from pathlib import Path

for raw in [".", "", "./", "warehouse", "/"]:
    p = Path(raw)
    try:
        last = p.parts[-1]
    except Exception as exc:
        last = f"{type(exc).__name__}: {exc}"
    print(
        f"{raw!r}: parts={p.parts!r}, name={p.name!r}, "
        f"resolved_name={p.resolve().name!r}, parts[-1]={last!r}"
    )
PY

Repository: ISISNeutronMuon/analytics-data-platform

Length of output: 621


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the relevant file with line numbers and nearby context.
sed -n '1,220p' elt-common/src/elt_common/pipeline.py | cat -n

printf '\n---\n'

# Find where PipelinesProject is constructed and how root is passed.
rg -n "PipelinesProject\(" -S .

printf '\n---\n'

# Find any references to the warehouse derivation / _warehouse usage.
rg -n "_warehouse|destination_warehouse|warehouse_name" elt-common/src -S

Repository: ISISNeutronMuon/analytics-data-platform

Length of output: 7101


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the call sites to see what kind of Path is passed as `root`.
sed -n '1,140p' elt-common/src/elt_common/cli.py | cat -n
printf '\n---\n'
sed -n '1,140p' elt-common/tests/unit_tests/test_pipeline.py | cat -n

Repository: ISISNeutronMuon/analytics-data-platform

Length of output: 7138


Use root.name with a resolved fallback here. Path(".") is accepted by the CLI, and root.parts[-1] raises IndexError for that input, so project initialisation breaks when ROOT is the current directory.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@elt-common/src/elt_common/pipeline.py` at line 19, The pipeline
initialization in the class using `_warehouse` should not rely on
`root.parts[-1]`, because `Path(".")` can produce an empty parts tuple and break
setup. Update the assignment to use `root.name` with a safe resolved fallback
for cases like the current directory, so the logic in the pipeline/root handling
remains robust regardless of how ROOT is passed in.

Comment thread elt-pipelines/README.md Outdated
Comment on lines +52 to +54
- Data from ingest pipelines is considered 'raw' data, and is loaded into a warehouse suffixed with `_landing`.
- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the raw data into it's final state in the target warehouse.
- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the
raw data into it's final state in the target warehouse.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win

Fix the possessive in the README.

Line 54 should use its final state, not it's final state.

Proposed fix
-- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the
-  raw data into it's final state in the target warehouse.
+- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the
+  raw data into its final state in the target warehouse.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- Data from ingest pipelines is considered 'raw' data, and is loaded into a warehouse suffixed with `_landing`.
- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the raw data into it's final state in the target warehouse.
- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the
raw data into it's final state in the target warehouse.
- Data from ingest pipelines is considered 'raw' data, and is loaded into a warehouse suffixed with `_landing`.
- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the
raw data into its final state in the target warehouse.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@elt-pipelines/README.md` around lines 52 - 54, The README text has a
possessive typo in the description of the transform pipelines. Update the
wording in the relevant documentation sentence to use “its final state” instead
of “it’s final state” so the `transform` subdirectory description reads
correctly.

WHTaylor added 2 commits June 29, 2026 17:26
Don't think this is likely to be a thing we want to do, but it's probably a little less fragile
@martyngigg martyngigg self-assigned this Jun 30, 2026

@martyngigg martyngigg left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. I tried with and without a warehouse name in my local yaml config and the log message works as expected.

This is a minor thing for now but might get more irritating, I get a lot of

2026-06-30 11:35:39 INFO     pyiceberg.io — Loaded FileIO: pyiceberg.io.fsspec.FsspecFileIO
2026-06-30 11:35:39 INFO     pyiceberg.io — Loaded FileIO: pyiceberg.io.fsspec.FsspecFileIO
2026-06-30 11:35:40 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner
2026-06-30 11:35:41 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner
2026-06-30 11:35:41 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner

in the logs that feel more low level than INFO messages. Maybe we can look at lowering their priority by default at some point..

@WHTaylor WHTaylor changed the title Configure iceberg warehouse based on pipeline directory name feat(elt-common): Configure iceberg warehouse based on pipeline directory name Jun 30, 2026
@WHTaylor WHTaylor merged commit 5a7505d into main Jun 30, 2026
4 checks passed
@WHTaylor WHTaylor deleted the 321-warehouse-configuration branch June 30, 2026 10:44
@WHTaylor

Copy link
Copy Markdown
Contributor Author

This is a minor thing for now but might get more irritating, I get a lot of

2026-06-30 11:35:39 INFO     pyiceberg.io — Loaded FileIO: pyiceberg.io.fsspec.FsspecFileIO
2026-06-30 11:35:39 INFO     pyiceberg.io — Loaded FileIO: pyiceberg.io.fsspec.FsspecFileIO
2026-06-30 11:35:40 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner
2026-06-30 11:35:41 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner
2026-06-30 11:35:41 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner

in the logs that feel more low level than INFO messages. Maybe we can look at lowering their priority by default at some point..

Might even be worth seeing if we can get this changed upstream, apache/iceberg-python@ceeb084 added a few logs that feel like they should be debug level. It looks like those actually make up the majority of logging calls from pyiceberg.

@martyngigg

Copy link
Copy Markdown
Member

This is a minor thing for now but might get more irritating, I get a lot of

2026-06-30 11:35:39 INFO     pyiceberg.io — Loaded FileIO: pyiceberg.io.fsspec.FsspecFileIO
2026-06-30 11:35:39 INFO     pyiceberg.io — Loaded FileIO: pyiceberg.io.fsspec.FsspecFileIO
2026-06-30 11:35:40 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner
2026-06-30 11:35:41 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner
2026-06-30 11:35:41 INFO     pyiceberg.io.fsspec — Loading signer S3V4RestSigner

in the logs that feel more low level than INFO messages. Maybe we can look at lowering their priority by default at some point..

Might even be worth seeing if we can get this changed upstream, apache/iceberg-python@ceeb084 added a few logs that feel like they should be debug level. It looks like those actually make up the majority of logging calls from pyiceberg.

Sounds good to me. I've had to put the odd PR here and there into Lakekeeper and Superset. The maintainers of pyiceberg seem welcoming of contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants