Skip to content

Commit f7ea421

Browse files
authored
feat(NODE-7441): add ChangeStream.bufferedCount (#4870)
1 parent ee12d4f commit f7ea421

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed

src/change_stream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,11 @@ export class ChangeStream<
705705
return this.cursor?.resumeToken;
706706
}
707707

708+
/** Returns the currently buffered documents length of the underlying cursor. */
709+
bufferedCount(): number {
710+
return this.cursor?.bufferedCount() ?? 0;
711+
}
712+
708713
/** Check if there is any document still available in the Change Stream */
709714
async hasNext(): Promise<boolean> {
710715
this._setIsIterator();

test/integration/change-streams/change_stream.test.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,91 @@ describe('Change Streams', function () {
11671167
);
11681168
});
11691169
});
1170+
1171+
// Note that `insertOne` in these tests is called with write concern `{ w: 'majority' }`.
1172+
// This is to avoid eventual consistency issues that make these tests brittle.
1173+
// A change stream won't see the new documents until they are persisted in the oplog and
1174+
// some MongoDB server versions (4.2-) used for testing default to write concern `{ w: 1 }`.
1175+
// Using write concern `{ w: 'majority' }` ensures that the new documents are persisted in
1176+
// the oplog when the `insertOne` promise resolves, making these tests more reliable.
1177+
describe('#bufferedCount()', function () {
1178+
it(
1179+
'should return 0 before starting to consume the change stream (empty collection)',
1180+
{ requires: { topology: 'replicaset' } },
1181+
async function () {
1182+
changeStream = collection.watch([]);
1183+
expect(changeStream.bufferedCount()).to.equal(0);
1184+
await changeStream.close();
1185+
}
1186+
);
1187+
1188+
it(
1189+
'should return 0 before starting to consume the change stream (non-empty collection)',
1190+
{ requires: { topology: 'replicaset' } },
1191+
async function () {
1192+
// existing documents
1193+
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
1194+
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });
1195+
1196+
changeStream = collection.watch([]);
1197+
1198+
// docs inserted after the change stream was created
1199+
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });
1200+
await collection.insertOne({ a: 4 }, { writeConcern: { w: 'majority' } });
1201+
1202+
expect(changeStream.bufferedCount()).to.equal(0);
1203+
await changeStream.close();
1204+
}
1205+
);
1206+
1207+
it('should return the underlying cursor buffered document count', {
1208+
metadata: { requires: { topology: 'replicaset' } },
1209+
async test() {
1210+
changeStream = collection.watch([]);
1211+
await initIteratorMode(changeStream);
1212+
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
1213+
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });
1214+
1215+
expect(changeStream.bufferedCount()).to.equal(0);
1216+
1217+
// hasNext will trigger a batch fetch, buffering the documents
1218+
const hasNext = await changeStream.hasNext();
1219+
expect(hasNext).to.be.true;
1220+
1221+
expect(changeStream.bufferedCount()).to.equal(2);
1222+
}
1223+
});
1224+
1225+
it(`decreases as buffered documents are consumed`, {
1226+
metadata: { requires: { topology: 'replicaset' } },
1227+
async test() {
1228+
changeStream = collection.watch([]);
1229+
await initIteratorMode(changeStream);
1230+
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
1231+
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });
1232+
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });
1233+
1234+
expect(changeStream.bufferedCount()).to.equal(0);
1235+
1236+
// `next` triggers a batch fetch, buffering the documents
1237+
// and then consumes one document from that buffer
1238+
for (let i = 2; i >= 0; i--) {
1239+
await changeStream.next();
1240+
expect(changeStream.bufferedCount()).to.equal(i);
1241+
}
1242+
1243+
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
1244+
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });
1245+
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });
1246+
1247+
// `tryNext` also triggers a batch fetch
1248+
for (let i = 2; i >= 0; i--) {
1249+
await changeStream.tryNext();
1250+
expect(changeStream.bufferedCount()).to.equal(i);
1251+
}
1252+
}
1253+
});
1254+
});
11701255
});
11711256

11721257
describe('startAfter', function () {

0 commit comments

Comments
 (0)