Skip to content

Commit 8f8ef1c

Browse files
authored
Use custom mutex implementation instead of async-mutex (#900)
1 parent 1bbba11 commit 8f8ef1c

38 files changed

Lines changed: 1635 additions & 888 deletions

.changeset/shaggy-games-approve.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
'@powersync/react-native': minor
3+
'@powersync/common': minor
4+
'@powersync/web': minor
5+
'@powersync/op-sqlite': patch
6+
'@powersync/adapter-sql-js': patch
7+
'@powersync/capacitor': patch
8+
'@powersync/node': patch
9+
'@powersync/nuxt': patch
10+
---
11+
12+
Remove `async-mutex` dependency in favor of internal implementation.

demos/react-multi-client/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
"@supabase/supabase-js": "^2.43.1",
1717
"@vitejs/plugin-react": "^4.2.1",
1818
"@webflow/webflow-cli": "^1.6.9",
19-
"async-mutex": "^0.5.0",
2019
"autoprefixer": "10.4.14",
2120
"lodash": "^4.17.21",
2221
"postcss": "8.4.27",

demos/react-multi-client/src/library/SupabaseConnector.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
BaseListener,
44
BaseObserver,
55
CrudEntry,
6+
Mutex,
67
PowerSyncBackendConnector,
78
UpdateType
89
} from '@powersync/web';
@@ -14,8 +15,6 @@ export interface SupabaseConnectorListener extends BaseListener {
1415
onCRUDEvent: (event: { crudType: UpdateType; elapsedTimeMs: number }) => void;
1516
}
1617

17-
import { Mutex } from 'async-mutex';
18-
1918
export class SupabaseConnector extends BaseObserver<SupabaseConnectorListener> implements PowerSyncBackendConnector {
2019
static SHARED_MUTEX = new Mutex();
2120
readonly client: SupabaseClient;

packages/adapter-sql-js/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@
3232
"test": "vitest"
3333
},
3434
"dependencies": {
35-
"@powersync/common": "workspace:^",
36-
"async-mutex": "catalog:"
35+
"@powersync/common": "workspace:^"
3736
},
3837
"devDependencies": {
3938
"@powersync/sql-js": "0.0.8",

packages/adapter-sql-js/rollup.config.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,6 @@ export default () => {
3131
]
3232
})
3333
],
34-
external: ['@powersync/common', 'async-mutex']
34+
external: ['@powersync/common']
3535
};
3636
};

packages/adapter-sql-js/src/SQLJSAdapter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ import {
1212
DBLockOptions,
1313
ILogger,
1414
LockContext,
15+
Mutex,
1516
QueryResult,
1617
SqlExecutor,
1718
SQLOpenFactory,
1819
SQLOpenOptions,
20+
timeoutSignal,
1921
Transaction
2022
} from '@powersync/common';
21-
import { Mutex } from 'async-mutex';
2223
// This uses a pure JS version which avoids the need for WebAssembly, which is not supported in React Native.
2324
import SQLJs from '@powersync/sql-js/dist/sql-asm.js';
2425

@@ -165,7 +166,7 @@ class SqlJsConnectionPool extends BaseObserver<DBAdapterListener> implements Con
165166
this.tableUpdateCache.clear();
166167
this.iterateListeners((l) => l.tablesUpdated?.(notification));
167168
return result;
168-
});
169+
}, timeoutSignal(options?.timeoutMs));
169170
}
170171

171172
async refreshSchema(): Promise<void> {

packages/capacitor/package.json

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,5 @@
9696
"android": {
9797
"src": "android"
9898
}
99-
},
100-
"dependencies": {
101-
"async-mutex": "catalog:"
10299
}
103100
}

