Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@ export class ChangeStream<
return this.cursor?.resumeToken;
}

/** Returns the currently buffered documents length of the underlying cursor. */
bufferedCount(): number {
return this.cursor?.bufferedCount() ?? 0;
}

/** Check if there is any document still available in the Change Stream */
async hasNext(): Promise<boolean> {
this._setIsIterator();
Expand Down
85 changes: 85 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,91 @@ describe('Change Streams', function () {
);
});
});

// Note that `insertOne` in these tests is called with write concern `{ w: 'majority' }`.
// This is to avoid eventual consistency issues that make these tests brittle.
// A change stream won't see the new documents until they are persisted in the oplog and
// some MongoDB server versions (4.2-) used for testing default to write concern `{ w: 1 }`.
// Using write concern `{ w: 'majority' }` ensures that the new documents are persisted in
// the oplog when the `insertOne` promise resolves, making these tests more reliable.
describe('#bufferedCount()', function () {
it(
'should return 0 before starting to consume the change stream (empty collection)',
{ requires: { topology: 'replicaset' } },
async function () {
changeStream = collection.watch([]);
expect(changeStream.bufferedCount()).to.equal(0);
await changeStream.close();
}
);

it(
'should return 0 before starting to consume the change stream (non-empty collection)',
{ requires: { topology: 'replicaset' } },
async function () {
// existing documents
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });

changeStream = collection.watch([]);

// docs inserted after the change stream was created
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 4 }, { writeConcern: { w: 'majority' } });

expect(changeStream.bufferedCount()).to.equal(0);
await changeStream.close();
}
);

it('should return the underlying cursor buffered document count', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });

expect(changeStream.bufferedCount()).to.equal(0);

// hasNext will trigger a batch fetch, buffering the documents
const hasNext = await changeStream.hasNext();
expect(hasNext).to.be.true;

expect(changeStream.bufferedCount()).to.equal(2);
}
});

it(`decreases as buffered documents are consumed`, {
metadata: { requires: { topology: 'replicaset' } },
async test() {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });

expect(changeStream.bufferedCount()).to.equal(0);

// `next` triggers a batch fetch, buffering the documents
// and then consumes one document from that buffer
for (let i = 2; i >= 0; i--) {
await changeStream.next();
expect(changeStream.bufferedCount()).to.equal(i);
}

await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });

// `tryNext` also triggers a batch fetch
for (let i = 2; i >= 0; i--) {
await changeStream.tryNext();
expect(changeStream.bufferedCount()).to.equal(i);
}
}
});
});
});

describe('startAfter', function () {
Expand Down