Skip to content

feat: add chunked file upload support to stream large files without buffering in memory#6189

Closed
FarhanAliRaza wants to merge 1 commit intoreflex-dev:mainfrom
FarhanAliRaza:chunked-upload
Closed

feat: add chunked file upload support to stream large files without buffering in memory#6189
FarhanAliRaza wants to merge 1 commit intoreflex-dev:mainfrom
FarhanAliRaza:chunked-upload

Conversation

@FarhanAliRaza
Copy link
Copy Markdown
Collaborator

Introduce rx.upload_files_chunk and the /_upload_chunk endpoint to allow uploading files in 8 MB chunks streamed directly to a background event handler via rx.UploadChunkIterator. This avoids buffering entire files in server memory and enables incremental processing as data arrives.

  • Add UploadChunk, UploadChunkIterator, and UploadFilesChunk to the public API
  • Add uploadFilesChunk JS client function that splits files into chunks with session tracking, progress reporting, and cancellation
  • Add upload_chunk() backend endpoint with multi-request session management and streaming multipart parsing
  • Refactor shared upload helpers (_require_upload_headers, _get_upload_runtime_handler, _seed_upload_router_data)
  • Add sync_web_runtime_templates() to keep .web runtime files in sync during setup_frontend
  • Add unit and integration tests for chunked uploads, cancellation, validation, and custom parameter names

All Submissions:

  • Have you followed the guidelines stated in CONTRIBUTING.md file?
  • Have you checked to ensure there aren't any other open Pull Requests for the desired changed?

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

New Feature Submission:

  • Does your submission pass the tests?
  • Have you linted your code locally prior to submission?

Changes To Core Features:

  • Have you added an explanation of what your changes do and why you'd like us to include them?
  • Have you written new tests for your core changes, as applicable?
  • Have you successfully ran tests with your changes locally?

After these steps, you're ready to open a pull request.

a. Give a descriptive title to your PR.

b. Describe your changes.

c. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if such).

…uffering in memory

Introduce `rx.upload_files_chunk` and the `/_upload_chunk` endpoint to
allow uploading files in 8 MB chunks streamed directly to a background
event handler via `rx.UploadChunkIterator`. This avoids buffering entire
files in server memory and enables incremental processing as data
arrives.

- Add `UploadChunk`, `UploadChunkIterator`, and `UploadFilesChunk` to
  the public API
- Add `uploadFilesChunk` JS client function that splits files into
  chunks with session tracking, progress reporting, and cancellation
- Add `upload_chunk()` backend endpoint with multi-request session
  management and streaming multipart parsing
- Refactor shared upload helpers (`_require_upload_headers`,
  `_get_upload_runtime_handler`, `_seed_upload_router_data`)
- Add `sync_web_runtime_templates()` to keep `.web` runtime files in
  sync during `setup_frontend`
- Add unit and integration tests for chunked uploads, cancellation,
  validation, and custom parameter names
@FarhanAliRaza FarhanAliRaza marked this pull request as draft March 18, 2026 17:00
}

const controller = createChunkUploadController();
const sessionId = createChunkUploadSessionId();

Check failure

Code scanning / CodeQL

Insecure randomness High

This uses a cryptographically insecure random number generated at
Math.random()
in a security context.
@FarhanAliRaza
Copy link
Copy Markdown
Collaborator Author

This is an experimental implementation of chunked files being sent from the browser and server writing those files. Using multiple requests.

@codspeed-hq
Copy link
Copy Markdown

codspeed-hq bot commented Mar 18, 2026

Merging this PR will not alter performance

✅ 8 untouched benchmarks


Comparing FarhanAliRaza:chunked-upload (78b656c) with main (d45a1bb)

Open in CodSpeed

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 18, 2026

Greptile Summary

