Skip to content

Commit da6cf38

Browse files
authored
feat(elt-pipelines): Add initial project with example pipeline (#368)
ref #321 Creates an `elt-pipelines` project with a `statusdisplay` pipeline, which uses `elt-common` to ingest data from the [ISIS cycles endpoint](https://status.isis.stfc.ac.uk/api/cycles). The pipeline can be run using the instructions from the README. - `elt-common` is currently included as a dependency in `elt-pipelines` using a relative path pointing at the package in the parent folder. This makes it easy to work on both locally, but means anything wanting to run pipelines needs both packages in its working directory; don't think it's a big deal, but maybe not ideal? Are we aiming to publish `elt-common` to PyPI so we can use it as a 'normal' dependency? - The new pipeline is ingesting into an `elt_cycles` table for testing purposes. Once we want to migrate to this pipeline in production, it should probably start ingesting into `cycles` instead, but because it's using a different schema from the current DLT pipeline we'll need to replace the table entirely, so there will be a bit of extra work needed at the time
1 parent 506e9fd commit da6cf38

6 files changed

Lines changed: 1749 additions & 2 deletions

File tree

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ repos:
1212
hooks:
1313
# Run the linter.
1414
- id: ruff-check
15-
files: (elt-common|warehouses)
15+
files: (elt-common|elt-pipelines|warehouses)
1616
# Run the formatter.
1717
- id: ruff-format
18-
files: (elt-common|warehouses)
18+
files: (elt-common|elt-pipelines|warehouses)
1919
- repo: https://github.com/shellcheck-py/shellcheck-py
2020
rev: v0.11.0.1
2121
hooks:

elt-pipelines/.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# ignore basic python artifacts
2+
.env
3+
**/__pycache__/
4+
**/*.py[cod]
5+
**/*$py.class
6+
**/build/
7+
**/*.egg-info/

elt-pipelines/README.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# elt-pipelines
2+
3+
Pipelines for ingesting data from various sources into Iceberg catalogs using elt-common.
4+
5+
**Under construction** - this project is being developed as a replacement for the existing DLT pipelines.
6+
[See here](https://github.com/ISISNeutronMuon/analytics-data-platform/issues/321) for details.
7+
8+
## Development setup
9+
10+
Development requires the following tools:
11+
12+
- [uv](https://docs.astral.uv/uv/): Used to manage both Python installations and dependencies
13+
14+
### Setting up a Python virtual environment
15+
16+
Once `uv` is installed, create an environment, activate it, and install dependencies with:
17+
18+
```bash
19+
> uv venv
20+
> source .venv/bin/activate
21+
> uv sync
22+
```
23+
24+
Pipelines can declare optional dependencies in `pyproject.toml` - for example, `statusdisplay` uses `requests` for
25+
fetching data. To install any additional dependencies for that specific pipeline, use:
26+
27+
```bash
28+
> uv sync --extra statusdisplay
29+
```
30+
31+
## Running a pipeline
32+
33+
Pipelines are run using the `elt` CLI tool. As an example, with the package as current working directory,
34+
`elt run facility_ops statusdisplay` will run the statusdisplay pipeline. See `elt -h` for full usage.
35+
36+
## Directory structure
37+
38+
The project uses the following directory structure:
39+
40+
```txt
41+
elt-pipelines/
42+
|-- <target warehouse>/
43+
| |-- ingest/
44+
| | |-- <domain>/
45+
| | | |-- <job name>/
46+
| | | | |-- <job name>.py
47+
```
48+
49+
- Each 'target warehouse' is the name of an Iceberg warehouse. The data ingested by the pipelines inside that directory end up in that warehouse.
50+
- The directory structure from `ingest` down is what is required for `elt-common` to be able to run 'ingest' pipelines.
51+
- Data from ingest pipelines is considered 'raw' data, and is loaded into a warehouse suffixed with `_landing`.
52+
- Under construction: Each warehouse will also have a `transform` subdirectory containing pipelines for converting the raw data into it's final state in the target warehouse.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from datetime import datetime
2+
import io
3+
import json
4+
import pyarrow.json
5+
import requests
6+
7+
from elt_common.extract import BaseExtract, ResourceProperties, ResourceWriteProperties
8+
9+
CYCLES_URL = "https://status.isis.stfc.ac.uk/api/cycles"
10+
11+
12+
class Extract(BaseExtract):
13+
def extract_resource_properties(self):
14+
yield (
15+
"elt_cycles",
16+
ResourceProperties(
17+
extractor=extract_cycles,
18+
write_properties=ResourceWriteProperties(write_mode="replace"),
19+
watermark_column=None,
20+
),
21+
)
22+
23+
24+
def extract_cycles(_):
25+
data = clean(fetch())
26+
newline_delimited = "\n".join(json.dumps(row) for row in data)
27+
28+
with io.BytesIO(newline_delimited.encode()) as f:
29+
yield pyarrow.json.read_json(f)
30+
31+
32+
def fetch():
33+
try:
34+
response = requests.get(CYCLES_URL, timeout=20)
35+
except requests.Timeout as ex:
36+
raise RuntimeError("Timed out when fetching cycles") from ex
37+
38+
if not response.ok:
39+
raise RuntimeError(f"Failed to fetch cycles - {response.reason}")
40+
41+
return response.json()
42+
43+
44+
def reformat(date_string):
45+
"""Convert a date from ISO format into one that pyarrow will convert into a timestamp"""
46+
return datetime.fromisoformat(date_string).strftime("%Y-%m-%d %H:%M:%S")
47+
48+
49+
def clean(data):
50+
for cycle in data:
51+
for phase in cycle["phases"]:
52+
if "start" in phase:
53+
phase["start"] = reformat(phase["start"])
54+
if "end" in phase:
55+
phase["end"] = reformat(phase["end"])
56+
57+
return data

elt-pipelines/pyproject.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[project]
2+
name = "elt-pipelines"
3+
version = "0.1.0"
4+
description = "Pipelines for ingesting data into Iceberg catalogs"
5+
readme = "README.md"
6+
requires-python = ">=3.13"
7+
dependencies = [
8+
"elt-common",
9+
"pydantic-settings>=2.14.2",
10+
]
11+
12+
[project.optional-dependencies]
13+
statusdisplay = [
14+
"pyarrow>=24.0.0",
15+
"requests>=2.34.2",
16+
]
17+
18+
[tool.uv.sources]
19+
elt-common = { path = "../elt-common", editable = true }
20+
21+
[dependency-groups]
22+
dev = [
23+
"prek>=0.4.5",
24+
]

0 commit comments

Comments
 (0)