Skip to content

Commit 5a7505d

Browse files
authored
feat(elt-common): Configure iceberg warehouse based on pipeline directory name (#376)
ref #321 Instead of requiring the `warehouse` value to be configured for pyiceberg, set it based on the pipeline directory name. This is as per the most recent couple of comments in the issue. I suspect `is_ingest_job` may not be the best way to handle what it does, but probably best to revisit when we add the `transform` functionality and have to actually start thinking about non-ingest jobs.
1 parent 8e98c1e commit 5a7505d

9 files changed

Lines changed: 92 additions & 28 deletions

File tree

elt-common/src/elt_common/iceberg/catalog.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,43 @@
44
a ``connect_catalog()`` helper that returns a connected pyiceberg ``Catalog``.
55
"""
66

7+
import logging
8+
79
from pyiceberg.catalog import Catalog, load_catalog
810
from pyiceberg.typedef import Identifier
911
from pyiceberg.utils.config import Config as IcebergCatalogConfig
1012

13+
LOGGER = logging.getLogger(__name__)
14+
15+
16+
def connect_catalog(warehouse_name: str) -> Catalog:
17+
"""Connect to the 'default' Iceberg catalog.
18+
19+
Loads configuration as per `pyiceberg`_, except the value of 'warehouse' which is set directly.
20+
21+
:param warehouse_name: the name of the warehouse to connect to
22+
:return: a catalog that can be used for reading and writing
1123
12-
def connect_catalog() -> Catalog:
24+
.. _pyiceberg: https://py.iceberg.apache.org/configuration/
25+
"""
1326
"""The default load_catalog only allows environment variables set before the first import or pyiceberg.catalog"""
1427
config = IcebergCatalogConfig()
1528
name = config.get_default_catalog_name()
16-
return load_catalog(name, **config.get_catalog_config(name)) # type: ignore
29+
conf = config.get_catalog_config(name)
30+
31+
if conf is None:
32+
raise RuntimeError(f"Couldn't load iceberg configuration for for catalog '{name}'")
33+
34+
if "warehouse" in conf and warehouse_name != conf["warehouse"]:
35+
msg = (
36+
"elt configures the destination warehouse based on the pipeline directory. "
37+
"Preconfigured value '%s' is being replaced by '%s'"
38+
)
39+
LOGGER.warning(msg, conf["warehouse"], warehouse_name)
40+
41+
conf["warehouse"] = warehouse_name
42+
43+
return load_catalog(name, **conf)
1744

1845

1946
def table_identifier(namespace: str, table_name: str) -> Identifier:

elt-common/src/elt_common/pipeline.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ def __init__(self, root: Path) -> None:
1515
if not ingest_dir.is_dir():
1616
raise ValueError(f"Invalid project. Ingest directory '{ingest_dir}' does not exist.")
1717

18-
self._root = root
18+
resolved = root.resolve()
19+
self._root = resolved
20+
self._warehouse = resolved.parts[-1]
1921
self._ingest_dir = ingest_dir
2022
self._name = root.name
2123
self._ingest_jobs = []
@@ -31,44 +33,46 @@ def ingest_dir(self) -> Path:
3133
@property
3234
def ingest_jobs(self) -> list[ELTJobManifest]:
3335
if not self._ingest_jobs:
34-
self._ingest_jobs = _discover_jobs(self._ingest_dir)
36+
self._ingest_jobs = _discover_jobs(self._warehouse, self._ingest_dir)
3537

3638
return self._ingest_jobs
3739

3840

39-
def _discover_jobs(ingest_dir: Path):
40-
"""Find all subdirectories under *root/ingest* and create manifests describing them.
41+
def _discover_jobs(warehouse_name: str, ingest_dir: Path):
42+
"""Find all subdirectories under the warehouse 'ingest' directory and create manifests describing them.
4143
4244
The following directory structure is assumed:
4345
44-
root/
46+
<warehouse_name>/
4547
|-- ingest/
4648
| |-- domain_A/
4749
| | |-- source_A/
4850
| | |-- source_B/
4951
| |-- domain_B/
5052
| |-- source_A/
51-
|-- transform/ # Root of dbt project
5253
5354
Each subdirectory under ingest is considered a domain and each subdirectory
5455
underneath a domain is a data source from that domain.
5556
56-
:param ingest_dir: Root directory to search recursively.
57+
:param warehouse_name: The top level directory name, which is stored in the manifest.
58+
:param ingest_dir: Root ingest directory to search for jobs.
5759
:returns: List of parsed manifests.
5860
"""
5961

6062
return [
61-
_create_ingest_manifest(job_dir)
63+
_create_ingest_manifest(warehouse_name, job_dir)
6264
for domain_dir in ingest_dir.iterdir()
6365
if domain_dir.is_dir()
6466
for job_dir in domain_dir.iterdir()
6567
if job_dir.is_dir()
6668
]
6769

6870

69-
def _create_ingest_manifest(job_dir: Path) -> ELTJobManifest:
71+
def _create_ingest_manifest(warehouse_name: str, job_dir: Path) -> ELTJobManifest:
7072
return ELTJobManifest(
73+
warehouse_name=warehouse_name,
7174
name=job_dir.name,
7275
domain=job_dir.parent.name,
7376
ingest_job_dir=job_dir.resolve(),
77+
is_ingest_job=True,
7478
)

elt-common/src/elt_common/runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def run_job(job: ELTJobManifest) -> None:
4141
def run_ingest(job: ELTJobManifest) -> dict[str, int]:
4242
"""Import the extract function, call it, and write results to Iceberg."""
4343

44-
iceberg_io = IcebergIO(connect_catalog())
44+
iceberg_io = IcebergIO(connect_catalog(job.destination_warehouse))
4545

4646
# Get object that will do the extraction.
4747
# Environment variables for the object's configuration must have been set

elt-common/src/elt_common/typing.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,20 @@ def write_table(
4141
class ELTJobManifest:
4242
"""Parsed representation of an ELT job"""
4343

44+
warehouse_name: str
4445
name: str
4546
domain: str
47+
is_ingest_job: bool
4648
ingest_job_dir: Path
4749

4850
@property
4951
def full_name(self) -> str:
5052
return f"{self.domain}.{self.name}"
5153

54+
@property
55+
def destination_warehouse(self):
56+
return f"{self.warehouse_name}_landing" if self.is_ingest_job else self.warehouse_name
57+
5258
@property
5359
def destination_namespace(self) -> str:
5460
"""The destination namespace for this job: ``{domain}_{name}``."""

elt-common/tests/unit_tests/iceberg/test_catalog.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
"""Tests for elt_common.iceberg.catalog"""
22

3+
from unittest.mock import MagicMock
4+
35
import pytest
46
from pytest_mock import MockerFixture
5-
from unittest.mock import MagicMock
67

78
from elt_common.iceberg.catalog import (
89
connect_catalog,
@@ -26,14 +27,20 @@ def mock_load_catalog(mocker: MockerFixture):
2627
return mocker.patch("elt_common.iceberg.catalog.load_catalog")
2728

2829

30+
def test_no_config_found_raises_error(mock_config):
31+
mock_config.get_catalog_config.return_value = None
32+
with pytest.raises(RuntimeError):
33+
connect_catalog("test_warehouse")
34+
35+
2936
def test_connect_catalog_loads_default_catalog(mock_config, mock_load_catalog):
3037
# Execute
31-
connect_catalog()
38+
connect_catalog("test_warehouse")
3239

3340
# Assert
3441
mock_config.get_default_catalog_name.assert_called_once()
3542
mock_config.get_catalog_config.assert_called_once_with("default")
36-
mock_load_catalog.assert_called_once_with("default", warehouse="/tmp/warehouse")
43+
mock_load_catalog.assert_called_once_with("default", warehouse="test_warehouse")
3744

3845

3946
def test_connect_catalog_forwards_all_options_from_pyiceberg_catalog_config(
@@ -44,13 +51,17 @@ def test_connect_catalog_forwards_all_options_from_pyiceberg_catalog_config(
4451
"uri": "http://localhost:8181",
4552
"auth": "oauth2",
4653
}
54+
# 'warehouse' is overwritten by the provided value
55+
expected_config = {k: v for k, v in catalog_config.items()}
56+
expected_config["warehouse"] = "test_warehouse"
57+
4758
mock_config.get_catalog_config.return_value = catalog_config
4859

4960
# Execute
50-
connect_catalog()
61+
connect_catalog("test_warehouse")
5162

5263
# Assert
53-
mock_load_catalog.assert_called_once_with("default", **catalog_config)
64+
mock_load_catalog.assert_called_once_with("default", **expected_config)
5465

5566

5667
def test_table_id_returns_tuple_identifier():

elt-common/tests/unit_tests/test_extract.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ def test_deserialize_watermark_good_values(serialized, expected):
4343
def make_error_manifest(filename):
4444
this_dir = Path(__file__).parent
4545
return ELTJobManifest(
46+
warehouse_name="warehouse",
4647
name=filename,
4748
domain="whatever",
49+
is_ingest_job=True,
4850
ingest_job_dir=this_dir / "create_extract_obj_fakes" / "errors",
4951
)
5052

@@ -69,8 +71,10 @@ def test_create_extract_obj_errors(filename, expected_error, expected_error_mess
6971
def make_manifest(filename):
7072
this_dir = Path(__file__).parent
7173
return ELTJobManifest(
74+
warehouse_name="test_warehouse",
7275
name=filename,
7376
domain="whatever",
77+
is_ingest_job=True,
7478
ingest_job_dir=this_dir / "create_extract_obj_fakes",
7579
)
7680

elt-common/tests/unit_tests/test_pipeline.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,32 @@
1010
)
1111

1212

13-
def test_namespace_property_combines_domain_and_name():
14-
"""Tests for IngestJobManifest.destination_namespace"""
13+
def test_properties():
1514
manifest = ELTJobManifest(
15+
warehouse_name="test_warehouse",
1616
name="source_a",
1717
domain="facility_ops",
18+
is_ingest_job=True,
1819
ingest_job_dir=Path("/some/path"),
1920
)
20-
assert manifest.destination_namespace == "facility_ops_source_a"
21-
21+
assert manifest.destination_namespace == "facility_ops_source_a", (
22+
"Destination namespace should be 'domain'_'name'"
23+
)
24+
assert manifest.full_name == "facility_ops.source_a", "Name should be 'domain'.'name'"
25+
assert manifest.destination_warehouse == "test_warehouse_landing", (
26+
"Ingest job destination should have _landing appended"
27+
)
2228

23-
def test_full_name_property_combines_domain_and_name():
24-
"""Tests for IngestJobManifest.full_name"""
25-
manifest = ELTJobManifest(
29+
non_ingest_manifest = ELTJobManifest(
30+
warehouse_name="test_warehouse",
2631
name="source_a",
2732
domain="facility_ops",
33+
is_ingest_job=False,
2834
ingest_job_dir=Path("/some/path"),
2935
)
30-
assert manifest.full_name == "facility_ops.source_a"
36+
assert non_ingest_manifest.destination_warehouse == "test_warehouse", (
37+
"Non-ingest manifest shouldn't change warehouse name"
38+
)
3139

3240

3341
def test_init_stores_root_and_derives_name(tmp_path: Path):

elt-common/tests/unit_tests/test_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ def elt_job(request) -> ELTJobManifest:
3838
job_name = request.param
3939

4040
return ELTJobManifest(
41+
warehouse_name="test_warehouse",
4142
name=job_name,
4243
domain=TEST_DOMAIN,
44+
is_ingest_job=True,
4345
ingest_job_dir=this_dir / "runner_extractor_fakes",
4446
)
4547

elt-pipelines/README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ elt-pipelines/
4646
| | | | |-- <job name>.py
4747
```
4848

49-
- Each 'target warehouse' is the name of an Iceberg warehouse. The data ingested by the pipelines inside that directory end up in that warehouse.
50-
- The directory structure from `ingest` down is what is required for `elt-common` to be able to run 'ingest' pipelines.
49+
- This directory structure is required for using `elt-common`
50+
- Each 'target warehouse' is the name of an Iceberg warehouse. The data ingested by the pipelines inside that directory
51+
end up in that warehouse.
5152
- Data from ingest pipelines is considered 'raw' data, and is loaded into a warehouse suffixed with `_landing`.
52-
- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the raw data into it's final state in the target warehouse.
53+
- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the
54+
raw data into its final state in the target warehouse.

0 commit comments

Comments
 (0)