diff --git a/packages/reflex-base/src/reflex_base/constants/__init__.py b/packages/reflex-base/src/reflex_base/constants/__init__.py index 7958e4b5643..e790572a13a 100644 --- a/packages/reflex-base/src/reflex_base/constants/__init__.py +++ b/packages/reflex-base/src/reflex_base/constants/__init__.py @@ -114,6 +114,7 @@ "RouteArgType", "RouteRegex", "RouteVar", + "RunningMode", "SocketEvent", "StateManagerMode", "Templates", diff --git a/packages/reflex-components-core/src/reflex_components_core/core/_upload.py b/packages/reflex-components-core/src/reflex_components_core/core/_upload.py index 01bf069853d..0674e071fbd 100644 --- a/packages/reflex-components-core/src/reflex_components_core/core/_upload.py +++ b/packages/reflex-components-core/src/reflex_components_core/core/_upload.py @@ -6,7 +6,7 @@ import contextlib import dataclasses from collections import deque -from collections.abc import AsyncGenerator, AsyncIterator +from collections.abc import AsyncGenerator, AsyncIterator, MutableMapping from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO, cast @@ -23,6 +23,8 @@ from typing_extensions import Self if TYPE_CHECKING: + from reflex_base.utils.types import ASGIApp, Receive, Scope, Send + from reflex.app import App @@ -575,6 +577,62 @@ async def _upload_chunk_file( return Response(status_code=202) +header_content_disposition = b"content-disposition" +header_content_type = b"content-type" +header_x_content_type_options = b"x-content-type-options" + + +class UploadedFilesHeadersMiddleware: + """ASGI middleware that adds security headers to uploaded file responses.""" + + def __init__(self, app: ASGIApp) -> None: + """Wrap an ASGI application with upload security headers. + + Args: + app: The ASGI application to wrap. + """ + self.app = app + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + """Add Content-Disposition and X-Content-Type-Options headers. + + Args: + scope: The ASGI scope. + receive: The ASGI receive callable. + send: The ASGI send callable. + """ + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + async def send_with_headers(message: MutableMapping[str, Any]) -> None: + if message["type"] == "http.response.start": + content_disposition = None + content_type = None + headers = [(header_x_content_type_options, b"nosniff")] + for header_name, header_value in message.get("headers", []): + lower_name = header_name.lower() + if lower_name == header_content_disposition: + content_disposition = header_value.lower() + # Always append content-disposition header if non-empty. + continue + if lower_name == header_x_content_type_options: + # Always replace this value with "nosniff", so ignore existing value. + continue + if lower_name == header_content_type: + content_type = header_value.lower() + headers.append((header_name, header_value)) + if content_type != b"application/pdf": + # Unknown content or non-PDF forces download. + content_disposition = b"attachment" + if content_disposition: + headers.append((header_content_disposition, content_disposition)) + message = {**message, "headers": headers} + await send(message) + + await self.app(scope, receive, send_with_headers) + + def upload(app: App): """Upload files, dispatching to buffered or streaming handling. diff --git a/reflex/app.py b/reflex/app.py index 39a896f6484..389d18f246a 100644 --- a/reflex/app.py +++ b/reflex/app.py @@ -68,8 +68,8 @@ from starlette.staticfiles import StaticFiles from typing_extensions import Unpack +from reflex._upload import UploadedFilesHeadersMiddleware, upload from reflex._upload import UploadFile as UploadFile -from reflex._upload import upload from reflex.admin import AdminDash from reflex.app_mixins import AppMixin, LifespanMixin, MiddlewareMixin from reflex.compiler import compiler @@ -714,7 +714,7 @@ def _add_optional_endpoints(self): # To access uploaded files. self._api.mount( str(constants.Endpoint.UPLOAD), - StaticFiles(directory=get_upload_dir()), + UploadedFilesHeadersMiddleware(StaticFiles(directory=get_upload_dir())), name="uploaded_files", ) diff --git a/tests/integration/test_media.py b/tests/integration/test_media.py index 9d0fe73d9df..08272d3b16a 100644 --- a/tests/integration/test_media.py +++ b/tests/integration/test_media.py @@ -55,6 +55,15 @@ def img_from_url(self) -> Image.Image: img_bytes = img_resp.content return Image.open(io.BytesIO(img_bytes)) + @rx.var + def generated_image(self) -> str: + # Generate a 150x150 red PNG and write it to the upload directory. + img = Image.new("RGB", (150, 150), "red") + upload_dir = rx.get_upload_dir() + upload_dir.mkdir(parents=True, exist_ok=True) + img.save(upload_dir / "generated.png") + return "generated.png" + app = rx.App() @app.add_page @@ -72,19 +81,53 @@ def index(): rx.image(src=State.img_gif, alt="GIF image", id="gif"), rx.image(src=State.img_webp, alt="WEBP image", id="webp"), rx.image(src=State.img_from_url, alt="Image from URL", id="from_url"), + rx.image( + src=rx.get_upload_url(State.generated_image), + alt="Uploaded image", + id="uploaded", + ), ) +def check_image_loaded( + driver, img, expected_width: int = 200, expected_height: int = 200 +) -> bool: + """Check whether an image element has fully loaded with expected dimensions. + + Args: + driver: WebDriver instance. + img: The image WebElement. + expected_width: Expected natural width. + expected_height: Expected natural height. + + Returns: + True if the image is complete and matches the expected dimensions. + """ + return driver.execute_script( + "return arguments[0].complete " + '&& typeof arguments[0].naturalWidth != "undefined" ' + "&& arguments[0].naturalWidth === arguments[1] " + '&& typeof arguments[0].naturalHeight != "undefined" ' + "&& arguments[0].naturalHeight === arguments[2]", + img, + expected_width, + expected_height, + ) + + @pytest.fixture -def media_app(tmp_path) -> Generator[AppHarness, None, None]: +def media_app(tmp_path, monkeypatch) -> Generator[AppHarness, None, None]: """Start MediaApp app at tmp_path via AppHarness. Args: tmp_path: pytest tmp_path fixture + monkeypatch: pytest monkeypatch fixture Yields: running AppHarness instance """ + monkeypatch.setenv("REFLEX_UPLOADED_FILES_DIR", str(tmp_path / "uploads")) + with AppHarness.create( root=tmp_path, app_source=MediaApp, @@ -116,52 +159,44 @@ def test_media_app(media_app: AppHarness): gif_img = driver.find_element(By.ID, "gif") webp_img = driver.find_element(By.ID, "webp") from_url_img = driver.find_element(By.ID, "from_url") - - def check_image_loaded(img, check_width=" == 200", check_height=" == 200"): - return driver.execute_script( - "console.log(arguments); return arguments[1].complete " - '&& typeof arguments[1].naturalWidth != "undefined" ' - f"&& arguments[1].naturalWidth {check_width} ", - '&& typeof arguments[1].naturalHeight != "undefined" ' - f"&& arguments[1].naturalHeight {check_height} ", - img, - ) + uploaded_img = driver.find_element(By.ID, "uploaded") default_img_src = default_img.get_attribute("src") assert default_img_src is not None assert default_img_src.startswith("data:image/png;base64") - assert check_image_loaded(default_img) + assert check_image_loaded(driver, default_img) bmp_img_src = bmp_img.get_attribute("src") assert bmp_img_src is not None assert bmp_img_src.startswith("data:image/bmp;base64") - assert check_image_loaded(bmp_img) + assert check_image_loaded(driver, bmp_img) jpg_img_src = jpg_img.get_attribute("src") assert jpg_img_src is not None assert jpg_img_src.startswith("data:image/jpeg;base64") - assert check_image_loaded(jpg_img) + assert check_image_loaded(driver, jpg_img) png_img_src = png_img.get_attribute("src") assert png_img_src is not None assert png_img_src.startswith("data:image/png;base64") - assert check_image_loaded(png_img) + assert check_image_loaded(driver, png_img) gif_img_src = gif_img.get_attribute("src") assert gif_img_src is not None assert gif_img_src.startswith("data:image/gif;base64") - assert check_image_loaded(gif_img) + assert check_image_loaded(driver, gif_img) webp_img_src = webp_img.get_attribute("src") assert webp_img_src is not None assert webp_img_src.startswith("data:image/webp;base64") - assert check_image_loaded(webp_img) + assert check_image_loaded(driver, webp_img) from_url_img_src = from_url_img.get_attribute("src") assert from_url_img_src is not None assert from_url_img_src.startswith("data:image/jpeg;base64") - assert check_image_loaded( - from_url_img, - check_width=" == 200", - check_height=" == 300", - ) + assert check_image_loaded(driver, from_url_img, expected_height=300) + + uploaded_img_src = uploaded_img.get_attribute("src") + assert uploaded_img_src is not None + assert "generated.png" in uploaded_img_src + assert check_image_loaded(driver, uploaded_img, 150, 150) diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py index 4c7f8e26997..2c9b71ecd77 100644 --- a/tests/integration/test_upload.py +++ b/tests/integration/test_upload.py @@ -8,17 +8,15 @@ from collections.abc import Generator from pathlib import Path from typing import Any -from urllib.parse import urlsplit import pytest from reflex_base.constants.event import Endpoint +from selenium.common.exceptions import NoAlertPresentException from selenium.webdriver.common.by import By import reflex as rx from reflex.testing import AppHarness, WebDriver -from .utils import poll_for_navigation - def UploadFile(): """App for testing dynamic routes.""" @@ -723,24 +721,132 @@ def test_upload_download_file( upload_box.send_keys(str(target_file)) upload_button.click() + # Wait for the upload to complete. + upload_done = driver.find_element(By.ID, "upload_done") + assert upload_file.poll_for_value(upload_done, exp_not_equal="false") == "true" + + # Configure the download directory using CDP. + download_dir = tmp_path / "downloads" + download_dir.mkdir() + driver.execute_cdp_cmd( + "Page.setDownloadBehavior", + {"behavior": "allow", "downloadPath": str(download_dir)}, + ) + + downloaded_file = download_dir / exp_name + # Download via event embedded in frontend code. download_frontend = driver.find_element(By.ID, "download-frontend") - with poll_for_navigation(driver): - download_frontend.click() - assert urlsplit(driver.current_url).path == f"/{Endpoint.UPLOAD.value}/test.txt" - assert driver.find_element(by=By.TAG_NAME, value="body").text == exp_contents - - # Go back and wait for the app to reload. - with poll_for_navigation(driver): - driver.back() - poll_for_token(driver, upload_file) + download_frontend.click() + AppHarness.expect(lambda: downloaded_file.exists()) + assert downloaded_file.read_text() == exp_contents + downloaded_file.unlink() # Download via backend event handler. download_backend = driver.find_element(By.ID, "download-backend") - with poll_for_navigation(driver): - download_backend.click() - assert urlsplit(driver.current_url).path == f"/{Endpoint.UPLOAD.value}/test.txt" - assert driver.find_element(by=By.TAG_NAME, value="body").text == exp_contents + download_backend.click() + AppHarness.expect(lambda: downloaded_file.exists()) + assert downloaded_file.read_text() == exp_contents + + +@pytest.mark.parametrize( + ("exp_name", "exp_contents", "expect_attachment", "expected_mime_type"), + [ + ( + "malicious.html", + "", + True, + "text/html; charset=utf-8", + ), + ("document.pdf", "%PDF-1.4 fake pdf contents", False, "application/pdf"), + ("readme.txt", "plain text contents", True, "text/plain; charset=utf-8"), + ], + ids=["html", "pdf", "txt"], +) +def test_uploaded_file_security_headers( + tmp_path, + upload_file: AppHarness, + driver: WebDriver, + exp_name: str, + exp_contents: str, + expect_attachment: bool, + expected_mime_type: str, +): + """Upload a file and verify security headers on the served response. + + For non-PDF files, Content-Disposition: attachment must be set to force a + download. For PDF files, Content-Disposition must NOT be set so the browser + can render them inline, but Content-Type: application/pdf is always present. + X-Content-Type-Options: nosniff is always required. + + Args: + tmp_path: pytest tmp_path fixture + upload_file: harness for UploadFile app. + driver: WebDriver instance. + exp_name: filename to upload. + exp_contents: file contents to upload. + expect_attachment: whether the response should force a download. + expected_mime_type: expected Content-Type mime type. + """ + import httpx + from reflex_base.config import get_config + + assert upload_file.app_instance is not None + poll_for_token(driver, upload_file) + clear_btn = driver.find_element(By.ID, "clear_uploads") + clear_btn.click() + + upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[2] + upload_button = driver.find_element(By.ID, "upload_button_tertiary") + + target_file = tmp_path / exp_name + target_file.write_text(exp_contents) + + upload_box.send_keys(str(target_file)) + upload_button.click() + + upload_done = driver.find_element(By.ID, "upload_done") + assert upload_file.poll_for_value(upload_done, exp_not_equal="false") == "true" + + # Fetch the uploaded file directly via httpx and check security headers. + upload_url = f"{get_config().api_url}/{Endpoint.UPLOAD.value}/{exp_name}" + resp = httpx.get(upload_url) + assert resp.status_code == 200 + assert resp.text == exp_contents + assert resp.headers["x-content-type-options"] == "nosniff" + assert resp.headers["content-type"] == expected_mime_type + + if expect_attachment: + assert resp.headers["content-disposition"] == "attachment" + else: + assert "content-disposition" not in resp.headers + + if not expect_attachment: + # PDF: no browser download test needed, skip the rest. + return + + # Configure the download directory using CDP. + download_dir = tmp_path / "downloads" + download_dir.mkdir() + driver.execute_cdp_cmd( + "Page.setDownloadBehavior", + {"behavior": "allow", "downloadPath": str(download_dir)}, + ) + + downloaded_file = download_dir / exp_name + + # Navigate to the uploaded HTML file in the browser and verify the script + # does not execute (Content-Disposition: attachment prevents rendering). + driver.get(upload_url) + # If the browser rendered the HTML, an alert('xss') dialog would appear. + # Verify no alert is present — the file should be downloaded, not rendered. + with pytest.raises(NoAlertPresentException): + alert = driver.switch_to.alert + alert.dismiss() + + # Also verify the file was downloaded with the correct contents. + AppHarness.expect(lambda: downloaded_file.exists()) + assert downloaded_file.read_text() == exp_contents def test_on_drop(