Skip to content

Commit 71c9817

Browse files
authored
feat(js/ts/python/beam): add support for Async.AwaitEvent (#4693)
1 parent ddda51a commit 71c9817

6 files changed

Lines changed: 133 additions & 0 deletions

File tree

src/fable-library-beam/fable_async.erl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
run_synchronously/1, run_synchronously/2,
55
start_with_continuations/4, start_with_continuations/5,
66
sleep/1,
7+
await_event/1, await_event/2,
78
parallel/1,
89
sequential/1,
910
catch_async/1,
@@ -137,6 +138,35 @@ sleep(Milliseconds) ->
137138
end
138139
end.
139140

141+
%% AwaitEvent: subscribe once; complete async when event fires.
142+
await_event(Event) -> await_event(Event, undefined).
143+
await_event(Event, CancelAction) ->
144+
fun(Ctx) ->
145+
OnSuccess = maps:get(on_success, Ctx),
146+
OnCancel = maps:get(on_cancel, Ctx),
147+
Token = maps:get(cancel_token, Ctx),
148+
149+
Handler = fun F(_Sender, Value) ->
150+
(maps:get(remove_handler, Event))(F),
151+
OnSuccess(Value)
152+
end,
153+
154+
case Token of
155+
undefined -> ok;
156+
_ ->
157+
fable_cancellation:register(Token, fun(_) ->
158+
(maps:get(remove_handler, Event))(Handler),
159+
case CancelAction of
160+
undefined -> ok;
161+
_ -> CancelAction(ok)
162+
end,
163+
OnCancel(ok)
164+
end)
165+
end,
166+
167+
(maps:get(add_handler, Event))(Handler)
168+
end.
169+
140170
%% Parallel: spawn one process per computation, collect results in order
141171
parallel(Computations) when is_reference(Computations) ->
142172
parallel(get(Computations));

src/fable-library-py/fable_library/async_.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
Choice_makeChoice2Of2,
3131
FSharpChoice_2,
3232
)
33+
from .event import IEvent_2
3334
from .protocols import IEnumerable_1
3435
from .task import TaskCompletionSource
3536
from .time_span import TimeSpan, to_milliseconds
@@ -95,6 +96,27 @@ def timeout():
9596
return protected_cont(cont)
9697

9798

99+
def await_event[T](event: IEvent_2[Any, T], cancel_action: Callable[[], None] | None = None) -> Async[T]:
100+
def cont(ctx: IAsyncContext[T]) -> None:
101+
token_id: list[int] = [0]
102+
103+
def handler(_sender: Any, arg: T) -> None:
104+
ctx.cancel_token.remove_listener(token_id[0])
105+
event.RemoveHandler(handler)
106+
ctx.on_success(arg)
107+
108+
def cancel() -> None:
109+
event.RemoveHandler(handler)
110+
if cancel_action is not None:
111+
cancel_action()
112+
ctx.on_cancel(OperationCanceledError())
113+
114+
token_id[0] = ctx.cancel_token.add_listener(cancel)
115+
event.AddHandler(handler)
116+
117+
return protected_cont(cont)
118+
119+
98120
def ignore(computation: Async[Any]) -> Async[None]:
99121
def binder(_: Any | Unit = UNIT) -> Async[None]:
100122
return protected_return(None)
@@ -362,6 +384,7 @@ def exception_handler(loop: asyncio.AbstractEventLoop, context: dict[str, Any])
362384

363385

364386
__all__ = [
387+
"await_event",
365388
"await_task",
366389
"cancel",
367390
"cancel_after",

src/fable-library-ts/Async.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { OperationCanceledException, Trampoline } from "./AsyncBuilder.ts";
22
import { Continuation, Continuations } from "./AsyncBuilder.ts";
33
import { Async, IAsyncContext, CancellationToken } from "./AsyncBuilder.ts";
44
import { protectedCont, protectedBind, protectedReturn } from "./AsyncBuilder.ts";
5+
import type { IEvent$2 } from "./Event.ts";
56
import { FSharpChoice$2_$union, Choice_makeChoice1Of2, Choice_makeChoice2Of2 } from "./Choice.ts";
67
import { TimeoutException_$ctor } from "./System.ts";
78
import { Exception } from "./Util.ts";
@@ -86,6 +87,23 @@ export function awaitPromise<T>(p: Promise<T>) {
8687
? conts[2] : conts[1])(err)));
8788
}
8889

