Skip to content

Commit e938b68

Browse files
authored
Update BufferedAsyncIterator.mts
1 parent 0f17800 commit e938b68

1 file changed

Lines changed: 152 additions & 41 deletions

File tree

src/BufferedAsyncIterator.mts

Lines changed: 152 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,155 @@
1-
class BufferedAsyncIterator<T> implements AsyncIterableIterator<T> {
2-
protected buffer: T[] = []
3-
protected done = false
4-
5-
public constructor(private iterator: AsyncIterator<T>) {}
6-
7-
public async next(): Promise<IteratorResult<T, undefined>> {
8-
if ( this.buffer.length ) {
9-
return { value: this.buffer.shift()!, done: false }
10-
}
11-
if ( this.done ) {
12-
return { value: undefined, done: true }
13-
}
14-
const result = await this.iterator.next()
15-
if ( result.done ) {
16-
this.done = true
17-
}
18-
return result
19-
}
20-
21-
public async peek(): Promise<IteratorResult<T, undefined>> {
22-
if ( this.buffer.length > 0 ) {
23-
return { value: this.buffer[0], done: false }
24-
}
25-
const result = await this.next()
26-
if ( result.done ) {
27-
return { value: undefined, done: true }
28-
}
29-
this.buffer.unshift(result.value)
30-
return { value: result.value, done: false }
31-
}
32-
33-
public clearBuffer() {
34-
this.buffer.length = 0
35-
}
36-
37-
[Symbol.asyncIterator](): BufferedAsyncIterator<T> {
38-
return this
1+
/**
2+
* A peekable, buffered async iterator wrapper.
3+
*
4+
* Semantics:
5+
* - Single logical consumer only: do not call next()/peek()/return()/throw() concurrently.
6+
* - Values are consumed from an internal buffer first, then from the underlying iterator.
7+
* - peek() returns the next value without consuming it.
8+
* - return()/throw() are forwarded to the underlying iterator when available.
9+
*/
10+
class BufferedAsyncIterator<T, TReturn = unknown, TNext = unknown>
11+
implements AsyncIterableIterator<T, TReturn, TNext>
12+
{
13+
/** Internal buffer of values that have been "looked ahead" but not consumed. */
14+
protected buffer: T[] = []
15+
16+
/** Whether the underlying iterator has completed. */
17+
protected done = false
18+
19+
/**
20+
* The final result from the underlying iterator once it is done.
21+
* After completion, subsequent next() calls will return this.
22+
*/
23+
protected finalResult: IteratorResult<T, TReturn> | null = null
24+
25+
public constructor(
26+
private readonly iterator: AsyncIterator<T, TReturn, TNext>
27+
) {}
28+
29+
/**
30+
* Get the next value from the iterator, consuming from the buffer first.
31+
*
32+
* Note: This class assumes a single logical consumer. Do not call next()
33+
* concurrently; always await the previous call before issuing another.
34+
*/
35+
public async next(
36+
...args: [] | [TNext]
37+
): Promise<IteratorResult<T, TReturn>> {
38+
// Serve from buffer if available.
39+
if (this.buffer.length > 0) {
40+
const value = this.buffer.shift() as T
41+
return { value, done: false }
42+
}
43+
44+
// If we've already seen completion, return the cached final result.
45+
if (this.done) {
46+
if (this.finalResult) {
47+
return this.finalResult
48+
}
49+
// Fallback: fabricate a completed result if we somehow have no finalResult.
50+
return { value: undefined as unknown as TReturn, done: true }
51+
}
52+
53+
// Delegate to the underlying iterator.
54+
const result = await this.iterator.next(...args)
55+
56+
if (result.done) {
57+
this.done = true
58+
this.finalResult = result
59+
}
60+
61+
return result
62+
}
63+
64+
/**
65+
* Peek at the next value without consuming it.
66+
*
67+
* If the iterator is done, returns the same completed result as next().
68+
* Otherwise, the value is buffered and will be returned again by next().
69+
*/
70+
public async peek(): Promise<IteratorResult<T, TReturn>> {
71+
// If we already have buffered values, just show the first one.
72+
if (this.buffer.length > 0) {
73+
return { value: this.buffer[0] as T, done: false }
74+
}
75+
76+
// Otherwise, pull from next().
77+
const result = await this.next()
78+
79+
if (result.done) {
80+
// Propagate completion as-is (including any non-undefined TReturn).
81+
return result
82+
}
83+
84+
// Buffer the value so that the next next() call sees it again.
85+
this.buffer.unshift(result.value as T)
86+
return { value: result.value, done: false }
87+
}
88+
89+
/**
90+
* Clear any buffered (peeked but not yet consumed) values.
91+
* Does not affect the underlying iterator state.
92+
*/
93+
public clearBuffer(): void {
94+
this.buffer.length = 0
95+
}
96+
97+
/**
98+
* Signal early completion to the iterator.
99+
*
100+
* Forwards to the underlying iterator.return() when available, ensuring
101+
* cleanup semantics are preserved. Also clears the buffer and marks this
102+
* wrapper as done.
103+
*/
104+
public async return(
105+
value?: TReturn | PromiseLike<TReturn>
106+
): Promise<IteratorResult<T, TReturn>> {
107+
this.done = true
108+
this.clearBuffer()
109+
110+
const resolvedValue =
111+
value instanceof Promise ? await value : (value as TReturn | undefined)
112+
113+
if (typeof this.iterator.return === 'function') {
114+
const result = await this.iterator.return(resolvedValue as TReturn)
115+
this.finalResult = result
116+
return result
39117
}
118+
119+
const fabricated: IteratorResult<T, TReturn> = {
120+
value: (resolvedValue ?? (undefined as unknown as TReturn)) as TReturn,
121+
done: true,
122+
}
123+
this.finalResult = fabricated
124+
return fabricated
125+
}
126+
127+
/**
128+
* Propagate an error into the iterator.
129+
*
130+
* Forwards to the underlying iterator.throw() when available. Also clears
131+
* the buffer and marks this wrapper as done.
132+
*/
133+
public async throw(
134+
e?: unknown
135+
): Promise<IteratorResult<T, TReturn>> {
136+
this.done = true
137+
this.clearBuffer()
138+
139+
if (typeof this.iterator.throw === 'function') {
140+
return this.iterator.throw(e)
141+
}
142+
143+
// If the underlying iterator does not support throw, rethrow the error.
144+
throw e
145+
}
146+
147+
/**
148+
* Async iterable protocol: the iterator is its own async iterable.
149+
*/
150+
public [Symbol.asyncIterator](): BufferedAsyncIterator<T, TReturn, TNext> {
151+
return this
152+
}
40153
}
41154

42-
export {
43-
BufferedAsyncIterator,
44-
}
155+
export { BufferedAsyncIterator }

0 commit comments

Comments
 (0)