Skip to content

Commit 75a7896

Browse files
committed
feat: enhance Streamlit UI with platform readiness checks and quick Airflow triggers; improve error handling in data quality checks; update documentation for new features
1 parent 5c5968d commit 75a7896

10 files changed

Lines changed: 446 additions & 19 deletions

File tree

README.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
- **Natural Language Pipelines** — Describe what you need in plain text; the AI agent generates and executes a validated YAML pipeline
1818
- **11 Composable Services** — Extract (CSV, SQL, API, Excel), Transform (clean, filter, join, quality checks, outlier detection, LLM), Load (CSV, Excel, JSON, Parquet)
1919
- **High-Performance Data Transfer** — Apache Arrow IPC binary format between all services (zero-copy, no CSV/JSON parsing overhead)
20-
- **Visual Pipeline Builder** — Streamlit UI with YAML editor, real-time execution monitor, dataset explorer (browse outputs, preview, download), and service catalog
20+
- **Visual Pipeline Builder** — Streamlit UI with YAML editor, platform readiness checks, one-click Airflow triggers, real-time execution monitor, dataset explorer (browse outputs, preview, download), and service catalog
2121
- **Airflow Orchestration** — Production-ready DAGs with file-based XCom for large datasets
2222
- **Full Observability** — Prometheus metrics + Grafana dashboards + structured JSON logging + correlation ID tracing
2323
- **Extensible** — Add a new service in minutes using the included scaffold template and step-by-step guide
@@ -64,7 +64,16 @@ Trigger one of the pre-built DAGs from the Airflow UI:
6464

6565
Or paste a YAML from [`examples/pipelines/`](examples/pipelines/) into the Streamlit YAML Editor.
6666

67-
After execution, switch to the **Datasets** tab to browse output files, preview data, and download results.
67+
After execution, switch to the **Datasets** tab to browse output files, preview data, download results, and compare the latest run against the previous successful run.
68+
69+
### New in Streamlit UX
70+
71+
- **Platform Readiness** panel in Execution tab: live checks for Airflow, Streamlit, Prometheus, Grafana, including Airflow scheduler heartbeat status
72+
- **Quick Airflow Triggers** in Execution tab: trigger `hr_analytics_pipeline`, `ecommerce_pipeline`, or `weather_api_pipeline` without leaving Streamlit
73+
- **Execution insights**: successful steps, processed data volume, slowest step, and orchestration overhead (%)
74+
- **Run diagnostics** in Datasets tab: per-run active processing vs queue/orchestration gap timeline
75+
- **Run Comparison** in Datasets tab: current run vs previous successful run deltas for duration, final rows, and removed outliers
76+
- **Business KPI snapshot** from latest output file (domain-aware: HR, e-commerce, weather, or generic completeness)
6877

6978
---
7079

@@ -215,7 +224,7 @@ Results including PNG charts and an interactive Plotly report are saved to `benc
215224
### Testing
216225

217226
```bash
218-
make test # Run all 208 tests (unit + integration)
227+
make test # Run all tests (unit + integration)
219228
make test-coverage # With coverage report
220229
make lint # Ruff linter
221230
```

airflow/dags/xcom_file_utils.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@
1818
SHARED_DATA_ROOT = "/app/data"
1919

2020

21+
def _shared_dir_mode() -> int:
22+
raw_mode = os.getenv("ETL_SHARED_DIR_MODE", "775")
23+
if raw_mode.startswith("0o"):
24+
raw_mode = raw_mode[2:]
25+
try:
26+
return int(raw_mode, 8)
27+
except ValueError:
28+
return 0o775
29+
30+
2131
def save_ipc_to_shared(ipc_data: bytes, dataset_name: str, step_name: str) -> str:
2232
"""
2333
Save Arrow IPC data to the shared volume.
@@ -32,10 +42,11 @@ def save_ipc_to_shared(ipc_data: bytes, dataset_name: str, step_name: str) -> st
3242
"""
3343
xcom_dir = os.path.join(SHARED_DATA_ROOT, dataset_name, "xcom")
3444
os.makedirs(xcom_dir, exist_ok=True)
35-
# Ensure the dataset dir and xcom dir are writable by all containers
45+
# Keep write access configurable; default to least-privilege group writable.
46+
mode = _shared_dir_mode()
3647
try:
37-
os.chmod(os.path.join(SHARED_DATA_ROOT, dataset_name), 0o777)
38-
os.chmod(xcom_dir, 0o777)
48+
os.chmod(os.path.join(SHARED_DATA_ROOT, dataset_name), mode)
49+
os.chmod(xcom_dir, mode)
3950
except OSError:
4051
pass
4152

benchmark/monolithic_pipeline.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,13 @@ def run_monolithic_pipeline(
5555

5656
# ── Step 2: Data Quality Check ──
5757
t0 = time.time()
58-
assert len(df) >= 10, f"Dataset has only {len(df)} rows (min: 10)"
58+
if len(df) < 10:
59+
raise ValueError(f"Dataset has only {len(df)} rows (min: 10)")
5960
total_cells = df.shape[0] * df.shape[1]
6061
total_nulls = df.isnull().sum().sum()
6162
null_ratio = total_nulls / total_cells if total_cells > 0 else 0
62-
assert null_ratio <= null_threshold, f"Null ratio {null_ratio:.3f} exceeds threshold {null_threshold}"
63+
if null_ratio > null_threshold:
64+
raise ValueError(f"Null ratio {null_ratio:.3f} exceeds threshold {null_threshold}")
6365
results["quality_check"] = {
6466
"duration_sec": time.time() - t0,
6567
"null_ratio": float(null_ratio),

docs/access-credentials.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ cp .env.example .env
6161
| `LLM_PROVIDER` | `openai` | AI provider: `openai` or `local` |
6262
| `OPENAI_API_KEY` | *(empty)* | OpenAI API key — required if `LLM_PROVIDER=openai` |
6363
| `OPENAI_MODEL` | `gpt-4o-mini` | OpenAI model to use |
64+
| `AIRFLOW_BASE_URL` | `http://localhost:8080` | Streamlit quick-trigger target Airflow URL (optional override) |
65+
| `AIRFLOW_USERNAME` | `admin` | Streamlit quick-trigger Airflow username (optional override) |
66+
| `AIRFLOW_PASSWORD` | `admin` | Streamlit quick-trigger Airflow password (optional override) |
67+
68+
> The Streamlit UI shows a warning when Airflow still uses default credentials.
6469
6570
---
6671

