Skip to content

Commit fe34ae8

Browse files
fix: avoid eagerly copying uploaded files into memory (#6174)
* fix: avoid eagerly copying uploaded files into memory Previously the upload handler read every file into a BytesIO buffer before passing it to the event handler, doubling peak memory for large uploads. Now the original Starlette temp files are passed through directly and form_data.close() is deferred until the streaming response completes. - Pass file.file directly instead of copying into BytesIO - Defer form_data.close() to finally block in streaming generator - Close form_data on error path to prevent resource leaks - Replace mock form data in tests with real Starlette FormData - Add test verifying files are not read before handler runs * test: improve upload tests with streaming assertions and deduplication - Extract _FileUploadMixin to eliminate duplicated fields and methods across FileUploadState, ChildFileUploadState, and GrandChildFileUploadState - Fix shared BytesIO bug where both UploadFile instances shared one buffer, causing the second file to write empty content - Use app.modify_state to set _tmp_path on the state instance instead of setting class attributes directly - Assert incremental delta updates during streaming (yield per file) - Verify both BinaryIO handles remain open during streaming and close after form_data.close() - Verify uploaded files are written to tmp dir with correct content - Remove dead state._tmp_path assignments from error-case tests * fix: prevent temp file leaks on upload cancellation Replace `except Exception` with `try/finally` + `event is None` guard so that `asyncio.CancelledError` (a BaseException) also triggers form_data cleanup. Add idempotent `_close_form_data()` helper to safely deduplicate cleanup across three paths: event creation failure, generator completion, and ASGI response teardown. Introduce `_UploadStreamingResponse` (module-level) that wraps `__call__` in try/finally as a safety net for the case where the response is cancelled before the generator is ever entered. Add tests verifying form_data.close() is called on: - CancelledError during event creation (get_state cancelled) - CancelledError during ASGI send (before generator iteration) * refactor: remove redundant try/finally in upload generator * test: use full ASGI lifecycle in upload form close test
1 parent 7607fa3 commit fe34ae8

File tree

3 files changed

+335
-197
lines changed

3 files changed

+335
-197
lines changed

reflex/app.py

Lines changed: 105 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import dataclasses
1010
import functools
1111
import inspect
12-
import io
1312
import json
1413
import operator
1514
import sys
@@ -19,6 +18,7 @@
1918
from collections.abc import (
2019
AsyncGenerator,
2120
AsyncIterator,
21+
Awaitable,
2222
Callable,
2323
Coroutine,
2424
Mapping,
@@ -1891,6 +1891,27 @@ async def health(_request: Request) -> JSONResponse:
18911891
return JSONResponse(content=health_status, status_code=status_code)
18921892

18931893

1894+
class _UploadStreamingResponse(StreamingResponse):
1895+
"""Streaming response that always releases upload form resources."""
1896+
1897+
_on_finish: Callable[[], Awaitable[None]]
1898+
1899+
def __init__(
1900+
self,
1901+
*args: Any,
1902+
on_finish: Callable[[], Awaitable[None]],
1903+
**kwargs: Any,
1904+
) -> None:
1905+
super().__init__(*args, **kwargs)
1906+
self._on_finish = on_finish
1907+
1908+
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
1909+
try:
1910+
await super().__call__(scope, receive, send)
1911+
finally:
1912+
await self._on_finish()
1913+
1914+
18941915
def upload(app: App):
18951916
"""Upload a file.
18961917
@@ -1920,87 +1941,98 @@ async def upload_file(request: Request):
19201941

19211942
# Get the files from the request.
19221943
try:
1923-
files = await request.form()
1944+
form_data = await request.form()
19241945
except ClientDisconnect:
19251946
return Response() # user cancelled
1926-
files = files.getlist("files")
1927-
if not files:
1928-
msg = "No files were uploaded."
1929-
raise UploadValueError(msg)
1930-
1931-
token = request.headers.get("reflex-client-token")
1932-
handler = request.headers.get("reflex-event-handler")
1933-
1934-
if not token or not handler:
1935-
raise HTTPException(
1936-
status_code=400,
1937-
detail="Missing reflex-client-token or reflex-event-handler header.",
1938-
)
19391947

1940-
# Get the state for the session.
1941-
substate_token = _substate_key(token, handler.rpartition(".")[0])
1942-
state = await app.state_manager.get_state(substate_token)
1948+
form_data_closed = False
19431949

1944-
handler_upload_param = ()
1950+
async def _close_form_data() -> None:
1951+
"""Close the parsed form data exactly once."""
1952+
nonlocal form_data_closed
1953+
if form_data_closed:
1954+
return
1955+
form_data_closed = True
1956+
await form_data.close()
19451957

1946-
_current_state, event_handler = state._get_event_handler(handler)
1958+
async def _create_upload_event() -> Event:
1959+
"""Create an upload event using the live Starlette temp files.
19471960
1948-
if event_handler.is_background:
1949-
msg = f"@rx.event(background=True) is not supported for upload handler `{handler}`."
1950-
raise UploadTypeError(msg)
1951-
func = event_handler.fn
1952-
if isinstance(func, functools.partial):
1953-
func = func.func
1954-
for k, v in get_type_hints(func).items():
1955-
if types.is_generic_alias(v) and types._issubclass(
1956-
get_args(v)[0],
1957-
UploadFile,
1958-
):
1959-
handler_upload_param = (k, v)
1960-
break
1961+
Returns:
1962+
The upload event backed by the original temp files.
1963+
"""
1964+
files = form_data.getlist("files")
1965+
if not files:
1966+
msg = "No files were uploaded."
1967+
raise UploadValueError(msg)
1968+
1969+
token = request.headers.get("reflex-client-token")
1970+
handler = request.headers.get("reflex-event-handler")
1971+
1972+
if not token or not handler:
1973+
raise HTTPException(
1974+
status_code=400,
1975+
detail="Missing reflex-client-token or reflex-event-handler header.",
1976+
)
19611977

1962-
if not handler_upload_param:
1963-
msg = (
1964-
f"`{handler}` handler should have a parameter annotated as "
1965-
"list[rx.UploadFile]"
1966-
)
1967-
raise UploadValueError(msg)
1968-
1969-
# Make a copy of the files as they are closed after the request.
1970-
# This behaviour changed from fastapi 0.103.0 to 0.103.1 as the
1971-
# AsyncExitStack was removed from the request scope and is now
1972-
# part of the routing function which closes this before the
1973-
# event is handled.
1974-
file_copies = []
1975-
for file in files:
1976-
if not isinstance(file, StarletteUploadFile):
1977-
raise UploadValueError(
1978-
"Uploaded file is not an UploadFile." + str(file)
1978+
# Get the state for the session.
1979+
substate_token = _substate_key(token, handler.rpartition(".")[0])
1980+
state = await app.state_manager.get_state(substate_token)
1981+
1982+
handler_upload_param = ()
1983+
1984+
_current_state, event_handler = state._get_event_handler(handler)
1985+
1986+
if event_handler.is_background:
1987+
msg = f"@rx.event(background=True) is not supported for upload handler `{handler}`."
1988+
raise UploadTypeError(msg)
1989+
func = event_handler.fn
1990+
if isinstance(func, functools.partial):
1991+
func = func.func
1992+
for k, v in get_type_hints(func).items():
1993+
if types.is_generic_alias(v) and types._issubclass(
1994+
get_args(v)[0],
1995+
UploadFile,
1996+
):
1997+
handler_upload_param = (k, v)
1998+
break
1999+
2000+
if not handler_upload_param:
2001+
msg = (
2002+
f"`{handler}` handler should have a parameter annotated as "
2003+
"list[rx.UploadFile]"
19792004
)
1980-
content_copy = io.BytesIO()
1981-
content_copy.write(await file.read())
1982-
content_copy.seek(0)
1983-
file_copies.append(
1984-
UploadFile(
1985-
file=content_copy,
1986-
path=Path(file.filename.lstrip("/")) if file.filename else None,
1987-
size=file.size,
1988-
headers=file.headers,
2005+
raise UploadValueError(msg)
2006+
2007+
# Keep the parsed form data alive until the upload event finishes so
2008+
# the underlying Starlette temp files remain available to the handler.
2009+
file_uploads = []
2010+
for file in files:
2011+
if not isinstance(file, StarletteUploadFile):
2012+
raise UploadValueError(
2013+
"Uploaded file is not an UploadFile." + str(file)
2014+
)
2015+
file_uploads.append(
2016+
UploadFile(
2017+
file=file.file,
2018+
path=Path(file.filename.lstrip("/")) if file.filename else None,
2019+
size=file.size,
2020+
headers=file.headers,
2021+
)
19892022
)
1990-
)
19912023

1992-
for file in files:
1993-
if not isinstance(file, StarletteUploadFile):
1994-
raise UploadValueError(
1995-
"Uploaded file is not an UploadFile." + str(file)
1996-
)
1997-
await file.close()
2024+
return Event(
2025+
token=token,
2026+
name=handler,
2027+
payload={handler_upload_param[0]: file_uploads},
2028+
)
19982029

1999-
event = Event(
2000-
token=token,
2001-
name=handler,
2002-
payload={handler_upload_param[0]: file_copies},
2003-
)
2030+
event: Event | None = None
2031+
try:
2032+
event = await _create_upload_event()
2033+
finally:
2034+
if event is None:
2035+
await _close_form_data()
20042036

20052037
async def _ndjson_updates():
20062038
"""Process the upload event, generating ndjson updates.
@@ -2018,9 +2050,10 @@ async def _ndjson_updates():
20182050
yield update.json() + "\n"
20192051

20202052
# Stream updates to client
2021-
return StreamingResponse(
2053+
return _UploadStreamingResponse(
20222054
_ndjson_updates(),
20232055
media_type="application/x-ndjson",
2056+
on_finish=_close_form_data,
20242057
)
20252058

20262059
return upload_file

tests/units/states/upload.py

Lines changed: 10 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Test states for upload-related tests."""
22

33
from pathlib import Path
4-
from typing import ClassVar
54

65
import reflex as rx
76
from reflex.state import BaseState, State
@@ -35,11 +34,11 @@ async def handle_upload(self, files: list[rx.UploadFile]):
3534
"""
3635

3736

38-
class FileUploadState(State):
39-
"""The base state for uploading a file."""
37+
class _FileUploadMixin(BaseState, mixin=True):
38+
"""Common fields and handlers for upload state tests."""
4039

4140
img_list: list[str]
42-
_tmp_path: ClassVar[Path]
41+
_tmp_path: Path = Path()
4342

4443
async def handle_upload2(self, files):
4544
"""Handle the upload of a file.
@@ -64,6 +63,7 @@ async def multi_handle_upload(self, files: list[rx.UploadFile]):
6463

6564
# Update the img var.
6665
self.img_list.append(file.name)
66+
yield
6767

6868
@rx.event(background=True)
6969
async def bg_upload(self, files: list[rx.UploadFile]):
@@ -74,87 +74,21 @@ async def bg_upload(self, files: list[rx.UploadFile]):
7474
"""
7575

7676

77+
class FileUploadState(_FileUploadMixin, State):
78+
"""The base state for uploading a file."""
79+
80+
7781
class FileStateBase1(State):
7882
"""The base state for a child FileUploadState."""
7983

8084

81-
class ChildFileUploadState(FileStateBase1):
85+
class ChildFileUploadState(_FileUploadMixin, FileStateBase1):
8286
"""The child state for uploading a file."""
8387

84-
img_list: list[str]
85-
_tmp_path: ClassVar[Path]
86-
87-
async def handle_upload2(self, files):
88-
"""Handle the upload of a file.
89-
90-
Args:
91-
files: The uploaded files.
92-
"""
93-
94-
async def multi_handle_upload(self, files: list[rx.UploadFile]):
95-
"""Handle the upload of a file.
96-
97-
Args:
98-
files: The uploaded files.
99-
"""
100-
for file in files:
101-
upload_data = await file.read()
102-
assert file.name is not None
103-
outfile = self._tmp_path / file.name
104-
105-
# Save the file.
106-
outfile.write_bytes(upload_data)
107-
108-
# Update the img var.
109-
self.img_list.append(file.name)
110-
111-
@rx.event(background=True)
112-
async def bg_upload(self, files: list[rx.UploadFile]):
113-
"""Background task cannot be upload handler.
114-
115-
Args:
116-
files: The uploaded files.
117-
"""
118-
11988

12089
class FileStateBase2(FileStateBase1):
12190
"""The parent state for a grandchild FileUploadState."""
12291

12392

124-
class GrandChildFileUploadState(FileStateBase2):
93+
class GrandChildFileUploadState(_FileUploadMixin, FileStateBase2):
12594
"""The child state for uploading a file."""
126-
127-
img_list: list[str]
128-
_tmp_path: ClassVar[Path]
129-
130-
async def handle_upload2(self, files):
131-
"""Handle the upload of a file.
132-
133-
Args:
134-
files: The uploaded files.
135-
"""
136-
137-
async def multi_handle_upload(self, files: list[rx.UploadFile]):
138-
"""Handle the upload of a file.
139-
140-
Args:
141-
files: The uploaded files.
142-
"""
143-
for file in files:
144-
upload_data = await file.read()
145-
assert file.name is not None
146-
outfile = self._tmp_path / file.name
147-
148-
# Save the file.
149-
outfile.write_bytes(upload_data)
150-
151-
# Update the img var.
152-
self.img_list.append(file.name)
153-
154-
@rx.event(background=True)
155-
async def bg_upload(self, files: list[rx.UploadFile]):
156-
"""Background task cannot be upload handler.
157-
158-
Args:
159-
files: The uploaded files.
160-
"""

0 commit comments

Comments
 (0)