90+
export function awaitEvent<Del extends Function, T>(event: IEvent$2<Del, T>, cancelAction?: () => void): Async<T> {
91+
return protectedCont((ctx: IAsyncContext<T>) => {
92+
let tokenId: number;
93+
const handler = ((_sender: unknown, arg: T) => {
94+
ctx.cancelToken.removeListener(tokenId);
95+
event.RemoveHandler(handler as unknown as Del);
96+
ctx.onSuccess(arg);
97+
}) as unknown as Del;
98+
tokenId = ctx.cancelToken.addListener(() => {
99+
event.RemoveHandler(handler);
100+
if (cancelAction != null) { cancelAction(); }
101+
ctx.onCancel(new OperationCanceledException());
102+
});
103+
event.AddHandler(handler);
104+
});
105+
}
106+
89107
export function cancellationToken() {
90108
return protectedCont((ctx: IAsyncContext<CancellationToken>) => ctx.onSuccess(ctx.cancelToken));
91109
}

tests/Beam/AsyncTests.fs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,4 +395,25 @@ let ``test Can use custom exceptions in async workflows`` () =
395395
equal 7 res
396396
} |> Async.RunSynchronously
397397

398+
[<Fact>]
399+
let ``test Async.AwaitEvent fires continuation when event is triggered`` () =
400+
let ev = Event<int>()
401+
Async.StartImmediate(async {
402+
let! v = Async.AwaitEvent ev.Publish
403+
equal 42 v
404+
})
405+
ev.Trigger(42)
406+
407+
[<Fact>]
408+
let ``test Async.AwaitEvent with cancelAction invokes it on cancellation`` () =
409+
let ev = Event<int>()
410+
let cts = new System.Threading.CancellationTokenSource()
411+
let mutable cancelCalled = false
412+
Async.StartImmediate(async {
413+
let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true)
414+
()
415+
}, cts.Token)
416+
cts.Cancel()
417+
equal true cancelCalled
418+
398419
#endif

tests/Js/Main/AsyncTests.fs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,4 +629,24 @@ let tests =
629629
let! res = parentWorkflow()
630630
equal 7 res
631631
}
632+
633+
testCaseAsync "Async.AwaitEvent fires continuation when event is triggered" <| fun () ->
634+
let ev = Event<int>()
635+
Async.StartImmediate(async {
636+
let! v = Async.AwaitEvent ev.Publish
637+
equal 42 v
638+
})
639+
ev.Trigger(42)
640+
async.Return()
641+
642+
testCaseAsync "Async.AwaitEvent with cancelAction invokes it on cancellation" <| fun () ->
643+
let ev = Event<int>()
644+
let cts = new System.Threading.CancellationTokenSource()
645+
let mutable cancelCalled = false
646+
Async.StartImmediate(async {
647+
let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true)
648+
()
649+
}, cts.Token)
650+
cts.Cancel()
651+
async { equal true cancelCalled }
632652
]

tests/Python/TestAsync.fs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,3 +565,24 @@ let ``test Async.Sleep works correctly with TimeSpan argument`` () =
565565
let isReasonableTime = elapsedMs >= 150.0 && elapsedMs <= 500.0
566566
equal true isReasonableTime
567567
} |> Async.StartImmediate
568+
569+
[<Fact>]
570+
let ``test Async.AwaitEvent fires continuation when event is triggered`` () =
571+
let ev = Event<int>()
572+
Async.StartImmediate(async {
573+
let! v = Async.AwaitEvent ev.Publish
574+
equal 42 v
575+
})
576+
ev.Trigger(42)
577+
578+
[<Fact>]
579+
let ``test Async.AwaitEvent with cancelAction invokes it on cancellation`` () =
580+
let ev = Event<int>()
581+
let cts = new System.Threading.CancellationTokenSource()
582+
let mutable cancelCalled = false
583+
Async.StartImmediate(async {
584+
let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true)
585+
()
586+
}, cts.Token)
587+
cts.Cancel()
588+
equal true cancelCalled

0 commit comments

Comments
 (0)