Skip to content

Commit c9f1523

Browse files
committed
New: Expose length and closed properties
1 parent 1a20395 commit c9f1523

2 files changed

Lines changed: 169 additions & 70 deletions

File tree

src/AsyncIterableBuffer.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,21 @@ export class AsyncIterableBuffer<T> implements AsyncIterableIterator<T, void> {
22

33
#dataBuffer: T[] = [];
44
#pendingRequestResolverQueue: Array<(result: IteratorYieldResult<T> | IteratorReturnResult<any>) => void> = [];
5-
#done = false;
5+
#closed = false;
6+
7+
get closed() {
8+
return this.#closed;
9+
}
10+
11+
get length() {
12+
return this.#dataBuffer.length;
13+
}
614

715
/**
816
* Push a new value into the buffer. Throws an error if the buffer has been ended.
917
*/
1018
push(value: T) {
11-
if (this.#done)
19+
if (this.#closed)
1220
throw new Error('Iterable buffer is already closed');
1321

1422
if (this.#pendingRequestResolverQueue.length) {
@@ -24,7 +32,7 @@ export class AsyncIterableBuffer<T> implements AsyncIterableIterator<T, void> {
2432
* Mark the buffer as completed and resolve any pending requests with { done: true }.
2533
*/
2634
end() {
27-
this.#done = true;
35+
this.#closed = true;
2836
while (this.#pendingRequestResolverQueue.length) {
2937
const resolve = this.#pendingRequestResolverQueue.splice(0, 1)[0];
3038
resolve({ done: true, value: undefined });
@@ -41,7 +49,7 @@ export class AsyncIterableBuffer<T> implements AsyncIterableIterator<T, void> {
4149
return { done: false, value };
4250
}
4351

44-
if (this.#done)
52+
if (this.#closed)
4553
return { done: true, value: undefined };
4654

4755
return new Promise(resolve => {

tests/unit/AsyncIterableBuffer.test.ts

Lines changed: 157 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2,96 +2,187 @@ import { AsyncIterableBuffer } from '../../src';
22

33
describe('AsyncIterableBuffer', () => {
44

5-
it('yields pushed values in order', async () => {
5+
describe('push()', () => {
6+
7+
it('throws an error if push() is called after end()', () => {
8+
const buffer = new AsyncIterableBuffer<number>();
9+
buffer.end();
10+
expect(() => buffer.push(1)).toThrow('Iterable buffer is already closed');
11+
});
12+
13+
it('throws an error if push() is called after return()', () => {
14+
const buffer = new AsyncIterableBuffer<number>();
15+
buffer.return();
16+
expect(() => buffer.push(1)).toThrow('Iterable buffer is already closed');
17+
});
18+
19+
it('adds a value to the buffer', () => {
20+
const buffer = new AsyncIterableBuffer<number>();
21+
buffer.push(1);
22+
expect(buffer).toHaveLength(1);
23+
});
24+
25+
it('resolves pending next() when push() is called', async () => {
26+
27+
const buffer = new AsyncIterableBuffer<string>();
28+
const nextPromise = buffer.next();
29+
buffer.push('hello');
30+
const result = await nextPromise;
31+
expect(result).toEqual({ done: false, value: 'hello' });
32+
});
33+
});
634

7-
const buffer = new AsyncIterableBuffer<number>();
8-
buffer.push(1);
9-
buffer.push(2);
10-
buffer.push(3);
11-
buffer.end();
35+
describe('closed', () => {
36+
37+
it('is false by default', () => {
38+
const buffer = new AsyncIterableBuffer<number>();
39+
expect(buffer.closed).toBe(false);
40+
});
41+
42+
it('remains false after next() is called', () => {
43+
const buffer = new AsyncIterableBuffer<number>();
44+
buffer.next();
45+
expect(buffer.closed).toBe(false);
46+
});
47+
48+
it('is true after end() is called', () => {
49+
const buffer = new AsyncIterableBuffer<number>();
50+
buffer.end();
51+
expect(buffer.closed).toBe(true);
52+
});
53+
54+
it('is true after return() is called', () => {
55+
const buffer = new AsyncIterableBuffer<number>();
56+
buffer.return();
57+
expect(buffer.closed).toBe(true);
58+
});
59+
});
1260

13-
const results: number[] = [];
14-
for await (const value of buffer)
15-
results.push(value);
61+
describe('length', () => {
1662

17-
expect(results).toEqual([1, 2, 3]);
18-
});
63+
it('is 0 by default', () => {
64+
const buffer = new AsyncIterableBuffer<number>();
65+
expect(buffer).toHaveProperty('length', 0);
66+
});
1967

20-
it('resolves pending next() when push() is called', async () => {
68+
it('returns the number of items in the buffer', () => {
69+
const buffer = new AsyncIterableBuffer<number>();
70+
buffer.push(1);
71+
buffer.push(2);
72+
expect(buffer).toHaveProperty('length', 2);
73+
});
2174

22-
const buffer = new AsyncIterableBuffer<string>();
23-
const nextPromise = buffer.next();
24-
buffer.push('hello');
25-
const result = await nextPromise;
26-
expect(result).toEqual({ done: false, value: 'hello' });
27-
});
75+
it('decreases when items are consumed', async () => {
76+
const buffer = new AsyncIterableBuffer<number>();
77+
buffer.push(1);
78+
buffer.push(2);
79+
buffer.push(3);
2880

29-
it('returns done:true when buffer is ended and no items remain', async () => {
81+
await buffer.next();
3082

31-
const buffer = new AsyncIterableBuffer<number>();
32-
buffer.end();
33-
const result = await buffer.next();
34-
expect(result).toEqual({ done: true, value: undefined });
35-
});
83+
expect(buffer).toHaveProperty('length', 2);
3684

37-
it('throws an error if push() is called after end()', () => {
85+
await buffer.next();
3886

39-
const buffer = new AsyncIterableBuffer<number>();
40-
buffer.end();
41-
expect(() => buffer.push(1)).toThrow('Iterable buffer is already closed');
42-
});
87+
expect(buffer).toHaveProperty('length', 1);
88+
});
89+
90+
it('keeps number of unconsumed items in the buffer after end() is called', async () => {
91+
const buffer = new AsyncIterableBuffer<number>();
92+
buffer.push(1);
93+
buffer.push(2);
94+
buffer.push(3);
4395

44-
it('[Symbol.asyncIterator]() returns the instance itself', () => {
96+
await buffer.next();
97+
buffer.end();
4598

46-
const buffer = new AsyncIterableBuffer<number>();
47-
expect(buffer[Symbol.asyncIterator]()).toBe(buffer);
99+
expect(buffer).toHaveProperty('length', 2);
100+
});
48101
});
49102

50-
it('allows async iteration to wait for new pushes', async () => {
103+
describe('next()', () => {
51104

52-
const buffer = new AsyncIterableBuffer<number>();
53-
const results: number[] = [];
105+
it('yields pushed values in order', async () => {
54106

55-
// Start async iteration
56-
(async () => {
57-
for await (const num of buffer)
58-
results.push(num);
59-
})();
107+
const buffer = new AsyncIterableBuffer<number>();
108+
buffer.push(1);
109+
buffer.push(2);
110+
buffer.push(3);
111+
buffer.end();
60112

61-
// Wait briefly, then push values.
62-
await new Promise(resolve => setTimeout(resolve, 5));
63-
buffer.push(10);
64-
await new Promise(resolve => setTimeout(resolve, 5));
65-
buffer.push(20);
66-
buffer.end();
113+
const results: number[] = [];
114+
for await (const value of buffer)
115+
results.push(value);
67116

68-
// Wait for the iteration to complete.
69-
await new Promise(resolve => setTimeout(resolve, 5));
70-
expect(results).toEqual([10, 20]);
71-
});
117+
expect(results).toEqual([1, 2, 3]);
118+
});
119+
120+
it('returns done:true when buffer is ended and no items remain', async () => {
121+
122+
const buffer = new AsyncIterableBuffer<number>();
123+
buffer.end();
124+
const result = await buffer.next();
125+
expect(result).toEqual({ done: true, value: undefined });
126+
});
72127

73-
it('flushes pending next() promises when return() is called', async () => {
128+
it('next() returns done:true after return() is called', async () => {
74129

75-
const buffer = new AsyncIterableBuffer<number>();
76-
const nextPromise = buffer.next();
77-
const ret = await buffer.return();
78-
expect(ret).toEqual({ done: true, value: undefined });
79-
const nextResult = await nextPromise;
80-
expect(nextResult).toEqual({ done: true, value: undefined });
130+
const buffer = new AsyncIterableBuffer<number>();
131+
await buffer.return();
132+
const result = await buffer.next();
133+
expect(result).toEqual({ done: true, value: undefined });
134+
});
135+
136+
it('allows async iteration to wait for new pushes', async () => {
137+
138+
const buffer = new AsyncIterableBuffer<number>();
139+
const results: number[] = [];
140+
141+
// Start async iteration
142+
(async () => {
143+
for await (const num of buffer)
144+
results.push(num);
145+
})();
146+
147+
// Wait briefly, then push values.
148+
await new Promise(resolve => setTimeout(resolve, 5));
149+
buffer.push(10);
150+
await new Promise(resolve => setTimeout(resolve, 5));
151+
buffer.push(20);
152+
buffer.end();
153+
154+
// Wait for the iteration to complete.
155+
await new Promise(resolve => setTimeout(resolve, 5));
156+
expect(results).toEqual([10, 20]);
157+
});
81158
});
82159

83-
it('prevents push() after return() is called', async () => {
160+
describe('[Symbol.asyncIterator]()', () => {
84161

85-
const buffer = new AsyncIterableBuffer<number>();
86-
await buffer.return();
87-
expect(() => buffer.push(42)).toThrow('Iterable buffer is already closed');
162+
it('returns the instance itself', () => {
163+
164+
const buffer = new AsyncIterableBuffer<number>();
165+
expect(buffer[Symbol.asyncIterator]()).toBe(buffer);
166+
});
88167
});
89168

90-
it('next() returns done:true after return() is called', async () => {
169+
describe('return()', () => {
170+
171+
it('flushes pending next() promises when return() is called', async () => {
172+
173+
const buffer = new AsyncIterableBuffer<number>();
174+
const nextPromise = buffer.next();
175+
const ret = await buffer.return();
176+
expect(ret).toEqual({ done: true, value: undefined });
177+
const nextResult = await nextPromise;
178+
expect(nextResult).toEqual({ done: true, value: undefined });
179+
});
180+
181+
it('prevents push() after return() is called', async () => {
91182

92-
const buffer = new AsyncIterableBuffer<number>();
93-
await buffer.return();
94-
const result = await buffer.next();
95-
expect(result).toEqual({ done: true, value: undefined });
183+
const buffer = new AsyncIterableBuffer<number>();
184+
await buffer.return();
185+
expect(() => buffer.push(42)).toThrow('Iterable buffer is already closed');
186+
});
96187
});
97188
});

0 commit comments

Comments
 (0)