|
25 | 25 | details. The flow configuration provides a centralized place for |
26 | 26 | policy decisions regarding process settings, threading, buffering etc. |
27 | 27 |
|
| 28 | +Flow also provides a subsystem for broadcast communication of |
| 29 | +out-of-band messages without explicit connections or |
| 30 | +declarations. This could for example be used to communicate the |
| 31 | +passage of (real or virtual) time. Broadcast messages are associated |
| 32 | +with (otherwise undeclared) signal-ids, and will be received by |
| 33 | +processes selecting those ids. Broadcasts messages will arrive along |
| 34 | +with messages from process inputs, so signal-ids must not conflict |
| 35 | +with any process input-id. Thus namespaced keywords, UUIDs etc or |
| 36 | +tuples thereof are recommended as signal-ids. See process |
| 37 | +describe/transform and inject below for details. |
| 38 | + |
28 | 39 | It is expected that applications will rarely define instances of the |
29 | 40 | process protocol but instead use the API function 'process' that |
30 | 41 | implements the process protocol in terms of calls to ordinary |
|
54 | 65 |
|
55 | 66 | :proc - a function that starts a process |
56 | 67 | :args - a map of param->val which will be passed to the process ctor |
57 | | -:chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n |
| 68 | +:chan-opts - a map of io-id->{:keys [buf-or-n xform]}, |
| 69 | + where io-id is an input/output name, and buf-or-n |
58 | 70 | and xform have their meanings per core.async/chan |
59 | | - the default is {:buf-or-n 10} |
| 71 | + the default for inputs and outputs is {:buf-or-n 10} |
60 | 72 |
|
61 | 73 | :conns - a collection of [[from-pid outid] [to-pid inid]] tuples. |
62 | 74 |
|
63 | 75 | Inputs and outputs support multiple connections. When an output is |
64 | | -connected multiple times every connection will get every message, |
65 | | -as per a core.async/mult. |
| 76 | +connected multiple times every connection will get every message, as |
| 77 | +per a core.async/mult. Note that non-multed outputs do not have |
| 78 | +corresponding channels and thus any chan-opts will be ignored. |
| 79 | + |
| 80 | +Broadcast signals are conveyed to a process via a channel with an |
| 81 | +async/sliding-buffer of size 100, thus signals not handled in a |
| 82 | +timely manner will be dropped in favor of later arriving signals. |
66 | 83 |
|
67 | | -:mixed-exec/:io-exec/:compute-exec -> ExecutorService |
68 | | -These can be used to specify the ExecutorService to use for the |
| 84 | +:mixed-exec/:io-exec/:compute-exec -> Executor |
| 85 | +These can be used to specify the Executor to use for the |
69 | 86 | corresonding workload, in lieu of the lib defaults. |
70 | 87 |
|
71 | 88 | N.B. The flow is not started. See 'start'</pre></div></div><div class="public anchor" id="var-futurize"><h3>futurize</h3><div class="usage"><code>(futurize f & {:keys [exec], :or {exec :mixed}, :as opts})</code></div><div class="doc"><pre class="plaintext">Takes a fn f and returns a fn that takes the same arguments as f |
|
75 | 92 |
|
76 | 93 | futurize accepts kwarg options: |
77 | 94 | :exec - one of the workloads :mixed, :io, :compute |
78 | | - or a j.u.c.ExecutorService object, |
79 | | - default :mixed</pre></div></div><div class="public anchor" id="var-inject"><h3>inject</h3><div class="usage"><code>(inject g [pid io-id :as coord] msgs)</code></div><div class="doc"><pre class="plaintext">asynchronously puts the messages on the channel corresponding to the |
80 | | -input or output of the process, returning a future that will |
81 | | -complete when done.</pre></div></div><div class="public anchor" id="var-lift*-.3Estep"><h3>lift*->step</h3><div class="usage"><code>(lift*->step f)</code></div><div class="doc"><pre class="plaintext">given a fn f taking one arg and returning a collection of non-nil |
| 95 | + or a j.u.c.Executor object, |
| 96 | + default :mixed</pre></div></div><div class="public anchor" id="var-inject"><h3>inject</h3><div class="usage"><code>(inject g [pid io-id :as coord] msgs)</code></div><div class="doc"><pre class="plaintext">asynchronously puts the messages on the channel corresponding to |
| 97 | +the input or output of the process, returning a future that will |
| 98 | +complete when done. You can broadcast messages on a signal using the |
| 99 | +special coord [::flow/cast a-signal-id]. Note that signals cannot be |
| 100 | +sent to a particular process.</pre></div></div><div class="public anchor" id="var-lift*-.3Estep"><h3>lift*->step</h3><div class="usage"><code>(lift*->step f)</code></div><div class="doc"><pre class="plaintext">given a fn f taking one arg and returning a collection of non-nil |
82 | 101 | values, creates a step fn as needed by process, with one input |
83 | 102 | and one output (named :in and :out), and no state.</pre></div></div><div class="public anchor" id="var-lift1-.3Estep"><h3>lift1->step</h3><div class="usage"><code>(lift1->step f)</code></div><div class="doc"><pre class="plaintext">like lift*->step except taking a fn returning one value, which when |
84 | 103 | nil will yield no output.</pre></div></div><div class="public anchor" id="var-map-.3Estep"><h3>map->step</h3><div class="usage"><code>(map->step {:keys [describe init transition transform]})</code></div><div class="doc"><pre class="plaintext">given a map of functions corresponding to step fn arities (see |
|
113 | 132 | datafy. |
114 | 133 |
|
115 | 134 | arity 0 - 'describe', () -> description |
116 | | -where description is a map with keys :params :ins and :outs, each of which |
117 | | -in turn is a map of keyword to doc string, and :workload with |
118 | | -possible values of :mixed :io :compute. All entries in the describe |
119 | | -return map are optional. |
| 135 | +where description is a map with possible keys: |
| 136 | +:params :ins and :outs, each of which in turn is a map of keyword to doc string |
| 137 | +:signal-select - a predicate of a signal-id. Messages on approved |
| 138 | + signals will appear in the transform arity (see below) |
| 139 | + For the simple case of enumerated signal-ids, use a set, |
| 140 | + e.g. #{:this/signal :that/signal} |
| 141 | + If no :signal-select is provided, no signals will be received |
| 142 | +:workload with possible values of :mixed :io :compute. |
| 143 | +All entries in the describe return map are optional. |
120 | 144 |
|
121 | 145 | :params describes the initial arguments to setup the state for the function. |
122 | | -:ins enumerates the input[s], for which the flow will create channels |
123 | | -:outs enumerates the output[s], for which the flow may create channels. |
| 146 | +:ins enumerates the process input[s], for which the flow will create channels |
| 147 | +:outs enumerates the process output[s], for which the flow _may_ create channels. |
124 | 148 | :workload - describes the nature of the workload, one of :mixed :io or :compute |
125 | 149 | an :io workload should not do extended computation |
126 | 150 | a :compute workload should never block |
127 | 151 |
|
128 | | -No key may be present in both :ins and :outs, allowing for a uniform |
129 | | -channel coordinate system of [:process-id :channel-id]. The |
| 152 | +No io-id key may be present in both :ins and :outs, allowing for a |
| 153 | +uniform channel coordinate system of [:process-id :channel-id]. The |
130 | 154 | ins/outs/params returned will be the ins/outs/params of the |
131 | 155 | process. describe may be called by users to understand how to use |
132 | 156 | the proc. It will also be called by the impl in order to discover |
|
166 | 190 | process will no longer be used following that. See the SPI for |
167 | 191 | details. state' will be the state supplied to subsequent calls. |
168 | 192 |
|
169 | | -arity 3 - 'transform', (state in-name msg) -> [state' output] |
| 193 | +arity 3 - 'transform', (state in-or-signal-id msg) -> [state' output] |
170 | 194 | where output is a map of outid->[msgs*] |
171 | 195 |
|
172 | | -The transform arity will be called every time a message arrives at any |
173 | | -of the inputs. Output can be sent to none, any or all of the :outs |
174 | | -enumerated, and/or an input named by a [pid inid] tuple (e.g. for |
175 | | -reply-to), and/or to the ::flow/report output. A step need not |
176 | | -output at all (output or msgs can be empyt/nil), however an output _message_ |
177 | | -may never be nil (per core.async channels). state' will be the state |
178 | | -supplied to subsequent calls. |
| 196 | +The transform arity will be called every time a message arrives at |
| 197 | +any of the inputs or signals (selected via :signal-select in |
| 198 | +describe), identified by the id. Output can be sent to none, any or |
| 199 | +all of the :outs enumerated, and/or an input named by a [pid in-id] |
| 200 | +coord tuple (e.g. for reply-to), and/or to the ::flow/report |
| 201 | +output. |
| 202 | + |
| 203 | +You can broadcast output to all processes selecting a signal via |
| 204 | +the special coord [::flow/cast a-signal-id] Note that signals cannot |
| 205 | +be sent to a particular process. |
| 206 | + |
| 207 | +A step need not output at all (output or msgs can be empty/nil), |
| 208 | +however an output _message_ may never be nil (per core.async |
| 209 | +channels). state' will be the state supplied to subsequent calls. |
179 | 210 |
|
180 | 211 | process also accepts an option map with keys: |
181 | 212 | :workload - one of :mixed, :io or :compute |
|
0 commit comments