packages/capacitor/src/adapter/CapacitorSQLiteAdapter.ts

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import {
1010
DBAdapterListener,
1111
DBLockOptions,
1212
LockContext,
13-
mutexRunExclusive,
13+
Mutex,
1414
QueryResult,
15+
timeoutSignal,
1516
Transaction
1617
} from '@powersync/web';
17-
import { Mutex } from 'async-mutex';
1818
import { PowerSyncCore } from '../plugin/PowerSyncCore.js';
1919
import { messageForErrorCode } from '../plugin/PowerSyncPlugin.js';
2020
import { CapacitorSQLiteOpenFactoryOptions, DEFAULT_SQLITE_OPTIONS } from './CapacitorSQLiteOpenFactory.js';
@@ -228,39 +228,31 @@ class CapacitorConnectionPool extends BaseObserver<DBAdapterListener> implements
228228
}
229229

230230
readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
231-
return mutexRunExclusive(
232-
this.readMutex,
233-
async () => {
234-
await this.initializedPromise;
235-
return await fn(this.generateLockContext(this.readConnection));
236-
},
237-
options
238-
);
231+
return this.readMutex.runExclusive(async () => {
232+
await this.initializedPromise;
233+
return fn(this.generateLockContext(this.readConnection));
234+
}, timeoutSignal(options?.timeoutMs));
239235
}
240236

241237
writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
242-
return mutexRunExclusive(
243-
this.writeMutex,
244-
async () => {
245-
await this.initializedPromise;
246-
const result = await fn(this.generateLockContext(this.writeConnection));
247-
248-
// Fetch table updates
249-
const updates = await this.writeConnection.query("SELECT powersync_update_hooks('get') AS table_name");
250-
const jsonUpdates = updates.values?.[0];
251-
if (!jsonUpdates || !jsonUpdates.table_name) {
252-
throw new Error('Could not fetch table updates');
253-
}
254-
const notification: BatchedUpdateNotification = {
255-
rawUpdates: [],
256-
tables: JSON.parse(jsonUpdates.table_name),
257-
groupedUpdates: {}
258-
};
259-
this.iterateListeners((l) => l.tablesUpdated?.(notification));
260-
return result;
261-
},
262-
options
263-
);
238+
return this.writeMutex.runExclusive(async () => {
239+
await this.initializedPromise;
240+
const result = await fn(this.generateLockContext(this.writeConnection));
241+
242+
// Fetch table updates
243+
const updates = await this.writeConnection.query("SELECT powersync_update_hooks('get') AS table_name");
244+
const jsonUpdates = updates.values?.[0];
245+
if (!jsonUpdates || !jsonUpdates.table_name) {
246+
throw new Error('Could not fetch table updates');
247+
}
248+
const notification: BatchedUpdateNotification = {
249+
rawUpdates: [],
250+
tables: JSON.parse(jsonUpdates.table_name),
251+
groupedUpdates: {}
252+
};
253+
this.iterateListeners((l) => l.tablesUpdated?.(notification));
254+
return result;
255+
}, timeoutSignal(options?.timeoutMs));
264256
}
265257

266258
refreshSchema(): Promise<void> {

packages/capacitor/src/sync/CapacitorSyncImplementation.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import { AbstractStreamingSyncImplementation, LockOptions, LockType, mutexRunExclusive } from '@powersync/web';
2-
import { Mutex } from 'async-mutex';
1+
import { AbstractStreamingSyncImplementation, LockOptions, LockType, Mutex } from '@powersync/web';
32

43
type MutexMap = {
54
/**
@@ -56,11 +55,8 @@ export class CapacitorStreamingSyncImplementation extends AbstractStreamingSyncI
5655
mutexRecord.tracking.add(this.instanceId);
5756
const mutex = mutexRecord.locks[lockOptions.type];
5857

59-
return mutexRunExclusive(mutex, async () => {
60-
if (lockOptions.signal?.aborted) {
61-
throw new Error('Aborted');
62-
}
58+
return mutex.runExclusive(async () => {
6359
return await lockOptions.callback();
64-
});
60+
}, lockOptions.signal);
6561
}
6662
}

packages/common/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
"test:exports": "attw --pack ."
5656
},
5757
"dependencies": {
58-
"async-mutex": "catalog:",
5958
"event-iterator": "^2.0.0"
6059
},
6160
"devDependencies": {

0 commit comments

Comments
 (0)