Skip to content

Commit a0f527c

Browse files
committed
Simplify control flow in sync iteration
1 parent 947ca21 commit a0f527c

5 files changed

Lines changed: 169 additions & 115 deletions

File tree

.changeset/chilled-books-beg.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fix `No iteration is active` errors due to race conditions when a sync iteration ends (closes https://github.com/powersync-ja/powersync-js/issues/943).

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ export enum PowerSyncControlCommand {
1717
START = 'start',
1818
NOTIFY_TOKEN_REFRESHED = 'refreshed_token',
1919
NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload',
20-
UPDATE_SUBSCRIPTIONS = 'update_subscriptions'
20+
UPDATE_SUBSCRIPTIONS = 'update_subscriptions',
21+
/**
22+
* An `established` or `end` event for response streams.
23+
*/
24+
CONNECTION_STATE = 'connection'
2125
}
2226

2327
export interface BucketStorageListener extends BaseListener {

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 141 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,20 @@ import { throttleLeadingTrailing } from '../../../utils/async.js';
77
import { BucketStorageAdapter, PowerSyncControlCommand } from '../bucket/BucketStorageAdapter.js';
88
import { CrudEntry } from '../bucket/CrudEntry.js';
99
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
10-
import { EstablishSyncStream, Instruction, coreStatusToJs } from './core-instruction.js';
11-
import { injectable, InjectableIterator, map, SimpleAsyncIterator } from '../../../utils/stream_transform.js';
10+
import {
11+
Instruction,
12+
NonInterruptingInstruction,
13+
coreStatusToJs,
14+
isInterruptingInstruction
15+
} from './core-instruction.js';
16+
import {
17+
doneResult,
18+
injectable,
19+
InjectableIterator,
20+
map,
21+
SimpleAsyncIterator,
22+
valueResult
23+
} from '../../../utils/stream_transform.js';
1224
import { StreamingSyncRequestParameterType } from './JsonValue.js';
1325

1426
export enum LockType {
@@ -626,30 +638,62 @@ The next upload iteration will be delayed.`);
626638
`Invalid appMetadata provided. Only string values are allowed. Invalid values: ${invalidMetadata.map(([key, value]) => `${key}: ${value}`).join(', ')}`
627639
);
628640
}
629-
const clientImplementation = resolvedOptions.clientImplementation;
630-
this.updateSyncStatus({ clientImplementation });
631641

632642
await this.requireKeyFormat(true);
633643
return await this.rustSyncIteration(signal, resolvedOptions);
634644
}
635645
});
636646
}
637647

638-
private async receiveSyncLines(data: {
648+
private receiveSyncLines(data: {
639649
options: SyncStreamOptions;
640650
connection: RequiredPowerSyncConnectionOptions;
641-
}): Promise<SimpleAsyncIterator<Uint8Array | string>> {
651+
}): SimpleAsyncIterator<EnqueuedCommand> {
642652
const { options, connection } = data;
643653
const remote = this.options.remote;
644654

645-
if (connection.connectionMethod == SyncStreamConnectionMethod.HTTP) {
646-
return await remote.fetchStream(options);
647-
} else {
648-
return await this.options.remote.socketStreamRaw({
649-
...options,
650-
...{ fetchStrategy: connection.fetchStrategy }
651-
});
652-
}
655+
const openInner = async () => {
656+
if (connection.connectionMethod == SyncStreamConnectionMethod.HTTP) {
657+
return await remote.fetchStream(options);
658+
} else {
659+
return await this.options.remote.socketStreamRaw({
660+
...options,
661+
...{ fetchStrategy: connection.fetchStrategy }
662+
});
663+
}
664+
};
665+
666+
let inner: SimpleAsyncIterator<string | Uint8Array> | undefined;
667+
let done = false;
668+
669+
return {
670+
async next(): Promise<IteratorResult<EnqueuedCommand>> {
671+
if (done) {
672+
return doneResult;
673+
} else if (inner == null) {
674+
inner = await openInner();
675+
// We're connected here, so we can tell the core extension about it.
676+
return valueResult<EnqueuedCommand>({
677+
command: PowerSyncControlCommand.CONNECTION_STATE,
678+
payload: 'established'
679+
});
680+
} else {
681+
const event = await inner.next();
682+
if (event.done) {
683+
done = true;
684+
return valueResult<EnqueuedCommand>({ command: PowerSyncControlCommand.CONNECTION_STATE, payload: 'end' });
685+
} else {
686+
return valueResult<EnqueuedCommand>({
687+
command:
688+
typeof event.value == 'string'
689+
? PowerSyncControlCommand.PROCESS_TEXT_LINE
690+
: PowerSyncControlCommand.PROCESS_BSON_LINE,
691+
payload: event.value
692+
});
693+
}
694+
}
695+
}
696+
};
653697
}
654698

655699
private async rustSyncIteration(
@@ -659,83 +703,40 @@ The next upload iteration will be delayed.`);
659703
const syncImplementation = this;
660704
const adapter = this.options.adapter;
661705
const remote = this.options.remote;
662-
const controller = new AbortController();
663-
const abort = () => {
664-
return controller.abort(signal.reason);
665-
};
666-
signal.addEventListener('abort', abort);
667-
let receivingLines: Promise<void> | null = null;
668-
let hadSyncLine = false;
669706
let hideDisconnectOnRestart = false;
670707

671708
if (signal.aborted) {
672709
throw new AbortOperation('Connection request has been aborted');
673710
}
674711

675-
// Pending sync lines received from the service, as well as local events that trigger a powersync_control
676-
// invocation (local events include refreshed tokens and completed uploads).
677-
// This is a single data stream so that we can handle all control calls from a single place.
678-
let controlInvocations: InjectableIterator<EnqueuedCommand> | null = null;
679-
680-
async function connect(instr: EstablishSyncStream) {
681-
const syncOptions: SyncStreamOptions = {
682-
path: '/sync/stream',
683-
abortSignal: controller.signal,
684-
data: instr.request
712+
function startCommand() {
713+
const options: any = {
714+
parameters: resolvedOptions.params,
715+
app_metadata: resolvedOptions.appMetadata,
716+
active_streams: syncImplementation.activeStreams,
717+
include_defaults: resolvedOptions.includeDefaultStreams
685718
};
686-
687-
controlInvocations = injectable(
688-
map(
689-
await syncImplementation.receiveSyncLines({
690-
options: syncOptions,
691-
connection: resolvedOptions
692-
}),
693-
(line) => {
694-
if (typeof line == 'string') {
695-
return {
696-
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
697-
payload: line
698-
};
699-
} else {
700-
return {
701-
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
702-
payload: line
703-
};
704-
}
705-
}
706-
)
707-
);
708-
709-
// The rust client will set connected: true after the first sync line because that's when it gets invoked, but
710-
// we're already connected here and can report that.
711-
syncImplementation.updateSyncStatus({ connected: true });
712-
713-
try {
714-
while (true) {
715-
let event = await controlInvocations.next();
716-
if (event.done) {
717-
break;
718-
}
719-
720-
const line = event.value;
721-
await control(line.command, line.payload);
722-
723-
if (!hadSyncLine) {
724-
syncImplementation.triggerCrudUpload();
725-
hadSyncLine = true;
726-
}
727-
}
728-
} finally {
729-
abort();
730-
signal.removeEventListener('abort', abort);
719+
if (resolvedOptions.serializedSchema) {
720+
options.schema = resolvedOptions.serializedSchema;
731721
}
722+
723+
return invokePowerSyncControl(PowerSyncControlCommand.START, JSON.stringify(options));
732724
}
733725

734726
async function stop() {
735-
await control(PowerSyncControlCommand.STOP);
727+
const instructions = await invokePowerSyncControl(PowerSyncControlCommand.STOP);
728+
for (const instruction of instructions) {
729+
// We don't need to handle interrupting instructions since we're unconditionally ending the sync iteration at
730+
// this point.
731+
if (isInterruptingInstruction(instruction)) continue;
732+
await handleInstruction(instruction);
733+
}
736734
}
737735

738-
async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) {
736+
async function invokePowerSyncControl(
737+
op: PowerSyncControlCommand,
738+
payload?: Uint8Array | string
739+
): Promise<Instruction[]> {
739740
const rawResponse = await adapter.control(op, payload ?? null);
740741
const logger = syncImplementation.logger;
741742
logger.trace(
@@ -749,10 +750,11 @@ The next upload iteration will be delayed.`);
749750
// Evidently we have a working connection here, otherwise powersync_control would have failed.
750751
syncImplementation.connectionMayHaveChanged = false;
751752
}
752-
await handleInstructions(JSON.parse(rawResponse));
753+
754+
return JSON.parse(rawResponse);
753755
}
754756

755-
async function handleInstruction(instruction: Instruction) {
757+
async function handleInstruction(instruction: NonInterruptingInstruction) {
756758
if ('LogLine' in instruction) {
757759
switch (instruction.LogLine.severity) {
758760
case 'DEBUG':
@@ -767,13 +769,6 @@ The next upload iteration will be delayed.`);
767769
}
768770
} else if ('UpdateSyncStatus' in instruction) {
769771
syncImplementation.updateSyncStatus(coreStatusToJs(instruction.UpdateSyncStatus.status));
770-
} else if ('EstablishSyncStream' in instruction) {
771-
if (receivingLines != null) {
772-
// Already connected, this shouldn't happen during a single iteration.
773-
throw 'Unexpected request to establish sync stream, already connected';
774-
}
775-
776-
receivingLines = connect(instruction.EstablishSyncStream);
777772
} else if ('FetchCredentials' in instruction) {
778773
if (instruction.FetchCredentials.did_expire) {
779774
remote.invalidateCredentials();
@@ -783,16 +778,13 @@ The next upload iteration will be delayed.`);
783778
// Restart iteration after the credentials have been refreshed.
784779
remote.fetchCredentials().then(
785780
(_) => {
786-
controlInvocations?.inject({ command: PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED });
781+
syncImplementation.notifyCompletedUploads?.();
787782
},
788783
(err) => {
789784
syncImplementation.logger.warn('Could not prefetch credentials', err);
790785
}
791786
);
792787
}
793-
} else if ('CloseSyncStream' in instruction) {
794-
controller.abort();
795-
hideDisconnectOnRestart = instruction.CloseSyncStream.hide_disconnect;
796788
} else if ('FlushFileSystem' in instruction) {
797789
// Not necessary on JS platforms.
798790
} else if ('DidCompleteSync' in instruction) {
@@ -804,35 +796,74 @@ The next upload iteration will be delayed.`);
804796
}
805797
}
806798

807-
async function handleInstructions(instructions: Instruction[]) {
808-
for (const instr of instructions) {
809-
await handleInstruction(instr);
810-
}
811-
}
812-
813799
try {
814-
const options: any = {
815-
parameters: resolvedOptions.params,
816-
app_metadata: resolvedOptions.appMetadata,
817-
active_streams: this.activeStreams,
818-
include_defaults: resolvedOptions.includeDefaultStreams
819-
};
820-
if (resolvedOptions.serializedSchema) {
821-
options.schema = resolvedOptions.serializedSchema;
800+
const defaultResult = { immediateRestart: false };
801+
// Pending sync lines received from the service, as well as local events that trigger a powersync_control
802+
// invocation (local events include refreshed tokens and completed uploads).
803+
// This is a single data stream so that we can handle all control calls from a single place.
804+
let controlInvocations: InjectableIterator<EnqueuedCommand> | null = null;
805+
806+
for (const startInstruction of await startCommand()) {
807+
if ('EstablishSyncStream' in startInstruction) {
808+
const syncOptions: SyncStreamOptions = {
809+
path: '/sync/stream',
810+
abortSignal: signal,
811+
data: startInstruction.EstablishSyncStream.request
812+
};
813+
814+
controlInvocations = injectable(
815+
syncImplementation.receiveSyncLines({
816+
options: syncOptions,
817+
connection: resolvedOptions
818+
})
819+
);
820+
} else if ('CloseSyncStream' in startInstruction) {
821+
return defaultResult;
822+
} else {
823+
await handleInstruction(startInstruction);
824+
}
822825
}
823-
824-
await control(PowerSyncControlCommand.START, JSON.stringify(options));
826+
if (controlInvocations == null) return defaultResult;
825827

826828
this.notifyCompletedUploads = () => {
827-
controlInvocations?.inject({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED });
829+
controlInvocations.inject({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED });
828830
};
829831
this.handleActiveStreamsChange = () => {
830-
controlInvocations?.inject({
832+
controlInvocations.inject({
831833
command: PowerSyncControlCommand.UPDATE_SUBSCRIPTIONS,
832834
payload: JSON.stringify(this.activeStreams)
833835
});
834836
};
835-
await receivingLines;
837+
838+
let hadSyncLine = false;
839+
loop: while (true) {
840+
const { done, value } = await controlInvocations.next();
841+
if (done) break;
842+
843+
if (!hadSyncLine) {
844+
// Trigger a local CRUD upload when the first sync line has been received, this allows uploading local changes
845+
// that have been made while offline or disconnected.
846+
if (
847+
value.command == PowerSyncControlCommand.PROCESS_TEXT_LINE ||
848+
value.command == PowerSyncControlCommand.PROCESS_BSON_LINE
849+
) {
850+
hadSyncLine = true;
851+
this.triggerCrudUpload?.();
852+
}
853+
}
854+
855+
const instructions = await invokePowerSyncControl(value.command, value.payload);
856+
for (const instruction of instructions) {
857+
if ('EstablishSyncStream' in instruction) {
858+
this.logger.warn('Received EstablishSyncStream while already connected.');
859+
} else if ('CloseSyncStream' in instruction) {
860+
hideDisconnectOnRestart = instruction.CloseSyncStream.hide_disconnect;
861+
break loop;
862+
} else {
863+
await handleInstruction(instruction);
864+
}
865+
}
866+
}
836867
} finally {
837868
this.notifyCompletedUploads = this.handleActiveStreamsChange = undefined;
838869
await stop();

packages/common/src/client/sync/stream/core-instruction.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,19 @@ import { FULL_SYNC_PRIORITY } from '../../../db/crud/SyncProgress.js';
55
* An internal instruction emitted by the sync client in the core extension in response to the JS
66
* SDK passing sync data into the extension.
77
*/
8-
export type Instruction =
8+
export type Instruction = InterruptingInstruction | NonInterruptingInstruction;
9+
10+
export type InterruptingInstruction =
11+
| { EstablishSyncStream: EstablishSyncStream }
12+
| { CloseSyncStream: { hide_disconnect: boolean } };
13+
14+
/**
15+
* An {@link Instruction} that doesn't start or stop a sync iteration.
16+
*/
17+
export type NonInterruptingInstruction =
918
| { LogLine: LogLine }
1019
| { UpdateSyncStatus: UpdateSyncStatus }
11-
| { EstablishSyncStream: EstablishSyncStream }
1220
| { FetchCredentials: FetchCredentials }
13-
| { CloseSyncStream: { hide_disconnect: boolean } }
1421
| { FlushFileSystem: any }
1522
| { DidCompleteSync: any };
1623

@@ -96,3 +103,7 @@ export function coreStatusToJs(status: CoreSyncStatus): sync_status.SyncStatusOp
96103
priorityStatusEntries: status.priority_status.map(priorityToJs)
97104
};
98105
}
106+
107+
export function isInterruptingInstruction(instruction: Instruction): instruction is InterruptingInstruction {
108+
return 'EstablishSyncStream' in instruction || 'CloseSyncStream' in instruction;
109+
}

0 commit comments

Comments
 (0)