Skip to content

Commit 7503533

Browse files
committed
Remove PullResult wrapper from queue adapter API
pull() now returns Promise<T | undefined> instead of Promise<PullResult<T> | undefined>. Also fixes README to match the actual createQueue(handler, adapter, options?) signature.
1 parent d382bb4 commit 7503533

4 files changed

Lines changed: 46 additions & 82 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@suckless/queue": minor
3+
---
4+
5+
Remove PullResult wrapper from queue adapter API. `pull` now returns `Promise<T | undefined>` instead of `Promise<PullResult<T> | undefined>`, matching the same simplification applied to @suckless/cache.

packages/queue/README.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ npm install @suckless/queue
1111
## Usage
1212

1313
```ts
14-
import { createQueue } from "@suckless/queue"
14+
import { createQueue, memoryAdapter } from "@suckless/queue"
1515

1616
const queue = createQueue<string>(
1717
async (url) => {
1818
const res = await fetch(url)
1919
await saveToDb(res)
2020
},
21+
memoryAdapter(),
2122
{ concurrency: 4 },
2223
)
2324

@@ -37,13 +38,14 @@ Handler errors are caught to prevent worker death. The handler is responsible fo
3738

3839
## API
3940

40-
### `createQueue<T>(handler, options?): Queue<T>`
41+
### `createQueue<T>(handler, adapter, options?): Queue<T>`
4142

4243
Creates a new queue. Workers start pulling immediately.
4344

4445
- `handler` — called for each item. May be sync or async.
45-
- `options.adapter`storage backend. Defaults to `memoryAdapter()`.
46+
- `adapter`a `QueueAdapter<T>` storage backend.
4647
- `options.concurrency` — max concurrent handlers. Must be a positive finite integer. Defaults to `1`.
48+
- `options.onError` — called when the handler throws. Receives the error and the item.
4749

4850
### `queue.push(item): Promise<void>`
4951

@@ -62,7 +64,9 @@ Number of handlers currently executing.
6264
The queue implements `AsyncDisposable`. Disposing marks the queue as closed, stops workers from pulling new items, waits for in-flight handlers, then disposes the adapter:
6365

6466
```ts
65-
await using queue = createQueue<Job>(processJob, { concurrency: 4 })
67+
await using queue = createQueue<Job>(processJob, memoryAdapter(), {
68+
concurrency: 4,
69+
})
6670
```
6771

6872
## Adapters
@@ -77,7 +81,7 @@ Implement `QueueAdapter<T>` to plug in any backend:
7781

7882
```ts
7983
import { RedisClient } from "bun"
80-
import type { PullResult, QueueAdapter } from "@suckless/queue"
84+
import type { QueueAdapter } from "@suckless/queue"
8185

