diff --git a/src/fable-library-beam/fable_async.erl b/src/fable-library-beam/fable_async.erl index 396b49c48..e43e32552 100644 --- a/src/fable-library-beam/fable_async.erl +++ b/src/fable-library-beam/fable_async.erl @@ -4,6 +4,7 @@ run_synchronously/1, run_synchronously/2, start_with_continuations/4, start_with_continuations/5, sleep/1, + await_event/1, await_event/2, parallel/1, sequential/1, catch_async/1, @@ -137,6 +138,35 @@ sleep(Milliseconds) -> end end. +%% AwaitEvent: subscribe once; complete async when event fires. +await_event(Event) -> await_event(Event, undefined). +await_event(Event, CancelAction) -> + fun(Ctx) -> + OnSuccess = maps:get(on_success, Ctx), + OnCancel = maps:get(on_cancel, Ctx), + Token = maps:get(cancel_token, Ctx), + + Handler = fun F(_Sender, Value) -> + (maps:get(remove_handler, Event))(F), + OnSuccess(Value) + end, + + case Token of + undefined -> ok; + _ -> + fable_cancellation:register(Token, fun(_) -> + (maps:get(remove_handler, Event))(Handler), + case CancelAction of + undefined -> ok; + _ -> CancelAction(ok) + end, + OnCancel(ok) + end) + end, + + (maps:get(add_handler, Event))(Handler) + end. + %% Parallel: spawn one process per computation, collect results in order parallel(Computations) when is_reference(Computations) -> parallel(get(Computations)); diff --git a/src/fable-library-py/fable_library/async_.py b/src/fable-library-py/fable_library/async_.py index 7e6437f36..1b72efd6e 100644 --- a/src/fable-library-py/fable_library/async_.py +++ b/src/fable-library-py/fable_library/async_.py @@ -30,6 +30,7 @@ Choice_makeChoice2Of2, FSharpChoice_2, ) +from .event import IEvent_2 from .protocols import IEnumerable_1 from .task import TaskCompletionSource from .time_span import TimeSpan, to_milliseconds @@ -95,6 +96,27 @@ def timeout(): return protected_cont(cont) +def await_event[T](event: IEvent_2[Any, T], cancel_action: Callable[[], None] | None = None) -> Async[T]: + def cont(ctx: IAsyncContext[T]) -> None: + token_id: list[int] = [0] + + def handler(_sender: Any, arg: T) -> None: + ctx.cancel_token.remove_listener(token_id[0]) + event.RemoveHandler(handler) + ctx.on_success(arg) + + def cancel() -> None: + event.RemoveHandler(handler) + if cancel_action is not None: + cancel_action() + ctx.on_cancel(OperationCanceledError()) + + token_id[0] = ctx.cancel_token.add_listener(cancel) + event.AddHandler(handler) + + return protected_cont(cont) + + def ignore(computation: Async[Any]) -> Async[None]: def binder(_: Any | Unit = UNIT) -> Async[None]: return protected_return(None) @@ -362,6 +384,7 @@ def exception_handler(loop: asyncio.AbstractEventLoop, context: dict[str, Any]) __all__ = [ + "await_event", "await_task", "cancel", "cancel_after", diff --git a/src/fable-library-ts/Async.ts b/src/fable-library-ts/Async.ts index ffb67cc81..1c40f4c9c 100644 --- a/src/fable-library-ts/Async.ts +++ b/src/fable-library-ts/Async.ts @@ -2,6 +2,7 @@ import { OperationCanceledException, Trampoline } from "./AsyncBuilder.ts"; import { Continuation, Continuations } from "./AsyncBuilder.ts"; import { Async, IAsyncContext, CancellationToken } from "./AsyncBuilder.ts"; import { protectedCont, protectedBind, protectedReturn } from "./AsyncBuilder.ts"; +import type { IEvent$2 } from "./Event.ts"; import { FSharpChoice$2_$union, Choice_makeChoice1Of2, Choice_makeChoice2Of2 } from "./Choice.ts"; import { TimeoutException_$ctor } from "./System.ts"; import { Exception } from "./Util.ts"; @@ -86,6 +87,23 @@ export function awaitPromise(p: Promise) { ? conts[2] : conts[1])(err))); } +export function awaitEvent(event: IEvent$2, cancelAction?: () => void): Async { + return protectedCont((ctx: IAsyncContext) => { + let tokenId: number; + const handler = ((_sender: unknown, arg: T) => { + ctx.cancelToken.removeListener(tokenId); + event.RemoveHandler(handler as unknown as Del); + ctx.onSuccess(arg); + }) as unknown as Del; + tokenId = ctx.cancelToken.addListener(() => { + event.RemoveHandler(handler); + if (cancelAction != null) { cancelAction(); } + ctx.onCancel(new OperationCanceledException()); + }); + event.AddHandler(handler); + }); +} + export function cancellationToken() { return protectedCont((ctx: IAsyncContext) => ctx.onSuccess(ctx.cancelToken)); } diff --git a/tests/Beam/AsyncTests.fs b/tests/Beam/AsyncTests.fs index a897f2ec2..4177220ce 100644 --- a/tests/Beam/AsyncTests.fs +++ b/tests/Beam/AsyncTests.fs @@ -395,4 +395,25 @@ let ``test Can use custom exceptions in async workflows`` () = equal 7 res } |> Async.RunSynchronously +[] +let ``test Async.AwaitEvent fires continuation when event is triggered`` () = + let ev = Event() + Async.StartImmediate(async { + let! v = Async.AwaitEvent ev.Publish + equal 42 v + }) + ev.Trigger(42) + +[] +let ``test Async.AwaitEvent with cancelAction invokes it on cancellation`` () = + let ev = Event() + let cts = new System.Threading.CancellationTokenSource() + let mutable cancelCalled = false + Async.StartImmediate(async { + let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true) + () + }, cts.Token) + cts.Cancel() + equal true cancelCalled + #endif diff --git a/tests/Js/Main/AsyncTests.fs b/tests/Js/Main/AsyncTests.fs index d80f8dc28..d94f726c3 100644 --- a/tests/Js/Main/AsyncTests.fs +++ b/tests/Js/Main/AsyncTests.fs @@ -629,4 +629,24 @@ let tests = let! res = parentWorkflow() equal 7 res } + + testCaseAsync "Async.AwaitEvent fires continuation when event is triggered" <| fun () -> + let ev = Event() + Async.StartImmediate(async { + let! v = Async.AwaitEvent ev.Publish + equal 42 v + }) + ev.Trigger(42) + async.Return() + + testCaseAsync "Async.AwaitEvent with cancelAction invokes it on cancellation" <| fun () -> + let ev = Event() + let cts = new System.Threading.CancellationTokenSource() + let mutable cancelCalled = false + Async.StartImmediate(async { + let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true) + () + }, cts.Token) + cts.Cancel() + async { equal true cancelCalled } ] diff --git a/tests/Python/TestAsync.fs b/tests/Python/TestAsync.fs index 36c753cb5..6db0fdf8c 100644 --- a/tests/Python/TestAsync.fs +++ b/tests/Python/TestAsync.fs @@ -565,3 +565,24 @@ let ``test Async.Sleep works correctly with TimeSpan argument`` () = let isReasonableTime = elapsedMs >= 150.0 && elapsedMs <= 500.0 equal true isReasonableTime } |> Async.StartImmediate + +[] +let ``test Async.AwaitEvent fires continuation when event is triggered`` () = + let ev = Event() + Async.StartImmediate(async { + let! v = Async.AwaitEvent ev.Publish + equal 42 v + }) + ev.Trigger(42) + +[] +let ``test Async.AwaitEvent with cancelAction invokes it on cancellation`` () = + let ev = Event() + let cts = new System.Threading.CancellationTokenSource() + let mutable cancelCalled = false + Async.StartImmediate(async { + let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true) + () + }, cts.Token) + cts.Cancel() + equal true cancelCalled