Skip to content

Commit 27176eb

Browse files
committed
feat: enhance Streamlit UI with Dataset Explorer and update pipeline descriptions
1 parent e635730 commit 27176eb

8 files changed

Lines changed: 292 additions & 12 deletions

File tree

.github/copilot-instructions.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ etl_microservices/
141141
│ └── pipeline_compiler.py # Parallel pipeline execution via Preparator SDK (dispatch registry + topological layering)
142142
143143
├── streamlit_app/
144-
│ ├── app.py # Streamlit UI: chat, YAML editor, execution monitor, service catalog, data preview/download, health dashboard
144+
│ ├── app.py # Streamlit UI: pipeline editor, YAML validation, execution monitor, dataset explorer (browse/preview/download outputs), service catalog
145145
│ ├── Dockerfile
146146
│ └── requirements.txt
147147
@@ -646,7 +646,7 @@ These are hard-won insights from building and debugging the platform. They shoul
646646
| **Orchestration** | Apache Airflow | 2.10.4 | PostgreSQL 16 backend, DAG-based |
647647
| **AI (cloud)** | OpenAI API | GPT-4o-mini default | Pipeline generation |
648648
| **AI (local)** | HuggingFace Transformers | Llama 3.2 1B Instruct | Text completion service |
649-
| **UI** | Streamlit | 1.30+ | Chat + pipeline builder + data preview/download + health dashboard |
649+
| **UI** | Streamlit | 1.30+ | Pipeline editor + execution monitor + dataset explorer + service catalog |
650650
| **Containers** | Docker Compose | v2 | Single bridge network |
651651
| **Monitoring** | Prometheus + Grafana | latest | Per-service metrics |
652652
| **Testing** | pytest | 7.x+ | Unit + integration |

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
- **Natural Language Pipelines** — Describe what you need in plain text; the AI agent generates and executes a validated YAML pipeline
1818
- **11 Composable Services** — Extract (CSV, SQL, API, Excel), Transform (clean, filter, join, quality checks, outlier detection, LLM), Load (CSV, Excel, JSON, Parquet)
1919
- **High-Performance Data Transfer** — Apache Arrow IPC binary format between all services (zero-copy, no CSV/JSON parsing overhead)
20-
- **Visual Pipeline Builder** — Streamlit UI with chat panel, YAML editor, real-time execution monitor, and data preview/download
20+
- **Visual Pipeline Builder** — Streamlit UI with YAML editor, real-time execution monitor, dataset explorer (browse outputs, preview, download), and service catalog
2121
- **Airflow Orchestration** — Production-ready DAGs with file-based XCom for large datasets
2222
- **Full Observability** — Prometheus metrics + Grafana dashboards + structured JSON logging + correlation ID tracing
2323
- **Extensible** — Add a new service in minutes using the included scaffold template and step-by-step guide
@@ -46,7 +46,7 @@ The Airflow admin user (`admin`/`admin`) is created automatically on first boot.
4646

4747
| Interface | URL | Credentials |
4848
|---|---|---|
49-
| **Streamlit** (AI Pipeline Builder) | http://localhost:8501 | — |
49+
| **Streamlit** (Pipeline Builder + Dataset Explorer) | http://localhost:8501 | — |
5050
| **Airflow** | http://localhost:8080 | admin / admin |
5151
| **Grafana** (pre-provisioned dashboard) | http://localhost:3000 | admin / *GF_SECURITY_ADMIN_PASSWORD from .env* |
5252
| **Prometheus** | http://localhost:9090 | — |
@@ -64,6 +64,8 @@ Trigger one of the pre-built DAGs from the Airflow UI:
6464

6565
Or paste a YAML from [`examples/pipelines/`](examples/pipelines/) into the Streamlit YAML Editor.
6666

67+
After execution, switch to the **Datasets** tab to browse output files, preview data, and download results.
68+
6769
---
6870

6971
## How It Works
@@ -243,7 +245,7 @@ Full walkthrough: [docs/extending.md](docs/extending.md)
243245
<summary>Click to expand</summary>
244246

