Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 103 additions & 56 deletions src/FSharpPlus/Extensions/Task.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ module Task =

/// Active pattern to match the state of a completed Task
let inline internal (|Succeeded|Canceled|Faulted|) (t: Task<'a>) =
if t.IsCompletedSuccessfully then Succeeded t.Result
elif t.IsFaulted then Faulted (Unchecked.nonNull t.Exception)
elif t.IsCanceled then Canceled
else invalidOp "Internal error: The task is not yet completed."
match t.Status with
| TaskStatus.RanToCompletion -> Succeeded t.Result
| TaskStatus.Faulted -> Faulted (Unchecked.nonNull t.Exception)
| TaskStatus.Canceled -> Canceled
| _ -> invalidOp (sprintf "Internal error: The task is not yet in a final state. State = TaskStatus.%A" t.Status)

let inline internal continueTask (tcs: TaskCompletionSource<'Result>) (k: 't -> unit) (x: Task<'t>) =
let f = function
Expand All @@ -26,32 +27,56 @@ module Task =
| Canceled -> tcs.SetCanceled ()
x.ConfigureAwait(false).GetAwaiter().UnsafeOnCompleted (fun () -> f x)

#if NET5_0_OR_GREATER
let [<Literal>] private tcsOptions = TaskCreationOptions.RunContinuationsAsynchronously
#else
let private tcsOptions = ()
#endif

/// <summary>Creates a Task that's completed successfully with the specified value.</summary>
/// <param name="value"></param>
/// <returns>A Task that is completed successfully with the specified value.</returns>
let result (value: 'T) : Task<'T> = Task.FromResult value



/// <summary>Creates a Task that's completed unsuccessfully with the specified exception.</summary>
/// <param name="exn">The exception to be raised.</param>
/// <returns>A Task that is completed unsuccessfully with the specified exception.</returns>
let raise<'T> (exn: exn) : Task<'T> =
#if NET5_0_OR_GREATER
Task.FromException<'T> exn
#else
let tcs = TaskCompletionSource<'T> tcsOptions
tcs.SetException exn
tcs.Task
#endif

/// <summary>Creates a Task that's completed unsuccessfully with the specified exceptions.</summary>
/// <param name="exn">The AggregateException to be raised.</param>
/// <param name="aex">The AggregateException to be raised.</param>
/// <returns>A Task that is completed unsuccessfully with the specified exceptions.</returns>
/// <remarks>
/// Prefer this function to handle AggregateExceptions over Task.FromException as it handles them correctly.
/// </remarks>
let inline internal FromExceptions<'T> (aex: AggregateException) : Task<'T> =
#if NET5_0_OR_GREATER
match aex with
| agg when agg.InnerExceptions.Count = 1 -> Task.FromException<'T> agg.InnerExceptions[0]
| agg ->
let tcs = TaskCompletionSource<'T> ()
tcs.SetException agg.InnerExceptions
| _ ->
#endif
let tcs = TaskCompletionSource<'T> tcsOptions
tcs.SetException aex.InnerExceptions
tcs.Task

let private cancellationTokenSingleton = CancellationToken true

/// <summary>Creates a Task that's canceled.</summary>
/// <returns>A Task that's canceled.</returns>
let canceled<'T> : Task<'T> = Task.FromCanceled<'T> cancellationTokenSingleton

let canceled<'T> : Task<'T> =
#if NET5_0_OR_GREATER
Task.FromCanceled<'T> (CancellationToken true)
#else
let tcs = TaskCompletionSource<'T> tcsOptions
tcs.SetCanceled ()
tcs.Task
#endif

/// <summary>Creates a task workflow from 'source' workflow, mapping its result with 'mapper'.</summary>
/// <param name="mapper">The mapping function.</param>
Expand All @@ -60,10 +85,15 @@ module Task =
let map (mapper: 'T -> 'U) (source: Task<'T>) : Task<'U> =
let source = nullArgCheck (nameof source) source

backgroundTask {
let! r = source
return mapper r
}
if source.IsCompleted then
match source with
| Succeeded r -> try result (mapper r) with e -> raise e
| Faulted exn -> FromExceptions exn
| Canceled -> canceled
else
let tcs = TaskCompletionSource<'U> tcsOptions
source |> continueTask tcs (fun r -> try tcs.SetResult (mapper r) with e -> tcs.SetException e)
tcs.Task

/// <summary>Creates a task workflow from two workflows 'task1' and 'task2', mapping its results with 'mapper'.</summary>
/// <remarks>Workflows are run in sequence.</remarks>
Expand All @@ -76,13 +106,13 @@ module Task =

if task1.IsCompleted && task2.IsCompleted then
match task1, task2 with
| Succeeded r1, Succeeded r2 -> try result (mapper r1 r2) with e -> Task.FromException<_> e
| Succeeded r1, Succeeded r2 -> try result (mapper r1 r2) with e -> raise e
| Succeeded _ , Faulted exn -> FromExceptions exn
| Succeeded _ , Canceled -> canceled
| Faulted exn , _ -> FromExceptions exn
| Canceled , _ -> canceled
else
let tcs = TaskCompletionSource<'U> TaskCreationOptions.RunContinuationsAsynchronously
let tcs = TaskCompletionSource<'U> tcsOptions

match task1.Status, task2.Status with
| TaskStatus.Canceled, _ -> tcs.SetCanceled ()
Expand All @@ -107,15 +137,15 @@ module Task =

if task1.IsCompleted && task2.IsCompleted && task3.IsCompleted then
match task1, task2, task3 with
| Succeeded r1, Succeeded r2, Succeeded r3 -> try result (mapper r1 r2 r3) with e -> Task.FromException<_> e
| Succeeded r1, Succeeded r2, Succeeded r3 -> try result (mapper r1 r2 r3) with e -> raise e
| Faulted exn , _ , _ -> FromExceptions exn
| Canceled , _ , _ -> canceled
| _ , Faulted exn , _ -> FromExceptions exn
| _ , Canceled , _ -> canceled
| _ , _ , Faulted exn -> FromExceptions exn
| _ , _ , Canceled -> canceled
else
let tcs = TaskCompletionSource<'U> TaskCreationOptions.RunContinuationsAsynchronously
let tcs = TaskCompletionSource<'U> tcsOptions
match task1.Status, task2.Status, task3.Status with
| TaskStatus.Canceled, _ , _ -> tcs.SetCanceled ()
| TaskStatus.Faulted , _ , _ -> tcs.SetException (Unchecked.nonNull task1.Exception).InnerExceptions
Expand All @@ -142,9 +172,9 @@ module Task =
let task2 = nullArgCheck (nameof task2) task2

if task1.Status = TaskStatus.RanToCompletion && task2.Status = TaskStatus.RanToCompletion then
try result (mapper task1.Result task2.Result) with e -> Task.FromException<'U> e
try result (mapper task1.Result task2.Result) with e -> raise e
else
let tcs = TaskCompletionSource<_> ()
let tcs = TaskCompletionSource<_> tcsOptions
let r1 = ref Unchecked.defaultof<_>
let r2 = ref Unchecked.defaultof<_>
let mutable cancelled = false
Expand Down Expand Up @@ -190,9 +220,9 @@ module Task =

if task1.Status = TaskStatus.RanToCompletion && task2.Status = TaskStatus.RanToCompletion && task3.Status = TaskStatus.RanToCompletion then
try result (mapper task1.Result task2.Result task3.Result)
with e -> Task.FromException<'U> e
with e -> raise e
else
let tcs = TaskCompletionSource<_> ()
let tcs = TaskCompletionSource<_> tcsOptions
let r1 = ref Unchecked.defaultof<_>
let r2 = ref Unchecked.defaultof<_>
let r3 = ref Unchecked.defaultof<_>
Expand Down Expand Up @@ -236,13 +266,13 @@ module Task =

if f.IsCompleted && x.IsCompleted then
match f, x with
| Succeeded r1, Succeeded r2 -> try result (r1 r2) with e -> Task.FromException<_> e
| Succeeded r1, Succeeded r2 -> try result (r1 r2) with e -> raise e
| Succeeded _ , Faulted exn -> FromExceptions exn
| Succeeded _ , Canceled -> canceled
| Faulted exn , _ -> FromExceptions exn
| Canceled , _ -> canceled
else
let tcs = TaskCompletionSource<'U> TaskCreationOptions.RunContinuationsAsynchronously
let tcs = TaskCompletionSource<'U> tcsOptions
match f.Status, x.Status with
| TaskStatus.Canceled, _ -> tcs.SetCanceled ()
| TaskStatus.Faulted, _ -> tcs.SetException (Unchecked.nonNull f.Exception).InnerExceptions
Expand All @@ -266,7 +296,7 @@ module Task =
| Faulted exn , _ -> FromExceptions exn
| Canceled , _ -> canceled
else
let tcs = TaskCompletionSource<'T1 * 'T2> ()
let tcs = TaskCompletionSource<'T1 * 'T2> tcsOptions
match task1.Status, task2.Status with
| TaskStatus.Canceled, _ -> tcs.SetCanceled ()
| TaskStatus.Faulted, _ -> tcs.SetException (Unchecked.nonNull task1.Exception).InnerExceptions
Expand All @@ -293,19 +323,13 @@ module Task =
let join (source: Task<Task<'T>>) : Task<'T> =
let source = nullArgCheck (nameof source) source

backgroundTask {
let! inner = source
return! inner
}
source.Unwrap()

/// <summary>Creates a task workflow from 'source' workflow, mapping and flattening its result with 'f'.</summary>
let bind (f: 'T -> Task<'U>) (source: Task<'T>) : Task<'U> =
let source = nullArgCheck (nameof source) source

backgroundTask {
let! r = source
return! f r
}
source |> Unchecked.nonNull |> map f |> join

/// <summary>Creates a task that ignores the result of the source task.</summary>
/// <param name="source">The source Task.</param>
Expand All @@ -314,11 +338,12 @@ module Task =
let ignore (source: Task) =
let source = nullArgCheck (nameof source) source

if source.IsCompletedSuccessfully then result ()
elif source.IsFaulted then FromExceptions (Unchecked.nonNull source.Exception)
elif source.IsCanceled then canceled
else
let tcs = TaskCompletionSource<unit> ()
match source.Status with
| TaskStatus.RanToCompletion -> result ()
| TaskStatus.Faulted -> FromExceptions (Unchecked.nonNull source.Exception)
| TaskStatus.Canceled -> canceled
| _ ->
let tcs = TaskCompletionSource<unit> tcsOptions
let k (t: Task) : unit =
if t.IsCanceled then tcs.SetCanceled ()
elif t.IsFaulted then tcs.SetException (Unchecked.nonNull source.Exception).InnerExceptions
Expand All @@ -327,14 +352,41 @@ module Task =
tcs.Task

[<ObsoleteAttribute("Swap parameters")>]
let tryWith (body: unit -> Task<'T>) (compensation: exn -> Task<'T>) : Task<'T> = backgroundTask {
try return! body ()
with e -> return! compensation e }
let rec tryWith (body: unit -> Task<'T>) (compensation: exn -> Task<'T>) : Task<'T> =
let runCompensation exn =
try compensation exn
with e -> raise e
let unwrapException (agg: AggregateException) =
if agg.InnerExceptions.Count = 1 then agg.InnerExceptions.[0]
else agg :> Exception
try Ok (body ()) with e -> Error e
|> function
| Ok task ->
if task.IsCompleted then
match task with
| Succeeded _ -> task
| Faulted aex -> runCompensation (unwrapException aex)
| Canceled -> canceled
else
task.ContinueWith(fun (x: Task<'T>) -> tryWith (fun () -> x) compensation).Unwrap ()
| Error exn -> runCompensation exn

[<ObsoleteAttribute("Swap parameters")>]
let tryFinally (body: unit -> Task<'T>) (compensation : unit -> unit) : Task<'T> = backgroundTask {
try return! body ()
finally compensation () }
let rec tryFinally (body: unit -> Task<'T>) (compensation : unit -> unit) : Task<'T> =
let task =
try body ()
with _ ->
try
compensation ()
reraise ()
with e -> raise e
if task.IsCompleted then
try
compensation ()
task
with e -> raise e
else
task.ContinueWith(fun (x: Task<'T>) -> tryFinally (fun () -> x) compensation).Unwrap ()

/// Used to de-sugar use .. blocks in Computation Expressions.
let using (disp: 'T when 'T :> IDisposable) (body: 'T -> Task<'U>) =
Expand Down Expand Up @@ -375,7 +427,7 @@ module Task =
let inline recover ([<InlineIfLambda>]mapper: exn -> 'T) (source: Task<'T>) : Task<'T> =
let source = nullArgCheck (nameof source) source

tryWith (fun () -> source) (mapper >> Task.FromResult)
tryWith (fun () -> source) (mapper >> result)

/// <summary>Maps the exception of a faulted task to another exception.</summary>
/// <param name="mapper">Mapping function from exception to exception.</param>
Expand All @@ -389,7 +441,7 @@ module Task =
| Faulted exn -> FromExceptions (AggregateException (mapper exn))
| _ -> source
else
let tcs = TaskCompletionSource<'T> TaskCreationOptions.RunContinuationsAsynchronously
let tcs = TaskCompletionSource<'T> tcsOptions
let k = function
| Succeeded r -> tcs.SetResult r
| Faulted aex -> tcs.SetException (AggregateException (mapper aex)).InnerExceptions
Expand All @@ -404,13 +456,8 @@ module Task =
/// <returns>The resulting Task.</returns>
let ofResult (source: Result<'T, exn>) : Task<'T> =
match source with
| Ok x -> Task.FromResult x
| Error exn -> Task.FromException<'T> exn

/// <summary>Creates a Task that's completed unsuccessfully with the specified exception.</summary>
/// <param name="exn">The exception to be raised.</param>
/// <returns>A Task that is completed unsuccessfully with the specified exception.</returns>
let raise<'T> (exn: exn) : Task<'T> = Task.FromException<'T> exn
| Ok x -> result x
| Error exn -> raise exn


/// Workaround to fix signatures without breaking binary compatibility.
Expand Down
Loading
Loading