This PR introduces chunked file upload support via a new /_upload_chunk endpoint and the rx.upload_files_chunk / rx.UploadChunkIterator public API, enabling large files to be streamed directly to a @rx.event(background=True) handler without buffering the entire file in server memory. The core flow is: the JS client (uploadFilesChunk) splits each file into 8 MB slices and sends them as sequential POST requests; the server aggregates them per logical session (_ChunkUploadSession) and feeds them to an UploadChunkIterator that the background handler iterates over. Supporting infrastructure includes a sync_web_runtime_templates() call during setup_frontend to keep .web runtime JS helpers in sync, and refactored shared upload helpers (_require_upload_headers, _get_upload_runtime_handler, _seed_upload_router_data).

Key findings:

  • Dead code_UploadChunkPart (lines 294–305) and _UploadChunkMultipartParser (lines 308–501) in reflex/app.py are defined but never instantiated or referenced. The actual handler reads raw bytes directly from request.stream(). The related imports (MultipartParser, parse_options_header, MultiPartException, _user_safe_decode) are also unused as a result. These ~200 lines should either be removed or the implementation should be completed to use them.
  • Inconsistent loggingstate.js calls console.log/console.warn directly in applyRestEvent instead of using the shared logUpload helper introduced in upload.js.
  • Magic numbersUploadChunkIterator(maxsize=8) in app.py hard-codes a value that already appears as the class default; a named constant would make the intended buffer size explicit.
  • String literal identifiers"uploadFiles" and "uploadFilesChunk" are repeated across event.py, upload.py, and state.js without a shared constant.

Confidence Score: 3/5

  • Functional feature with working tests, but contains ~200 lines of dead code in the critical app.py file that needs resolution before merging.
  • The chunked upload feature is well-designed and the tests pass, but the presence of _UploadChunkPart and _UploadChunkMultipartParser — two substantial classes that are completely unreferenced in the actual request handler — is a significant red flag. It either indicates an incomplete implementation (multipart parsing was planned but not wired up) or leftover code from a refactor. Until the intent is clarified and the dead code resolved, the PR introduces unnecessary maintenance burden and potential confusion about how the endpoint actually processes data.
  • reflex/app.py requires the most attention due to the unused _UploadChunkPart and _UploadChunkMultipartParser classes spanning lines 294–501.

Important Files Changed

Filename Overview
reflex/app.py Adds upload_chunk endpoint, session management, and all chunked-upload server logic, but includes ~200 lines of dead code (_UploadChunkPart + _UploadChunkMultipartParser) that are never called; also contains a hardcoded maxsize=8 magic number at the session creation site.
reflex/.templates/web/utils/helpers/upload.js Good refactor that extracts shared helpers and adds uploadFilesChunk; minor: UPLOAD_CHUNK_SIZE constant lacks an inline // 8 MB comment per project convention.
reflex/.templates/web/utils/state.js Correctly wires uploadFilesChunk into applyRestEvent, but uses direct console.log/console.warn rather than the shared logUpload helper from upload.js.
reflex/event.py Adds UploadChunk, UploadChunkIterator, UploadFilesChunk, and two resolver functions; the string literals "uploadFiles" / "uploadFilesChunk" used as client_handler_name are repeated across multiple files and should be extracted into constants.
reflex/components/core/upload.py Extends on_drop / on_drop_rejected to accept chunk handlers; the hardcoded set {"uploadFiles", "uploadFilesChunk"} at lines 318–320 duplicates string literals that should be shared constants.
reflex/constants/event.py Cleanly adds UPLOAD_CHUNK = "_upload_chunk" to the Endpoint enum; automatically included in env.json via the existing set_env_json loop.
reflex/utils/frontend_skeleton.py Adds sync_web_runtime_templates() to refresh static runtime files in .web during setup_frontend; well-structured with clear path constants and test coverage.

Sequence Diagram

