-
Notifications
You must be signed in to change notification settings - Fork 285
Expand file tree
/
Copy pathrate_limits.py
More file actions
117 lines (90 loc) · 4.66 KB
/
rate_limits.py
File metadata and controls
117 lines (90 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from typing import Annotated, Any
from fastapi import APIRouter, Depends, Request
from fastcrud import PaginatedListResponse, compute_offset, paginated_response
from sqlalchemy.ext.asyncio import AsyncSession
from ...api.dependencies import get_current_superuser
from ...core.db.database import async_get_db
from ...core.exceptions.http_exceptions import DuplicateValueException, NotFoundException
from ...crud.crud_rate_limit import crud_rate_limits
from ...crud.crud_tier import crud_tiers
from ...schemas.rate_limit import RateLimitCreate, RateLimitCreateInternal, RateLimitRead, RateLimitUpdate
from ...schemas.tier import TierRead
router = APIRouter(tags=["rate_limits"])
@router.post("/tier/{tier_name}/rate_limit", dependencies=[Depends(get_current_superuser)], status_code=201)
async def write_rate_limit(
request: Request, tier_name: str, rate_limit: RateLimitCreate, db: Annotated[AsyncSession, Depends(async_get_db)]
) -> dict[str, Any]:
db_tier = await crud_tiers.get(db=db, name=tier_name, schema_to_select=TierRead)
if not db_tier:
raise NotFoundException("Tier not found")
rate_limit_internal_dict = rate_limit.model_dump()
rate_limit_internal_dict["tier_id"] = db_tier["id"]
db_rate_limit = await crud_rate_limits.exists(db=db, name=rate_limit_internal_dict["name"])
if db_rate_limit:
raise DuplicateValueException("Rate Limit Name not available")
rate_limit_internal = RateLimitCreateInternal(**rate_limit_internal_dict)
created_rate_limit = await crud_rate_limits.create(db=db, object=rate_limit_internal)
if created_rate_limit is None:
raise NotFoundException("Failed to create rate limit")
rate_limit_read = await crud_rate_limits.get(db=db, id=created_rate_limit["id"], schema_to_select=RateLimitRead)
if rate_limit_read is None:
raise NotFoundException("Created rate limit not found")
return rate_limit_read
@router.get("/tier/{tier_name}/rate_limits", response_model=PaginatedListResponse[RateLimitRead])
async def read_rate_limits(
request: Request,
tier_name: str,
db: Annotated[AsyncSession, Depends(async_get_db)],
page: int = 1,
items_per_page: int = 10,
) -> dict:
db_tier = await crud_tiers.get(db=db, name=tier_name, schema_to_select=TierRead)
if not db_tier:
raise NotFoundException("Tier not found")
rate_limits_data = await crud_rate_limits.get_multi(
db=db,
offset=compute_offset(page, items_per_page),
limit=items_per_page,
tier_id=db_tier["id"],
)
response: dict[str, Any] = paginated_response(crud_data=rate_limits_data, page=page, items_per_page=items_per_page)
return response
@router.get("/tier/{tier_name}/rate_limit/{id}", response_model=RateLimitRead)
async def read_rate_limit(
request: Request, tier_name: str, id: int, db: Annotated[AsyncSession, Depends(async_get_db)]
) -> dict[str, Any]:
db_tier = await crud_tiers.get(db=db, name=tier_name, schema_to_select=TierRead)
if not db_tier:
raise NotFoundException("Tier not found")
db_rate_limit = await crud_rate_limits.get(db=db, tier_id=db_tier["id"], id=id, schema_to_select=RateLimitRead)
if db_rate_limit is None:
raise NotFoundException("Rate Limit not found")
return db_rate_limit
@router.patch("/tier/{tier_name}/rate_limit/{id}", dependencies=[Depends(get_current_superuser)])
async def patch_rate_limit(
request: Request,
tier_name: str,
id: int,
values: RateLimitUpdate,
db: Annotated[AsyncSession, Depends(async_get_db)],
) -> dict[str, str]:
db_tier = await crud_tiers.get(db=db, name=tier_name, schema_to_select=TierRead)
if not db_tier:
raise NotFoundException("Tier not found")
db_rate_limit = await crud_rate_limits.get(db=db, tier_id=db_tier["id"], id=id, schema_to_select=RateLimitRead)
if db_rate_limit is None:
raise NotFoundException("Rate Limit not found")
await crud_rate_limits.update(db=db, object=values, id=id)
return {"message": "Rate Limit updated"}
@router.delete("/tier/{tier_name}/rate_limit/{id}", dependencies=[Depends(get_current_superuser)])
async def erase_rate_limit(
request: Request, tier_name: str, id: int, db: Annotated[AsyncSession, Depends(async_get_db)]
) -> dict[str, str]:
db_tier = await crud_tiers.get(db=db, name=tier_name, schema_to_select=TierRead)
if not db_tier:
raise NotFoundException("Tier not found")
db_rate_limit = await crud_rate_limits.get(db=db, tier_id=db_tier["id"], id=id, schema_to_select=RateLimitRead)
if db_rate_limit is None:
raise NotFoundException("Rate Limit not found")
await crud_rate_limits.delete(db=db, id=id)
return {"message": "Rate Limit deleted"}