|
| 1 | +(*** condition: prepare ***) |
| 2 | +#nowarn "211" |
| 3 | +#I "../src/FSharp.Control.AsyncSeq/bin/Release/netstandard2.1" |
| 4 | +#r "FSharp.Control.AsyncSeq.dll" |
| 5 | +(*** condition: fsx ***) |
| 6 | +#if FSX |
| 7 | +#r "nuget: FSharp.Control.AsyncSeq,{{package-version}}" |
| 8 | +#endif // FSX |
| 9 | +(*** condition: ipynb ***) |
| 10 | +#if IPYNB |
| 11 | +#r "nuget: FSharp.Control.AsyncSeq,{{package-version}}" |
| 12 | +#endif // IPYNB |
| 13 | + |
| 14 | + |
| 15 | +(** |
| 16 | +[](https://mybinder.org/v2/gh/fsprojects/FSharp.Control.AsyncSeq/gh-pages?filepath=AsyncSeq.ipynb) |
| 17 | +
|
| 18 | +# Comparison with IObservable |
| 19 | +
|
| 20 | +Both `IObservable<'T>` and `AsyncSeq<'T>` represent collections of items and both provide similar operations |
| 21 | +for transformation and composition. The central difference between the two is that the former uses a *synchronous push* |
| 22 | +to a subscriber and the latter uses an *asynchronous pull* by a consumer. |
| 23 | +Consumers of an `IObservable<'T>` *subscribe* to receive notifications about |
| 24 | +new items or the end of the sequence. By contrast, consumers of an `AsyncSeq<'T>` *asynchronously retrieve* subsequent items on their own |
| 25 | +terms. Some domains are more naturally modeled with one or the other, however it is less clear which is a more |
| 26 | +suitable tool for a specific task. In many cases, a combination of the two provides the optimal solution and |
| 27 | +restricting yourself to one, while simplifying the programming model, can lead one to view all problems as a nail. |
| 28 | +
|
| 29 | +A more specific difference between the two is that `IObservable<'T>` subscribers have the basic type `'T -> unit` |
| 30 | +and are therefore inherently synchronous and imperative. The observer can certainly make a blocking call, but this |
| 31 | +can defeat the purpose of the observable sequence all together. Alternatively, the observer can spawn an operation, but |
| 32 | +this can break composition because one can no longer rely on the observer returning to determine that it has |
| 33 | +completed. With the observable model however, we can model blocking operations through composition on sequences rather |
| 34 | +than observers. |
| 35 | +
|
| 36 | +To illustrate, let's try to implement the above Tweet retrieval, filtering and storage workflow using observable sequences. |
| 37 | +Suppose we already have an observable sequence representing tweets `IObservable<Tweet>` and we simply wish |
| 38 | +to filter it and store the resulting tweets. The function `Observable.filter` allows one to filter observable |
| 39 | +sequences based on a predicate, however in this case it doesn't quite cut it because the predicate passed to it must |
| 40 | +be synchronous `'T -> bool`: |
| 41 | +*) |
| 42 | + |
| 43 | +open System |
| 44 | + |
| 45 | +let tweetsObs : IObservable<Tweet> = |
| 46 | + failwith "TODO: create observable" |
| 47 | + |
| 48 | +let filteredTweetsObs = |
| 49 | + tweetsObs |
| 50 | + |> Observable.filter (filterTweet >> Async.RunSynchronously) // blocking IO-call! |
| 51 | + |
| 52 | +(** |
| 53 | +To remedy the blocking IO-call we can better adapt the filtering function to the `IObservable<'T>` model. A value |
| 54 | +of type `Async<'T>` can be modeled as an `IObservable<'T>` with one element. Suppose that we have |
| 55 | +`Tweet -> IObservable<bool>`. We can define a few helper operators on observables to allow filtering using |
| 56 | +an asynchronous predicate as follows: |
| 57 | +*) |
| 58 | + |
| 59 | +module Observable = |
| 60 | + |
| 61 | + /// a |> Async.StartAsTask |> (fun t -> t.ToObservable()) |
| 62 | + let ofAsync (a:Async<'a>) : IObservable<'a> = |
| 63 | + failwith "TODO" |
| 64 | + |
| 65 | + /// Observable.SelectMany |
| 66 | + let bind (f:'a -> IObservable<'b>) (o:IObservable<'a>) : IObservable<'b> = |
| 67 | + failwith "TODO" |
| 68 | + |
| 69 | + /// Filter an observable sequence using a predicate producing a observable |
| 70 | + /// which emits a single boolean value. |
| 71 | + let filterObs (f:'a -> IObservable<bool>) : IObservable<'a> -> IObservable<'a> = |
| 72 | + bind <| fun a -> |
| 73 | + f a |
| 74 | + |> Observable.choose (function |
| 75 | + | true -> Some a |
| 76 | + | false -> None |
| 77 | + ) |
| 78 | + |
| 79 | + /// Filter an observable sequence using a predicate which returns an async |
| 80 | + /// computation producing a boolean value. |
| 81 | + let filterAsync (f:'a -> Async<bool>) : IObservable<'a> -> IObservable<'a> = |
| 82 | + filterObs (f >> ofAsync) |
| 83 | + |
| 84 | + /// Maps over an observable sequence using an async-returning function. |
| 85 | + let mapAsync (f:'a -> Async<'b>) : IObservable<'a> -> IObservable<'b> = |
| 86 | + bind (f >> ofAsync) |
| 87 | + |
| 88 | +let filteredTweetsObs' : IObservable<Tweet> = |
| 89 | + filteredTweetsObs |
| 90 | + |> Observable.filterAsync filterTweet |
| 91 | + |
| 92 | + |
| 93 | +(** |
| 94 | +With a little effort, we were able to adapt `IObservable<'a>` to our needs. Next let's try implementing the storage of |
| 95 | +filtered tweets. Again, we can adapt the function `storeTweet` defined above to the observable model and bind the |
| 96 | +observable of filtered tweets to it: |
| 97 | +*) |
| 98 | + |
| 99 | +let storedTweetsObs : IObservable<unit> = |
| 100 | + filteredTweetsObs' |
| 101 | + |> Observable.mapAsync storeTweet |
| 102 | + |
| 103 | +(** |
| 104 | +The observable sequence `storedTweetsObs` will produces a value each time a filtered tweet is stored. The entire |
| 105 | +workflow can be expressed as follows: |
| 106 | +*) |
| 107 | + |
| 108 | +let storedTeetsObs' : IObservable<unit> = |
| 109 | + tweetsObs |
| 110 | + |> Observable.filterAsync filterTweet |
| 111 | + |> Observable.mapAsync storeTweet |
| 112 | + |
| 113 | +(** |
| 114 | +Overall, both solutions are succinct and composable and deciding which one to use can ultimately be a matter of preference. |
| 115 | +Some things to consider are the "synchronous push" vs. "asynchronous pull" semantics. On the one hand, tweets are pushed based - the consumer has no control |
| 116 | +over their generation. On the other hand, the program at hand will process the tweets on its own terms regardless of how quickly |
| 117 | +they are being generated. Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of |
| 118 | +tweets from persistent storage. As such, the distinction between "synchronous push" vs. "asynchronous pull" becomes less interesting. If the underlying source |
| 119 | +is truly push-based, then one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based, |
| 120 | +then one can turn it into an observable sequence by first pulling, then pushing. Note however that in a true real-time reactive system, |
| 121 | +notifications must be pushed immediately without delay. |
| 122 | +
|
| 123 | +Upon closer inspection, the consumption approaches between the two models aren't all too different. While `AsyncSeq` is based on an asynchronous-pull operation, |
| 124 | +it is usually consumed using an operator such as `AsyncSeq.iterAsync` as shown above. This is a function of type |
| 125 | +`('T -> Async<unit>) -> AsyncSeq<'T> -> Async<unit>` where the first argument is a function `'T -> Async<unit>` which performs |
| 126 | +some work on an item of the sequence and is applied repeatedly to subsequent items. In a sense, `iterAsync` *pushes* values to this |
| 127 | +function. The primary difference from observers of observable sequences is the return type `Async<unit>` rather than simply `unit`. |
| 128 | +
|
| 129 | +## Related Articles |
| 130 | +
|
| 131 | + * [Programming with F# asynchronous sequences](http://tomasp.net/blog/async-sequences.aspx/) |
| 132 | +
|
| 133 | +*) |
0 commit comments