Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/fable-library-beam/fable_async.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
run_synchronously/1, run_synchronously/2,
start_with_continuations/4, start_with_continuations/5,
sleep/1,
await_event/1, await_event/2,
parallel/1,
sequential/1,
catch_async/1,
Expand Down Expand Up @@ -137,6 +138,35 @@ sleep(Milliseconds) ->
end
end.

%% AwaitEvent: subscribe once; complete async when event fires.
await_event(Event) -> await_event(Event, undefined).
await_event(Event, CancelAction) ->
fun(Ctx) ->
OnSuccess = maps:get(on_success, Ctx),
OnCancel = maps:get(on_cancel, Ctx),
Token = maps:get(cancel_token, Ctx),

Handler = fun F(_Sender, Value) ->
(maps:get(remove_handler, Event))(F),
OnSuccess(Value)
end,

case Token of
undefined -> ok;
_ ->
fable_cancellation:register(Token, fun(_) ->
(maps:get(remove_handler, Event))(Handler),
case CancelAction of
undefined -> ok;
_ -> CancelAction(ok)
end,
OnCancel(ok)
end)
end,

(maps:get(add_handler, Event))(Handler)
end.

%% Parallel: spawn one process per computation, collect results in order
parallel(Computations) when is_reference(Computations) ->
parallel(get(Computations));
Expand Down
23 changes: 23 additions & 0 deletions src/fable-library-py/fable_library/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Choice_makeChoice2Of2,
FSharpChoice_2,
)
from .event import IEvent_2
from .protocols import IEnumerable_1
from .task import TaskCompletionSource
from .time_span import TimeSpan, to_milliseconds
Expand Down Expand Up @@ -95,6 +96,27 @@ def timeout():
return protected_cont(cont)


def await_event[T](event: IEvent_2[Any, T], cancel_action: Callable[[], None] | None = None) -> Async[T]:
def cont(ctx: IAsyncContext[T]) -> None:
token_id: list[int] = [0]

def handler(_sender: Any, arg: T) -> None:
ctx.cancel_token.remove_listener(token_id[0])
event.RemoveHandler(handler)
ctx.on_success(arg)

def cancel() -> None:
event.RemoveHandler(handler)
if cancel_action is not None:
cancel_action()
ctx.on_cancel(OperationCanceledError())

token_id[0] = ctx.cancel_token.add_listener(cancel)
event.AddHandler(handler)

return protected_cont(cont)


def ignore(computation: Async[Any]) -> Async[None]:
def binder(_: Any | Unit = UNIT) -> Async[None]:
return protected_return(None)
Expand Down Expand Up @@ -362,6 +384,7 @@ def exception_handler(loop: asyncio.AbstractEventLoop, context: dict[str, Any])


__all__ = [
"await_event",
"await_task",
"cancel",
"cancel_after",
Expand Down
18 changes: 18 additions & 0 deletions src/fable-library-ts/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { OperationCanceledException, Trampoline } from "./AsyncBuilder.ts";
import { Continuation, Continuations } from "./AsyncBuilder.ts";
import { Async, IAsyncContext, CancellationToken } from "./AsyncBuilder.ts";
import { protectedCont, protectedBind, protectedReturn } from "./AsyncBuilder.ts";
import type { IEvent$2 } from "./Event.ts";
import { FSharpChoice$2_$union, Choice_makeChoice1Of2, Choice_makeChoice2Of2 } from "./Choice.ts";
import { TimeoutException_$ctor } from "./System.ts";
import { Exception } from "./Util.ts";
Expand Down Expand Up @@ -86,6 +87,23 @@ export function awaitPromise<T>(p: Promise<T>) {
? conts[2] : conts[1])(err)));
}

export function awaitEvent<Del extends Function, T>(event: IEvent$2<Del, T>, cancelAction?: () => void): Async<T> {
return protectedCont((ctx: IAsyncContext<T>) => {
let tokenId: number;
const handler = ((_sender: unknown, arg: T) => {
ctx.cancelToken.removeListener(tokenId);
event.RemoveHandler(handler as unknown as Del);
ctx.onSuccess(arg);
}) as unknown as Del;
tokenId = ctx.cancelToken.addListener(() => {
event.RemoveHandler(handler);
if (cancelAction != null) { cancelAction(); }
ctx.onCancel(new OperationCanceledException());
});
event.AddHandler(handler);
});
}

export function cancellationToken() {
return protectedCont((ctx: IAsyncContext<CancellationToken>) => ctx.onSuccess(ctx.cancelToken));
}
Expand Down
21 changes: 21 additions & 0 deletions tests/Beam/AsyncTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,25 @@ let ``test Can use custom exceptions in async workflows`` () =
equal 7 res
} |> Async.RunSynchronously

[<Fact>]
let ``test Async.AwaitEvent fires continuation when event is triggered`` () =
let ev = Event<int>()
Async.StartImmediate(async {
let! v = Async.AwaitEvent ev.Publish
equal 42 v
})
ev.Trigger(42)

[<Fact>]
let ``test Async.AwaitEvent with cancelAction invokes it on cancellation`` () =
let ev = Event<int>()
let cts = new System.Threading.CancellationTokenSource()
let mutable cancelCalled = false
Async.StartImmediate(async {
let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true)
()
}, cts.Token)
cts.Cancel()
equal true cancelCalled

#endif
20 changes: 20 additions & 0 deletions tests/Js/Main/AsyncTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,24 @@ let tests =
let! res = parentWorkflow()
equal 7 res
}

testCaseAsync "Async.AwaitEvent fires continuation when event is triggered" <| fun () ->
let ev = Event<int>()
Async.StartImmediate(async {
let! v = Async.AwaitEvent ev.Publish
equal 42 v
})
ev.Trigger(42)
async.Return()

testCaseAsync "Async.AwaitEvent with cancelAction invokes it on cancellation" <| fun () ->
let ev = Event<int>()
let cts = new System.Threading.CancellationTokenSource()
let mutable cancelCalled = false
Async.StartImmediate(async {
let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true)
()
}, cts.Token)
cts.Cancel()
async { equal true cancelCalled }
]
21 changes: 21 additions & 0 deletions tests/Python/TestAsync.fs
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,24 @@ let ``test Async.Sleep works correctly with TimeSpan argument`` () =
let isReasonableTime = elapsedMs >= 150.0 && elapsedMs <= 500.0
equal true isReasonableTime
} |> Async.StartImmediate

[<Fact>]
let ``test Async.AwaitEvent fires continuation when event is triggered`` () =
let ev = Event<int>()
Async.StartImmediate(async {
let! v = Async.AwaitEvent ev.Publish
equal 42 v
})
ev.Trigger(42)

[<Fact>]
let ``test Async.AwaitEvent with cancelAction invokes it on cancellation`` () =
let ev = Event<int>()
let cts = new System.Threading.CancellationTokenSource()
let mutable cancelCalled = false
Async.StartImmediate(async {
let! _ = Async.AwaitEvent(ev.Publish, fun () -> cancelCalled <- true)
()
}, cts.Token)
cts.Cancel()
equal true cancelCalled
Loading