245247
```
246-
├── docker-compose.yml # Full stack (17 containers)
248+
├── docker-compose.yml # Full stack (18 containers)
247249
├── Makefile # Common commands
248250
├── data/demo/ # Bundled demo datasets
249251
│ ├── hr_sample.csv

airflow/dags/hr_analytics_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@
4343
description="HR People Analytics ETL pipeline (IBM HR Attrition dataset)",
4444
tags=["hr", "analytics", "etl", "v4"],
4545
params={
46-
"dataset_name": Param("hr_attrition", type="string", description="Dataset identifier"),
46+
"dataset_name": Param("hr_demo", type="string", description="Dataset identifier"),
4747
"file_path": Param(
48-
"/app/data/hr_attrition/WA_Fn-UseC_-HR-Employee-Attrition.csv",
48+
"/app/data/hr_demo/data.csv",
4949
type="string",
5050
description="Path to HR CSV file on shared volume",
5151
),

airflow/dags/xcom_file_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ def save_ipc_to_shared(ipc_data: bytes, dataset_name: str, step_name: str) -> st
3232
"""
3333
xcom_dir = os.path.join(SHARED_DATA_ROOT, dataset_name, "xcom")
3434
os.makedirs(xcom_dir, exist_ok=True)
35+
# Ensure the dataset dir and xcom dir are writable by all containers
36+
try:
37+
os.chmod(os.path.join(SHARED_DATA_ROOT, dataset_name), 0o777)
38+
os.chmod(xcom_dir, 0o777)
39+
except OSError:
40+
pass
3541

3642
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
3743
unique_id = uuid.uuid4().hex[:8]

docs/demo-guide.md

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ curl http://localhost:5002/health # clean-nan-service
5252

5353
http://localhost:8501
5454

55-
You'll see four tabs: **Chat**, **YAML Editor**, **Service Catalog**, **Health Dashboard**.
55+
You'll see four tabs: **Pipeline Editor**, **Execution**, **Datasets**, **Services**.
5656

5757
### 2. Chat Tab — describe the pipeline in natural language
5858

59-
In the text box, type something like:
59+
In the chat panel, type something like:
6060

6161
> *"Load the HR dataset, check data quality, remove outliers on monthly salary and save as CSV"*
6262
@@ -88,7 +88,7 @@ cat examples/pipelines/hr_analytics.yaml
8888

8989
### 2. Paste it in the editor
9090

91-
Open http://localhost:8501 → tab **YAML Editor** → paste the content.
91+
Open http://localhost:8501 → tab **Pipeline Editor** → paste the content in the YAML editor.
9292

9393
The validator shows any errors (nonexistent service, missing parameter, cycle in graph)
9494
before execution even starts.
@@ -225,6 +225,21 @@ Click **Trigger DAG w/ config** and pass JSON:
225225

226226
---
227227

228+
## Browsing Results — Dataset Explorer
229+
230+
After any pipeline completes (UI, Airflow, or SDK), output files and metadata are written
231+
to the shared volume at `/app/data/<dataset_name>/`.
232+
233+
Open http://localhost:8501**Datasets** tab to:
234+
235+
- **Output Files** — preview CSV, Parquet, JSON, or Excel files and download them directly
236+
- **Pipeline Runs** — view run history grouped by `correlation_id`, with per-step duration, row counts, and service details
237+
- **Raw Metadata** — inspect the JSON metadata files written by each service
238+
239+
Select any dataset from the sidebar dropdown to explore its contents.
240+
241+
---
242+
228243
## What to observe during execution
229244

230245
### Metrics in Grafana
@@ -270,5 +285,6 @@ docker exec extract-csv-service ls /app/data/hr_demo/metadata/
270285
| `curl /health` → connection refused | service not started | `docker compose ps` + `docker compose up -d <service>` |
271286
| Streamlit shows "AI agent not available" | `OPENAI_API_KEY` missing | add key to `.env` + `docker compose restart streamlit-app` |
272287
| Pipeline fails with "file not found" | demo data not loaded | `make demo-data` |
288+
| Datasets tab shows no datasets | no pipelines have run yet | run a pipeline first, then refresh |
273289
| Airflow task fail "No module named..." | Airflow container not updated | `docker compose up -d --build airflow` |
274290
| Grafana "No data" in panels | services not scraped yet | wait 15s or run a pipeline to generate traffic |

preparator/preparator_v4.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,29 @@ def run_service_ipc_in_ipc_out_with_header(self, service_key, ipc_data, header_d
122122
self._handle_error_response(resp, service_key)
123123
return resp.content # Arrow IPC bytes in case of success
124124

125+
def run_service_ipc_in_json_out_with_header(self, service_key, ipc_data, header_dict):
126+
"""
127+
Executes a POST request to the microservice identified by `service_key`,
128+
sending `ipc_data` in the body and parameters (header_dict) in a header called 'X-Params' (as JSON).
129+
Returns the parsed JSON response body as a dict (used by services that respond with JSON, e.g. load-data).
130+
"""
131+
header_json = json.dumps(header_dict)
132+
self.logger.info(f"Calling {service_key} with IPC data (size={len(ipc_data)}). Header: {header_json}")
133+
url = self.services[service_key]
134+
135+
resp = self.session.post(
136+
url,
137+
data=ipc_data, # Arrow IPC in body
138+
headers={
139+
"Content-Type": "application/vnd.apache.arrow.stream",
140+
"X-Params": header_json,
141+
"X-Correlation-ID": self.correlation_id,
142+
},
143+
timeout=self.timeout
144+
)
145+
self._handle_error_response(resp, service_key)
146+
return resp.json() # JSON dict in case of success
147+
125148
# ================================================================
126149
# EXTRACTION
127150
# ================================================================
@@ -297,9 +320,11 @@ def text_completion_llm(
297320
def load_data(self, ipc_data, format='csv', dataset_name="default_dataset"):
298321
"""
299322
'load_data' microservice
323+
The load-data-service returns a JSON status response (not Arrow IPC),
324+
so we use run_service_ipc_in_json_out_with_header.
300325
"""
301326
header_dict = {
302327
"dataset_name": dataset_name,
303328
"format": format
304329
}
305-
return self.run_service_ipc_in_ipc_out_with_header("load_data", ipc_data, header_dict)
330+
return self.run_service_ipc_in_json_out_with_header("load_data", ipc_data, header_dict)

services/common/path_utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ def ensure_dataset_dirs(dataset_name):
3535
dataset_folder = resolved_dataset_folder
3636
metadata_dir = os.path.join(dataset_folder, "metadata")
3737
os.makedirs(metadata_dir, exist_ok=True)
38+
# Ensure directories are world-writable so all containers (services run as
39+
# root, Airflow runs as uid 50000) can read/write to the same shared volume.
40+
try:
41+
os.chmod(dataset_folder, 0o777)
42+
os.chmod(metadata_dir, 0o777)
43+
except OSError:
44+
pass # best-effort; may fail on read-only mounts
3845
return dataset_folder, metadata_dir
3946

4047

0 commit comments

Comments
 (0)