@@ -9,14 +9,26 @@ import { ChangeStreamInvalidatedError } from './ChangeStream.js';
99
1010export interface RawChangeStreamOptions {
1111 signal ?: AbortSignal ;
12+
1213 /**
1314 * How long to wait for new data per batch (max time for long-polling).
15+ *
16+ * This is used for maxTimeMS for the getMore command.
1417 */
1518 maxAwaitTimeMS : number ;
19+
1620 /**
1721 * Timeout for the initial aggregate command.
1822 */
1923 maxTimeMS : number ;
24+
25+ /**
26+ * batchSize for the getMore commands.
27+ *
28+ * The aggregate command always uses a batchSize of 1.
29+ *
30+ * After a timeout error, the batchSize will be reduced and ramped up again.
31+ */
2032 batchSize : number ;
2133
2234 /**
@@ -63,6 +75,8 @@ export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[],
6375 }
6476 let lastResumeToken : unknown | null = null ;
6577
78+ const batchSizer = new AdaptiveBatchSize ( options . batchSize ) ;
79+
6680 // > If the server supports sessions, the resume attempt MUST use the same session as the previous attempt's command.
6781 const session = db . client . startSession ( ) ;
6882 await using _ = { [ Symbol . asyncDispose ] : ( ) => session . endSession ( ) } ;
@@ -79,9 +93,15 @@ export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[],
7993 changeStreamStage . resumeAfter = lastResumeToken ;
8094 innerPipeline = [ { $changeStream : changeStreamStage } , ...rest ] ;
8195 }
82- const inner = rawChangeStreamInner ( session , db , innerPipeline , {
83- ...options
84- } ) ;
96+ const inner = rawChangeStreamInner (
97+ session ,
98+ db ,
99+ innerPipeline ,
100+ {
101+ ...options
102+ } ,
103+ batchSizer
104+ ) ;
85105 for await ( let batch of inner ) {
86106 yield batch ;
87107 lastResumeToken = batch . resumeToken ;
@@ -111,7 +131,8 @@ async function* rawChangeStreamInner(
111131 session : mongo . ClientSession ,
112132 db : mongo . Db ,
113133 pipeline : mongo . Document [ ] ,
114- options : RawChangeStreamOptions
134+ options : RawChangeStreamOptions ,
135+ batchSizer : AdaptiveBatchSize
115136) : AsyncGenerator < ChangeStreamBatch > {
116137 // See specs:
117138 // https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md
@@ -123,7 +144,6 @@ async function* rawChangeStreamInner(
123144 */
124145 let nsCollection : string | null = null ;
125146
126- const maxTimeMS = options . maxAwaitTimeMS ;
127147 const batchSize = options . batchSize ;
128148 const collection = options . collection ?? 1 ;
129149 let abortPromise : Promise < any > | null = null ;
@@ -149,7 +169,11 @@ async function* rawChangeStreamInner(
149169 {
150170 aggregate : collection ,
151171 pipeline,
152- cursor : { batchSize } ,
172+ cursor : {
173+ // Always use batchSize of 1 for the initial aggregate command,
174+ // to maximize the chance of success in case of timeouts.
175+ batchSize : 1
176+ } ,
153177 maxTimeMS : options . maxTimeMS
154178 } ,
155179 { session, raw : true }
@@ -183,8 +207,8 @@ async function* rawChangeStreamInner(
183207 {
184208 getMore : cursorId ,
185209 collection : nsCollection ,
186- batchSize,
187- maxTimeMS
210+ batchSize : batchSizer . next ( ) ,
211+ maxTimeMS : options . maxAwaitTimeMS
188212 } ,
189213 { session, raw : true }
190214 )
@@ -196,6 +220,9 @@ async function* rawChangeStreamInner(
196220 }
197221
198222 if ( isResumableChangeStreamError ( e ) ) {
223+ if ( isTimeoutError ( e ) ) {
224+ batchSizer . reduceAfterError ( ) ;
225+ }
199226 throw new ResumableChangeStreamError ( e . message , { cause : e } ) ;
200227 }
201228 throw mapChangeStreamError ( e ) ;
@@ -233,6 +260,55 @@ async function* rawChangeStreamInner(
233260
234261class ResumableChangeStreamError extends Error { }
235262
263+ /**
264+ * After a timeout error, we reduce the batch size to this number, then multiply by this for each batch.
265+ *
266+ * Must be an integer >= 2 with the current implementation.
267+ */
268+ const BATCH_SIZE_MULTIPLIER = 2 ;
269+
270+ /**
271+ * Manage batch sizes after timeout errors.
272+ *
273+ * This starts with the initial batch size.
274+ *
275+ * After a timeout error, we reduce the batch size for aggregate command to 1,
276+ * then multiply by BATCH_SIZE_MULTIPLIER for each subsequent batch, until we reach the initial batch size again.
277+ *
278+ * We use this to protect against timeout errors:
279+ * [PSYNC_S1345] Timeout while reading MongoDB ChangeStream
280+ *
281+ * When we run into that, the stream is restarted automatically. starting with an aggregate command
282+ * with batchSize: 1.
283+ *
284+ * We then ramp up the batchSize for getMore commands.
285+ */
286+ class AdaptiveBatchSize {
287+ private nextBatchSize : number ;
288+
289+ constructor ( private maxBatchSize : number ) {
290+ this . nextBatchSize = maxBatchSize ;
291+ }
292+
293+ /**
294+ * Get the next batchSize for a getMore command.
295+ */
296+ next ( ) {
297+ const current = this . nextBatchSize ;
298+ this . nextBatchSize = Math . min ( this . maxBatchSize , this . nextBatchSize * BATCH_SIZE_MULTIPLIER ) ;
299+ return current ;
300+ }
301+
302+ /**
303+ * After a timeout error, the next aggregate command will start with a batchSize of 1.
304+ *
305+ * The next getMore will then have a batchSize of BATCH_SIZE_MULTIPLIER.
306+ */
307+ reduceAfterError ( ) {
308+ this . nextBatchSize = BATCH_SIZE_MULTIPLIER ;
309+ }
310+ }
311+
236312type RawBsonValue =
237313 | string
238314 | number
@@ -308,31 +384,39 @@ export function parseChangeDocument(buffer: Buffer): ProjectedChangeStreamDocume
308384 return doc as any ;
309385}
310386
387+ function isTimeoutError ( e : unknown ) {
388+ return isMongoNetworkTimeoutError ( e ) || ( isMongoServerError ( e ) && e . codeName == 'MaxTimeMSExpired' ) ;
389+ }
390+
311391function isResumableChangeStreamError ( e : unknown ) {
312392 // See: https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md#resumable-error
313393 if ( ! isMongoServerError ( e ) ) {
314- // Any error encountered which is not a server error (e.g. a timeout error or network error)
394+ // Any error encountered which is not a server error (e.g. a socket timeout error or network error)
315395 return true ;
316396 } else if ( e . codeName == 'CursorNotFound' ) {
317397 // A server error with code 43 (CursorNotFound)
318398 return true ;
319399 } else if ( e . hasErrorLabel ( 'ResumableChangeStreamError' ) ) {
320400 // For servers with wire version 9 or higher (server version 4.4 or higher), any server error with the ResumableChangeStreamError error label.
321401 return true ;
402+ } else if ( e . codeName == 'MaxTimeMSExpired' ) {
403+ // Our own exception for MaxTimeMSExpired.
404+ // This can help us retry faster, with a smaller batch size (if initialBatchSize is set to 1), which should hopefully avoid the timeout.
405+ return true ;
322406 } else {
323- // We ignore servers with wire version less than 9, since we only support MongoDB 6.0+.
407+ // Other errors are not retried.
408+ // We ignore the spec for servers with wire version less than 9, since we only support MongoDB 6.0+.
324409 return false ;
325410 }
326411}
327412
328413export function mapChangeStreamError ( e : unknown ) {
329- if ( isMongoNetworkTimeoutError ( e ) ) {
330- // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
331- // We wrap the error to make it more useful.
332- throw new DatabaseConnectionError ( ErrorCode . PSYNC_S1345 , `Timeout while reading MongoDB ChangeStream` , e ) ;
333- } else if ( isMongoServerError ( e ) && e . codeName == 'MaxTimeMSExpired' ) {
334- // maxTimeMS was reached. Example message:
335- // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit
414+ if ( isTimeoutError ( e ) ) {
415+ // For isMongoNetworkTimeoutError():
416+ // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
417+ // We wrap the error to make it more useful.
418+ // Example for MaxTimeMSExpired:
419+ // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit
336420 throw new DatabaseConnectionError ( ErrorCode . PSYNC_S1345 , `Timeout while reading MongoDB ChangeStream` , e ) ;
337421 } else if (
338422 isMongoServerError ( e ) &&
0 commit comments