docs/architecture.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,21 @@ Every service writes a JSON file to `/app/data/<dataset_name>/metadata/` after p
518518
These files are low-latency (written locally on shared volume) and allow
519519
reconstructing what happened at each step without depending on external systems.
520520

521+
### Streamlit observability surface (operator UX)
522+
523+
The Streamlit UI now exposes runtime diagnostics directly in the **Execution** and
524+
**Datasets** tabs to reduce context switching between Airflow, Grafana, and logs.
525+
526+
- **Platform Readiness**: probes Airflow (`/health`), Streamlit, Prometheus, and Grafana
527+
- **Airflow scheduler heartbeat status**: explicit healthy/not-ready signal from Airflow health payload
528+
- **Execution insights**: successful steps, total processed KB, slowest step, orchestration overhead (%)
529+
- **Run timeline diagnostics**: active processing vs queue/orchestration gap per step
530+
- **Run comparison**: latest run vs previous successful run deltas for duration, final rows, and outliers removed
531+
532+
This UX layer does not replace Prometheus/Grafana but gives immediate operator feedback
533+
for common questions like "is the platform ready?", "where is time spent?", and
534+
"did this run improve or regress vs the previous one?".
535+
521536
### Prometheus scraping
522537

523538
```

docs/demo-guide.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ http://localhost:8501
5454

5555
You'll see four tabs: **Pipeline Editor**, **Execution**, **Datasets**, **Services**.
5656

57+
The **Execution** tab now includes:
58+
- **Platform Readiness** checks (Airflow, Streamlit, Prometheus, Grafana)
59+
- **Quick Airflow Triggers** (trigger core DAGs directly from Streamlit)
60+
- **Execution Insights** (slowest step, processed data, orchestration overhead)
61+
5762
### 2. Chat Tab — describe the pipeline in natural language
5863

5964
In the chat panel, type something like:
@@ -74,6 +79,8 @@ The panel shows:
7479
Click **Execute**. You'll see steps complete sequentially (or in parallel if independent).
7580
When done, a table preview and saved file path in `/app/data/` appear.
7681

82+
You can also use **Quick Airflow Triggers** in the same tab to launch DAGs directly via Airflow API.
83+
7784
---
7885

7986
## Scenario B — YAML Editor (pre-built pipeline)
@@ -233,9 +240,14 @@ to the shared volume at `/app/data/<dataset_name>/`.
233240
Open http://localhost:8501**Datasets** tab to:
234241

235242
- **Output Files** — preview CSV, Parquet, JSON, or Excel files and download them directly
236-
- **Pipeline Runs** — view run history grouped by `correlation_id`, with per-step duration, row counts, and service details
243+
- **Pipeline Runs** — view run history grouped by `correlation_id`, with:
244+
- run timeline (active processing vs queue/orchestration gap)
245+
- compact run comparison (current vs previous successful run)
246+
- per-step duration, row counts, and service details
237247
- **Raw Metadata** — inspect the JSON metadata files written by each service
238248

249+
The dataset overview also includes a **Business KPI snapshot** from the latest output (for example: revenue/AOV for e-commerce, attrition rate for HR).
250+
239251
Select any dataset from the sidebar dropdown to explore its contents.
240252

241253
---

services/common/path_utils.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,16 @@
66
_DATASET_NAME_PATTERN = re.compile(r"^[A-Za-z0-9._-]{1,128}$")
77

88

9+
def _shared_dir_mode():
10+
raw_mode = os.getenv("ETL_SHARED_DIR_MODE", "775")
11+
if raw_mode.startswith("0o"):
12+
raw_mode = raw_mode[2:]
13+
try:
14+
return int(raw_mode, 8)
15+
except ValueError:
16+
return 0o775
17+
18+
919
def sanitize_dataset_name(dataset_name):
1020
if not isinstance(dataset_name, str):
1121
raise ValueError("Parameter 'dataset_name' must be a string")
@@ -35,11 +45,11 @@ def ensure_dataset_dirs(dataset_name):
3545
dataset_folder = resolved_dataset_folder
3646
metadata_dir = os.path.join(dataset_folder, "metadata")
3747
os.makedirs(metadata_dir, exist_ok=True)
38-
# Ensure directories are world-writable so all containers (services run as
39-
# root, Airflow runs as uid 50000) can read/write to the same shared volume.
48+
# Keep write access configurable; default to least-privilege group writable.
49+
mode = _shared_dir_mode()
4050
try:
41-
os.chmod(dataset_folder, 0o777)
42-
os.chmod(metadata_dir, 0o777)
51+
os.chmod(dataset_folder, mode)
52+
os.chmod(metadata_dir, mode)
4353
except OSError:
4454
pass # best-effort; may fail on read-only mounts
4555
return dataset_folder, metadata_dir

services/extract-excel-service/app/extract.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pyarrow as pa
55
from common.path_utils import resolve_input_path
66

7+
78
def process_excel(file_path):
89
"""
910
Load Excel into DataFrame and return Arrow Table.

0 commit comments

Comments
 (0)