Skip to content

Commit 9176fba

Browse files
committed
feat(worker): deterministic async task worker module
1 parent a1e301a commit 9176fba

12 files changed

Lines changed: 646 additions & 81 deletions

File tree

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ with ease, as the picker will only ever render and decorate a small subset of th
9393
- [Using vim-plug](#using-vim-plug)
9494
- [Configuration](#configuration)
9595
- [Tracing](#tracing)
96+
- [Workers](#workers)
9697
- [Pool](#pool)
9798
- [Lifecycle](#lifecycle)
9899
- [Normalization](#normalization)
@@ -245,6 +246,22 @@ Supported modules:
245246
- `async.trace`: Async creation and completion events.
246247
- `registry.trace`: Registry registration, touch, removal, and prune events.
247248

249+
## Workers
250+
251+
Workers are internal coordination helpers that sit on top of the async scheduler. They are not user-configurable, but they define how
252+
different subsystems sequence work so ordering is deterministic even under heavy async load.
253+
254+
There are two worker modes used in the codebase:
255+
256+
- **Coalescer**: Keeps only the latest request while work is in flight. This is used for UI rendering in `Select`, where multiple list updates
257+
can arrive rapidly. The coalescer guarantees that only the most recent render request runs, so stale intermediate renders are dropped.
258+
259+
- **Queue**: Runs every request in strict FIFO order. This is used for stream processing, where chunk order is semantically important. The
260+
queue executes tasks one after the other, and each task can yield without allowing another queued task to start early.
261+
262+
Workers are cooperative: each task runs inside an `Async` coroutine and may call `Async.yield()`. Yielding allows other unrelated async tasks
263+
to run, but does not violate the ordering guarantees within a worker.
264+
248265
## Pool
249266

250267
The pool is a table-reuse subsystem that keeps allocation pressure low for hot paths (streaming, matching, selection). It is a single global

lua/fuzzy/async.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ function Async:abort()
113113
self:_done(nil, "abort")
114114
end
115115

116-
--- Registers a callback for when the async completes, or calls it immediately if already done (unless aborted).
116+
--- Registers a callback for when the async completes, or calls it immediately if already done (unless aborted). The callbacks are executed in order of registration
117117
--- @param callback function Function to call on completion
118118
function Async:await(callback)
119119
assert(type(callback) == "function")

lua/fuzzy/select.lua

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ local spacing = { ",", "SelectHeaderDelimiter" }
1212

1313
local utils = require("fuzzy.utils")
1414
local Async = require("fuzzy.async")
15-
local Scheduler = require("fuzzy.scheduler")
15+
local Worker = require("fuzzy.worker")
1616

1717
--- @class Select
1818
--- @field private source_window integer|nil The window ID of the source window where the selection interface was opened from.
@@ -1688,7 +1688,7 @@ end
16881688

16891689
--- Internal: schedules a UI render of list+related features.
16901690
function Select:_render_list()
1691-
local executor = Async.wrap(function()
1691+
local render = function()
16921692
local entries = self._state.entries or {}
16931693
cursor_clamp(self._state.cursor, #entries)
16941694

@@ -1710,19 +1710,8 @@ function Select:_render_list()
17101710
flush = true,
17111711
})
17121712
end
1713-
end)
1714-
1715-
local renderer = function()
1716-
self._state.renderer = executor()
1717-
Scheduler.add(self._state.renderer)
1718-
end
1719-
1720-
if self:_is_rendering() then
1721-
self._state.renderer:await(renderer)
1722-
self:_stop_rendering()
1723-
else
1724-
renderer()
17251713
end
1714+
self._state.renderer.enqueue(render)
17261715
end
17271716

17281717
--- Resets the state variables for a Select instance.
@@ -1859,9 +1848,8 @@ function Select:_clear_view(force)
18591848
self:_reset_state()
18601849
end
18611850

1862-
if self:_is_rendering() then
1863-
self._state.renderer:await(clearer)
1864-
self:_stop_rendering()
1851+
if self._state.renderer then
1852+
self._state.renderer.cancel(clearer)
18651853
self._state.renderer = nil
18661854
else
18671855
clearer()
@@ -1897,9 +1885,8 @@ function Select:_close_view(force)
18971885
end
18981886
end
18991887

1900-
if self:_is_rendering() then
1901-
self._state.renderer:await(closer)
1902-
self:_stop_rendering()
1888+
if self._state.renderer then
1889+
self._state.renderer.cancel(closer)
19031890
self._state.renderer = nil
19041891
else
19051892
closer()
@@ -1910,15 +1897,15 @@ end
19101897
--- @return boolean
19111898
function Select:_is_rendering()
19121899
if self._state.renderer then
1913-
return self._state.renderer:is_running()
1900+
return self._state.renderer.is_running()
19141901
end
19151902
return false
19161903
end
19171904

19181905
--- Cancels and clears the running renderer coroutine.
19191906
function Select:_stop_rendering()
19201907
if self._state.renderer then
1921-
self._state.renderer:cancel()
1908+
self._state.renderer.cancel()
19221909
self._state.renderer = nil
19231910
end
19241911
end
@@ -2585,6 +2572,9 @@ function Select:open()
25852572
self:_init_preview()
25862573
end
25872574

2575+
if not self._state.renderer then
2576+
self._state.renderer = Worker.coalesce()
2577+
end
25882578
self.source_window = vim.api.nvim_get_current_win()
25892579
local factor = (opts.prompt_list and opts.preview) and 2.0 or 1.0
25902580
local size = compute_height(opts.window_ratio, factor)

lua/fuzzy/stream.lua

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
local Scheduler = require("fuzzy.scheduler")
21
local Async = require("fuzzy.async")
32
local Pool = require("fuzzy.pool")
3+
local Worker = require("fuzzy.worker")
44
local utils = require("fuzzy.utils")
55

66
--- @class Stream
@@ -50,6 +50,7 @@ function Stream:_close_stream()
5050
close_handle(self._state.handle)
5151
self._state.handle = nil
5252
end
53+
self:_stop_streaming()
5354

5455
if self._state.buffer then
5556
-- clean up the string references to avoid any memory leaks, the buffer could still be reused in future calls to start
@@ -83,8 +84,6 @@ function Stream:_close_stream()
8384

8485
self.callback = nil
8586
self.transform = nil
86-
87-
self:_stop_streaming()
8887
end
8988

9089
function Stream:_make_stream()
@@ -166,6 +165,7 @@ function Stream:_handle_data(data, size)
166165
self._state.buffer[self._state.size + 1] = self:_transform_data(data)
167166
self._state.size = self._state.size + 1
168167
end
168+
return self._state.size
169169
end
170170

171171
function Stream:_handle_out(code, chunk, kind)
@@ -264,8 +264,11 @@ function Stream:_handle_in(err, chunk, kind)
264264
end
265265

266266
local line = chunk:sub(start, pos - 1)
267-
self:_handle_data(line, size)
267+
size = self:_handle_data(line, size)
268268

269+
if size % 512 == 0 then
270+
Async.yield()
271+
end
269272
start = next + 1
270273
count = count + 1
271274
end
@@ -285,48 +288,21 @@ function Stream:_handle_in(err, chunk, kind)
285288
end
286289

287290
function Stream:_handle_stdout(err, chunk)
288-
local executor = Async.wrap(Stream._handle_in)
289-
290-
local streamer = function()
291-
self._state.streamer = executor(self, err, chunk, 1)
292-
Scheduler.add(self._state.streamer)
293-
end
294-
295-
if self:_is_streaming() then
296-
self._state.streamer:await(streamer)
297-
else
298-
streamer()
299-
end
291+
self._state.streamer.enqueue(function()
292+
self:_handle_in(err, chunk, 1)
293+
end)
300294
end
301295

302296
function Stream:_handle_stderr(err, chunk)
303-
local executor = Async.wrap(Stream._handle_in)
304-
305-
local streamer = function()
306-
self._state.streamer = executor(self, err, chunk, 2)
307-
Scheduler.add(self._state.streamer)
308-
end
309-
310-
if self:_is_streaming() then
311-
self._state.streamer:await(streamer)
312-
else
313-
streamer()
314-
end
297+
self._state.streamer.enqueue(function()
298+
self:_handle_in(err, chunk, 2)
299+
end)
315300
end
316301

317302
function Stream:_handle_exit(e, c)
318-
local executor = Async.wrap(Stream._handle_out)
319-
320-
local streamer = function()
321-
self._state.streamer = executor(self, e, c, 3)
322-
Scheduler.add(self._state.streamer)
323-
end
324-
325-
if self:_is_streaming() then
326-
self._state.streamer:await(streamer)
327-
else
328-
streamer()
329-
end
303+
self._state.streamer.enqueue(function()
304+
self:_handle_out(e, c, 3)
305+
end)
330306
end
331307

332308
function Stream:_is_streaming()
@@ -335,7 +311,7 @@ end
335311

336312
function Stream:_stop_streaming()
337313
if self._state.streamer then
338-
self._state.streamer:abort()
314+
self._state.streamer.cancel()
339315
self._state.streamer = nil
340316
end
341317
end
@@ -443,6 +419,10 @@ function Stream:start(cmd, opts)
443419
self._state.accum = Pool.obtain(size)
444420
end
445421

422+
if not self._state.streamer then
423+
self._state.streamer = Worker.queue()
424+
end
425+
446426
if type(cmd) == "function" then
447427
local finalized = false
448428
local worker = self:_bind_guarded(function(stream, data)
@@ -459,13 +439,13 @@ function Stream:start(cmd, opts)
459439
finalized = true
460440
end
461441
end, token)
462-
local executor = Async.wrap(function(stream)
442+
self._state.streamer.enqueue(function()
463443
local ok, err = utils.safe_call(
464444
cmd, worker, opts.args, opts.cwd, opts.env
465445
)
466446
local code = not ok and 1 or 0
467447
if finalized == false then
468-
local finalizer = stream:_bind_guarded(function(s, c, e)
448+
local finalizer = self:_bind_guarded(function(s, c, e)
469449
s._state.stdouteof = true
470450
s._state.stderreof = true
471451
s:_handle_out(c, e, 3)
@@ -474,8 +454,6 @@ function Stream:start(cmd, opts)
474454
finalized = true
475455
end
476456
end)
477-
self._state.streamer = executor(self)
478-
Scheduler.add(self._state.streamer)
479457
else
480458
local stdio = self:_make_stream()
481459
assert(vim.fn.executable(cmd) == 1)

0 commit comments

Comments
 (0)