-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathaggregation.py
More file actions
112 lines (93 loc) · 3.56 KB
/
aggregation.py
File metadata and controls
112 lines (93 loc) · 3.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
"""Aggregation API endpoints for population analytics."""
import logging
from typing import Annotated
from odoo.api import Environment
from odoo.addons.fastapi.dependencies import odoo_env
from odoo.addons.spp_api_v2.middleware.auth import get_authenticated_client
from fastapi import APIRouter, Depends, HTTPException, Query, status
from ..schemas.aggregation import (
AggregationResponse,
ComputeAggregationRequest,
DimensionsListResponse,
)
from ..services.aggregation_api_service import AggregationApiService
_logger = logging.getLogger(__name__)
aggregation_router = APIRouter(tags=["Aggregation"], prefix="/aggregation")
@aggregation_router.post(
"/compute",
response_model=AggregationResponse,
summary="Compute population aggregation",
description="Compute population counts and statistics with optional demographic breakdowns.",
)
async def compute_aggregation(
request: ComputeAggregationRequest,
env: Annotated[Environment, Depends(odoo_env)],
api_client: Annotated[dict, Depends(get_authenticated_client)],
):
"""Compute aggregation for a scope with optional breakdown.
Requires:
aggregation:read scope
Response:
AggregationResponse with total_count, statistics, and optional breakdown
"""
if not api_client.has_scope("aggregation", "read"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Client does not have aggregation:read scope",
)
try:
service = AggregationApiService(env)
result = service.compute_aggregation(
scope_dict=request.scope.model_dump(),
statistics=request.statistics,
group_by=request.group_by,
)
return result
except ValueError as e:
_logger.warning("Invalid aggregation request: %s", str(e))
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
except Exception as e:
_logger.error("Aggregation computation failed: %s", str(e), exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to compute aggregation",
) from e
@aggregation_router.get(
"/dimensions",
response_model=DimensionsListResponse,
summary="List available dimensions",
description="Returns active demographic dimensions available for group_by.",
)
async def list_dimensions(
env: Annotated[Environment, Depends(odoo_env)],
api_client: Annotated[dict, Depends(get_authenticated_client)],
applies_to: Annotated[
str | None,
Query(description="Filter: 'individuals', 'groups', or None for all"),
] = None,
):
"""List active demographic dimensions.
Requires:
aggregation:read scope
Response:
DimensionsListResponse with available dimensions
"""
if not api_client.has_scope("aggregation", "read"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Client does not have aggregation:read scope",
)
try:
service = AggregationApiService(env)
dimensions = service.list_dimensions(applies_to=applies_to)
return {"dimensions": dimensions}
except Exception as e:
_logger.error("Failed to list dimensions: %s", str(e), exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to list dimensions",
) from e