sequenceDiagram
    participant Browser as Browser (uploadFilesChunk)
    participant Backend as Backend (/_upload_chunk)
    participant Session as _ChunkUploadSession
    participant Iterator as UploadChunkIterator
    participant Handler as Background EventHandler

    Browser->>Backend: POST /_upload_chunk?session_id=S&filename=F&offset=0
    Note over Backend: _extract_chunk_upload_request_metadata()
    Backend->>Session: _get_or_create_chunk_upload_session()
    Session->>Iterator: UploadChunkIterator(maxsize=8)
    Session->>Handler: _process_background(state, event)
    Handler-->>Iterator: (consumes via async for)
    loop For each request body chunk
        Backend->>Iterator: iterator.push(UploadChunk)
        Iterator-->>Handler: yields UploadChunk
    end
    Backend->>Backend: record_request()

    Browser->>Backend: POST /_upload_chunk?session_id=S&filename=F&offset=N&complete=1
    Backend->>Iterator: iterator.finish()
    Iterator-->>Handler: StopAsyncIteration
    Handler-->>Handler: writes files, updates state

    alt Client Cancel
        Browser->>Backend: POST /_upload_chunk?session_id=S&cancel=1
        Backend->>Iterator: iterator.fail(RuntimeError)
        Iterator-->>Handler: raises RuntimeError
    end
Loading

Last reviewed commit: "feat: add chunked fi..."

Comment on lines +294 to +501
@dataclasses.dataclass
class _UploadChunkPart:
"""Track the current multipart file part for chunk uploads."""

content_disposition: bytes | None = None
field_name: str = ""
filename: str | None = None
content_type: str = ""
item_headers: list[tuple[bytes, bytes]] = dataclasses.field(default_factory=list)
offset: int = 0
bytes_emitted: int = 0
is_upload_chunk: bool = False


class _UploadChunkMultipartParser:
"""Streaming multipart parser for chunked uploads."""

def __init__(
self,
headers: Headers,
stream: AsyncGenerator[bytes, None],
chunk_iter: UploadChunkIterator,
) -> None:
self.headers = headers
self.stream = stream
self.chunk_iter = chunk_iter
self._charset = ""
self._current_partial_header_name = b""
self._current_partial_header_value = b""
self._current_part = _UploadChunkPart()
self._chunks_to_emit: list[UploadChunk] = []
self._seen_upload_chunk = False
self._part_count = 0
self._emitted_chunk_count = 0
self._emitted_bytes = 0
self._stream_chunk_count = 0

def on_part_begin(self) -> None:
"""Reset parser state for a new multipart part."""
self._current_part = _UploadChunkPart()

def on_part_data(self, data: bytes, start: int, end: int) -> None:
"""Record streamed chunk data for the current part."""
if (
not self._current_part.is_upload_chunk
or self._current_part.filename is None
):
return

message_bytes = data[start:end]
self._chunks_to_emit.append(
UploadChunk(
filename=self._current_part.filename,
offset=self._current_part.offset + self._current_part.bytes_emitted,
content_type=self._current_part.content_type,
data=message_bytes,
)
)
self._current_part.bytes_emitted += len(message_bytes)
self._emitted_chunk_count += 1
self._emitted_bytes += len(message_bytes)

def on_part_end(self) -> None:
"""Emit a zero-byte chunk for empty file parts."""
if (
self._current_part.is_upload_chunk
and self._current_part.filename is not None
and self._current_part.bytes_emitted == 0
):
self._chunks_to_emit.append(
UploadChunk(
filename=self._current_part.filename,
offset=self._current_part.offset,
content_type=self._current_part.content_type,
data=b"",
)
)
self._emitted_chunk_count += 1

def on_header_field(self, data: bytes, start: int, end: int) -> None:
"""Accumulate multipart header field bytes."""
self._current_partial_header_name += data[start:end]

def on_header_value(self, data: bytes, start: int, end: int) -> None:
"""Accumulate multipart header value bytes."""
self._current_partial_header_value += data[start:end]

def on_header_end(self) -> None:
"""Store the completed multipart header."""
field = self._current_partial_header_name.lower()
if field == b"content-disposition":
self._current_part.content_disposition = self._current_partial_header_value
self._current_part.item_headers.append((
field,
self._current_partial_header_value,
))
self._current_partial_header_name = b""
self._current_partial_header_value = b""

def on_headers_finished(self) -> None:
"""Parse upload chunk metadata from multipart headers."""
disposition, options = parse_options_header(
self._current_part.content_disposition
)
if disposition != b"form-data":
msg = "Invalid upload chunk disposition."
raise MultiPartException(msg)

try:
field_name = _user_safe_decode(options[b"name"], self._charset)
except KeyError as err:
msg = 'The Content-Disposition header field "name" must be provided.'
raise MultiPartException(msg) from err

parts = field_name.split(":")
if len(parts) != 3 or parts[0] != "chunk":
msg = f"Invalid upload chunk field name: {field_name}"
raise MultiPartException(msg)

try:
int(parts[1])
offset = int(parts[2])
except ValueError as err:
msg = f"Invalid upload chunk field name: {field_name}"
raise MultiPartException(msg) from err

if offset < 0:
msg = f"Invalid upload chunk field name: {field_name}"
raise MultiPartException(msg)

try:
filename = _user_safe_decode(options[b"filename"], self._charset)
except KeyError as err:
msg = f"Missing filename for upload chunk field: {field_name}"
raise MultiPartException(msg) from err

content_type = ""
for header_name, header_value in self._current_part.item_headers:
if header_name == b"content-type":
content_type = _user_safe_decode(header_value, self._charset)
break

self._current_part.field_name = field_name
self._current_part.filename = filename
self._current_part.content_type = content_type
self._current_part.offset = offset
self._current_part.bytes_emitted = 0
self._current_part.is_upload_chunk = True
self._seen_upload_chunk = True
self._part_count += 1

def on_end(self) -> None:
"""Finalize parser callbacks."""

def stats(self) -> dict[str, int | bool]:
"""Return parser statistics for logging."""
return {
"parts": self._part_count,
"emitted_chunks": self._emitted_chunk_count,
"emitted_bytes": self._emitted_bytes,
"request_chunks": self._stream_chunk_count,
"saw_upload_chunk": self._seen_upload_chunk,
}

async def parse(self) -> None:
"""Parse the incoming request stream and push chunks to the iterator.

Raises:
MultiPartException: If the request is not valid multipart upload data.
RuntimeError: If the upload handler exits before consuming all chunks.
"""
_, params = parse_options_header(self.headers["Content-Type"])
charset = params.get(b"charset", "utf-8")
if isinstance(charset, bytes):
charset = charset.decode("latin-1")
self._charset = charset

try:
boundary = params[b"boundary"]
except KeyError as err:
msg = "Missing boundary in multipart."
raise MultiPartException(msg) from err

callbacks = {
"on_part_begin": self.on_part_begin,
"on_part_data": self.on_part_data,
"on_part_end": self.on_part_end,
"on_header_field": self.on_header_field,
"on_header_value": self.on_header_value,
"on_header_end": self.on_header_end,
"on_headers_finished": self.on_headers_finished,
"on_end": self.on_end,
}
parser = MultipartParser(boundary, cast(Any, callbacks))

async for chunk in self.stream:
self._stream_chunk_count += 1
parser.write(chunk)
while self._chunks_to_emit:
await self.chunk_iter.push(self._chunks_to_emit.pop(0))

parser.finalize()
while self._chunks_to_emit:
await self.chunk_iter.push(self._chunks_to_emit.pop(0))

if not self._seen_upload_chunk:
msg = "No file chunks were uploaded."
raise MultiPartException(msg)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unused dead code: _UploadChunkPart and _UploadChunkMultipartParser

Both _UploadChunkPart (lines 294–305) and _UploadChunkMultipartParser (lines 308–501) are defined here but are never instantiated or referenced anywhere in the codebase. The actual upload_file_chunk handler inside upload_chunk(app) streams raw bytes directly from request.stream() without ever using these classes:

async for chunk_bytes in request.stream():
    await session.iterator.push(
        UploadChunk(filename=metadata.filename or "", ...)
    )

On the JS side, chunks are sent as raw binary request bodies (not wrapped in multipart/form-data), so a multipart parser is not needed.

This represents roughly 200 lines of unreachable code. Either:

  1. These classes are left over from an earlier design and should be removed, or
  2. The implementation is incomplete and the multipart path was intended to be used.

The two imports added at the top of app.py specifically for this dead class (from python_multipart.multipart import MultipartParser, parse_options_header and from starlette.formparsers import MultiPartException, _user_safe_decode) would also become unnecessary after removal.

)
handler_upload_param = resolve_upload_chunk_handler_param(event_handler)

chunk_iter = UploadChunkIterator(maxsize=8)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Hardcoded magic number maxsize=8

The literal 8 appears both as the default in UploadChunkIterator.__init__ (maxsize: int = 8) and again explicitly at this call site. Per the project rule about extracting magic numbers into named constants, consider extracting this into a module-level constant — e.g. _CHUNK_UPLOAD_ITERATOR_BUFFER_SIZE = 8 — and using it in both places. This makes it easy to tune the backpressure buffer without hunting for duplicate literals.

Suggested change
chunk_iter = UploadChunkIterator(maxsize=8)
chunk_iter = UploadChunkIterator(maxsize=_CHUNK_UPLOAD_ITERATOR_BUFFER_SIZE)

Rule Used: String literals that are used as identifiers or ke... (source)

Learnt From
reflex-dev/flexgen#2170

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

console.log("Upload already in progress for ", upload_id);
return false;
}
const UPLOAD_CHUNK_SIZE = 8 * 1024 * 1024;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing human-readable size comment

Per project conventions, numeric literals representing sizes (or durations) should carry a comment explaining the human-readable value. Add a comment here:

Suggested change
const UPLOAD_CHUNK_SIZE = 8 * 1024 * 1024;
const UPLOAD_CHUNK_SIZE = 8 * 1024 * 1024; // 8 MB

Rule Used: When using time-based calculations in code, includ... (source)

Learnt From
reflex-dev/flexgen#2190

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

upload_param_name = "files"
return self._as_event_spec(
handler,
client_handler_name="uploadFiles",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Hardcoded handler name string literals used as identifiers

The string "uploadFiles" (here and in UploadFilesChunk.as_event_spec"uploadFilesChunk") is repeated across three files: reflex/event.py, reflex/components/core/upload.py (lines 319–320), and reflex/.templates/web/utils/state.js (lines 421, 453). Per the project rule about extracting string literals used as identifiers into named constants, consider defining module-level constants such as:

_CLIENT_HANDLER_UPLOAD_FILES = "uploadFiles"
_CLIENT_HANDLER_UPLOAD_FILES_CHUNK = "uploadFilesChunk"

This makes future renames safe and avoids silent mismatches.

Rule Used: String literals that are used as identifiers or ke... (source)

Learnt From
reflex-dev/flexgen#2170

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +426 to +433
console.log("[reflex upload] applyRestEvent", {
event_name: event.name,
handler: event.handler,
upload_id: event.payload.upload_id,
file_payload_key: filePayloadKey,
file_count: uploadFilesPayload?.length ?? 0,
payload_keys: Object.keys(event.payload || {}),
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Direct console.log bypasses the logUpload helper

upload.js defines a shared logUpload helper to provide consistent prefixed upload logging. The new applyRestEvent block calls console.log and console.warn directly instead of using that helper, which fires on every upload event in production and is inconsistent with the logging approach in upload.js.

Consider importing and using logUpload here for consistency. This would require exporting logUpload from upload.js and importing it in state.js.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants