Skip to content

Commit cf71425

Browse files
committed
docs: update copilot-instructions to reflect current codebase
- Add e-commerce and weather API use cases to project identity - Add data/demo/, docs/, examples/, templates/ to project structure tree - Add ecommerce_pipeline and weather_api_pipeline DAGs - Update pipeline_compiler description (parallel execution, dispatch registry) - Fix pipeline_agent description (validate_pipeline() is module-level, not __new__) - Add path_utils.ensure_dataset_dirs() to shared utilities docs - Update test section: 17 files, 208 tests, per-file counts, new test files - Add services_config fixture to key fixtures list - Add template/docs references to new-service checklist - Fix duplicate numbering in lessons learned (two items #15 -> 15,16,17,18) - Update centralized utilities lesson to remove stale line-count claim - Add README.md to project structure
1 parent cdfda2e commit cf71425

1 file changed

Lines changed: 57 additions & 13 deletions

File tree

.github/copilot-instructions.md

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
**What this is:** An AI-assisted, modular ETL (Extract, Transform, Load) platform where each data operation is an independent Flask microservice. Pipelines are orchestrated via Apache Airflow DAGs or via the AI agent (natural language → YAML → execution).
1010

11-
**Primary use case:** HR / People Analytics — the platform ships with a production-ready pipeline for the IBM HR Attrition dataset (extract → quality check → drop columns → outlier detection → clean NaN → load).
11+
**Primary use case:** HR / People Analytics and E-commerce — the platform ships with production-ready pipelines for the IBM HR Attrition dataset and e-commerce order analytics, plus a weather API demo. Bundled demo datasets in `data/demo/` allow out-of-the-box testing.
1212

1313
**Core differentiator:** Composable, dynamically-assembled pipelines where an AI agent can translate natural language into validated YAML pipeline definitions and execute them. Each microservice is independently deployable, observable, and scalable.
1414

@@ -96,14 +96,15 @@ etl_microservices/
9696
├── docker-compose.yml # Full stack: 11 services + postgres + airflow + prometheus + grafana + streamlit
9797
├── Makefile # Common commands: up, down, build, test, lint, benchmark
9898
├── pyproject.toml # Project metadata, pytest/ruff/coverage config
99+
├── README.md # Project overview and quickstart
99100
├── .env # Environment variables (not committed)
100101
├── .env.example # Template for .env
101102
102103
├── services/
103104
│ ├── common/ # Shared utilities (imported by all services)
104105
│ │ ├── arrow_utils.py # ipc_to_table(), table_to_ipc()
105106
│ │ ├── json_utils.py # NpEncoder (handles numpy types in JSON)
106-
│ │ ├── path_utils.py # sanitize_dataset_name(), resolve_input_path() — security
107+
│ │ ├── path_utils.py # sanitize_dataset_name(), resolve_input_path(), ensure_dataset_dirs() — security & paths
107108
│ │ ├── logging_config.py # JSONFormatter, CorrelationAdapter, configure_service_logging()
108109
│ │ ├── health.py # create_health_response() — enriched health checks
109110
│ │ └── service_utils.py # Counters, /health+/metrics, X-Params parsing, metadata, correlation ID
@@ -124,6 +125,8 @@ etl_microservices/
124125
│ ├── Dockerfile # Airflow webserver + scheduler, db migrate on startup
125126
│ └── dags/
126127
│ ├── hr_analytics_pipeline.py # HR use case (6-step pipeline)
128+
│ ├── ecommerce_pipeline.py # E-commerce order analytics pipeline
129+
│ ├── weather_api_pipeline.py # Weather API extraction demo
127130
│ ├── parametrized_preparator_v4_quality.py # Generic quality pipeline
128131
│ ├── parametrized_preparator_v4_ia.py # Pipeline with LLM text completion
129132
│ ├── parametrized_preparator_v4_quality_join.py # Pipeline with dataset join
@@ -133,7 +136,7 @@ etl_microservices/
133136
│ ├── __init__.py
134137
│ ├── llm_provider.py # Abstract LLMProvider + OpenAIProvider + LocalProvider
135138
│ ├── pipeline_agent.py # NL → YAML pipeline generation + validation
136-
│ └── pipeline_compiler.py # Pipeline execution via Preparator SDK
139+
│ └── pipeline_compiler.py # Parallel pipeline execution via Preparator SDK (dispatch registry + topological layering)
137140
138141
├── streamlit_app/
139142
│ ├── app.py # Streamlit UI: chat, YAML editor, execution monitor, service catalog, data preview/download, health dashboard
@@ -150,9 +153,32 @@ etl_microservices/
150153
│ ├── run_benchmark.py # Runner with charts (matplotlib + plotly)
151154
│ └── __init__.py
152155
156+
├── data/
157+
│ └── demo/ # Bundled demo datasets for out-of-the-box testing
158+
│ ├── hr_sample.csv # IBM HR Attrition sample (501 rows)
159+
│ └── ecommerce_orders.csv # E-commerce orders sample (501 rows)
160+
161+
├── docs/
162+
│ └── extending.md # Developer guide: add new services & create pipelines
163+
164+
├── examples/
165+
│ └── pipelines/ # Ready-to-use YAML pipeline definitions
166+
│ ├── hr_analytics.yaml
167+
│ ├── ecommerce_analytics.yaml
168+
│ ├── weather_data.yaml
169+
│ └── README.md
170+
171+
├── templates/
172+
│ └── new_service/ # Scaffolding template with placeholders for new services
173+
│ ├── Dockerfile
174+
│ ├── requirements.txt
175+
│ ├── run.py
176+
│ ├── README.md # Step-by-step checklist
177+
│ └── app/
178+
153179
├── tests/
154-
│ ├── conftest.py # Shared fixtures (sample Arrow tables, IPC data)
155-
│ ├── unit/ # 12 test files for business logic functions
180+
│ ├── conftest.py # Shared fixtures (sample Arrow tables, IPC data, services_config)
181+
│ ├── unit/ # 17 test files (208 tests total with integration)
156182
│ └── integration/ # 2 test files (Flask test client + Preparator SDK)
157183
158184
├── .github/
@@ -265,6 +291,13 @@ from common.service_utils import (
265291
parse_x_params, # Parse X-Params header (ValueError on bad JSON)
266292
save_metadata, # Write metadata JSON file
267293
)
294+
295+
# Path security & dataset directory management
296+
from common.path_utils import (
297+
sanitize_dataset_name, # Validate/sanitize dataset name
298+
resolve_input_path, # Resolve and validate input file paths
299+
ensure_dataset_dirs, # Create dataset + metadata directories
300+
)
268301
```
269302

270303
### Flask App Factory Pattern (`__init__.py`)
@@ -283,8 +316,8 @@ Every service's `create_app()` now includes:
283316
| Module | Purpose |
284317
|---|---|
285318
| `ai_agent/llm_provider.py` | Abstract `LLMProvider` + `OpenAIProvider` (GPT-4o-mini default) + `LocalProvider` (calls text-completion-llm-service) |
286-
| `ai_agent/pipeline_agent.py` | `PipelineAgent`: builds system prompt from `service_registry.json`, calls LLM to generate YAML, validates structure + services + params + dependencies. Can be instantiated without LLM (via `__new__()`) for validation-only use (e.g., Streamlit UI). |
287-
| `ai_agent/pipeline_compiler.py` | `PipelineCompiler`: executes validated pipeline definition step-by-step via Preparator SDK, returns `PipelineResult` with per-step metrics + `correlation_id`. Supports `join_datasets` (2 `depends_on` entries as input datasets). Exposes `last_step_outputs` dict for UI data preview. |
319+
| `ai_agent/pipeline_agent.py` | `PipelineAgent`: builds system prompt from `service_registry.json`, calls LLM to generate YAML, validates structure + services + params + dependencies. Standalone `validate_pipeline()` module-level function enables validation-only use without instantiating the agent (e.g., Streamlit UI). |
320+
| `ai_agent/pipeline_compiler.py` | `PipelineCompiler`: executes validated pipeline definitions via Preparator SDK with **parallel execution** of independent steps (topological layering via Kahn’s algorithm + `ThreadPoolExecutor`). Uses a **dispatch registry** (`_build_dispatch_registry()`) for extensibility—add new services via `register_service()` without if/elif chains. Returns `PipelineResult` with per-step metrics + `correlation_id`. Supports `join_datasets` (2 `depends_on` entries). Exposes `last_step_outputs` dict for UI data preview. |
288321
| `schemas/service_registry.json` | Complete metadata for all 11 services: name, type, description, endpoint, input/output formats, params with types/required/defaults/enums |
289322
| `schemas/pipeline_schema.json` | JSON Schema for pipeline definitions |
290323

@@ -356,6 +389,8 @@ Files stored at `/app/data/<dataset_name>/xcom/<step>_<timestamp>_<uuid>.arrow`.
356389
| DAG ID | Description |
357390
|---|---|
358391
| `hr_analytics_pipeline` | 6-step HR analytics (extract → quality → drop cols → outliers → clean → load) |
392+
| `ecommerce_pipeline` | E-commerce order analytics (extract → quality → clean → outliers → load) |
393+
| `weather_api_pipeline` | Weather API extraction demo (extract API → quality → load) |
359394
| `parametrized_preparator_v4_quality` | Generic quality pipeline (extract → quality → outliers → clean → load) |
360395
| `parametrized_preparator_v4_ia` | Pipeline with LLM text completion step |
361396
| `parametrized_preparator_v4_quality_join` | Pipeline with two-dataset join |
@@ -417,18 +452,24 @@ Single bridge network `etl-network`. Services reference each other by container
417452

418453
```
419454
tests/
420-
├── conftest.py # Shared fixtures: sample_arrow_table, sample_ipc_data, etc.
455+
├── conftest.py # Shared fixtures: sample_arrow_table, sample_ipc_data, services_config, etc.
421456
├── unit/ # Test pure business logic (no HTTP)
422457
│ ├── test_arrow_utils.py
423458
│ ├── test_clean.py # 14 tests (drop + 7 fill strategies + columns filter + validation)
424459
│ ├── test_columns.py
425460
│ ├── test_dq.py # 20 tests (min_rows, null_ratio, duplicates, types, unique, range, completeness)
461+
│ ├── test_extract_api.py # 7 tests (URL validation, auth, public/private API handling)
462+
│ ├── test_extract_csv.py # 6 tests (CSV loading, path resolution, error handling)
463+
│ ├── test_extract_excel.py # 6 tests (Excel loading, path resolution, error handling)
464+
│ ├── test_extract_sql.py # 18 tests (SQL validation, injection prevention, query execution)
426465
│ ├── test_health.py # 10 tests (health response structure, env vars, volume checks)
427466
│ ├── test_join.py
428-
│ ├── test_load.py # 8 tests (csv, json, xlsx, parquet, roundtrip, unsupported, edge cases)
467+
│ ├── test_load.py # 11 tests (csv, json, xlsx, parquet, roundtrip, xls normalization, edge cases)
429468
│ ├── test_logging_config.py # 14 tests (JSON formatter, CorrelationAdapter, configure, no-dup handlers)
430469
│ ├── test_outliers.py
431470
│ ├── test_path_utils.py
471+
│ ├── test_pipeline_agent.py # 16 tests (YAML generation, validation, schema compliance)
472+
│ ├── test_pipeline_compiler.py # 21 tests (dispatch, topological layers, parallel execution, error handling)
432473
│ └── test_service_utils.py # 11 tests (counters, correlation ID, parse_x_params, save_metadata)
433474
└── integration/ # Test Flask endpoints + Preparator SDK
434475
├── test_service_endpoints.py
@@ -442,6 +483,7 @@ tests/
442483
- `sample_arrow_table_with_outliers` — 100 rows with 2 injected outliers
443484
- `large_arrow_table` — 100k rows for performance testing
444485
- `sample_ipc_data` / `sample_ipc_data_with_nulls` — serialized Arrow IPC bytes
486+
- `services_config` — test service URL configuration fixture
445487
446488
### Important: sys.path Constraint
447489
@@ -475,7 +517,9 @@ make lint # ruff linter
475517

476518
## 10. Adding a New Microservice (Step-by-Step)
477519

478-
1. **Create directory:** `services/<name>/` with `Dockerfile`, `requirements.txt`, `run.py`, `app/__init__.py`, `app/routes.py`, `app/<logic>.py`
520+
A ready-to-copy scaffolding template is available in `templates/new_service/` with placeholder syntax. A comprehensive developer guide is in `docs/extending.md`.
521+
522+
1. **Create directory:** Copy `templates/new_service/` to `services/<name>/` and replace placeholders (`{{SERVICE_NAME}}`, `{{SERVICE_PORT}}`, etc.).
479523
2. **Logic module:** Pure function — takes `pyarrow.Table` + params, returns `pyarrow.Table`. No Flask imports.
480524
3. **Routes:** Follow the endpoint pattern (section 5). Include REQUEST/SUCCESS/ERROR Prometheus counters. Include `/health` endpoint.
481525
4. **Dockerfile:** Copy `common/`, install deps, set `PYTHONPATH=/app/services`, use gunicorn CMD with HEALTHCHECK.
@@ -563,11 +607,11 @@ These are hard-won insights from building and debugging the platform. They shoul
563607

564608
### Observability Lessons
565609

566-
15. **Centralized utilities eliminate boilerplate.** The `common/service_utils.py` module extracted ~800 lines of duplicated code (Prometheus counters, /health, /metrics, X-Params parsing, metadata writing) from 11 services into shared functions. This reduced each `routes.py` by ~60–80 lines and ensures consistent behavior across all services.
610+
16. **Centralized utilities eliminate boilerplate.** The `common/service_utils.py` module extracted duplicated code (Prometheus counters, /health, /metrics, X-Params parsing, metadata writing) from 11 services into shared functions. This ensures consistent behavior across all services.
567611

568-
16. **Correlation ID must be generated at the edge.** The Preparator SDK generates a UUID `correlation_id` on construction and includes it in every HTTP request (`X-Correlation-ID` header). Services read or generate it via `get_correlation_id()`. This enables end-to-end tracing of a pipeline request across all service logs.
612+
17. **Correlation ID must be generated at the edge.** The Preparator SDK generates a UUID `correlation_id` on construction and includes it in every HTTP request (`X-Correlation-ID` header). Services read or generate it via `get_correlation_id()`. This enables end-to-end tracing of a pipeline request across all service logs.
569613

570-
17. **Structured logging must be opt-in at startup.** Using `configure_service_logging()` in `create_app()` (not at module level) prevents duplicate handlers when Flask reloads. The `JSONFormatter` outputs single-line JSON with timestamp, level, service, message, correlation_id, and dataset_name.
614+
18. **Structured logging must be opt-in at startup.** Using `configure_service_logging()` in `create_app()` (not at module level) prevents duplicate handlers when Flask reloads. The `JSONFormatter` outputs single-line JSON with timestamp, level, service, message, correlation_id, and dataset_name.
571615

572616
---
573617

0 commit comments

Comments
 (0)