Skip to content

Commit 46b3e4d

Browse files
committed
Avoid unecessary completed upload notification
1 parent f3ac89b commit 46b3e4d

2 files changed

Lines changed: 6 additions & 9 deletions

File tree

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,6 @@ export type SubscribedStream = {
208208
params: Record<string, any> | null;
209209
};
210210

211-
// The priority we assume when we receive checkpoint lines where no priority is set.
212-
// This is the default priority used by the sync service, but can be set to an arbitrary
213-
// value since sync services without priorities also won't send partial sync completion
214-
// messages.
215-
const FALLBACK_PRIORITY = 3;
216-
217211
export abstract class AbstractStreamingSyncImplementation
218212
extends BaseObserver<StreamingSyncImplementationListener>
219213
implements StreamingSyncImplementation
@@ -371,7 +365,9 @@ The next upload iteration will be delayed.`);
371365
} else {
372366
// Uploading is completed
373367
const neededUpdate = await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
374-
if (neededUpdate == false && checkedCrudItem != null) {
368+
if (neededUpdate) {
369+
this.notifyCompletedUploads?.();
370+
} else if (checkedCrudItem != null) {
375371
// Only log this if there was something to upload
376372
this.logger.debug('Upload complete, no write checkpoint needed.');
377373
}
@@ -394,8 +390,6 @@ The next upload iteration will be delayed.`);
394390
`Caught exception when uploading. Upload will retry after a delay. Exception: ${(ex as Error).message}`
395391
);
396392
} finally {
397-
this.notifyCompletedUploads?.();
398-
399393
this.updateSyncStatus({
400394
dataFlow: {
401395
uploading: false

packages/common/src/utils/stream_transform.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ export interface NotifyIterator extends SimpleAsyncIterator<undefined> {
127127

128128
/**
129129
* An iterator dispatching notifications without associated data.
130+
*
131+
* Backpressure is handled by folding calls to `notify()`: The iterator emits an item once there's been at least one
132+
* `notify()` call since the last completed call to `next()`.
130133
*/
131134
export function notifyIterator(): NotifyIterator {
132135
// Possible states:

0 commit comments

Comments
 (0)