|
34 | 34 | remove_cache_key, |
35 | 35 | rev_cache, |
36 | 36 | ) |
37 | | -from app.utils.config import DOMAIN, MAX_RECENT_URLS, CACHE_PURGE_TOKEN, QR_DIR |
38 | | -from app.utils.helper import generate_code, is_valid_url, sanitize_url, format_date |
| 37 | +from app.utils.config import ( |
| 38 | + DOMAIN, |
| 39 | + MAX_RECENT_URLS, |
| 40 | + CACHE_PURGE_TOKEN, |
| 41 | + QR_DIR, |
| 42 | +) |
| 43 | +from app.utils.helper import ( |
| 44 | + generate_code, |
| 45 | + sanitize_url, |
| 46 | + is_valid_url, |
| 47 | + authorize_url, |
| 48 | + format_date, |
| 49 | +) |
39 | 50 | from app.utils.qr import generate_qr_with_logo |
40 | 51 |
|
41 | | -# templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) |
42 | 52 | templates = Jinja2Templates(directory="app/templates") |
43 | 53 | # Routers |
44 | 54 | ui_router = APIRouter() |
@@ -98,12 +108,18 @@ async def create_short_url( |
98 | 108 | qr_type: str = Form("short"), |
99 | 109 | ): |
100 | 110 | session = request.session |
101 | | - original_url = sanitize_url(original_url) |
| 111 | + original_url = sanitize_url(original_url) # sanitize the URL input |
102 | 112 |
|
103 | | - if not original_url or not is_valid_url(original_url): |
| 113 | + if not original_url or not is_valid_url(original_url): # validate the URL |
104 | 114 | session["error"] = "Please enter a valid URL." |
105 | 115 | return RedirectResponse("/", status_code=status.HTTP_303_SEE_OTHER) |
106 | 116 |
|
| 117 | + if not authorize_url( |
| 118 | + original_url |
| 119 | + ): # authorize the URL based on whitelist/blacklist |
| 120 | + session["error"] = "This domain is not allowed." |
| 121 | + return RedirectResponse("/", status_code=status.HTTP_303_SEE_OTHER) |
| 122 | + |
107 | 123 | short_code: Optional[str] = get_short_from_cache(original_url) |
108 | 124 |
|
109 | 125 | if not short_code and db.is_connected(): |
@@ -219,7 +235,6 @@ def redirect_short_ui(short_code: str, background_tasks: BackgroundTasks): |
219 | 235 | set_cache_pair(short_code, original_url) |
220 | 236 | return RedirectResponse(original_url) |
221 | 237 |
|
222 | | - # return PlainTextResponse("Invalid short URL", status_code=404) |
223 | 238 | raise HTTPException(status_code=404, detail="Page not found") |
224 | 239 |
|
225 | 240 |
|
@@ -331,9 +346,13 @@ class ShortenRequest(BaseModel): |
331 | 346 | @api_v1.post("/shorten") |
332 | 347 | def shorten_api(payload: ShortenRequest): |
333 | 348 | original_url = sanitize_url(payload.url) |
| 349 | + |
334 | 350 | if not is_valid_url(original_url): |
335 | 351 | return JSONResponse(status_code=400, content={"error": "INVALID_URL"}) |
336 | 352 |
|
| 353 | + if not authorize_url(original_url): |
| 354 | + return JSONResponse(status_code=400, content={"error": "DOMAIN_NOT_ALLOWED"}) |
| 355 | + |
337 | 356 | short_code = get_short_from_cache(original_url) |
338 | 357 | if not short_code: |
339 | 358 | short_code = generate_code() |
|
0 commit comments