fix: avoid eagerly copying uploaded files into memory#6174
fix: avoid eagerly copying uploaded files into memory#6174masenf merged 5 commits intoreflex-dev:mainfrom
Conversation
Previously the upload handler read every file into a BytesIO buffer before passing it to the event handler, doubling peak memory for large uploads. Now the original Starlette temp files are passed through directly and form_data.close() is deferred until the streaming response completes. - Pass file.file directly instead of copying into BytesIO - Defer form_data.close() to finally block in streaming generator - Close form_data on error path to prevent resource leaks - Replace mock form data in tests with real Starlette FormData - Add test verifying files are not read before handler runs
|
There is also another method to solve this issue. But that will require a more involved refactor. We kind of use Pheonix live view method. We upload the files in chunks from the frontend and the backend writes those chunks to files. That will be the nuclear option that can reduce Ram a lot. I have to fully research it . It sounds possible. |
Greptile SummaryThis PR eliminates the peak-memory double-spike for large uploads by removing the eager Key changes:
Minor concerns:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant UploadRoute as upload_file (Starlette route)
participant CreateEvent as _create_upload_event()
participant NdjsonGen as _ndjson_updates() generator
participant Handler as State event handler
participant FormData as form_data (Starlette temp files)
Client->>UploadRoute: POST /upload (multipart)
UploadRoute->>FormData: await request.form()
FormData-->>UploadRoute: form_data (SpooledTempFile refs)
UploadRoute->>CreateEvent: await _create_upload_event()
CreateEvent->>FormData: form_data.getlist("files")
Note over CreateEvent: wraps file.file directly<br/>(no BytesIO copy)
CreateEvent-->>UploadRoute: Event(payload={files: [UploadFile(file=file.file)]})
alt _create_upload_event raises
UploadRoute->>FormData: await form_data.close() [cleanup]
UploadRoute-->>Client: raise exception
else success
UploadRoute-->>Client: StreamingResponse(_ndjson_updates())
loop body_iterator consumed
NdjsonGen->>Handler: state._process(event)
Handler->>FormData: await file.read() [reads temp file]
Handler-->>NdjsonGen: state updates
NdjsonGen-->>Client: yield update JSON
end
NdjsonGen->>FormData: await form_data.close() [finally block]
Note over FormData: SpooledTempFile deleted from disk
end
|
| try: | ||
| event = await _create_upload_event() | ||
| except Exception: | ||
| await form_data.close() | ||
| raise |
There was a problem hiding this comment.
BaseException subclasses bypass form_data cleanup
except Exception does not catch BaseException subclasses such as asyncio.CancelledError (in Python ≥ 3.8 it is a BaseException, not an Exception). If the task is cancelled between _create_upload_event() returning and _ndjson_updates being entered, form_data is never closed and the underlying temp file / spooled file leaks.
A try / finally approach avoids the gap, since the success path hands ownership to _ndjson_updates:
event: Event | None = None
try:
event = await _create_upload_event()
finally:
if event is None:
await form_data.close()Alternatively, except BaseException with a raise covers cancellation as well.
masenf
left a comment
There was a problem hiding this comment.
Great work. I think the greptile comment is onto something, but instead of a finally, which would be detremental, just catch BaseException instead.
The test improvements are nice, I just had one suggestion for improving the assertions on the existing test case we had.
| assert not bio.closed | ||
|
|
||
| async for _ in streaming_response.body_iterator: | ||
| pass |
There was a problem hiding this comment.
we should assert not bio.closed for each iteration of this loop
| files_mock.getlist = getlist | ||
|
|
||
| return files_mock | ||
| return FormData([("files", file1), ("files", file2)]) |
There was a problem hiding this comment.
can you extend this test:
- use actual different BinaryIO instances for each file
- set a
_tmp_pathin the file upload state usingapp.modify_state(instead of the currentget_state). This path is where the handler will actually write the data that it reads from the "files" - update
multi_handle_uploadtoyieldat the end of processing each UploadFile - each iteration of the streaming loop should assert that both BinaryIO handles remain unclosed
- at the end of processing, also check that the files got written to the tmp dir correctly and that both BinaryIO handles got closed
- Extract _FileUploadMixin to eliminate duplicated fields and methods across FileUploadState, ChildFileUploadState, and GrandChildFileUploadState - Fix shared BytesIO bug where both UploadFile instances shared one buffer, causing the second file to write empty content - Use app.modify_state to set _tmp_path on the state instance instead of setting class attributes directly - Assert incremental delta updates during streaming (yield per file) - Verify both BinaryIO handles remain open during streaming and close after form_data.close() - Verify uploaded files are written to tmp dir with correct content - Remove dead state._tmp_path assignments from error-case tests
Replace `except Exception` with `try/finally` + `event is None` guard so that `asyncio.CancelledError` (a BaseException) also triggers form_data cleanup. Add idempotent `_close_form_data()` helper to safely deduplicate cleanup across three paths: event creation failure, generator completion, and ASGI response teardown. Introduce `_UploadStreamingResponse` (module-level) that wraps `__call__` in try/finally as a safety net for the case where the response is cancelled before the generator is ever entered. Add tests verifying form_data.close() is called on: - CancelledError during event creation (get_state cancelled) - CancelledError during ASGI send (before generator iteration)
|
I have fixed the reviews. |
masenf
left a comment
There was a problem hiding this comment.
suggestion based on your last comment.
i like the new approach of baking the close into the streaming response.
| try: | ||
| # Process the event. | ||
| async with app.state_manager.modify_state_with_links( | ||
| event.substate_token | ||
| ) as state: | ||
| async for update in state._process(event): | ||
| # Postprocess the event. | ||
| update = await app._postprocess(state, event, update) | ||
| yield update.json() + "\n" | ||
| finally: | ||
| await _close_form_data() |
There was a problem hiding this comment.
yeah i think cleaning up this path since we handle it with _UploadStreamingResponse makes the code more maintainable
| try: | |
| # Process the event. | |
| async with app.state_manager.modify_state_with_links( | |
| event.substate_token | |
| ) as state: | |
| async for update in state._process(event): | |
| # Postprocess the event. | |
| update = await app._postprocess(state, event, update) | |
| yield update.json() + "\n" | |
| finally: | |
| await _close_form_data() | |
| # Process the event. | |
| async with app.state_manager.modify_state_with_links( | |
| event.substate_token | |
| ) as state: | |
| async for update in state._process(event): | |
| # Postprocess the event. | |
| update = await app._postprocess(state, event, update) | |
| yield update.json() + "\n" |
|
Integration tests pass locally . The test is maybe flaky. |
Previously the upload handler read every file into a BytesIO buffer before passing it to the event handler, doubling peak memory for large uploads. Now the original Starlette temp files are passed through directly and form_data.close() is deferred until the streaming response completes.
After these fixes, 4 GB will peak at about 4 GB of RAM when the user actually reads the file. Before there were two spikes, once when copying and once when the user read the file in its view.
All Submissions:
Type of change
Please delete options that are not relevant.
closes #3517