diff --git a/src/change_stream.ts b/src/change_stream.ts index 22330e8595e..f99a18c5016 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -705,6 +705,11 @@ export class ChangeStream< return this.cursor?.resumeToken; } + /** Returns the currently buffered documents length of the underlying cursor. */ + bufferedCount(): number { + return this.cursor?.bufferedCount() ?? 0; + } + /** Check if there is any document still available in the Change Stream */ async hasNext(): Promise { this._setIsIterator(); diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index fbdc5e45613..ae79cb5f3f1 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1167,6 +1167,91 @@ describe('Change Streams', function () { ); }); }); + + // Note that `insertOne` in these tests is called with write concern `{ w: 'majority' }`. + // This is to avoid eventual consistency issues that make these tests brittle. + // A change stream won't see the new documents until they are persisted in the oplog and + // some MongoDB server versions (4.2-) used for testing default to write concern `{ w: 1 }`. + // Using write concern `{ w: 'majority' }` ensures that the new documents are persisted in + // the oplog when the `insertOne` promise resolves, making these tests more reliable. + describe('#bufferedCount()', function () { + it( + 'should return 0 before starting to consume the change stream (empty collection)', + { requires: { topology: 'replicaset' } }, + async function () { + changeStream = collection.watch([]); + expect(changeStream.bufferedCount()).to.equal(0); + await changeStream.close(); + } + ); + + it( + 'should return 0 before starting to consume the change stream (non-empty collection)', + { requires: { topology: 'replicaset' } }, + async function () { + // existing documents + await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } }); + + changeStream = collection.watch([]); + + // docs inserted after the change stream was created + await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 4 }, { writeConcern: { w: 'majority' } }); + + expect(changeStream.bufferedCount()).to.equal(0); + await changeStream.close(); + } + ); + + it('should return the underlying cursor buffered document count', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } }); + + expect(changeStream.bufferedCount()).to.equal(0); + + // hasNext will trigger a batch fetch, buffering the documents + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; + + expect(changeStream.bufferedCount()).to.equal(2); + } + }); + + it(`decreases as buffered documents are consumed`, { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } }); + + expect(changeStream.bufferedCount()).to.equal(0); + + // `next` triggers a batch fetch, buffering the documents + // and then consumes one document from that buffer + for (let i = 2; i >= 0; i--) { + await changeStream.next(); + expect(changeStream.bufferedCount()).to.equal(i); + } + + await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } }); + + // `tryNext` also triggers a batch fetch + for (let i = 2; i >= 0; i--) { + await changeStream.tryNext(); + expect(changeStream.bufferedCount()).to.equal(i); + } + } + }); + }); }); describe('startAfter', function () {