Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server_api/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .router import router

__all__ = ["router"]
27 changes: 27 additions & 0 deletions server_api/workflow/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from datetime import datetime
from typing import Dict, Optional

from pydantic import BaseModel, Field


class WorkflowEvent(BaseModel):
event_type: str = Field(alias="type")
created_at: datetime
action: Optional[str] = None
from_stage: Optional[str] = None
to_stage: Optional[str] = None


class WorkflowMetricsResponse(BaseModel):
workflow_id: str
total_events: int
event_counts_by_type: Dict[str, int]
approvals_count: int
rejections_count: int
approvals_rate: float
rejections_rate: float
stage_transition_counts: Dict[str, int]
first_event_at: Optional[datetime]
last_event_at: Optional[datetime]
15 changes: 15 additions & 0 deletions server_api/workflow/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from fastapi import APIRouter

from .models import WorkflowMetricsResponse
from .service import compute_metrics
from .store import get_events

router = APIRouter(prefix="/api/workflows", tags=["workflow"])


@router.get("/{workflow_id}/metrics", response_model=WorkflowMetricsResponse)
async def get_workflow_metrics(workflow_id: str):
events = get_events(workflow_id)
return compute_metrics(workflow_id=workflow_id, events=events)
58 changes: 58 additions & 0 deletions server_api/workflow/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

from collections import Counter
from typing import Iterable, Sequence

from .models import WorkflowEvent, WorkflowMetricsResponse

_APPROVAL_TYPES = {"approval", "approved", "approval_granted"}
_REJECTION_TYPES = {"rejection", "rejected", "approval_rejected"}
_APPROVAL_ACTIONS = {"approve", "approved"}
_REJECTION_ACTIONS = {"reject", "rejected"}


def _is_approval(event: WorkflowEvent) -> bool:
if event.event_type.lower() in _APPROVAL_TYPES:
return True
if event.action and event.action.lower() in _APPROVAL_ACTIONS:
return True
return False


def _is_rejection(event: WorkflowEvent) -> bool:
if event.event_type.lower() in _REJECTION_TYPES:
return True
if event.action and event.action.lower() in _REJECTION_ACTIONS:
return True
return False


def compute_metrics(workflow_id: str, events: Sequence[WorkflowEvent]) -> WorkflowMetricsResponse:
type_counts = Counter(event.event_type for event in events)

approvals_count = sum(1 for event in events if _is_approval(event))
rejections_count = sum(1 for event in events if _is_rejection(event))
review_total = approvals_count + rejections_count

transition_counts = Counter(
f"{event.from_stage}->{event.to_stage}"
for event in events
if event.from_stage and event.to_stage
)

timestamps = [event.created_at for event in events]
first_event_at = min(timestamps) if timestamps else None
last_event_at = max(timestamps) if timestamps else None

return WorkflowMetricsResponse(
workflow_id=str(workflow_id),
total_events=len(events),
event_counts_by_type=dict(type_counts),
approvals_count=approvals_count,
rejections_count=rejections_count,
approvals_rate=(approvals_count / review_total) if review_total else 0.0,
rejections_rate=(rejections_count / review_total) if review_total else 0.0,
stage_transition_counts=dict(transition_counts),
first_event_at=first_event_at,
last_event_at=last_event_at,
)
20 changes: 20 additions & 0 deletions server_api/workflow/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations

from collections import defaultdict
from typing import DefaultDict, List

from .models import WorkflowEvent

_workflow_events: DefaultDict[str, List[WorkflowEvent]] = defaultdict(list)


def get_events(workflow_id: str) -> list[WorkflowEvent]:
return list(_workflow_events.get(str(workflow_id), []))


def set_events(workflow_id: str, events: list[WorkflowEvent]) -> None:
_workflow_events[str(workflow_id)] = list(events)


def clear_events() -> None:
_workflow_events.clear()
86 changes: 86 additions & 0 deletions tests/test_workflow_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from fastapi import FastAPI
from fastapi.testclient import TestClient

from server_api.workflow.router import router as workflow_router
from server_api.workflow.models import WorkflowEvent
from server_api.workflow.store import clear_events, set_events


def _build_test_client() -> TestClient:
app = FastAPI()
app.include_router(workflow_router)
return TestClient(app)


def teardown_function(_):
clear_events()


def test_workflow_metrics_returns_safe_defaults_for_empty_workflow():
client = _build_test_client()

response = client.get('/api/workflows/empty-workflow/metrics')

assert response.status_code == 200
payload = response.json()

assert payload == {
'workflow_id': 'empty-workflow',
'total_events': 0,
'event_counts_by_type': {},
'approvals_count': 0,
'rejections_count': 0,
'approvals_rate': 0.0,
'rejections_rate': 0.0,
'stage_transition_counts': {},
'first_event_at': None,
'last_event_at': None,
}


def test_workflow_metrics_align_with_seeded_events():
client = _build_test_client()
workflow_id = 'wf-123'
set_events(
workflow_id,
[
WorkflowEvent(type='workflow_started', created_at='2026-01-01T10:00:00Z'),
WorkflowEvent(
type='stage_transition',
from_stage='draft',
to_stage='review',
created_at='2026-01-01T10:10:00Z',
),
WorkflowEvent(type='agent_action', action='approved', created_at='2026-01-01T10:20:00Z'),
WorkflowEvent(type='agent_action', action='rejected', created_at='2026-01-01T10:25:00Z'),
WorkflowEvent(
type='stage_transition',
from_stage='review',
to_stage='done',
created_at='2026-01-01T10:30:00Z',
),
],
)

response = client.get(f'/api/workflows/{workflow_id}/metrics')

assert response.status_code == 200
payload = response.json()

assert payload['workflow_id'] == workflow_id
assert payload['total_events'] == 5
assert payload['event_counts_by_type'] == {
'workflow_started': 1,
'stage_transition': 2,
'agent_action': 2,
}
assert payload['approvals_count'] == 1
assert payload['rejections_count'] == 1
assert payload['approvals_rate'] == 0.5
assert payload['rejections_rate'] == 0.5
assert payload['stage_transition_counts'] == {
'draft->review': 1,
'review->done': 1,
}
assert payload['first_event_at'] == '2026-01-01T10:00:00Z'
assert payload['last_event_at'] == '2026-01-01T10:30:00Z'
Loading