Skip to content

Commit ea90a03

Browse files
ENG-9028: Move event queue to backend (#6267)
* Add StateToken[TOKEN_TYPE] for flexible state manager Special case access patterns for BaseState retrieval to allow for other types of state that have different semantics. * Use StateToken with redis and memory managers Update all unit test cases to use the new StateToken / BaseStateToken API * disambiguate class name * handle legacy tokens passed to app.modify_state * Add EventContext module * Move BaseState event processing to reflex.ievent.processor package Create the EventProcessor class to manage the backend event queue. Move event processing logic out of the BaseState and into a separate module. * Overhaul EventProcessor lifecycle * No tasks start until `.start()` is called * add graceful shutdown timeout to allow tasks to finish before cancellation * use more keyword only parameters * move BaseState-specific processing to new BaseStateEventProcessor subclass * add test fixtures for `mock_event_processor` that can process simple registered events * Fix test_app to use BaseStateEventProcessor make Event.substate_token no longer work, because we're deprecating `token` as an Event field, so we cannot rely on it under the covers. * Avoid handling the same exception multiple times in EventProcessor * Attach cls to setter event handlers * fix test_app, test_state and friends Use the new mock_base_state_event_processor fixture to process arbitrary events and assert on emitted events or deltas. * ENG-9198: implement ContextVar-based registry for BaseState and EventHandler This allows better control over which states and events are part of a given app and avoiding true global variables makes cleanup and testing much simpler. * Remove `token` field from Event * Clean up frontend Event and StateUpdate remove null/default fields when serializing Event from the frontend and StateUpdate from the backend. * remove get_app dependency from get_state and background tasks * Remove remaining get_app / mock_app dependency from tests Fix issue with background task delta calculation not starting from the root state * EventContext inherits from BaseContext remove extra `event_context` ContextVar being passed around * additional fixups * apply changes to migrated files separately * add missing import * remove pyleak integration from base_state_processor * EventProcessor.enqueue_stream_delta and task Future EventProcessor.enqueue now returns a Future that tracks the completion of the event (and can be used to cancel the event) EventProcessor.enqueue_stream_delta overrides the default emit_delta implementation and instead yields deltas directly to the caller as the event is processing. * Adapt upload endpoint to new EventProcessor * Fix test_expiration.py and other new state tests * Fix upload tests for new EventProcessor fixtures * add OPLOCK_ENABLED state_manager.close to tests * state.js: pass around params as a ref The function () => params.current baked inside the ensureSocketConnected function was getting a stale reference and the early events (hydrate, on load, client state) were missing the query parameters in their router_data and thus on_load was not working correctly. * registry: substate tracking and stateful component cache Store the state_full_name to substate mapping in RegistrationContext Make it easier to register / re-register select states and event handlers in a new RegistrationContext Store StatefulComponent cached components in RegistrationContext for easier resetting/dropping after compilation or for use in testing. * close old locks in disk/memory state manager * Remove state_manager from AppHarness Update all associated tests to make assertions using the browser/app and not attempting to fetch the backend state directly. This makes the tests more robust, reduces state_manager related hacks, and makes the tests easier to eventually migrate to an external process using granian where direct state_manager access will not be available. * Move reflex._internal to reflex_core._internal * move reflex.ievent to reflex_core._internal.event * replace "reload" functionality with internal rehydration instead of telling the frontend to reload, just hydrate and run on_load internally before processing the user's requested event. * incldue coverage from subpackages raise coverage bar back up to 72 at least * remove simulated pre-hydrated states * Add unit test cases for new registry/context/processor modules Add unit test cases for reflex.istate.manager.token * Use correct token in enqueue_stream_delta Use the delta's token when emitting to the processor queue. Return after emitting to the processor queue so the caller does not get deltas from unrelated tokens. Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Fix StateToken.deserialize implementation Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Update packages/reflex-core/src/reflex_core/_internal/event/processor/base_state_processor.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Fix StateToken mismerge (Thanks greptile) Remove TODO now that issue is created in repo. * move EventChain import to avoid circular dep * fix StateToken deserialize tests * Python 3.11 and 3.12 compatibility * Vendor the new async iterator `asyncio.as_completed` for < 3.13 * Alternative Queue.shutdown mechanism for < 3.13 * Alternative to `Task.get_context` for < 3.12 * from __future__ import annotations for new modules (so TYPE_CHECKING imports work) * bug with cls super() call on dataclass with slots=True * typing_extensions.deprecated * py3.10 Self compat * py3.10: typing_extensions Self * ugh more py3.10 Self compat * fix reflex_core -> reflex import * Handle py3.11 compatible queue shutdown better Join the queue that we None'd out to make sure all the tasks have flushed. Fix some import issues being reported on my branch. * state.js: pump the queue in processEvent If there is another event to process, chain to processEvent * AppHarness: pre-register SharedState so it's available in the base RegistrationContext * BaseStateEventProcessor: emit deltas before enqueuing events * set RegistrationContext in ASGI middleware Ensure that the RegistrationContext associated with the App is set in the task that handles an ASGI request. * EventFuture: tracks execution of chained events roll up chained events for use with enqueue_stream_deltas for HTTP invocation of events. * Add BaseState to reflex_core.event namespace for docgen EventHandler has a BaseState field, so it needs this reference to generate the proper documentation (and basically not crash) * EventProcessor.enqueue only accepts a single Event Add new `enqueue_many` function for queuing multiple events. This allows each enqueued event to get its own EventContext/txid and Future/Task tracking. Update tests to wait for Future returned by `enqueue` to ensure processing has completed before making any assertions. * attach the registration_context_middleware in App.__call__ We don't want to set it from a lifespan task, because once the middleware is added, it cannot be readded again. So if the same ASGI gets started and stopped, it will throw an error. Does this happen much in real life? No. But one AppHarness test was hitting it, and this is technically more correct. * test_connection_banner: use CDP to simulate network offline Instead of killing and restarting the server, which is slower and more annoying, just temporarily restrict network traffic in the browser. * reflex_core.event: provide BaseState as a namespace property Avoid other weird circular import issues that occur when the `sys.modules` record for the module is not actually the module namespace. * Track EventFuture children Ensure yielded event execution ordering, by waiting for sibling EventFuture for non-background tasks before processing. * use py3.11 compatible super() for dataclasses with slots * Only process one non-backend event per token * Fix event order assertions in test_event_chain * py3.11 super() fix again * Move _registration_context_middleware to top of asgi stack And don't register it as middleware, because it's not middleware technically (even though it worked the way it was). * Move registration context middle to not quite the top level app. The returned top level app should always continue to be a Starlette instance for compatibility. * Add cache_key and lock_key attributes to StateToken * Use cache_key and lock_key in StateManagerDisk and StateManagerRedis * update pyi_hashes * make reflex_base.event a package * Get rid of reflex_base._internal namespace * test_upload: extend sleep before cancellation increase chances of actually getting a single chunk processed before the cancel takes place. * re-add fix_events token param * Add StateManager.state property as a compat shim * deprecate StateUpdate.final (instead of removal) * Support legacy token format in StateManager implementations * add test case for StateManager legacy str tokens * ignore QueueShutDown when shutting down queue * optimize test_event_processing benchmark avoid overhead in stop() and join() paths * asyncio.QueueShutDown was only added in 3.13+ * Add deprecated typing hints for passing str token to StateManager * Move overloads outside of if TYPE_CHECKING --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent 8edcb7b commit ea90a03

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+6501
-2758
lines changed

packages/reflex-base/src/reflex_base/.templates/web/utils/state.js

Lines changed: 59 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ const cookies = new Cookies();
4040
// Dictionary holding component references.
4141
export const refs = {};
4242

43-
// Flag ensures that only one event is processing on the backend concurrently.
44-
let event_processing = false;
4543
// Array holding pending events to be processed.
4644
const event_queue = [];
4745

@@ -203,30 +201,28 @@ function urlFrom(string) {
203201
* @param socket The socket object to send the event on.
204202
* @param navigate The navigate function from useNavigate
205203
* @param params The params object from useParams
206-
*
207-
* @returns True if the event was sent, false if it was handled locally.
208204
*/
209205
export const applyEvent = async (event, socket, navigate, params) => {
210206
// Handle special events
211207
if (event.name == "_redirect") {
212208
if ((event.payload.path ?? undefined) === undefined) {
213-
return false;
209+
return;
214210
}
215211
if (event.payload.external) {
216212
window.open(
217213
event.payload.path,
218214
"_blank",
219215
"noopener" + (event.payload.popup ? ",popup" : ""),
220216
);
221-
return false;
217+
return;
222218
}
223219
const url = urlFrom(event.payload.path);
224220
let pathname = event.payload.path;
225221
if (url) {
226222
if (url.host !== window.location.host) {
227223
// External URL
228224
window.location.assign(event.payload.path);
229-
return false;
225+
return;
230226
} else {
231227
pathname = url.pathname + url.search + url.hash;
232228
}
@@ -236,37 +232,37 @@ export const applyEvent = async (event, socket, navigate, params) => {
236232
} else {
237233
navigate(pathname);
238234
}
239-
return false;
235+
return;
240236
}
241237

242238
if (event.name == "_remove_cookie") {
243239
cookies.remove(event.payload.key, { ...event.payload.options });
244240
queueEventIfSocketExists(initialEvents(), socket, navigate, params);
245-
return false;
241+
return;
246242
}
247243

248244
if (event.name == "_clear_local_storage") {
249245
localStorage.clear();
250246
queueEventIfSocketExists(initialEvents(), socket, navigate, params);
251-
return false;
247+
return;
252248
}
253249

254250
if (event.name == "_remove_local_storage") {
255251
localStorage.removeItem(event.payload.key);
256252
queueEventIfSocketExists(initialEvents(), socket, navigate, params);
257-
return false;
253+
return;
258254
}
259255

260256
if (event.name == "_clear_session_storage") {
261257
sessionStorage.clear();
262258
queueEventIfSocketExists(initialEvents(), socket, navigate, params);
263-
return false;
259+
return;
264260
}
265261

266262
if (event.name == "_remove_session_storage") {
267263
sessionStorage.removeItem(event.payload.key);
268264
queueEventIfSocketExists(initialEvents(), socket, navigate, params);
269-
return false;
265+
return;
270266
}
271267

272268
if (event.name == "_download") {
@@ -285,7 +281,7 @@ export const applyEvent = async (event, socket, navigate, params) => {
285281
a.download = event.payload.filename;
286282
a.click();
287283
a.remove();
288-
return false;
284+
return;
289285
}
290286

291287
if (event.name == "_set_focus") {
@@ -299,7 +295,7 @@ export const applyEvent = async (event, socket, navigate, params) => {
299295
} else {
300296
current.focus();
301297
}
302-
return false;
298+
return;
303299
}
304300

305301
if (event.name == "_blur_focus") {
@@ -313,7 +309,7 @@ export const applyEvent = async (event, socket, navigate, params) => {
313309
} else {
314310
current.blur();
315311
}
316-
return false;
312+
return;
317313
}
318314

319315
if (event.name == "_set_value") {
@@ -322,7 +318,7 @@ export const applyEvent = async (event, socket, navigate, params) => {
322318
if (ref.current) {
323319
ref.current.value = event.payload.value;
324320
}
325-
return false;
321+
return;
326322
}
327323

328324
if (
@@ -348,7 +344,7 @@ export const applyEvent = async (event, socket, navigate, params) => {
348344
window.onerror(e.message, null, null, null, e);
349345
}
350346
}
351-
return false;
347+
return;
352348
}
353349

354350
if (event.name == "_call_script" || event.name == "_call_function") {
@@ -375,36 +371,35 @@ export const applyEvent = async (event, socket, navigate, params) => {
375371
window.onerror(e.message, null, null, null, e);
376372
}
377373
}
378-
return false;
374+
return;
379375
}
380376

381377
// Update token and router data (if missing).
382-
event.token = getToken();
383378
if (
384379
event.router_data === undefined ||
385380
Object.keys(event.router_data).length === 0
386381
) {
387382
// Since we don't have router directly, we need to get info from our hooks
388383
event.router_data = {
389384
pathname: window.location.pathname,
390-
query: {
391-
...Object.fromEntries(new URLSearchParams(window.location.search)),
392-
...params(),
393-
},
394385
asPath:
395386
window.location.pathname +
396387
window.location.search +
397388
window.location.hash,
398389
};
390+
const query = {
391+
...Object.fromEntries(new URLSearchParams(window.location.search)),
392+
...params.current,
393+
};
394+
if (query && Object.keys(query).length > 0) {
395+
event.router_data.query = query;
396+
}
399397
}
400398

401399
// Send the event to the server.
402400
if (socket) {
403401
socket.emit("event", event);
404-
return true;
405402
}
406-
407-
return false;
408403
};
409404

410405
/**
@@ -413,11 +408,8 @@ export const applyEvent = async (event, socket, navigate, params) => {
413408
* @param socket The socket object to send the response event(s) on.
414409
* @param navigate The navigate function from React Router
415410
* @param params The params object from React Router
416-
*
417-
* @returns Whether the event was sent.
418411
*/
419412
export const applyRestEvent = async (event, socket, navigate, params) => {
420-
let eventSent = false;
421413
if (event.handler === "uploadFiles") {
422414
// Start upload, but do not wait for it, which would block other events.
423415
uploadFiles(
@@ -431,9 +423,7 @@ export const applyRestEvent = async (event, socket, navigate, params) => {
431423
getBackendURL,
432424
getToken,
433425
);
434-
return false;
435426
}
436-
return eventSent;
437427
};
438428

439429
/**
@@ -487,28 +477,21 @@ export const processEvent = async (socket, navigate, params) => {
487477
}
488478

489479
// Only proceed if we're not already processing an event.
490-
if (event_queue.length === 0 || event_processing) {
480+
if (event_queue.length === 0) {
491481
return;
492482
}
493483

494-
// Set processing to true to block other events from being processed.
495-
event_processing = true;
496-
497484
// Apply the next event in the queue.
498485
const event = event_queue.shift();
499486

500-
let eventSent = false;
501487
// Process events with handlers via REST and all others via websockets.
502488
if (event.handler) {
503-
eventSent = await applyRestEvent(event, socket, navigate, params);
489+
await applyRestEvent(event, socket, navigate, params);
504490
} else {
505-
eventSent = await applyEvent(event, socket, navigate, params);
491+
await applyEvent(event, socket, navigate, params);
506492
}
507-
// If no event was sent, set processing to false.
508-
if (!eventSent) {
509-
event_processing = false;
510-
// recursively call processEvent to drain the queue, since there is
511-
// no state update to trigger the useEffect event loop.
493+
// Process any remaining events.
494+
if (event_queue.length > 0) {
512495
await processEvent(socket, navigate, params);
513496
}
514497
};
@@ -621,17 +604,11 @@ export const connect = async (
621604
window.addEventListener("unload", disconnectTrigger);
622605
if (socket.current.rehydrate) {
623606
socket.current.rehydrate = false;
624-
queueEvents(
625-
initialEvents(),
626-
socket,
627-
true,
628-
navigate,
629-
() => params.current,
630-
);
607+
queueEvents(initialEvents(), socket, true, navigate, params);
631608
}
632609
// Drain any initial events from the queue.
633-
while (event_queue.length > 0 && !event_processing) {
634-
await processEvent(socket.current, navigate, () => params.current);
610+
while (event_queue.length > 0) {
611+
await processEvent(socket.current, navigate, params);
635612
}
636613
});
637614

@@ -650,12 +627,10 @@ export const connect = async (
650627
}, 200 * n_connect_errors); // Incremental backoff
651628
});
652629

653-
// When the socket disconnects reset the event_processing flag
654630
socket.current.on("disconnect", (reason, details) => {
655631
socket.current.wait_connect = false;
656632
const try_reconnect =
657633
reason !== "io server disconnect" && reason !== "io client disconnect";
658-
event_processing = false;
659634
window.removeEventListener("unload", disconnectTrigger);
660635
window.removeEventListener("beforeunload", disconnectTrigger);
661636
window.removeEventListener("pagehide", pagehideHandler);
@@ -667,30 +642,24 @@ export const connect = async (
667642

668643
// On each received message, queue the updates and events.
669644
socket.current.on("event", async (update) => {
670-
for (const substate in update.delta) {
671-
dispatch[substate](update.delta[substate]);
672-
// handle events waiting for `is_hydrated`
673-
if (
674-
substate === state_name &&
675-
update.delta[substate]?.is_hydrated_rx_state_
676-
) {
677-
queueEvents(on_hydrated_queue, socket, false, navigate, params);
678-
on_hydrated_queue.length = 0;
645+
if (update.delta && Object.keys(update.delta).length > 0) {
646+
for (const substate in update.delta) {
647+
dispatch[substate](update.delta[substate]);
648+
// handle events waiting for `is_hydrated`
649+
if (
650+
substate === state_name &&
651+
update.delta[substate]?.is_hydrated_rx_state_
652+
) {
653+
queueEvents(on_hydrated_queue, socket, false, navigate, params);
654+
on_hydrated_queue.length = 0;
655+
}
679656
}
657+
applyClientStorageDelta(client_storage, update.delta);
680658
}
681-
applyClientStorageDelta(client_storage, update.delta);
682-
if (update.final !== null) {
683-
event_processing = !update.final;
684-
}
685-
if (update.events) {
659+
if (update.events && update.events.length > 0) {
686660
queueEvents(update.events, socket, false, navigate, params);
687661
}
688662
});
689-
socket.current.on("reload", async (event) => {
690-
event_processing = false;
691-
on_hydrated_queue.push(event);
692-
queueEvents(initialEvents(), socket, true, navigate, params);
693-
});
694663
socket.current.on("new_token", async (new_token) => {
695664
token = new_token;
696665
window.sessionStorage.setItem(TOKEN_KEY, new_token);
@@ -713,7 +682,17 @@ export const ReflexEvent = (
713682
event_actions = {},
714683
handler = null,
715684
) => {
716-
return { name, payload, handler, event_actions };
685+
const e = { name };
686+
if (payload && Object.keys(payload).length > 0) {
687+
e.payload = payload;
688+
}
689+
if (event_actions && Object.keys(event_actions).length > 0) {
690+
e.event_actions = event_actions;
691+
}
692+
if (handler !== null) {
693+
e.handler = handler;
694+
}
695+
return e;
717696
};
718697

719698
/**
@@ -919,7 +898,7 @@ export const useEventLoop = (
919898
setConnectErrors,
920899
client_storage,
921900
navigate,
922-
() => params.current,
901+
params,
923902
);
924903
}
925904
}, [
@@ -947,7 +926,7 @@ export const useEventLoop = (
947926
}
948927

949928
return applyEventActions(
950-
() => queueEvents(_events, socket, false, navigate, () => params.current),
929+
() => queueEvents(_events, socket, false, navigate, params),
951930
event_actions,
952931
args,
953932
_events.map((e) => e.name).join("+++"),
@@ -958,13 +937,7 @@ export const useEventLoop = (
958937
const sentHydrate = useRef(false); // Avoid double-hydrate due to React strict-mode
959938
useEffect(() => {
960939
if (!sentHydrate.current) {
961-
queueEvents(
962-
initial_events(),
963-
socket,
964-
true,
965-
navigate,
966-
() => params.current,
967-
);
940+
queueEvents(initial_events(), socket, true, navigate, params);
968941
sentHydrate.current = true;
969942
}
970943
}, []);
@@ -1028,9 +1001,9 @@ export const useEventLoop = (
10281001
}
10291002
(async () => {
10301003
// Process all outstanding events.
1031-
while (event_queue.length > 0 && !event_processing) {
1004+
while (event_queue.length > 0) {
10321005
await ensureSocketConnected();
1033-
await processEvent(socket.current, navigate, () => params.current);
1006+
await processEvent(socket.current, navigate, params);
10341007
}
10351008
})();
10361009
});

0 commit comments

Comments
 (0)