From 3fa6bd9ba2db617012853f6ab9fab191fb6252fb Mon Sep 17 00:00:00 2001 From: Holger Schmidt Date: Sat, 2 Mar 2024 00:21:53 +0100 Subject: [PATCH 1/6] documented and commented debounce method to prevent misuse --- src/Fabulous/Cmd.fs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/Fabulous/Cmd.fs b/src/Fabulous/Cmd.fs index 4a4b96956..855ac2416 100644 --- a/src/Fabulous/Cmd.fs +++ b/src/Fabulous/Cmd.fs @@ -188,20 +188,33 @@ module Cmd = let inline msgOption (task: Task<'msg option>) = OfAsync.msgOption(task |> Async.AwaitTask) - /// Command to issue a message if no other message has been issued within the specified timeout + /// Creates a factory for Commands that dispatch a message only + /// if the factory produces no other Command within the specified timeout. + /// Helps control how often a message is dispatched by delaying the dispatch after a period of inactivity. + /// Useful for handling noisy inputs like keypresses or scrolling, and preventing too many actions in a short time, like rapid button clicks. + /// Note that this creates an object with internal state and is intended to be used per Program or longer-running background process + /// rather than once per message in the update function. + /// The time to wait for the next Command from the factory in milliseconds. + /// Maps a factory input value to a message for delayed dispatch. + /// A Command factory function that maps an input value to a "sleeper" Command which dispatches a delayed message (mapped from the value). + /// This command is cancelled if the factory produces another Command within the specified timeout; otherwise it succeeds and the message is dispatched. let debounce (timeout: int) (fn: 'value -> 'msg) : 'value -> Cmd<'msg> = - let funLock = obj() - let mutable cts: CancellationTokenSource = null + let funLock = obj() // ensures safe access to resources shared across different threads + let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command + // return a factory function mapping input values to "sleeper" Commands with delayed dispatch fun (value: 'value) -> [ fun dispatch -> lock funLock (fun () -> + // cancel the last sleeping Command issued earlier from this factory if cts <> null then cts.Cancel() cts.Dispose() + // make cancellation available to the factory's next Command cts <- new CancellationTokenSource() + // asynchronously wait for the specified time before dispatch Async.Start( async { do! Async.Sleep(timeout) @@ -209,6 +222,7 @@ module Cmd = lock funLock (fun () -> dispatch(fn value) + // done; invalidate own cancellation token if cts <> null then cts.Dispose() cts <- null) From e377841dd2732ef30142b292c48e8ac4cab7f116 Mon Sep 17 00:00:00 2001 From: Holger Schmidt Date: Fri, 1 Mar 2024 21:34:58 +0100 Subject: [PATCH 2/6] simplified debounce test adding message counting --- src/Fabulous.Tests/CmdTests.fs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/Fabulous.Tests/CmdTests.fs b/src/Fabulous.Tests/CmdTests.fs index 073b6d8fc..6b8411eea 100644 --- a/src/Fabulous.Tests/CmdTests.fs +++ b/src/Fabulous.Tests/CmdTests.fs @@ -13,13 +13,14 @@ module CmdTestsHelper = [] type ``Cmd tests``() = [] - member _.``Cmd.debounce only dispatch the last message``() = + member _.``Cmd.debounce only dispatches the last messages within the timeout``() = async { + let mutable messageCount = 0 let mutable actualValue = None let dispatch msg = - if actualValue.IsNone then - actualValue <- Some msg + messageCount <- messageCount + 1 + actualValue <- Some msg let triggerCmd = Cmd.debounce 100 NewValue @@ -30,14 +31,14 @@ type ``Cmd tests``() = triggerCmd 3 |> CmdTestsHelper.execute dispatch do! Async.Sleep 125 + Assert.AreEqual(1, messageCount) Assert.AreEqual(Some(NewValue 3), actualValue) - actualValue <- None - triggerCmd 4 |> CmdTestsHelper.execute dispatch do! Async.Sleep 75 triggerCmd 5 |> CmdTestsHelper.execute dispatch do! Async.Sleep 125 + Assert.AreEqual(2, messageCount) Assert.AreEqual(Some(NewValue 5), actualValue) } From a3fccbaca8f4434d722b18fabb68642f092dbc9f Mon Sep 17 00:00:00 2001 From: Holger Schmidt Date: Fri, 1 Mar 2024 21:23:02 +0100 Subject: [PATCH 3/6] added helpers and tests for creating throttled and buffered throttled command factories similar to Cmd.debounce --- src/Fabulous.Tests/CmdTests.fs | 107 +++++++++++++++++++++++++++++++++ src/Fabulous/Cmd.fs | 82 +++++++++++++++++++++++++ 2 files changed, 189 insertions(+) diff --git a/src/Fabulous.Tests/CmdTests.fs b/src/Fabulous.Tests/CmdTests.fs index 6b8411eea..62ec048e2 100644 --- a/src/Fabulous.Tests/CmdTests.fs +++ b/src/Fabulous.Tests/CmdTests.fs @@ -42,3 +42,110 @@ type ``Cmd tests``() = Assert.AreEqual(2, messageCount) Assert.AreEqual(Some(NewValue 5), actualValue) } + + [] + member _.``Cmd.throttle issues message at specified intervals``() = + async { + let mutable messageCount = 0 + let mutable actualValue = None + + let dispatch msg = + messageCount <- messageCount + 1 + actualValue <- Some msg + + let throttleCmd = Cmd.throttle 100 NewValue + + throttleCmd 1 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 50 + throttleCmd 2 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 75 + throttleCmd 3 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 125 + + Assert.AreEqual(2, messageCount) + Assert.AreEqual(Some(NewValue 3), actualValue) + + throttleCmd 4 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 75 + throttleCmd 5 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 125 + + Assert.AreEqual(3, messageCount) + Assert.AreEqual(Some(NewValue 4), actualValue) + } + + [] + member _.``Cmd.throttle issues only one message per interval``() = + async { + let mutable messageCount = 0 + let mutable actualValue = None + + let dispatch msg = + messageCount <- messageCount + 1 + actualValue <- Some msg + + let throttleCmd = Cmd.throttle 100 NewValue + + throttleCmd 1 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 20 + throttleCmd 2 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 35 + throttleCmd 3 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 125 + + // Only the first message should have been dispatched + Assert.AreEqual(1, messageCount) + Assert.AreEqual(Some(NewValue 1), actualValue) + } + + [] + member _.``Cmd.bufferedThrottle dispatches the first and most recent message within the specified interval``() = + async { + let mutable messageCount = 0 + let mutable actualValue = None + + let dispatch msg = + messageCount <- messageCount + 1 + actualValue <- Some msg + + let throttleCmd = Cmd.bufferedThrottle 100 NewValue + + throttleCmd 1 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 20 + throttleCmd 2 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 10 + throttleCmd 3 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 20 + throttleCmd 4 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 125 + + // Only the first and most recent message should be dispatched + Assert.AreEqual(2, messageCount) + Assert.AreEqual(Some(NewValue 4), actualValue) + } + + [] + member _.``Cmd.bufferedThrottle dispatches the most recent message even if delayed``() = + async { + let mutable actualValue = None + let mutable messageCount = 0 + + let dispatch msg = + messageCount <- messageCount + 1 + actualValue <- Some msg + + let throttleCmd = Cmd.bufferedThrottle 100 NewValue + + throttleCmd 1 |> CmdTestsHelper.execute dispatch + throttleCmd 2 |> CmdTestsHelper.execute dispatch + + // Only the first message should have been dispatched + Assert.AreEqual(1, messageCount) + Assert.AreEqual(Some(NewValue 1), actualValue) + + do! Async.Sleep 200 // Wait longer than the throttle interval + + // the second message should have been dispatched delayed + Assert.AreEqual(2, messageCount) + Assert.AreEqual(Some(NewValue 2), actualValue) + } diff --git a/src/Fabulous/Cmd.fs b/src/Fabulous/Cmd.fs index 855ac2416..b929ca7f5 100644 --- a/src/Fabulous/Cmd.fs +++ b/src/Fabulous/Cmd.fs @@ -229,3 +229,85 @@ module Cmd = }, cts.Token )) ] + + /// Creates a factory for Commands that dispatch a message only + /// if the factory produced no other Command within the specified interval. + /// This limits how often a message is dispatched by ensuring to only dispatch once within a certain time interval + /// and dropping messages that are produces during the cooldown. + /// Useful for limiting how often a progress message is shown or preventing too many updates to a UI element in a short time. + /// Note that this creates an object with internal state and is intended to be used per Program or longer-running background process + /// rather than once per message in the update function. + /// The minimum time interval between two consecutive Command executions in milliseconds. + /// Maps a factory input value to a message for dispatch. + /// A Command factory function that maps an input value to a "throttled" Command which dispatches a message (mapped from the value) + /// if the minimum time interval has elapsed since the last Command execution; otherwise, it does nothing. + let throttle (interval: int) (fn: 'value -> 'msg) : 'value -> Cmd<'msg> = + let mutable lastDispatch = System.DateTime.MinValue + + // return a factory function mapping input values to "throttled" Commands that only dispatch if enough time passed + fun (value: 'value) -> + [ fun dispatch -> + let now = System.DateTime.UtcNow + + // If the interval has elapsed since the last execution, dispatch the message + if now - lastDispatch >= System.TimeSpan.FromMilliseconds(float interval) then + lastDispatch <- now + dispatch(fn value) ] + + /// + /// Creates a Command factory that dispatches the most recent message in a given interval - even if delayed. + /// This makes it similar to in that it rate-limits the message dispatch + /// and similar to in that it guarantees the last message (within the interval or in total) is dispatched. + /// Helpful for scenarios where you want to throttle, but cannot risk losing the last message to throttling + /// - like the last progress update that completes a progress. + /// Note that this function creates an object with internal state and is intended to be used per Program or longer-running background process + /// rather than once per message in the update function. + /// + /// The minimum time interval between two consecutive Command executions in milliseconds. + /// A function that maps a factory input value to a message for dispatch. + /// + /// A Command factory function that maps an input value to a Command which dispatches a message (mapped from the value), either immediately + /// or after a delay respecting the interval, while cancelling older commands if the factory produces another Command before the interval has elapsed. + /// + let bufferedThrottle (interval: int) (fn: 'value -> 'msg) : 'value -> Cmd<'msg> = + let rateLimit = System.TimeSpan.FromMilliseconds(float interval) + let funLock = obj() // ensures safe access to resources shared across different threads + let mutable lastDispatch = System.DateTime.MinValue + let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command + + // Return a factory function mapping input values to sleeper Commands with delayed dispatch of the most recent message + fun (value: 'value) -> + [ fun dispatch -> + lock funLock (fun () -> + let now = System.DateTime.UtcNow + let elapsedSinceLastDispatch = now - lastDispatch + + // If the interval has elapsed since the last dispatch, dispatch immediately + if elapsedSinceLastDispatch >= rateLimit then + dispatch(fn value) + lastDispatch <- now + else // schedule the dispatch for when the interval is up + // cancel the last sleeper Command issued earlier from this factory + if cts <> null then + cts.Cancel() + cts.Dispose() + + // make cancellation available to the factory's next Command + cts <- new CancellationTokenSource() + + // asynchronously wait for the remaining time before dispatch + Async.Start( + async { + do! Async.Sleep(rateLimit - elapsedSinceLastDispatch) + + lock funLock (fun () -> + dispatch(fn value) + lastDispatch <- System.DateTime.UtcNow + + // done; invalidate own cancellation token + if cts <> null then + cts.Dispose() + cts <- null) + }, + cts.Token + )) ] From b05010c0561aee31d2121c538653834a1b9a23bc Mon Sep 17 00:00:00 2001 From: Holger Schmidt Date: Mon, 18 Mar 2024 22:52:36 +0100 Subject: [PATCH 4/6] added Cmd.batchedThrottle --- src/Fabulous.Tests/CmdTests.fs | 65 +++++++++++++++++++++++++++++++- src/Fabulous/Cmd.fs | 69 ++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 1 deletion(-) diff --git a/src/Fabulous.Tests/CmdTests.fs b/src/Fabulous.Tests/CmdTests.fs index 62ec048e2..e522ad99c 100644 --- a/src/Fabulous.Tests/CmdTests.fs +++ b/src/Fabulous.Tests/CmdTests.fs @@ -3,7 +3,9 @@ namespace Fabulous.Tests open Fabulous open NUnit.Framework -type CmdTestsMsg = NewValue of int +type CmdTestsMsg = + | NewValue of int + | NewValues of int list module CmdTestsHelper = let execute dispatch (cmd: Cmd<'msg>) = @@ -149,3 +151,64 @@ type ``Cmd tests``() = Assert.AreEqual(2, messageCount) Assert.AreEqual(Some(NewValue 2), actualValue) } + + [] + member _.``Cmd.batchedThrottle dispatches all undispatched values on interval expiry``() = + async { + let mutable messageCount = 0 + let mutable dispatched = [] // records dispatched messages latest first + + let dispatch msg = + messageCount <- messageCount + 1 + dispatched <- msg :: dispatched + + let batchedThrottleCmd = Cmd.batchedThrottle 100 NewValues + + batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch + + do! Async.Sleep 200 // Wait longer than the throttle interval + + // All three values should have been dispatched + Assert.AreEqual(2, messageCount) + Assert.AreEqual([ NewValues [ 2; 3; 4 ]; NewValues [ 1 ] ], dispatched) + } + + [] + member _.``Cmd.batchedThrottle dispatches messages immediately if interval not expired``() = + async { + let mutable messageCount = 0 + let mutable dispatched = [] // records dispatched messages latest first + + let dispatch msg = + messageCount <- messageCount + 1 + dispatched <- msg :: dispatched + + let batchedThrottleCmd = Cmd.batchedThrottle 100 NewValues + + batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch + + // Only the first value should have been dispatched immediately + Assert.AreEqual(1, messageCount) + Assert.AreEqual([ NewValues[1] ], dispatched) + + (* Wait for longer than twice the throttle interval, + giving second value time to dispatch and elapsing time until next dispatch *) + do! Async.Sleep 210 + + batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch + + // Second value should have dispatched delayed, third immediately + Assert.AreEqual(3, messageCount) + Assert.AreEqual([ NewValues[3]; NewValues[2]; NewValues[1] ], dispatched) + + do! Async.Sleep 110 // Wait longer than the throttle interval + + // All values should have been dispatched eventually + Assert.AreEqual(4, messageCount) + Assert.AreEqual([ NewValues[4]; NewValues[3]; NewValues[2]; NewValues[1] ], dispatched) + } diff --git a/src/Fabulous/Cmd.fs b/src/Fabulous/Cmd.fs index b929ca7f5..23caa85ce 100644 --- a/src/Fabulous/Cmd.fs +++ b/src/Fabulous/Cmd.fs @@ -311,3 +311,72 @@ module Cmd = }, cts.Token )) ] + + /// + /// Creates a factory for Commands that dispatch messages with a list of pending values at a fixed maximum rate, + /// ensuring that all pending values are dispatched when the specified interval elapses. + /// This function is similar to , but instead of dispatching only the last value, + /// it remembers and dispatches all undispatched values within the specified interval. + /// Helpful for scenarios where you want to throttle messages but cannot afford to lose any of the values they carry, + /// ensuring all values are processed at a controlled rate. + /// Note that this function creates an object with internal state and is intended to be used per Program + /// or longer-running background process rather than once per message in the update function. + /// + /// The minimum time interval between two consecutive Command executions in milliseconds. + /// A function that maps a list of factory input values to a message for dispatch. + /// + /// A Command factory function that maps a list of input values to a Command which dispatches a message (mapped from the pending values), + /// either immediately or after a delay respecting the interval, while remembering and dispatching all remembered values + /// when the interval has elapsed, ensuring no values are lost. + /// + let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : 'value -> Cmd<'msg> = + let rateLimit = System.TimeSpan.FromMilliseconds(float interval) + let funLock = obj() // ensures safe access to resources shared across different threads + let mutable lastDispatch = System.DateTime.MinValue + let mutable pendingValues: 'value list = [] + let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command + + // dispatches all pendingValues and resets them while updating lastDispatch + let dispatchBatch (dispatch: 'msg -> unit) = + // Dispatch in the order they were received + pendingValues |> List.rev |> mapValuesToMsg |> dispatch + + lastDispatch <- System.DateTime.UtcNow + pendingValues <- [] + + // Return a factory function mapping input values to sleeping Commands dispatching all pending messages + fun (value: 'value) -> + [ fun dispatch -> + lock funLock (fun () -> + let now = System.DateTime.UtcNow + let elapsedSinceLastDispatch = now - lastDispatch + pendingValues <- value :: pendingValues + + // If the interval has elapsed since the last dispatch, dispatch all pending messages + if elapsedSinceLastDispatch >= rateLimit then + dispatchBatch dispatch + else // schedule dispatch + + // if the the last sleeping dispatch can still be cancelled, do so + if cts <> null then + cts.Cancel() + cts.Dispose() + + // used to enable cancelling this dispatch if newer values come into the factory + cts <- new CancellationTokenSource() + + Async.Start( + async { + // wait only as long as we have to before next dispatch + do! Async.Sleep(rateLimit - elapsedSinceLastDispatch) + + lock funLock (fun () -> + dispatchBatch dispatch + + // done; invalidate own cancellation + if cts <> null then + cts.Dispose() + cts <- null) + }, + cts.Token + )) ] From 0cc2a6a0658206522fe8da551aa54398d1826957 Mon Sep 17 00:00:00 2001 From: Holger Schmidt Date: Thu, 21 Mar 2024 03:42:18 +0100 Subject: [PATCH 5/6] Proposal: returning a second function from batchedThrottle allowing to await the next dispatch Should debounce and bufferedThrottle follow the same API? --- src/Fabulous.Tests/CmdTests.fs | 30 +++++++++- src/Fabulous/Cmd.fs | 100 ++++++++++++++++++++------------- 2 files changed, 90 insertions(+), 40 deletions(-) diff --git a/src/Fabulous.Tests/CmdTests.fs b/src/Fabulous.Tests/CmdTests.fs index e522ad99c..6d026bd87 100644 --- a/src/Fabulous.Tests/CmdTests.fs +++ b/src/Fabulous.Tests/CmdTests.fs @@ -162,7 +162,7 @@ type ``Cmd tests``() = messageCount <- messageCount + 1 dispatched <- msg :: dispatched - let batchedThrottleCmd = Cmd.batchedThrottle 100 NewValues + let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch @@ -186,7 +186,7 @@ type ``Cmd tests``() = messageCount <- messageCount + 1 dispatched <- msg :: dispatched - let batchedThrottleCmd = Cmd.batchedThrottle 100 NewValues + let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch @@ -212,3 +212,29 @@ type ``Cmd tests``() = Assert.AreEqual(4, messageCount) Assert.AreEqual([ NewValues[4]; NewValues[3]; NewValues[2]; NewValues[1] ], dispatched) } + + [] + member _.``Cmd.batchedThrottle factory can be awaited for completion``() = + async { + let mutable messageCount = 0 + let mutable dispatched = [] // records dispatched messages latest first + + let dispatch msg = + messageCount <- messageCount + 1 + dispatched <- msg :: dispatched + + let createCmd, awaitNextDispatch = Cmd.batchedThrottle 100 NewValues + + createCmd 1 |> CmdTestsHelper.execute dispatch + createCmd 2 |> CmdTestsHelper.execute dispatch + + // Only the first value should have been dispatched immediately + Assert.AreEqual(1, messageCount) + Assert.AreEqual([ NewValues[1] ], dispatched) + + do! awaitNextDispatch None // only waits until next dispatch + + // All values should have been dispatched after waiting + Assert.AreEqual(2, messageCount) + Assert.AreEqual([ NewValues[2]; NewValues[1] ], dispatched) + } diff --git a/src/Fabulous/Cmd.fs b/src/Fabulous/Cmd.fs index 23caa85ce..8c41c23ac 100644 --- a/src/Fabulous/Cmd.fs +++ b/src/Fabulous/Cmd.fs @@ -325,17 +325,23 @@ module Cmd = /// The minimum time interval between two consecutive Command executions in milliseconds. /// A function that maps a list of factory input values to a message for dispatch. /// - /// A Command factory function that maps a list of input values to a Command which dispatches a message (mapped from the pending values), + /// Two methods - the first being a Command factory function that maps a list of input values to a Command + /// which dispatches a message (mapped from the pending values), /// either immediately or after a delay respecting the interval, while remembering and dispatching all remembered values /// when the interval has elapsed, ensuring no values are lost. + /// The second can be used for awaiting the next dispatch from the outside while adding some buffer time. /// - let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : 'value -> Cmd<'msg> = + let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : ('value -> Cmd<'msg>) * (System.TimeSpan option -> Async) = let rateLimit = System.TimeSpan.FromMilliseconds(float interval) let funLock = obj() // ensures safe access to resources shared across different threads let mutable lastDispatch = System.DateTime.MinValue let mutable pendingValues: 'value list = [] let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command + // gets the time to wait until the next allowed dispatch returning a negative timespan if the time is up + let getTimeUntilNextDispatch () = + lastDispatch.Add(rateLimit) - System.DateTime.UtcNow + // dispatches all pendingValues and resets them while updating lastDispatch let dispatchBatch (dispatch: 'msg -> unit) = // Dispatch in the order they were received @@ -344,39 +350,57 @@ module Cmd = lastDispatch <- System.DateTime.UtcNow pendingValues <- [] - // Return a factory function mapping input values to sleeping Commands dispatching all pending messages - fun (value: 'value) -> - [ fun dispatch -> - lock funLock (fun () -> - let now = System.DateTime.UtcNow - let elapsedSinceLastDispatch = now - lastDispatch - pendingValues <- value :: pendingValues - - // If the interval has elapsed since the last dispatch, dispatch all pending messages - if elapsedSinceLastDispatch >= rateLimit then - dispatchBatch dispatch - else // schedule dispatch - - // if the the last sleeping dispatch can still be cancelled, do so - if cts <> null then - cts.Cancel() - cts.Dispose() - - // used to enable cancelling this dispatch if newer values come into the factory - cts <- new CancellationTokenSource() - - Async.Start( - async { - // wait only as long as we have to before next dispatch - do! Async.Sleep(rateLimit - elapsedSinceLastDispatch) - - lock funLock (fun () -> - dispatchBatch dispatch - - // done; invalidate own cancellation - if cts <> null then - cts.Dispose() - cts <- null) - }, - cts.Token - )) ] + // a factory function mapping input values to sleeping Commands dispatching all pending messages + let factory = + fun (value: 'value) -> + [ fun dispatch -> + lock funLock (fun () -> + let untilNextDispatch = getTimeUntilNextDispatch() + pendingValues <- value :: pendingValues + + // If the interval has elapsed since the last dispatch, dispatch all pending messages + if untilNextDispatch <= System.TimeSpan.Zero then + dispatchBatch dispatch + else // schedule dispatch + + // if the the last sleeping dispatch can still be cancelled, do so + if cts <> null then + cts.Cancel() + cts.Dispose() + + // used to enable cancelling this dispatch if newer values come into the factory + cts <- new CancellationTokenSource() + + Async.Start( + async { + // wait only as long as we have to before next dispatch + do! Async.Sleep(untilNextDispatch) + + lock funLock (fun () -> + dispatchBatch dispatch + + // done; invalidate own cancellation + if cts <> null then + cts.Dispose() + cts <- null) + }, + cts.Token + )) ] + + // a function to wait until after the next async dispatch + some buffer time to ensure the dispatch is complete + let awaitNextDispatch buffer = + lock funLock (fun () -> + async { + if not pendingValues.IsEmpty then + let untilAfterNextDispatch = + getTimeUntilNextDispatch() + + match buffer with + | Some value -> value + | None -> System.TimeSpan.Zero + + if untilAfterNextDispatch > System.TimeSpan.Zero then + do! Async.Sleep(untilAfterNextDispatch) + }) + + // return both the factory and the await helper + factory, awaitNextDispatch From 63f739af0cd1b1ed4c78e329ce251c26890196a5 Mon Sep 17 00:00:00 2001 From: Holger Schmidt Date: Thu, 11 Apr 2024 00:16:05 +0200 Subject: [PATCH 6/6] rewrote Cmd.batchedThrottle to Dispatch.batchThrottled extension because it feels more natural to use it that way with a dispatch inside an ofEffect that produces values rapidly --- src/Fabulous.Tests/CmdTests.fs | 32 +++++----- src/Fabulous/Cmd.fs | 111 +++++++++++++++++---------------- 2 files changed, 72 insertions(+), 71 deletions(-) diff --git a/src/Fabulous.Tests/CmdTests.fs b/src/Fabulous.Tests/CmdTests.fs index 6d026bd87..f3aed2229 100644 --- a/src/Fabulous.Tests/CmdTests.fs +++ b/src/Fabulous.Tests/CmdTests.fs @@ -153,7 +153,7 @@ type ``Cmd tests``() = } [] - member _.``Cmd.batchedThrottle dispatches all undispatched values on interval expiry``() = + member _.``Dispatch.batchThrottled dispatches all undispatched values on interval expiry``() = async { let mutable messageCount = 0 let mutable dispatched = [] // records dispatched messages latest first @@ -162,12 +162,12 @@ type ``Cmd tests``() = messageCount <- messageCount + 1 dispatched <- msg :: dispatched - let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues + let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues) - batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 1 + batchedThrottleCmd 2 + batchedThrottleCmd 3 + batchedThrottleCmd 4 do! Async.Sleep 200 // Wait longer than the throttle interval @@ -177,7 +177,7 @@ type ``Cmd tests``() = } [] - member _.``Cmd.batchedThrottle dispatches messages immediately if interval not expired``() = + member _.``Dispatch.batchThrottled dispatches messages immediately if interval not expired``() = async { let mutable messageCount = 0 let mutable dispatched = [] // records dispatched messages latest first @@ -186,10 +186,10 @@ type ``Cmd tests``() = messageCount <- messageCount + 1 dispatched <- msg :: dispatched - let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues + let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues) - batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 1 + batchedThrottleCmd 2 // Only the first value should have been dispatched immediately Assert.AreEqual(1, messageCount) @@ -199,8 +199,8 @@ type ``Cmd tests``() = giving second value time to dispatch and elapsing time until next dispatch *) do! Async.Sleep 210 - batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 3 + batchedThrottleCmd 4 // Second value should have dispatched delayed, third immediately Assert.AreEqual(3, messageCount) @@ -214,7 +214,7 @@ type ``Cmd tests``() = } [] - member _.``Cmd.batchedThrottle factory can be awaited for completion``() = + member _.``Dispatch.batchThrottled factory can be awaited for completion``() = async { let mutable messageCount = 0 let mutable dispatched = [] // records dispatched messages latest first @@ -223,10 +223,10 @@ type ``Cmd tests``() = messageCount <- messageCount + 1 dispatched <- msg :: dispatched - let createCmd, awaitNextDispatch = Cmd.batchedThrottle 100 NewValues + let createCmd, awaitNextDispatch = dispatch.batchThrottled(100, NewValues) - createCmd 1 |> CmdTestsHelper.execute dispatch - createCmd 2 |> CmdTestsHelper.execute dispatch + createCmd 1 + createCmd 2 // Only the first value should have been dispatched immediately Assert.AreEqual(1, messageCount) diff --git a/src/Fabulous/Cmd.fs b/src/Fabulous/Cmd.fs index 8c41c23ac..cc83a4eb7 100644 --- a/src/Fabulous/Cmd.fs +++ b/src/Fabulous/Cmd.fs @@ -1,5 +1,6 @@ namespace Fabulous +open System.Runtime.CompilerServices open System.Threading open System.Threading.Tasks @@ -312,27 +313,28 @@ module Cmd = cts.Token )) ] +type DispatchExtensions = + /// - /// Creates a factory for Commands that dispatch messages with a list of pending values at a fixed maximum rate, - /// ensuring that all pending values are dispatched when the specified interval elapses. - /// This function is similar to , but instead of dispatching only the last value, - /// it remembers and dispatches all undispatched values within the specified interval. - /// Helpful for scenarios where you want to throttle messages but cannot afford to lose any of the values they carry, - /// ensuring all values are processed at a controlled rate. + /// Creates a throttled dispatch factory that dispatches values in batches at a fixed minimum interval/maximum rate + /// while ensuring that all values are dispatched eventually. + /// This helps throttle the message dispatch of a rapid producer to avoid overloading the MVU loop + /// without dropping any of the carried values - ensuring all values are processed in batches at a controlled rate. /// Note that this function creates an object with internal state and is intended to be used per Program /// or longer-running background process rather than once per message in the update function. /// - /// The minimum time interval between two consecutive Command executions in milliseconds. - /// A function that maps a list of factory input values to a message for dispatch. + /// The minimum time interval between two consecutive dispatches in milliseconds. + /// A function that maps a list of pending input values to a message for dispatch. /// - /// Two methods - the first being a Command factory function that maps a list of input values to a Command - /// which dispatches a message (mapped from the pending values), - /// either immediately or after a delay respecting the interval, while remembering and dispatching all remembered values - /// when the interval has elapsed, ensuring no values are lost. - /// The second can be used for awaiting the next dispatch from the outside while adding some buffer time. + /// Two functions. The first has a Dispatch signature and is used to feed a single value into the factory, + /// where it is either dispatched immediately or after a delay respecting the interval, + /// batched with other pending values in the order they were fed in. + /// The second can be used for awaiting the next dispatch from the outside + /// - while optionally adding some buffer time (in milliseconds) to account for race condiditions. /// - let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : ('value -> Cmd<'msg>) * (System.TimeSpan option -> Async) = - let rateLimit = System.TimeSpan.FromMilliseconds(float interval) + [] + static member batchThrottled((dispatch: Dispatch<'msg>), interval, (mapBatchToMsg: 'value list -> 'msg)) = + let rateLimit = System.TimeSpan.FromMilliseconds(interval) let funLock = obj() // ensures safe access to resources shared across different threads let mutable lastDispatch = System.DateTime.MinValue let mutable pendingValues: 'value list = [] @@ -343,49 +345,48 @@ module Cmd = lastDispatch.Add(rateLimit) - System.DateTime.UtcNow // dispatches all pendingValues and resets them while updating lastDispatch - let dispatchBatch (dispatch: 'msg -> unit) = + let dispatchBatch () = // Dispatch in the order they were received - pendingValues |> List.rev |> mapValuesToMsg |> dispatch + pendingValues |> List.rev |> mapBatchToMsg |> dispatch lastDispatch <- System.DateTime.UtcNow pendingValues <- [] - // a factory function mapping input values to sleeping Commands dispatching all pending messages - let factory = + // a function with the Dispatch signature for feeding a single value into the throttled batch factory + let dispatchSingle = fun (value: 'value) -> - [ fun dispatch -> - lock funLock (fun () -> - let untilNextDispatch = getTimeUntilNextDispatch() - pendingValues <- value :: pendingValues - - // If the interval has elapsed since the last dispatch, dispatch all pending messages - if untilNextDispatch <= System.TimeSpan.Zero then - dispatchBatch dispatch - else // schedule dispatch - - // if the the last sleeping dispatch can still be cancelled, do so - if cts <> null then - cts.Cancel() - cts.Dispose() - - // used to enable cancelling this dispatch if newer values come into the factory - cts <- new CancellationTokenSource() - - Async.Start( - async { - // wait only as long as we have to before next dispatch - do! Async.Sleep(untilNextDispatch) - - lock funLock (fun () -> - dispatchBatch dispatch - - // done; invalidate own cancellation - if cts <> null then - cts.Dispose() - cts <- null) - }, - cts.Token - )) ] + lock funLock (fun () -> + let untilNextDispatch = getTimeUntilNextDispatch() + pendingValues <- value :: pendingValues + + // If the interval has elapsed since the last dispatch, dispatch all pending messages + if untilNextDispatch <= System.TimeSpan.Zero then + dispatchBatch() + else // schedule dispatch + + // if the the last sleeping dispatch can still be cancelled, do so + if cts <> null then + cts.Cancel() + cts.Dispose() + + // used to enable cancelling this dispatch if newer values come into the factory + cts <- new CancellationTokenSource() + + Async.Start( + async { + // wait only as long as we have to before next dispatch + do! Async.Sleep(untilNextDispatch) + + lock funLock (fun () -> + dispatchBatch() + + // done; invalidate own cancellation + if cts <> null then + cts.Dispose() + cts <- null) + }, + cts.Token + )) // a function to wait until after the next async dispatch + some buffer time to ensure the dispatch is complete let awaitNextDispatch buffer = @@ -395,12 +396,12 @@ module Cmd = let untilAfterNextDispatch = getTimeUntilNextDispatch() + match buffer with - | Some value -> value + | Some value -> System.TimeSpan.FromMilliseconds(value) | None -> System.TimeSpan.Zero if untilAfterNextDispatch > System.TimeSpan.Zero then do! Async.Sleep(untilAfterNextDispatch) }) - // return both the factory and the await helper - factory, awaitNextDispatch + // return both the dispatch and the await helper + dispatchSingle, awaitNextDispatch