feat: add chunked file upload support to stream large files without buffering in memory#6189
feat: add chunked file upload support to stream large files without buffering in memory#6189FarhanAliRaza wants to merge 1 commit intoreflex-dev:mainfrom
Conversation
…uffering in memory Introduce `rx.upload_files_chunk` and the `/_upload_chunk` endpoint to allow uploading files in 8 MB chunks streamed directly to a background event handler via `rx.UploadChunkIterator`. This avoids buffering entire files in server memory and enables incremental processing as data arrives. - Add `UploadChunk`, `UploadChunkIterator`, and `UploadFilesChunk` to the public API - Add `uploadFilesChunk` JS client function that splits files into chunks with session tracking, progress reporting, and cancellation - Add `upload_chunk()` backend endpoint with multi-request session management and streaming multipart parsing - Refactor shared upload helpers (`_require_upload_headers`, `_get_upload_runtime_handler`, `_seed_upload_router_data`) - Add `sync_web_runtime_templates()` to keep `.web` runtime files in sync during `setup_frontend` - Add unit and integration tests for chunked uploads, cancellation, validation, and custom parameter names
|
This is an experimental implementation of chunked files being sent from the browser and server writing those files. Using multiple requests. |
Greptile SummaryThis PR introduces chunked file upload support via a new Key findings:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Browser as Browser (uploadFilesChunk)
participant Backend as Backend (/_upload_chunk)
participant Session as _ChunkUploadSession
participant Iterator as UploadChunkIterator
participant Handler as Background EventHandler
Browser->>Backend: POST /_upload_chunk?session_id=S&filename=F&offset=0
Note over Backend: _extract_chunk_upload_request_metadata()
Backend->>Session: _get_or_create_chunk_upload_session()
Session->>Iterator: UploadChunkIterator(maxsize=8)
Session->>Handler: _process_background(state, event)
Handler-->>Iterator: (consumes via async for)
loop For each request body chunk
Backend->>Iterator: iterator.push(UploadChunk)
Iterator-->>Handler: yields UploadChunk
end
Backend->>Backend: record_request()
Browser->>Backend: POST /_upload_chunk?session_id=S&filename=F&offset=N&complete=1
Backend->>Iterator: iterator.finish()
Iterator-->>Handler: StopAsyncIteration
Handler-->>Handler: writes files, updates state
alt Client Cancel
Browser->>Backend: POST /_upload_chunk?session_id=S&cancel=1
Backend->>Iterator: iterator.fail(RuntimeError)
Iterator-->>Handler: raises RuntimeError
end
Last reviewed commit: "feat: add chunked fi..." |
| @dataclasses.dataclass | ||
| class _UploadChunkPart: | ||
| """Track the current multipart file part for chunk uploads.""" | ||
|
|
||
| content_disposition: bytes | None = None | ||
| field_name: str = "" | ||
| filename: str | None = None | ||
| content_type: str = "" | ||
| item_headers: list[tuple[bytes, bytes]] = dataclasses.field(default_factory=list) | ||
| offset: int = 0 | ||
| bytes_emitted: int = 0 | ||
| is_upload_chunk: bool = False | ||
|
|
||
|
|
||
| class _UploadChunkMultipartParser: | ||
| """Streaming multipart parser for chunked uploads.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| headers: Headers, | ||
| stream: AsyncGenerator[bytes, None], | ||
| chunk_iter: UploadChunkIterator, | ||
| ) -> None: | ||
| self.headers = headers | ||
| self.stream = stream | ||
| self.chunk_iter = chunk_iter | ||
| self._charset = "" | ||
| self._current_partial_header_name = b"" | ||
| self._current_partial_header_value = b"" | ||
| self._current_part = _UploadChunkPart() | ||
| self._chunks_to_emit: list[UploadChunk] = [] | ||
| self._seen_upload_chunk = False | ||
| self._part_count = 0 | ||
| self._emitted_chunk_count = 0 | ||
| self._emitted_bytes = 0 | ||
| self._stream_chunk_count = 0 | ||
|
|
||
| def on_part_begin(self) -> None: | ||
| """Reset parser state for a new multipart part.""" | ||
| self._current_part = _UploadChunkPart() | ||
|
|
||
| def on_part_data(self, data: bytes, start: int, end: int) -> None: | ||
| """Record streamed chunk data for the current part.""" | ||
| if ( | ||
| not self._current_part.is_upload_chunk | ||
| or self._current_part.filename is None | ||
| ): | ||
| return | ||
|
|
||
| message_bytes = data[start:end] | ||
| self._chunks_to_emit.append( | ||
| UploadChunk( | ||
| filename=self._current_part.filename, | ||
| offset=self._current_part.offset + self._current_part.bytes_emitted, | ||
| content_type=self._current_part.content_type, | ||
| data=message_bytes, | ||
| ) | ||
| ) | ||
| self._current_part.bytes_emitted += len(message_bytes) | ||
| self._emitted_chunk_count += 1 | ||
| self._emitted_bytes += len(message_bytes) | ||
|
|
||
| def on_part_end(self) -> None: | ||
| """Emit a zero-byte chunk for empty file parts.""" | ||
| if ( | ||
| self._current_part.is_upload_chunk | ||
| and self._current_part.filename is not None | ||
| and self._current_part.bytes_emitted == 0 | ||
| ): | ||
| self._chunks_to_emit.append( | ||
| UploadChunk( | ||
| filename=self._current_part.filename, | ||
| offset=self._current_part.offset, | ||
| content_type=self._current_part.content_type, | ||
| data=b"", | ||
| ) | ||
| ) | ||
| self._emitted_chunk_count += 1 | ||
|
|
||
| def on_header_field(self, data: bytes, start: int, end: int) -> None: | ||
| """Accumulate multipart header field bytes.""" | ||
| self._current_partial_header_name += data[start:end] | ||
|
|
||
| def on_header_value(self, data: bytes, start: int, end: int) -> None: | ||
| """Accumulate multipart header value bytes.""" | ||
| self._current_partial_header_value += data[start:end] | ||
|
|
||
| def on_header_end(self) -> None: | ||
| """Store the completed multipart header.""" | ||
| field = self._current_partial_header_name.lower() | ||
| if field == b"content-disposition": | ||
| self._current_part.content_disposition = self._current_partial_header_value | ||
| self._current_part.item_headers.append(( | ||
| field, | ||
| self._current_partial_header_value, | ||
| )) | ||
| self._current_partial_header_name = b"" | ||
| self._current_partial_header_value = b"" | ||
|
|
||
| def on_headers_finished(self) -> None: | ||
| """Parse upload chunk metadata from multipart headers.""" | ||
| disposition, options = parse_options_header( | ||
| self._current_part.content_disposition | ||
| ) | ||
| if disposition != b"form-data": | ||
| msg = "Invalid upload chunk disposition." | ||
| raise MultiPartException(msg) | ||
|
|
||
| try: | ||
| field_name = _user_safe_decode(options[b"name"], self._charset) | ||
| except KeyError as err: | ||
| msg = 'The Content-Disposition header field "name" must be provided.' | ||
| raise MultiPartException(msg) from err | ||
|
|
||
| parts = field_name.split(":") | ||
| if len(parts) != 3 or parts[0] != "chunk": | ||
| msg = f"Invalid upload chunk field name: {field_name}" | ||
| raise MultiPartException(msg) | ||
|
|
||
| try: | ||
| int(parts[1]) | ||
| offset = int(parts[2]) | ||
| except ValueError as err: | ||
| msg = f"Invalid upload chunk field name: {field_name}" | ||
| raise MultiPartException(msg) from err | ||
|
|
||
| if offset < 0: | ||
| msg = f"Invalid upload chunk field name: {field_name}" | ||
| raise MultiPartException(msg) | ||
|
|
||
| try: | ||
| filename = _user_safe_decode(options[b"filename"], self._charset) | ||
| except KeyError as err: | ||
| msg = f"Missing filename for upload chunk field: {field_name}" | ||
| raise MultiPartException(msg) from err | ||
|
|
||
| content_type = "" | ||
| for header_name, header_value in self._current_part.item_headers: | ||
| if header_name == b"content-type": | ||
| content_type = _user_safe_decode(header_value, self._charset) | ||
| break | ||
|
|
||
| self._current_part.field_name = field_name | ||
| self._current_part.filename = filename | ||
| self._current_part.content_type = content_type | ||
| self._current_part.offset = offset | ||
| self._current_part.bytes_emitted = 0 | ||
| self._current_part.is_upload_chunk = True | ||
| self._seen_upload_chunk = True | ||
| self._part_count += 1 | ||
|
|
||
| def on_end(self) -> None: | ||
| """Finalize parser callbacks.""" | ||
|
|
||
| def stats(self) -> dict[str, int | bool]: | ||
| """Return parser statistics for logging.""" | ||
| return { | ||
| "parts": self._part_count, | ||
| "emitted_chunks": self._emitted_chunk_count, | ||
| "emitted_bytes": self._emitted_bytes, | ||
| "request_chunks": self._stream_chunk_count, | ||
| "saw_upload_chunk": self._seen_upload_chunk, | ||
| } | ||
|
|
||
| async def parse(self) -> None: | ||
| """Parse the incoming request stream and push chunks to the iterator. | ||
|
|
||
| Raises: | ||
| MultiPartException: If the request is not valid multipart upload data. | ||
| RuntimeError: If the upload handler exits before consuming all chunks. | ||
| """ | ||
| _, params = parse_options_header(self.headers["Content-Type"]) | ||
| charset = params.get(b"charset", "utf-8") | ||
| if isinstance(charset, bytes): | ||
| charset = charset.decode("latin-1") | ||
| self._charset = charset | ||
|
|
||
| try: | ||
| boundary = params[b"boundary"] | ||
| except KeyError as err: | ||
| msg = "Missing boundary in multipart." | ||
| raise MultiPartException(msg) from err | ||
|
|
||
| callbacks = { | ||
| "on_part_begin": self.on_part_begin, | ||
| "on_part_data": self.on_part_data, | ||
| "on_part_end": self.on_part_end, | ||
| "on_header_field": self.on_header_field, | ||
| "on_header_value": self.on_header_value, | ||
| "on_header_end": self.on_header_end, | ||
| "on_headers_finished": self.on_headers_finished, | ||
| "on_end": self.on_end, | ||
| } | ||
| parser = MultipartParser(boundary, cast(Any, callbacks)) | ||
|
|
||
| async for chunk in self.stream: | ||
| self._stream_chunk_count += 1 | ||
| parser.write(chunk) | ||
| while self._chunks_to_emit: | ||
| await self.chunk_iter.push(self._chunks_to_emit.pop(0)) | ||
|
|
||
| parser.finalize() | ||
| while self._chunks_to_emit: | ||
| await self.chunk_iter.push(self._chunks_to_emit.pop(0)) | ||
|
|
||
| if not self._seen_upload_chunk: | ||
| msg = "No file chunks were uploaded." | ||
| raise MultiPartException(msg) |
There was a problem hiding this comment.
Unused dead code:
_UploadChunkPart and _UploadChunkMultipartParser
Both _UploadChunkPart (lines 294–305) and _UploadChunkMultipartParser (lines 308–501) are defined here but are never instantiated or referenced anywhere in the codebase. The actual upload_file_chunk handler inside upload_chunk(app) streams raw bytes directly from request.stream() without ever using these classes:
async for chunk_bytes in request.stream():
await session.iterator.push(
UploadChunk(filename=metadata.filename or "", ...)
)On the JS side, chunks are sent as raw binary request bodies (not wrapped in multipart/form-data), so a multipart parser is not needed.
This represents roughly 200 lines of unreachable code. Either:
- These classes are left over from an earlier design and should be removed, or
- The implementation is incomplete and the multipart path was intended to be used.
The two imports added at the top of app.py specifically for this dead class (from python_multipart.multipart import MultipartParser, parse_options_header and from starlette.formparsers import MultiPartException, _user_safe_decode) would also become unnecessary after removal.
| ) | ||
| handler_upload_param = resolve_upload_chunk_handler_param(event_handler) | ||
|
|
||
| chunk_iter = UploadChunkIterator(maxsize=8) |
There was a problem hiding this comment.
Hardcoded magic number
maxsize=8
The literal 8 appears both as the default in UploadChunkIterator.__init__ (maxsize: int = 8) and again explicitly at this call site. Per the project rule about extracting magic numbers into named constants, consider extracting this into a module-level constant — e.g. _CHUNK_UPLOAD_ITERATOR_BUFFER_SIZE = 8 — and using it in both places. This makes it easy to tune the backpressure buffer without hunting for duplicate literals.
| chunk_iter = UploadChunkIterator(maxsize=8) | |
| chunk_iter = UploadChunkIterator(maxsize=_CHUNK_UPLOAD_ITERATOR_BUFFER_SIZE) |
Rule Used: String literals that are used as identifiers or ke... (source)
Learnt From
reflex-dev/flexgen#2170
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| console.log("Upload already in progress for ", upload_id); | ||
| return false; | ||
| } | ||
| const UPLOAD_CHUNK_SIZE = 8 * 1024 * 1024; |
There was a problem hiding this comment.
Missing human-readable size comment
Per project conventions, numeric literals representing sizes (or durations) should carry a comment explaining the human-readable value. Add a comment here:
| const UPLOAD_CHUNK_SIZE = 8 * 1024 * 1024; | |
| const UPLOAD_CHUNK_SIZE = 8 * 1024 * 1024; // 8 MB |
Rule Used: When using time-based calculations in code, includ... (source)
Learnt From
reflex-dev/flexgen#2190
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| upload_param_name = "files" | ||
| return self._as_event_spec( | ||
| handler, | ||
| client_handler_name="uploadFiles", |
There was a problem hiding this comment.
Hardcoded handler name string literals used as identifiers
The string "uploadFiles" (here and in UploadFilesChunk.as_event_spec → "uploadFilesChunk") is repeated across three files: reflex/event.py, reflex/components/core/upload.py (lines 319–320), and reflex/.templates/web/utils/state.js (lines 421, 453). Per the project rule about extracting string literals used as identifiers into named constants, consider defining module-level constants such as:
_CLIENT_HANDLER_UPLOAD_FILES = "uploadFiles"
_CLIENT_HANDLER_UPLOAD_FILES_CHUNK = "uploadFilesChunk"This makes future renames safe and avoids silent mismatches.
Rule Used: String literals that are used as identifiers or ke... (source)
Learnt From
reflex-dev/flexgen#2170
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| console.log("[reflex upload] applyRestEvent", { | ||
| event_name: event.name, | ||
| handler: event.handler, | ||
| upload_id: event.payload.upload_id, | ||
| file_payload_key: filePayloadKey, | ||
| file_count: uploadFilesPayload?.length ?? 0, | ||
| payload_keys: Object.keys(event.payload || {}), | ||
| }); |
There was a problem hiding this comment.
Direct
console.log bypasses the logUpload helper
upload.js defines a shared logUpload helper to provide consistent prefixed upload logging. The new applyRestEvent block calls console.log and console.warn directly instead of using that helper, which fires on every upload event in production and is inconsistent with the logging approach in upload.js.
Consider importing and using logUpload here for consistency. This would require exporting logUpload from upload.js and importing it in state.js.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
…
Introduce
rx.upload_files_chunkand the/_upload_chunkendpoint to allow uploading files in 8 MB chunks streamed directly to a background event handler viarx.UploadChunkIterator. This avoids buffering entire files in server memory and enables incremental processing as data arrives.UploadChunk,UploadChunkIterator, andUploadFilesChunkto the public APIuploadFilesChunkJS client function that splits files into chunks with session tracking, progress reporting, and cancellationupload_chunk()backend endpoint with multi-request session management and streaming multipart parsing_require_upload_headers,_get_upload_runtime_handler,_seed_upload_router_data)sync_web_runtime_templates()to keep.webruntime files in sync duringsetup_frontendAll Submissions:
Type of change
Please delete options that are not relevant.
New Feature Submission:
Changes To Core Features:
After these steps, you're ready to open a pull request.