diff --git a/README.md b/README.md index e009ef9..4793027 100644 --- a/README.md +++ b/README.md @@ -768,6 +768,8 @@ make api | `POST` | `/query/explain` | Explain a time window | | `POST` | `/query/ask` | Answer a natural language question | | `POST` | `/query/clusters` | List top clusters | +| `POST` | `/query/timeline` | Reconstruct incident timeline for a window | +| `POST` | `/query/compare` | Diff two time windows (same semantics as `raglogs compare`) | | `GET` | `/config` | Read effective configuration | **Example** @@ -796,6 +798,22 @@ curl -X POST http://localhost:8000/query/explain \ } ``` +**Timeline** — `POST /query/timeline` accepts the same window filters as the CLI (`since` or `from_time`/`to_time`, optional `service`, `env`, `all_ingestions`, `ingestion_job_id`). Set `"format": "text"` to include a plain-text `text` field alongside `events`. + +```bash +curl -X POST http://localhost:8000/query/timeline \ + -H "Content-Type: application/json" \ + -d '{"since": "2h", "format": "json"}' +``` + +**Compare** — `POST /query/compare` matches `raglogs compare`: either `"since"` + `"baseline"` (durations, window A ends at request time) or explicit `window_a_from` / `window_a_to` / `window_b_from` / `window_b_to`. Optional `"format": "text"` adds a rendered `text` field. + +```bash +curl -X POST http://localhost:8000/query/compare \ + -H "Content-Type: application/json" \ + -d '{"since": "30m", "baseline": "24h"}' +``` + --- ## Development @@ -862,7 +880,6 @@ New source adapters go in `raglogs/adapters/`. Each adapter yields `ParsedLogLin - Kubernetes log export ingestion - Semantic cluster merging via pgvector - Markdown incident report export (`raglogs explain --format markdown > postmortem.md`) -- `POST /query/timeline` and `POST /query/compare` API endpoints - Web UI --- diff --git a/src/api/app.py b/src/api/app.py index 6d3eeb1..ca63b3f 100644 --- a/src/api/app.py +++ b/src/api/app.py @@ -3,7 +3,7 @@ from fastapi import FastAPI from fastapi.responses import ORJSONResponse -from src.api.routes import ask, clusters, config, explain, health, ingestions +from src.api.routes import ask, clusters, compare_windows, config, explain, health, ingestions, timeline @asynccontextmanager @@ -30,4 +30,6 @@ async def lifespan(app: FastAPI): app.include_router(explain.router, prefix="/query", tags=["query"]) app.include_router(ask.router, prefix="/query", tags=["query"]) app.include_router(clusters.router, prefix="/query", tags=["query"]) +app.include_router(timeline.router, prefix="/query", tags=["query"]) +app.include_router(compare_windows.router, prefix="/query", tags=["query"]) app.include_router(config.router, prefix="/config", tags=["config"]) diff --git a/src/api/routes/compare_windows.py b/src/api/routes/compare_windows.py new file mode 100644 index 0000000..671b5e9 --- /dev/null +++ b/src/api/routes/compare_windows.py @@ -0,0 +1,269 @@ +"""POST /query/compare — same pipeline as `raglogs compare`.""" +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import Literal, Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from src.core.compare.differ import CompareResult + +router = APIRouter() + + +class CompareRequest(BaseModel): + since: Optional[str] = None + baseline: Optional[str] = None + window_a_from: Optional[datetime] = None + window_a_to: Optional[datetime] = None + window_b_from: Optional[datetime] = None + window_b_to: Optional[datetime] = None + service: Optional[str] = None + env: Optional[str] = None + all_ingestions: bool = False + ingestion_job_id: Optional[str] = None + format: Literal["json", "text"] = "json" + + +def _ensure_utc(dt: datetime) -> datetime: + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt + + +def _resolve_compare_windows(request: CompareRequest) -> tuple[datetime, datetime, datetime, datetime]: + from src.utils.time import parse_duration + + now = datetime.now(tz=timezone.utc) + + if request.since and request.baseline: + incident_delta = parse_duration(request.since) + baseline_delta = parse_duration(request.baseline) + a_end = now + a_start = now - incident_delta + b_end = now - baseline_delta + b_start = b_end - incident_delta + return a_start, a_end, b_start, b_end + + if ( + request.window_a_from + and request.window_a_to + and request.window_b_from + and request.window_b_to + ): + return ( + _ensure_utc(request.window_a_from), + _ensure_utc(request.window_a_to), + _ensure_utc(request.window_b_from), + _ensure_utc(request.window_b_to), + ) + + raise ValueError( + "Provide either since+baseline, or window_a_from/to and window_b_from/to" + ) + + +def _diff_to_dict(d) -> dict: + return { + "fingerprint": d.fingerprint, + "message": d.message, + "services": d.services, + "count_a": d.count_a, + "count_b": d.count_b, + } + + +def _compare_result_to_payload(result: CompareResult) -> dict: + return { + "window_a": { + "start": result.window_a_start.isoformat(), + "end": result.window_a_end.isoformat(), + }, + "window_b": { + "start": result.window_b_start.isoformat(), + "end": result.window_b_end.isoformat(), + }, + "has_changes": result.has_changes, + "new_clusters": [_diff_to_dict(d) for d in result.new_clusters], + "disappeared_clusters": [_diff_to_dict(d) for d in result.disappeared_clusters], + "increased_clusters": [_diff_to_dict(d) for d in result.increased_clusters], + "decreased_clusters": [_diff_to_dict(d) for d in result.decreased_clusters], + "new_triggers": [ + {"message": t.message, "service": t.service, "only_in": t.only_in} + for t in result.new_triggers + ], + "dropped_triggers": [ + {"message": t.message, "service": t.service, "only_in": t.only_in} + for t in result.dropped_triggers + ], + } + + +def _trunc(text: str, n: int = 72) -> str: + if len(text) <= n: + return text + cut = text[:n] + sp = cut.rfind(" ") + return (cut[:sp] if sp > n // 2 else cut) + "…" + + +def _format_counts(count_a: Optional[int], count_b: Optional[int]) -> str: + if count_a is not None and count_b is None: + return f"{count_a} events" + if count_b is not None and count_a is None: + return f"{count_b} events" + if count_a is not None and count_b is not None: + return f"{count_b} → {count_a}" + return "" + + +def format_compare_plain(result: CompareResult) -> str: + """Plain-text layout matching CLI `compare` (no Rich).""" + from src.utils.time import format_window + + lines: list[str] = [] + lines.append("") + lines.append("Incident comparison") + lines.append("") + lines.append(f" Window A (now): {format_window(result.window_a_start, result.window_a_end)}") + lines.append(f" Window B (baseline): {format_window(result.window_b_start, result.window_b_end)}") + lines.append("") + + if not result.has_changes: + lines.append(" No significant changes between windows.") + lines.append("") + return "\n".join(lines) + + def row(sigil: str, msg: str, count_a: Optional[int], count_b: Optional[int]) -> None: + count_str = _format_counts(count_a, count_b) + lines.append(f" {sigil} {_trunc(msg):<74} {count_str}".rstrip()) + + if result.new_clusters: + lines.append("New error clusters") + for d in result.new_clusters: + row("+", d.message, d.count_a, d.count_b) + lines.append("") + + if result.disappeared_clusters: + lines.append("Errors that disappeared") + for d in result.disappeared_clusters: + row("-", d.message, d.count_a, d.count_b) + lines.append("") + + if result.increased_clusters: + lines.append("Errors that increased") + for d in result.increased_clusters: + row("↑", d.message, d.count_a, d.count_b) + lines.append("") + + if result.decreased_clusters: + lines.append("Errors that decreased") + for d in result.decreased_clusters: + row("↓", d.message, d.count_a, d.count_b) + lines.append("") + + if result.new_triggers: + lines.append("Triggers in A not seen in B") + for t in result.new_triggers: + svc = f" · {t.service}" if t.service else "" + lines.append(f" +⚡ {_trunc(t.message)}{svc}") + lines.append("") + + if result.dropped_triggers: + lines.append("Triggers in B not seen in A") + for t in result.dropped_triggers: + svc = f" · {t.service}" if t.service else "" + lines.append(f" -⚡ {_trunc(t.message)}{svc}") + lines.append("") + + return "\n".join(lines) + + +@router.post("/compare") +def compare_endpoint(request: CompareRequest): + from src.core.clustering.clusterer import run_clustering + from src.core.compare.differ import compare_windows as run_compare_windows + from src.core.explain.evidence import assemble_evidence + from src.core.explain.summarizer import get_latest_ingestion_job_id + from src.db.session import get_db + + try: + a_start, a_end, b_start, b_end = _resolve_compare_windows(request) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + ingestion_job_id: Optional[uuid.UUID] = None + if request.ingestion_job_id: + try: + ingestion_job_id = uuid.UUID(request.ingestion_job_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid ingestion_job_id") + + try: + with get_db() as db: + job_id = None + if ingestion_job_id is not None: + job_id = ingestion_job_id + elif not request.all_ingestions: + job_id = get_latest_ingestion_job_id(db) + + _, clusters_a = run_clustering( + db=db, + window_start=a_start, + window_end=a_end, + service=request.service, + environment=request.env, + save_to_db=False, + ingestion_job_id=job_id, + max_clusters=50, + ) + _, clusters_b = run_clustering( + db=db, + window_start=b_start, + window_end=b_end, + service=request.service, + environment=request.env, + save_to_db=False, + ingestion_job_id=job_id, + max_clusters=50, + ) + + packet_a = assemble_evidence( + db=db, + window_start=a_start, + window_end=a_end, + clusters=clusters_a, + service_filter=request.service, + environment_filter=request.env, + ingestion_job_id=job_id, + ) + packet_b = assemble_evidence( + db=db, + window_start=b_start, + window_end=b_end, + clusters=clusters_b, + service_filter=request.service, + environment_filter=request.env, + ingestion_job_id=job_id, + ) + + result = run_compare_windows( + clusters_a=clusters_a, + clusters_b=clusters_b, + triggers_a=packet_a.trigger_candidates, + triggers_b=packet_b.trigger_candidates, + window_a_start=a_start, + window_a_end=a_end, + window_b_start=b_start, + window_b_end=b_end, + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + payload = _compare_result_to_payload(result) + if request.format == "text": + payload["text"] = format_compare_plain(result) + return payload diff --git a/src/api/routes/timeline.py b/src/api/routes/timeline.py new file mode 100644 index 0000000..5dd0a5c --- /dev/null +++ b/src/api/routes/timeline.py @@ -0,0 +1,113 @@ +"""POST /query/timeline — same pipeline as `raglogs timeline`.""" +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Literal, Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from src.core.timeline.plain_text import format_timeline_plain + +router = APIRouter() + + +class TimelineRequest(BaseModel): + since: Optional[str] = None + from_time: Optional[datetime] = None + to_time: Optional[datetime] = None + service: Optional[str] = None + env: Optional[str] = None + all_ingestions: bool = False + ingestion_job_id: Optional[str] = None + format: Literal["json", "text"] = "json" + + +def _events_to_json(events) -> list[dict]: + return [ + { + "timestamp": e.timestamp.isoformat(), + "category": e.category, + "label": e.label, + "description": e.description, + "count": e.count, + "services": e.services, + "duration_minutes": e.duration_minutes, + } + for e in events + ] + + +@router.post("/timeline") +def timeline_endpoint(request: TimelineRequest): + from src.core.clustering.clusterer import run_clustering + from src.core.explain.evidence import assemble_evidence + from src.core.explain.summarizer import get_latest_ingestion_job_id + from src.core.timeline.builder import build_timeline + from src.db.session import get_db + from src.utils.time import resolve_window + + try: + window_start, window_end = resolve_window( + since=request.since, + from_time=request.from_time, + to_time=request.to_time, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + ingestion_job_id: Optional[uuid.UUID] = None + if request.ingestion_job_id: + try: + ingestion_job_id = uuid.UUID(request.ingestion_job_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid ingestion_job_id") + + try: + with get_db() as db: + job_id = None + if ingestion_job_id is not None: + job_id = ingestion_job_id + elif not request.all_ingestions: + job_id = get_latest_ingestion_job_id(db) + + _, clusters = run_clustering( + db=db, + window_start=window_start, + window_end=window_end, + service=request.service, + environment=request.env, + save_to_db=False, + ingestion_job_id=job_id, + ) + + packet = assemble_evidence( + db=db, + window_start=window_start, + window_end=window_end, + clusters=clusters, + service_filter=request.service, + environment_filter=request.env, + ingestion_job_id=job_id, + ) + + events = build_timeline(packet) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + payload: dict = { + "window": { + "start": window_start.isoformat(), + "end": window_end.isoformat(), + }, + "ingestion_job_id": request.ingestion_job_id, + "all_ingestions": request.all_ingestions, + "events": _events_to_json(events), + } + + if request.format == "text": + payload["text"] = format_timeline_plain(events, window_start, window_end) + + return payload diff --git a/src/core/timeline/plain_text.py b/src/core/timeline/plain_text.py new file mode 100644 index 0000000..12fcd4d --- /dev/null +++ b/src/core/timeline/plain_text.py @@ -0,0 +1,51 @@ +"""Plain-text timeline rendering (no Rich) for API and tooling.""" +from __future__ import annotations + +from src.core.timeline.builder import TimelineEvent +from src.utils.time import format_window + + +def format_timeline_plain( + events: list[TimelineEvent], + window_start, + window_end, +) -> str: + """Mirror CLI timeline layout without ANSI/markup.""" + lines: list[str] = [] + lines.append("") + lines.append(f"Incident timeline {format_window(window_start, window_end)}") + lines.append("") + + if not events: + lines.append(" No significant events found in this window.") + lines.append("") + return "\n".join(lines) + + prev_ts = None + for event in events: + if prev_ts is not None: + gap = (event.timestamp - prev_ts).total_seconds() + if gap > 60: + lines.append("") + + ts_str = event.timestamp.strftime("%H:%M:%S") + label = event.label + + if event.count is None: + svc = " · ".join(event.services) + suffix = f" · {svc}" if svc else "" + lines.append(f" {ts_str} {label:<10} {event.description}{suffix}") + else: + lines.append(f" {ts_str} {label:<10} {event.description}") + plural = "s" if event.count != 1 else "" + parts = [f"{event.count} event{plural}"] + if event.services: + parts.append(", ".join(event.services)) + if event.duration_minutes: + parts.append(f"{event.duration_minutes} min span") + lines.append(f" {' · '.join(parts)}") + + prev_ts = event.timestamp + + lines.append("") + return "\n".join(lines) diff --git a/tests/unit/test_api.py b/tests/unit/test_api.py index b90a3a8..001f037 100644 --- a/tests/unit/test_api.py +++ b/tests/unit/test_api.py @@ -6,7 +6,7 @@ trying to patch the route module's namespace. Tests: async ingestion flow, explain cache hit/miss, ingestion_job_id -validation, health queue depth, and 4xx error cases. +validation, timeline/compare query routes, health queue depth, and 4xx cases. """ import uuid import pytest @@ -285,6 +285,142 @@ def test_invalid_ingestion_job_id_returns_400(self): assert resp.status_code == 400 +# ── POST /query/timeline ───────────────────────────────────────────────────── + +class TestTimelineEndpoint: + def test_returns_events_list(self): + from src.core.timeline.builder import TimelineEvent + + events = [ + TimelineEvent( + timestamp=datetime(2026, 3, 12, 22, 0, 0, tzinfo=timezone.utc), + category="deploy", + label="deploy", + description="Deploy completed", + count=None, + services=["deployment-controller"], + duration_minutes=None, + ), + ] + mock_db = _ctx_db() + with patch("src.db.session.get_db", side_effect=lambda: mock_db), \ + patch("src.core.clustering.clusterer.run_clustering", return_value=(None, [])), \ + patch("src.core.explain.evidence.assemble_evidence", return_value=MagicMock()), \ + patch("src.core.timeline.builder.build_timeline", return_value=events): + resp = client.post("/query/timeline", json={"since": "1h"}) + + assert resp.status_code == 200 + data = resp.json() + assert "events" in data + assert len(data["events"]) == 1 + assert data["events"][0]["category"] == "deploy" + assert data["events"][0]["label"] == "deploy" + + def test_format_text_includes_rendered_field(self): + from src.core.timeline.builder import TimelineEvent + + events = [ + TimelineEvent( + timestamp=datetime(2026, 3, 12, 22, 0, 0, tzinfo=timezone.utc), + category="error", + label="error ↑", + description="Something failed", + count=5, + services=["api"], + duration_minutes=3, + ), + ] + mock_db = _ctx_db() + with patch("src.db.session.get_db", side_effect=lambda: mock_db), \ + patch("src.core.clustering.clusterer.run_clustering", return_value=(None, [])), \ + patch("src.core.explain.evidence.assemble_evidence", return_value=MagicMock()), \ + patch("src.core.timeline.builder.build_timeline", return_value=events): + resp = client.post("/query/timeline", json={"since": "1h", "format": "text"}) + + assert resp.status_code == 200 + assert "text" in resp.json() + assert "Incident timeline" in resp.json()["text"] + + def test_invalid_ingestion_job_id_returns_400(self): + resp = client.post("/query/timeline", json={ + "since": "1h", + "ingestion_job_id": "not-a-uuid", + }) + assert resp.status_code == 400 + + +# ── POST /query/compare ─────────────────────────────────────────────────────── + +class TestCompareEndpoint: + def _mock_compare_result(self): + from src.core.compare.differ import ClusterDiff, CompareResult, TriggerDiff + + return CompareResult( + window_a_start=datetime(2026, 3, 16, 15, 17, 42, tzinfo=timezone.utc), + window_a_end=datetime(2026, 3, 16, 15, 47, 42, tzinfo=timezone.utc), + window_b_start=datetime(2026, 3, 15, 15, 17, 42, tzinfo=timezone.utc), + window_b_end=datetime(2026, 3, 15, 15, 47, 42, tzinfo=timezone.utc), + new_clusters=[ + ClusterDiff( + fingerprint="fp1", + message="New error", + services=["api"], + count_a=10, + count_b=None, + ), + ], + new_triggers=[ + TriggerDiff(message="Deploy done", service="deploy", only_in="a"), + ], + ) + + def test_since_baseline_returns_json(self): + mock_db = _ctx_db() + with patch("src.db.session.get_db", side_effect=lambda: mock_db), \ + patch("src.core.clustering.clusterer.run_clustering", return_value=(None, [])), \ + patch("src.core.explain.evidence.assemble_evidence", return_value=MagicMock()), \ + patch("src.core.compare.differ.compare_windows", return_value=self._mock_compare_result()): + resp = client.post( + "/query/compare", + json={"since": "30m", "baseline": "24h"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["has_changes"] is True + assert len(data["new_clusters"]) == 1 + assert data["new_clusters"][0]["message"] == "New error" + assert data["new_triggers"][0]["service"] == "deploy" + assert "only_in" in data["new_triggers"][0] + + def test_missing_window_params_returns_400(self): + resp = client.post("/query/compare", json={}) + assert resp.status_code == 400 + + def test_format_text_includes_rendered_field(self): + mock_db = _ctx_db() + with patch("src.db.session.get_db", side_effect=lambda: mock_db), \ + patch("src.core.clustering.clusterer.run_clustering", return_value=(None, [])), \ + patch("src.core.explain.evidence.assemble_evidence", return_value=MagicMock()), \ + patch("src.core.compare.differ.compare_windows", return_value=self._mock_compare_result()): + resp = client.post( + "/query/compare", + json={"since": "30m", "baseline": "24h", "format": "text"}, + ) + + assert resp.status_code == 200 + assert "text" in resp.json() + assert "Incident comparison" in resp.json()["text"] + + def test_invalid_ingestion_job_id_returns_400(self): + resp = client.post("/query/compare", json={ + "since": "30m", + "baseline": "24h", + "ingestion_job_id": "bad", + }) + assert resp.status_code == 400 + + # ── Cache key consistency ───────────────────────────────────────────────────── class TestCacheKey: