Skip to content

Commit 2320246

Browse files
committed
feat: add chunked file upload support Streaming Upload API (rx.upload_files_chunk)
Implement chunked/streaming file uploads to handle large files without loading them entirely into memory. Moves upload handling logic from app.py to event.py, adds chunked upload JS helpers, and updates the upload component to support the new upload_files_chunk API. Includes unit and integration tests for chunked upload, cancel, and streaming.
1 parent d45a1bb commit 2320246

13 files changed

Lines changed: 1631 additions & 307 deletions

File tree

pyi_hashes.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"reflex/__init__.pyi": "0a3ae880e256b9fd3b960e12a2cb51a7",
2+
"reflex/__init__.pyi": "70485139882c5c114c121445a24c7b28",
33
"reflex/components/__init__.pyi": "ac05995852baa81062ba3d18fbc489fb",
44
"reflex/components/base/__init__.pyi": "16e47bf19e0d62835a605baa3d039c5a",
55
"reflex/components/base/app_wrap.pyi": "22e94feaa9fe675bcae51c412f5b67f1",
@@ -19,7 +19,7 @@
1919
"reflex/components/core/helmet.pyi": "43f8497c8fafe51e29dca1dd535d143a",
2020
"reflex/components/core/html.pyi": "86eb9d4c1bb4807547b2950d9a32e9fd",
2121
"reflex/components/core/sticky.pyi": "cb763b986a9b0654d1a3f33440dfcf60",
22-
"reflex/components/core/upload.pyi": "6dc28804a6dddf903e31162e87c1b023",
22+
"reflex/components/core/upload.pyi": "58c023b9149635894331528bf29eaf13",
2323
"reflex/components/core/window_events.pyi": "af33ccec866b9540ee7fbec6dbfbd151",
2424
"reflex/components/datadisplay/__init__.pyi": "52755871369acbfd3a96b46b9a11d32e",
2525
"reflex/components/datadisplay/code.pyi": "b86769987ef4d1cbdddb461be88539fd",
Lines changed: 126 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,11 @@
11
import JSON5 from "json5";
22
import env from "$/env.json";
33

4-
/**
5-
* Upload files to the server.
6-
*
7-
* @param state The state to apply the delta to.
8-
* @param handler The handler to use.
9-
* @param upload_id The upload id to use.
10-
* @param on_upload_progress The function to call on upload progress.
11-
* @param socket the websocket connection
12-
* @param extra_headers Extra headers to send with the request.
13-
* @param refs The refs object to store the abort controller in.
14-
* @param getBackendURL Function to get the backend URL.
15-
* @param getToken Function to get the Reflex token.
16-
*
17-
* @returns The response from posting to the UPLOADURL endpoint.
18-
*/
19-
export const uploadFiles = async (
20-
handler,
21-
files,
22-
upload_id,
23-
on_upload_progress,
24-
extra_headers,
25-
socket,
26-
refs,
27-
getBackendURL,
28-
getToken,
29-
) => {
30-
// return if there's no file to upload
31-
if (files === undefined || files.length === 0) {
32-
return false;
33-
}
34-
35-
const upload_ref_name = `__upload_controllers_${upload_id}`;
36-
37-
if (refs[upload_ref_name]) {
38-
console.log("Upload already in progress for ", upload_id);
39-
return false;
40-
}
41-
4+
const trackUploadResponse = (socket) => {
425
// Track how many partial updates have been processed for this upload.
436
let resp_idx = 0;
44-
const eventHandler = (progressEvent) => {
7+
8+
return (progressEvent) => {
459
const event_callbacks = socket._callbacks.$event;
4610
// Whenever called, responseText will contain the entire response so far.
4711
const chunks = progressEvent.event.target.responseText.trim().split("\n");
@@ -73,22 +37,32 @@ export const uploadFiles = async (
7337
}
7438
});
7539
};
40+
};
7641

77-
const controller = new AbortController();
78-
const formdata = new FormData();
42+
const sendUploadRequest = async ({
43+
handler,
44+
upload_id,
45+
on_upload_progress,
46+
extra_headers,
47+
refs,
48+
getToken,
49+
formdata,
50+
url,
51+
responseHandler,
52+
}) => {
53+
const upload_ref_name = `__upload_controllers_${upload_id}`;
7954

80-
// Add the token and handler to the file name.
81-
files.forEach((file) => {
82-
formdata.append("files", file, file.path || file.name);
83-
});
55+
if (refs[upload_ref_name]) {
56+
return false;
57+
}
58+
59+
const controller = new AbortController();
8460

85-
// Send the file to the server.
8661
refs[upload_ref_name] = controller;
8762

8863
return new Promise((resolve, reject) => {
8964
const xhr = new XMLHttpRequest();
9065

91-
// Set up event handlers
9266
xhr.onload = function () {
9367
if (xhr.status >= 200 && xhr.status < 300) {
9468
resolve({
@@ -112,42 +86,36 @@ export const uploadFiles = async (
11286
reject(new Error("Upload aborted"));
11387
};
11488

115-
// Handle upload progress
11689
if (on_upload_progress) {
11790
xhr.upload.onprogress = function (event) {
11891
if (event.lengthComputable) {
119-
const progressEvent = {
92+
on_upload_progress({
12093
loaded: event.loaded,
12194
total: event.total,
12295
progress: event.loaded / event.total,
123-
};
124-
on_upload_progress(progressEvent);
96+
});
12597
}
12698
};
12799
}
128100

129-
// Handle download progress with streaming response parsing
130-
xhr.onprogress = function (event) {
131-
if (eventHandler) {
132-
const progressEvent = {
101+
if (responseHandler) {
102+
xhr.onprogress = function (event) {
103+
responseHandler({
133104
event: {
134105
target: {
135106
responseText: xhr.responseText,
136107
},
137108
},
138109
progress: event.lengthComputable ? event.loaded / event.total : 0,
139-
};
140-
eventHandler(progressEvent);
141-
}
142-
};
110+
});
111+
};
112+
}
143113

144-
// Handle abort controller
145114
controller.signal.addEventListener("abort", () => {
146115
xhr.abort();
147116
});
148117

149-
// Configure and send request
150-
xhr.open("POST", getBackendURL(env.UPLOAD));
118+
xhr.open("POST", url);
151119
xhr.setRequestHeader("Reflex-Client-Token", getToken());
152120
xhr.setRequestHeader("Reflex-Event-Handler", handler);
153121
for (const [key, value] of Object.entries(extra_headers || {})) {
@@ -168,3 +136,99 @@ export const uploadFiles = async (
168136
delete refs[upload_ref_name];
169137
});
170138
};
139+
140+
/**
141+
* Upload files to the server.
142+
*
143+
* @param handler The handler to use.
144+
* @param upload_id The upload id to use.
145+
* @param on_upload_progress The function to call on upload progress.
146+
* @param extra_headers Extra headers to send with the request.
147+
* @param socket The websocket connection.
148+
* @param refs The refs object to store the abort controller in.
149+
* @param getBackendURL Function to get the backend URL.
150+
* @param getToken Function to get the Reflex token.
151+
*
152+
* @returns The response from posting to the upload endpoint.
153+
*/
154+
export const uploadFiles = async (
155+
handler,
156+
files,
157+
upload_id,
158+
on_upload_progress,
159+
extra_headers,
160+
socket,
161+
refs,
162+
getBackendURL,
163+
getToken,
164+
) => {
165+
if (files === undefined || files.length === 0) {
166+
return false;
167+
}
168+
169+
const formdata = new FormData();
170+
171+
files.forEach((file) => {
172+
formdata.append("files", file, file.path || file.name);
173+
});
174+
175+
return sendUploadRequest({
176+
handler,
177+
upload_id,
178+
on_upload_progress,
179+
extra_headers,
180+
refs,
181+
getToken,
182+
formdata,
183+
url: getBackendURL(env.UPLOAD),
184+
responseHandler: trackUploadResponse(socket),
185+
});
186+
};
187+
188+
/**
189+
* Upload files to the streaming chunk endpoint.
190+
*
191+
* @param handler The handler to use.
192+
* @param files The files to upload.
193+
* @param upload_id The upload id to use.
194+
* @param on_upload_progress The function to call on upload progress.
195+
* @param extra_headers Extra headers to send with the request.
196+
* @param _socket The websocket connection.
197+
* @param refs The refs object to store the abort controller in.
198+
* @param getBackendURL Function to get the backend URL.
199+
* @param getToken Function to get the Reflex token.
200+
*
201+
* @returns The response from posting to the chunk upload endpoint.
202+
*/
203+
export const uploadFilesChunk = async (
204+
handler,
205+
files,
206+
upload_id,
207+
on_upload_progress,
208+
extra_headers,
209+
_socket,
210+
refs,
211+
getBackendURL,
212+
getToken,
213+
) => {
214+
if (files === undefined || files.length === 0) {
215+
return false;
216+
}
217+
218+
const formdata = new FormData();
219+
220+
files.forEach((file) => {
221+
formdata.append("files", file, file.path || file.name);
222+
});
223+
224+
return sendUploadRequest({
225+
handler,
226+
upload_id,
227+
on_upload_progress,
228+
extra_headers,
229+
refs,
230+
getToken,
231+
formdata,
232+
url: getBackendURL(env.UPLOAD_CHUNK),
233+
});
234+
};

reflex/.templates/web/utils/state.js

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
} from "$/utils/context";
2121
import debounce from "$/utils/helpers/debounce";
2222
import throttle from "$/utils/helpers/throttle";
23-
import { uploadFiles } from "$/utils/helpers/upload";
23+
import { uploadFiles, uploadFilesChunk } from "$/utils/helpers/upload";
2424

2525
// Endpoint URLs.
2626
const EVENTURL = env.EVENT;
@@ -418,21 +418,27 @@ export const applyEvent = async (event, socket, navigate, params) => {
418418
*/
419419
export const applyRestEvent = async (event, socket, navigate, params) => {
420420
let eventSent = false;
421-
if (event.handler === "uploadFiles") {
422-
if (event.payload.files === undefined || event.payload.files.length === 0) {
421+
if (event.handler === "uploadFiles" || event.handler === "uploadFilesChunk") {
422+
const filePayloadKey = event.payload.upload_param_name || "files";
423+
const uploadFilesPayload =
424+
event.payload.files ?? event.payload[filePayloadKey];
425+
426+
if (uploadFilesPayload === undefined || uploadFilesPayload.length === 0) {
423427
// Submit the event over the websocket to trigger the event handler.
424428
return await applyEvent(
425-
ReflexEvent(event.name, { files: [] }),
429+
ReflexEvent(event.name, { [filePayloadKey]: [] }),
426430
socket,
427431
navigate,
428432
params,
429433
);
430434
}
431435

432436
// Start upload, but do not wait for it, which would block other events.
433-
uploadFiles(
437+
const uploadFn =
438+
event.handler === "uploadFilesChunk" ? uploadFilesChunk : uploadFiles;
439+
uploadFn(
434440
event.name,
435-
event.payload.files,
441+
uploadFilesPayload,
436442
event.payload.upload_id,
437443
event.payload.on_upload_progress,
438444
event.payload.extra_headers,

reflex/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,8 @@
301301
"event",
302302
"EventChain",
303303
"EventHandler",
304+
"UploadChunk",
305+
"UploadChunkIterator",
304306
"call_script",
305307
"call_function",
306308
"run_script",
@@ -320,6 +322,7 @@
320322
"set_value",
321323
"stop_propagation",
322324
"upload_files",
325+
"upload_files_chunk",
323326
"window_alert",
324327
],
325328
"istate.storage": [
@@ -348,6 +351,7 @@
348351
_SUBMODULES: set[str] = {
349352
"components",
350353
"app",
354+
"uploads",
351355
"style",
352356
"admin",
353357
"base",

0 commit comments

Comments
 (0)