8286
function redisAdapter<T>(key: string, url?: string): QueueAdapter<T> {
8387
const redis = new RedisClient(url)
@@ -90,7 +94,7 @@ function redisAdapter<T>(key: string, url?: string): QueueAdapter<T> {
9094
while (!signal.aborted) {
9195
const result = await redis.send("BRPOP", [key, "1"])
9296
if (result) {
93-
return { value: JSON.parse(result[1]) } as PullResult<T>
97+
return JSON.parse(result[1]) as T
9498
}
9599
}
96100
return undefined

packages/queue/src/index.test.ts

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
import { describe, expect, mock, test } from "bun:test"
2-
import {
3-
type PullResult,
4-
type QueueAdapter,
5-
createQueue,
6-
memoryAdapter,
7-
} from "."
2+
import { createQueue, type QueueAdapter, memoryAdapter } from "."
83

94
function delay(ms: number): Promise<void> {
105
return new Promise<void>((resolve) => {
@@ -20,9 +15,9 @@ describe("memoryAdapter", () => {
2015
await adapter.push("c")
2116

2217
const signal = AbortSignal.abort()
23-
expect(await adapter.pull(signal)).toEqual({ value: "a" })
24-
expect(await adapter.pull(signal)).toEqual({ value: "b" })
25-
expect(await adapter.pull(signal)).toEqual({ value: "c" })
18+
expect(await adapter.pull(signal)).toBe("a")
19+
expect(await adapter.pull(signal)).toBe("b")
20+
expect(await adapter.pull(signal)).toBe("c")
2621
})
2722

2823
test("blocking pull resolves when item pushed", async () => {
@@ -32,7 +27,7 @@ describe("memoryAdapter", () => {
3227
const pulling = adapter.pull(controller.signal)
3328
await adapter.push(42)
3429

35-
expect(await pulling).toEqual({ value: 42 })
30+
expect(await pulling).toBe(42)
3631
controller.abort()
3732
})
3833

@@ -43,7 +38,7 @@ describe("memoryAdapter", () => {
4338
const pulling = adapter.pull(controller.signal)
4439
await adapter.push("direct")
4540

46-
expect(await pulling).toEqual({ value: "direct" })
41+
expect(await pulling).toBe("direct")
4742
controller.abort()
4843
})
4944

@@ -73,8 +68,8 @@ describe("memoryAdapter", () => {
7368
await adapter.push(2)
7469

7570
// Drain the buffer
76-
expect(await adapter.pull(controller.signal)).toEqual({ value: 1 })
77-
expect(await adapter.pull(controller.signal)).toEqual({ value: 2 })
71+
expect(await adapter.pull(controller.signal)).toBe(1)
72+
expect(await adapter.pull(controller.signal)).toBe(2)
7873

7974
// Now buffer is empty — this pull will block
8075
const pulling = adapter.pull(controller.signal)
@@ -107,9 +102,9 @@ describe("memoryAdapter", () => {
107102
await adapter.push("b")
108103
await adapter.push("c")
109104

110-
expect(await pull1).toEqual({ value: "a" })
111-
expect(await pull2).toEqual({ value: "b" })
112-
expect(await pull3).toEqual({ value: "c" })
105+
expect(await pull1).toBe("a")
106+
expect(await pull2).toBe("b")
107+
expect(await pull3).toBe("c")
113108
controller.abort()
114109
})
115110

@@ -121,27 +116,9 @@ describe("memoryAdapter", () => {
121116
await adapter.push("")
122117

123118
const signal = AbortSignal.abort()
124-
expect(await adapter.pull(signal)).toEqual({ value: 0 })
125-
expect(await adapter.pull(signal)).toEqual({ value: false })
126-
expect(await adapter.pull(signal)).toEqual({ value: "" })
127-
})
128-
129-
test("undefined items pass through correctly", async () => {
130-
const adapter = memoryAdapter<undefined>()
131-
132-
await adapter.push(undefined)
133-
await adapter.push(undefined)
134-
135-
const signal = AbortSignal.abort()
136-
const result = await adapter.pull(signal)
137-
expect(result).toEqual({ value: undefined })
138-
expect(result).not.toBeUndefined()
139-
140-
// Second pull also returns wrapped undefined, not bare undefined
141-
expect(await adapter.pull(signal)).toEqual({ value: undefined })
142-
143-
// Empty buffer with aborted signal returns bare undefined
144-
expect(await adapter.pull(signal)).toBeUndefined()
119+
expect(await adapter.pull(signal)).toBe(0)
120+
expect(await adapter.pull(signal)).toBe(false)
121+
expect(await adapter.pull(signal)).toBe("")
145122
})
146123
})
147124

@@ -404,21 +381,6 @@ describe("createQueue", () => {
404381
await queue[Symbol.asyncDispose]()
405382
})
406383

407-
test("undefined items pass through handler", async () => {
408-
let count = 0
409-
const queue = createQueue<undefined>(() => {
410-
count++
411-
}, memoryAdapter())
412-
413-
await queue.push(undefined)
414-
await queue.push(undefined)
415-
await queue.push(undefined)
416-
await queue.drain()
417-
418-
expect(count).toBe(3)
419-
await queue[Symbol.asyncDispose]()
420-
})
421-
422384
test("multiple concurrent drain waiters all resolve", async () => {
423385
const queue = createQueue<number>(async () => {
424386
await delay(10)
@@ -476,7 +438,7 @@ describe("createQueue with spy adapter", () => {
476438
const adapter: QueueAdapter<number> = {
477439
push: pushFn,
478440
pull: () =>
479-
new Promise<PullResult<number> | undefined>((resolve) => {
441+
new Promise<number | undefined>((resolve) => {
480442
setTimeout(() => {
481443
resolve(undefined)
482444
}, 100)
@@ -493,7 +455,7 @@ describe("createQueue with spy adapter", () => {
493455

494456
test("workers call adapter pull", async () => {
495457
const pullFn = mock((_signal: AbortSignal) =>
496-
Promise.resolve<PullResult<number> | undefined>(undefined),
458+
Promise.resolve<number | undefined>(undefined),
497459
)
498460
const adapter: QueueAdapter<number> = {
499461
push: () => Promise.resolve(),
@@ -513,7 +475,7 @@ describe("createQueue with spy adapter", () => {
513475
const adapter: QueueAdapter<number> = {
514476
push: () => Promise.reject(new Error("push failed")),
515477
pull: () =>
516-
new Promise<PullResult<number> | undefined>((resolve) => {
478+
new Promise<number | undefined>((resolve) => {
517479
setTimeout(() => {
518480
resolve(undefined)
519481
}, 100)
@@ -554,7 +516,7 @@ describe("createQueue with spy adapter", () => {
554516
const adapter: QueueAdapter<number> = {
555517
push: () => Promise.resolve(),
556518
pull: () =>
557-
new Promise<PullResult<number> | undefined>((resolve) => {
519+
new Promise<number | undefined>((resolve) => {
558520
setTimeout(() => {
559521
resolve(undefined)
560522
}, 100)

packages/queue/src/index.ts

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
1-
/** Wrapper that distinguishes a pulled value from "no value available." */
2-
export interface PullResult<T> {
3-
value: T
4-
}
5-
61
/** Storage backend for queue items. */
72
export interface QueueAdapter<T> extends AsyncDisposable {
83
push(item: T): Promise<void>
9-
pull(signal: AbortSignal): Promise<PullResult<T> | undefined>
4+
pull(signal: AbortSignal): Promise<T | undefined>
105
}
116

127
/** Producer/consumer queue with pluggable storage. */
@@ -19,13 +14,13 @@ export interface Queue<T> extends AsyncDisposable {
1914
/** In-memory FIFO queue adapter with blocking pull. */
2015
export function memoryAdapter<T>(): QueueAdapter<T> {
2116
const buffer: T[] = []
22-
const waiters: ((result: PullResult<T> | undefined) => void)[] = []
17+
const waiters: ((item: T | undefined) => void)[] = []
2318

2419
return {
2520
push(item) {
2621
const waiter = waiters.shift()
2722
if (waiter) {
28-
waiter({ value: item })
23+
waiter(item)
2924
} else {
3025
buffer.push(item)
3126
}
@@ -34,18 +29,16 @@ export function memoryAdapter<T>(): QueueAdapter<T> {
3429

3530
pull(signal) {
3631
if (buffer.length > 0) {
37-
return Promise.resolve<PullResult<T>>({
38-
// oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- length check guarantees shift returns T
39-
value: buffer.shift() as T,
40-
})
32+
// oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- length check guarantees shift returns T
33+
return Promise.resolve(buffer.shift() as T)
4134
}
4235
if (signal.aborted) {
4336
return Promise.resolve(undefined)
4437
}
45-
return new Promise<PullResult<T> | undefined>((resolve) => {
46-
const waiter = (result: PullResult<T> | undefined) => {
38+
return new Promise<T | undefined>((resolve) => {
39+
const waiter = (item: T | undefined) => {
4740
signal.removeEventListener("abort", onAbort)
48-
resolve(result)
41+
resolve(item)
4942
}
5043
const onAbort = () => {
5144
const idx = waiters.indexOf(waiter)
@@ -104,27 +97,27 @@ export function createQueue<T>(
10497

10598
async function worker(): Promise<void> {
10699
while (!controller.signal.aborted) {
107-
let result: PullResult<T> | undefined
100+
let item: T | undefined
108101
try {
109102
// oxlint-disable-next-line no-await-in-loop -- worker loop is sequential by design
110-
result = await adapter.pull(controller.signal)
103+
item = await adapter.pull(controller.signal)
111104
} catch {
112105
// Adapter error — exit gracefully to avoid unhandled rejections.
113106
break
114107
}
115-
if (result === undefined) {
108+
if (item === undefined) {
116109
break
117110
}
118111
running++
119112
try {
120-
await handler(result.value) // oxlint-disable-line no-await-in-loop
113+
await handler(item) // oxlint-disable-line no-await-in-loop
121114
} catch (error) {
122115
try {
123116
onError?.(
124117
error instanceof Error
125118
? error
126119
: new Error(String(error), { cause: error }),
127-
result.value,
120+
item,
128121
)
129122
} catch {
130123
// Prevent onError failures from killing the worker.

0 commit comments

Comments
 (0)