[codex] Add DuckDB query backend#3
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a DuckDB-backed artifact and query layer to make pipeline outputs directly queryable, with additional ingestion/quality diagnostics and expanded test coverage.
Changes:
- Materialize
output/eval_cards.duckdbduring pipeline runs and record table/join-quality metadata inmanifest.json. - Introduce
shared.duckdb_backend.DuckDBBackendwith ingestion helpers and SQL-backed query methods (stats, sources/models/benchmarks, join integrity, identifier/orphan checks). - Add a FastAPI service exposing ingestion + read/query endpoints, plus new tests for malformed instance ID repair and metric-scoped instance joins.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
scripts/pipeline.py |
Creates and records the DuckDB artifact during pipeline execution; skips .duckdb during output contract scan. |
shared/duckdb_backend.py |
New DuckDB schema + ingestion/query implementation, including join-integrity/quality diagnostics. |
api/main.py |
New FastAPI app exposing ingest and query endpoints over the DuckDB artifact. |
shared/config.py |
Adds environment-driven settings for DuckDB path and ingestion defaults. |
shared/constants.py |
Adds shared enum constants. |
tests/test_pipeline_integration.py |
Integration assertions for emitted DuckDB artifact + canonical identity and metric-scoped joins. |
tests/test_duckdb_backend.py |
Unit tests for instance evaluation_id repair and dashboard query surface. |
tests/conftest.py |
Adds fixture instance samples and enables instance loading during dry-run for integration tests. |
pyproject.toml |
Adds Python package metadata and dependencies for DuckDB/FastAPI tooling. |
.github/workflows/sync.yml |
Updates CI install step to install project dependencies and run pytest. |
README.md |
Documents DuckDB artifact and query API usage/configuration. |
.gitignore |
Ignores pytest cache, egg-info, and local DuckDB files. |
api/__init__.py, shared/__init__.py |
Package initializers. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| backend = DuckDBBackend(settings.duckdb_path) | ||
|
|
||
| _BASE_DIR = Path(settings.ingestion_base_dir).resolve() | ||
|
|
There was a problem hiding this comment.
A DuckDBBackend instance (and its DuckDB connection) is created at import time. This causes side effects during module import (directory creation / DB open) and makes it hard to control lifecycle/cleanup. Prefer initializing the backend in a FastAPI lifespan handler and closing it on shutdown (or constructing connections per request).
| @app.post("/ingest/aggregate") | ||
| async def ingest_aggregate(jsonl_path: str | None = None) -> dict: | ||
| """Ingest aggregate JSONL records (eval.schema.json shape) into DuckDB.""" | ||
| raw = jsonl_path or settings.default_aggregate_jsonl_path | ||
| path = _resolve_safe_path(raw) | ||
| if not path.exists(): | ||
| raise HTTPException(status_code=404, detail=f"Aggregate file not found: {path}") | ||
|
|
||
| try: | ||
| stats = backend.ingest_aggregate_jsonl(path) | ||
| except Exception as exc: # pragma: no cover | ||
| raise HTTPException(status_code=400, detail=f"Ingestion failed: {exc}") from exc | ||
|
|
||
| return {"ok": True, "path": str(path), **stats} | ||
|
|
||
|
|
||
| @app.post("/ingest/instance") | ||
| async def ingest_instance(jsonl_path: str | None = None) -> dict: | ||
| """Ingest instance-level JSONL records (instance_level_eval.schema.json shape) into DuckDB.""" | ||
| raw = jsonl_path or settings.default_instance_jsonl_path | ||
| path = _resolve_safe_path(raw) | ||
| if not path.exists(): | ||
| raise HTTPException(status_code=404, detail=f"Instance file not found: {path}") | ||
|
|
||
| try: | ||
| stats = backend.ingest_instance_jsonl(path) | ||
| except Exception as exc: # pragma: no cover | ||
| raise HTTPException(status_code=400, detail=f"Ingestion failed: {exc}") from exc | ||
|
|
||
| return {"ok": True, "path": str(path), **stats} | ||
|
|
There was a problem hiding this comment.
All endpoints are declared async def but call synchronous DuckDB operations directly. This will block the event loop under load, and a single shared DuckDB connection is not safe to use concurrently across requests/threads. Consider making these endpoints regular def (so FastAPI runs them in a threadpool), or explicitly offloading DB work to a threadpool + guarding the connection with a lock / creating a per-request connection.
| def ingest_instance_jsonl(self, jsonl_path: str | Path) -> dict[str, Any]: | ||
| path = Path(jsonl_path) | ||
| if not path.exists(): | ||
| raise FileNotFoundError(path) | ||
|
|
||
| rows = 0 | ||
| validated_rows = 0 | ||
| repaired_rows = 0 | ||
| filled_rows = 0 | ||
| unvalidated_rows = 0 | ||
| expected_evaluation_id, file_link_lookup_status = ( | ||
| self._expected_evaluation_id_for_instance_path(path) | ||
| ) | ||
| with path.open("r", encoding="utf-8") as handle: | ||
| for raw_line in handle: |
There was a problem hiding this comment.
ingest_instance_jsonl performs many DELETE/INSERT statements without wrapping them in an explicit transaction. If ingestion fails mid-file you'll end up with partial writes, and performance will be significantly worse due to per-statement commits. Wrap the loop in begin()/commit() (with rollback on error) similar to the aggregate/pipeline ingest paths.
| delete_evaluation_ids = {evaluation_id} | ||
| if original_evaluation_id and original_evaluation_id != evaluation_id: | ||
| delete_evaluation_ids.add(original_evaluation_id) | ||
| for delete_evaluation_id in delete_evaluation_ids: | ||
| self.conn.execute( | ||
| """ | ||
| DELETE FROM instance_evaluations | ||
| WHERE evaluation_id = ? | ||
| AND sample_id = ? | ||
| AND result_join_id = ? | ||
| AND name_join_id = ? | ||
| """, | ||
| [delete_evaluation_id, sample_id, result_join_id, name_join_id], | ||
| ) |
There was a problem hiding this comment.
The per-row dedupe strategy (DELETE then INSERT) inside ingest_instance_jsonl is expensive for large JSONL files. Since instance_evaluations has a primary key, consider using INSERT OR REPLACE/ON CONFLICT semantics (and/or staging rows + bulk insert) to achieve idempotency without an extra delete per row.
| [project] | ||
| name = "eval-cards-backend-pipeline" | ||
| version = "1.0.0" | ||
| description = "Pipeline and query backend for Eval Cards artifacts" | ||
| readme = "README.md" | ||
| requires-python = ">=3.11" | ||
| dependencies = [ | ||
| "duckdb>=1.1.0", | ||
| "fastapi>=0.109.0", | ||
| "huggingface_hub>=0.23.0", | ||
| "pydantic-settings>=2.1.0", | ||
| "uvicorn[standard]>=0.27.0", | ||
| ] |
There was a problem hiding this comment.
pyproject.toml is missing a [build-system] section. With no setup.py in the repo, pip install -e ".[dev]" (used in CI) is likely to fail because pip will fall back to legacy setuptools behavior that expects setup.py. Add an explicit [build-system] stanza (e.g., setuptools + wheel with the setuptools build backend) so editable/PEP517 installs work reliably.
Summary
Validation