|
11 | 11 | from flask import Blueprint, jsonify, request, abort, send_file, current_app |
12 | 12 | from flask_login import login_required, current_user |
13 | 13 |
|
14 | | -from models import has_permission |
| 14 | +from models import has_permission, has_workspace_folder_access |
15 | 15 | from routes.auth_routes import require_permission |
16 | 16 |
|
17 | 17 | bp = Blueprint("workspace", __name__) |
@@ -186,6 +186,23 @@ def _is_blocklisted(name: str) -> bool: |
186 | 186 | return False |
187 | 187 |
|
188 | 188 |
|
| 189 | +def _check_folder_access(path_str: str): |
| 190 | + """Abort 403 if user's role cannot access this workspace folder. |
| 191 | +
|
| 192 | + Only enforced for paths inside WORKSPACE_DIR. Admin paths are skipped |
| 193 | + (they are gated by config:manage permission already). |
| 194 | + """ |
| 195 | + if not current_user.is_authenticated: |
| 196 | + return |
| 197 | + # Only enforce for workspace/ paths |
| 198 | + parts = path_str.strip("/").split("/") |
| 199 | + if not parts or parts[0] != "workspace": |
| 200 | + return |
| 201 | + if not has_workspace_folder_access(current_user.role, path_str): |
| 202 | + _audit("denied", path_str, result="denied", reason="folder_restricted") |
| 203 | + abort(403, description="Access to this workspace folder is restricted") |
| 204 | + |
| 205 | + |
189 | 206 | # ── Endpoints ────────────────────────────────────────────────────────────── |
190 | 207 |
|
191 | 208 | @bp.route("/api/workspace/tree") |
@@ -236,6 +253,16 @@ def _build_entries(directory: Path, current_depth: int) -> list: |
236 | 253 |
|
237 | 254 | entries = _build_entries(full, depth) |
238 | 255 |
|
| 256 | + # Filter top-level workspace folders based on role access |
| 257 | + rel_check = _repo_rel(full) |
| 258 | + if rel_check == "workspace" and current_user.is_authenticated: |
| 259 | + entries = [ |
| 260 | + e for e in entries |
| 261 | + if not e.get("is_dir") or has_workspace_folder_access( |
| 262 | + current_user.role, f"workspace/{e['name']}" |
| 263 | + ) |
| 264 | + ] |
| 265 | + |
239 | 266 | # Build breadcrumbs from repo-rel path |
240 | 267 | rel = _repo_rel(full) |
241 | 268 | parts = [p for p in rel.split("/") if p] |
@@ -266,6 +293,7 @@ def workspace_file_read(): |
266 | 293 | (REPO_ROOT / path).resolve(), WORKSPACE_DIR.resolve() |
267 | 294 | ) |
268 | 295 | full = _resolve_safe(path, require_admin=require_admin) |
| 296 | + _check_folder_access(_repo_rel(full)) |
269 | 297 |
|
270 | 298 | if not full.exists(): |
271 | 299 | return jsonify({"error": "File not found", "code": "not_found"}), 404 |
@@ -314,6 +342,7 @@ def workspace_file_write(): |
314 | 342 | (REPO_ROOT / path).resolve(), WORKSPACE_DIR.resolve() |
315 | 343 | ) |
316 | 344 | full = _resolve_safe(path, require_admin=require_admin) |
| 345 | + _check_folder_access(_repo_rel(full)) |
317 | 346 |
|
318 | 347 | if full.is_dir(): |
319 | 348 | return jsonify({"error": "Path is a directory", "code": "bad_path"}), 409 |
@@ -351,6 +380,7 @@ def workspace_file_create(): |
351 | 380 | (REPO_ROOT / path).resolve(), WORKSPACE_DIR.resolve() |
352 | 381 | ) |
353 | 382 | full = _resolve_safe(path, require_admin=require_admin) |
| 383 | + _check_folder_access(_repo_rel(full)) |
354 | 384 |
|
355 | 385 | if full.exists(): |
356 | 386 | return jsonify({"error": "File already exists", "code": "conflict"}), 409 |
@@ -385,6 +415,7 @@ def workspace_folder_create(): |
385 | 415 | (REPO_ROOT / path).resolve(), WORKSPACE_DIR.resolve() |
386 | 416 | ) |
387 | 417 | full = _resolve_safe(path, require_admin=require_admin) |
| 418 | + _check_folder_access(_repo_rel(full)) |
388 | 419 |
|
389 | 420 | if full.exists(): |
390 | 421 | return jsonify({"error": "Directory already exists", "code": "conflict"}), 409 |
@@ -422,6 +453,8 @@ def workspace_rename(): |
422 | 453 | ) |
423 | 454 | full_from = _resolve_safe(from_path, require_admin=require_admin) |
424 | 455 | full_to = _resolve_safe(to_path, require_admin=require_admin) |
| 456 | + _check_folder_access(_repo_rel(full_from)) |
| 457 | + _check_folder_access(_repo_rel(full_to)) |
425 | 458 |
|
426 | 459 | if not full_from.exists(): |
427 | 460 | return jsonify({"error": "Source not found", "code": "not_found"}), 404 |
@@ -456,6 +489,7 @@ def workspace_file_delete(): |
456 | 489 | (REPO_ROOT / path).resolve(), WORKSPACE_DIR.resolve() |
457 | 490 | ) |
458 | 491 | full = _resolve_safe(path, require_admin=require_admin) |
| 492 | + _check_folder_access(_repo_rel(full)) |
459 | 493 |
|
460 | 494 | if not full.exists(): |
461 | 495 | return jsonify({"error": "File not found", "code": "not_found"}), 404 |
@@ -515,6 +549,7 @@ def workspace_upload(): |
515 | 549 | (REPO_ROOT / path).resolve(), WORKSPACE_DIR.resolve() |
516 | 550 | ) |
517 | 551 | dir_full = _resolve_safe(path, require_admin=require_admin) |
| 552 | + _check_folder_access(_repo_rel(dir_full)) |
518 | 553 |
|
519 | 554 | if not dir_full.is_dir(): |
520 | 555 | return jsonify({"error": "Target path is not a directory", "code": "bad_path"}), 400 |
@@ -549,6 +584,7 @@ def workspace_download(): |
549 | 584 | (REPO_ROOT / path).resolve(), WORKSPACE_DIR.resolve() |
550 | 585 | ) |
551 | 586 | full = _resolve_safe(path, require_admin=require_admin) |
| 587 | + _check_folder_access(_repo_rel(full)) |
552 | 588 |
|
553 | 589 | if not full.exists(): |
554 | 590 | return jsonify({"error": "File not found", "code": "not_found"}), 404 |
@@ -585,7 +621,11 @@ def workspace_recent(): |
585 | 621 | continue |
586 | 622 | if _is_blocklisted(f.name): |
587 | 623 | continue |
588 | | - if any(_is_blocklisted(part) for part in f.relative_to(WORKSPACE_DIR).parts): |
| 624 | + rel_parts = f.relative_to(WORKSPACE_DIR).parts |
| 625 | + if any(_is_blocklisted(part) for part in rel_parts): |
| 626 | + continue |
| 627 | + # Enforce folder access: check top-level folder |
| 628 | + if rel_parts and not has_workspace_folder_access(current_user.role, f"workspace/{rel_parts[0]}"): |
589 | 629 | continue |
590 | 630 | try: |
591 | 631 | stat = f.stat() |
|
0 commit comments