From d676c6a6390e4f3e92552ca7ae3689527893a184 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Sat, 21 Feb 2026 08:11:01 +0200 Subject: [PATCH 1/8] Pool read connections in MDSQLiteAdapter --- .../Client/PowerSyncDatabase.cs | 2 +- PowerSync/PowerSync.Common/DB/IDBAdapter.cs | 2 +- .../MDSQLite/MDSQLiteAdapter.cs | 85 ++++++++++++++----- 3 files changed, 66 insertions(+), 23 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 0eb3a43..862e17f 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -497,7 +497,7 @@ public ISyncStream SyncStream(string name, Dictionary? parameter ConnectionManager.Close(); BucketStorageAdapter?.Close(); - Database.Close(); + await Database.Close(); Closed = true; Emit(new PowerSyncDBEvent { Closed = true }); } diff --git a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs index 7825cd5..a693075 100644 --- a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs +++ b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs @@ -118,7 +118,7 @@ public interface IDBAdapter : IEventStream, ILockContext /// /// Closes the adapter. /// - new void Close(); + new Task Close(); /// /// The name of the adapter. diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index e331ebc..a36cde8 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -1,6 +1,7 @@ namespace PowerSync.Common.MDSQLite; using System; +using System.Collections.Concurrent; using System.Threading.Tasks; using Microsoft.Data.Sqlite; @@ -22,8 +23,14 @@ public class MDSQLiteAdapter : EventStream, IDBAdapter { public string Name => options.Name; - public MDSQLiteConnection? writeConnection; - public MDSQLiteConnection? readConnection; + // One writer + private MDSQLiteConnection writeConnection = null!; + + // Many readers + private ConcurrentQueue readPool = null!; + private SemaphoreSlim readLock = null!; + // TODO make maxPoolSize option in SQLOpenOptions + int maxPoolSize = 2; private readonly Task initialized; @@ -31,6 +38,7 @@ public class MDSQLiteAdapter : EventStream, IDBAdapter protected RequiredMDSQLiteOptions resolvedMDSQLiteOptions; private CancellationTokenSource? tablesUpdatedCts; + private Task? tablesUpdatedTask; private readonly AsyncLock writeMutex = new(); private readonly AsyncLock readMutex = new(); @@ -61,7 +69,6 @@ private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options) private async Task Init() { writeConnection = await OpenConnection(options.Name); - readConnection = await OpenConnection(options.Name); string[] baseStatements = [ @@ -88,20 +95,27 @@ private async Task Init() { for (int tries = 0; tries < 30; tries++) { - await writeConnection!.Execute(statement); + await writeConnection.Execute(statement); tries = 30; } } - foreach (var statement in readConnectionStatements) + readPool = new ConcurrentQueue(); + readLock = new SemaphoreSlim(maxPoolSize, maxPoolSize); + for (int i = 0; i < maxPoolSize; i++) { - await readConnection!.Execute(statement); + var readConnection = await OpenConnection(options.Name); + foreach (var statement in readConnectionStatements) + { + await readConnection.Execute(statement); + } + readPool.Enqueue(readConnection); } tablesUpdatedCts = new CancellationTokenSource(); - var _ = Task.Run(() => + tablesUpdatedTask = Task.Run(async () => { - foreach (var notification in writeConnection!.Listen(tablesUpdatedCts.Token)) + await foreach (var notification in writeConnection.ListenAsync(tablesUpdatedCts.Token)) { if (notification.TablesUpdated != null) { @@ -136,12 +150,15 @@ protected virtual void LoadExtension(SqliteConnection db) db.LoadExtension(extensionPath, "sqlite3_powersync_init"); } - public new void Close() + public new async Task Close() { tablesUpdatedCts?.Cancel(); + try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ } base.Close(); writeConnection?.Close(); - readConnection?.Close(); + await LockReaders(); + foreach (var conn in readPool) conn.Close(); + UnlockReaders(); } public async Task Execute(string query, object?[]? parameters = null) @@ -186,7 +203,7 @@ public async Task Get(string sql, object?[]? parameters = null) public async Task ReadTransaction(Func> fn, DBLockOptions? options = null) { - return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction(readConnection!)!, fn)); + return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction((MDSQLiteConnection)ctx), fn)); } public async Task ReadLock(Func> fn, DBLockOptions? options = null) @@ -194,9 +211,17 @@ public async Task ReadLock(Func> fn, DBLockOptions? await initialized; T result; - using (await readMutex.LockAsync()) + using (await readLock.LockAsync()) { - result = await fn(readConnection!); + readPool.TryDequeue(out var conn); + try + { + result = await fn(conn); + } + finally + { + readPool.Enqueue(conn); + } } return result; @@ -208,10 +233,10 @@ public async Task WriteLock(Func fn, DBLockOptions? options using (await writeMutex.LockAsync()) { - await fn(writeConnection!); + await fn(writeConnection); } - writeConnection!.FlushUpdates(); + writeConnection.FlushUpdates(); } @@ -222,22 +247,22 @@ public async Task WriteLock(Func> fn, DBLockOptions? T result; using (await writeMutex.LockAsync()) { - result = await fn(writeConnection!); + result = await fn(writeConnection); } - writeConnection!.FlushUpdates(); + writeConnection.FlushUpdates(); return result; } public async Task WriteTransaction(Func fn, DBLockOptions? options = null) { - await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn)); + await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn)); } public async Task WriteTransaction(Func> fn, DBLockOptions? options = null) { - return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn)); + return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn)); } protected static async Task InternalTransaction( @@ -285,8 +310,26 @@ private static async Task RunTransaction( public async Task RefreshSchema() { await initialized; - await writeConnection!.RefreshSchema(); - await readConnection!.RefreshSchema(); + await writeConnection.RefreshSchema(); + await LockReaders(); + foreach (var conn in readPool) await conn.RefreshSchema(); + UnlockReaders(); + } + + private async Task LockReaders() + { + for (int i = 0; i < maxPoolSize; i++) + { + await readLock.WaitAsync(); + } + } + + private void UnlockReaders() + { + for (int i = 0; i < maxPoolSize; i++) + { + readLock.Release(); + } } } From d3a96853826a8f4f096dc6d937865bc92c08a4b2 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Sat, 21 Feb 2026 08:38:23 +0200 Subject: [PATCH 2/8] Make ReadPoolSize an option in MDSQLiteOptions --- .../MDSQLite/MDSQLiteAdapter.cs | 36 +++++++++---------- .../MDSQLite/MDSQLiteOptions.cs | 11 +++++- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index a36cde8..6f202de 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -16,7 +16,6 @@ public class MDSQLiteAdapterOptions() public string Name { get; set; } = null!; public MDSQLiteOptions? SqliteOptions; - } public class MDSQLiteAdapter : EventStream, IDBAdapter @@ -25,32 +24,28 @@ public class MDSQLiteAdapter : EventStream, IDBAdapter // One writer private MDSQLiteConnection writeConnection = null!; + private readonly AsyncLock writeMutex = new(); // Many readers private ConcurrentQueue readPool = null!; private SemaphoreSlim readLock = null!; - // TODO make maxPoolSize option in SQLOpenOptions - int maxPoolSize = 2; private readonly Task initialized; protected MDSQLiteAdapterOptions options; - protected RequiredMDSQLiteOptions resolvedMDSQLiteOptions; + protected RequiredMDSQLiteOptions resolvedOptions; private CancellationTokenSource? tablesUpdatedCts; private Task? tablesUpdatedTask; - private readonly AsyncLock writeMutex = new(); - private readonly AsyncLock readMutex = new(); - public MDSQLiteAdapter(MDSQLiteAdapterOptions options) { this.options = options; - resolvedMDSQLiteOptions = resolveMDSQLiteOptions(options.SqliteOptions); + resolvedOptions = ResolveMDSQLiteOptions(options.SqliteOptions); initialized = Init(); } - private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options) + private RequiredMDSQLiteOptions ResolveMDSQLiteOptions(MDSQLiteOptions? options) { var defaults = RequiredMDSQLiteOptions.DEFAULT_SQLITE_OPTIONS; return new RequiredMDSQLiteOptions @@ -62,7 +57,8 @@ private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options) TemporaryStorage = options?.TemporaryStorage ?? defaults.TemporaryStorage, LockTimeoutMs = options?.LockTimeoutMs ?? defaults.LockTimeoutMs, EncryptionKey = options?.EncryptionKey ?? defaults.EncryptionKey, - Extensions = options?.Extensions ?? defaults.Extensions + Extensions = options?.Extensions ?? defaults.Extensions, + ReadPoolSize = options?.ReadPoolSize ?? defaults.ReadPoolSize, }; } @@ -72,17 +68,17 @@ private async Task Init() string[] baseStatements = [ - $"PRAGMA busy_timeout = {resolvedMDSQLiteOptions.LockTimeoutMs}", - $"PRAGMA cache_size = -{resolvedMDSQLiteOptions.CacheSizeKb}", - $"PRAGMA temp_store = {resolvedMDSQLiteOptions.TemporaryStorage}" + $"PRAGMA busy_timeout = {resolvedOptions.LockTimeoutMs}", + $"PRAGMA cache_size = -{resolvedOptions.CacheSizeKb}", + $"PRAGMA temp_store = {resolvedOptions.TemporaryStorage}" ]; string[] writeConnectionStatements = [ .. baseStatements, - $"PRAGMA journal_mode = {resolvedMDSQLiteOptions.JournalMode}", - $"PRAGMA journal_size_limit = {resolvedMDSQLiteOptions.JournalSizeLimit}", - $"PRAGMA synchronous = {resolvedMDSQLiteOptions.Synchronous}", + $"PRAGMA journal_mode = {resolvedOptions.JournalMode}", + $"PRAGMA journal_size_limit = {resolvedOptions.JournalSizeLimit}", + $"PRAGMA synchronous = {resolvedOptions.Synchronous}", ]; string[] readConnectionStatements = @@ -101,8 +97,8 @@ private async Task Init() } readPool = new ConcurrentQueue(); - readLock = new SemaphoreSlim(maxPoolSize, maxPoolSize); - for (int i = 0; i < maxPoolSize; i++) + readLock = new SemaphoreSlim(resolvedOptions.ReadPoolSize, resolvedOptions.ReadPoolSize); + for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) { var readConnection = await OpenConnection(options.Name); foreach (var statement in readConnectionStatements) @@ -318,7 +314,7 @@ public async Task RefreshSchema() private async Task LockReaders() { - for (int i = 0; i < maxPoolSize; i++) + for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) { await readLock.WaitAsync(); } @@ -326,7 +322,7 @@ private async Task LockReaders() private void UnlockReaders() { - for (int i = 0; i < maxPoolSize; i++) + for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) { readLock.Release(); } diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs index 7de8f6a..6abc635 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs @@ -102,6 +102,11 @@ public class MDSQLiteOptions /// Load extensions using the path and entryPoint. /// public SqliteExtension[]? Extensions { get; set; } + + /// + /// The number of MDSQLiteConnection objects to create for the read pool. + /// + public int? ReadPoolSize { get; set; } } public class RequiredMDSQLiteOptions : MDSQLiteOptions @@ -115,7 +120,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions TemporaryStorage = TemporaryStorageOption.MEMORY, LockTimeoutMs = 30000, EncryptionKey = null, - Extensions = [] + Extensions = [], + ReadPoolSize = 2, }; public new SqliteJournalMode JournalMode { get; set; } = null!; @@ -131,5 +137,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions public new TemporaryStorageOption TemporaryStorage { get; set; } = null!; public new int CacheSizeKb { get; set; } + public new SqliteExtension[] Extensions { get; set; } = null!; + + public new int ReadPoolSize { get; set; } } From 567889263acf6139c73d373a8cd70eee1627d1ee Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Mon, 23 Feb 2026 10:38:47 +0200 Subject: [PATCH 3/8] Disable ADO.NET connection pooling (obsoleted) --- PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index 6f202de..abf4e12 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -134,7 +134,8 @@ protected async Task OpenConnection(string dbFilename) private static SqliteConnection OpenDatabase(string dbFilename) { - var connection = new SqliteConnection($"Data Source={dbFilename}"); + string connectionString = $"Data Source={dbFilename};Pooling=False;"; + var connection = new SqliteConnection(connectionString); connection.Open(); return connection; } From 56adcf6f92ffb6be9d8205bbb561208514d2a68c Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Mon, 23 Feb 2026 16:37:46 +0200 Subject: [PATCH 4/8] Up baseline readpool size --- PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs index 6abc635..845b57c 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs @@ -121,7 +121,7 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions LockTimeoutMs = 30000, EncryptionKey = null, Extensions = [], - ReadPoolSize = 2, + ReadPoolSize = 5, }; public new SqliteJournalMode JournalMode { get; set; } = null!; From 64ed444938de925370b1f523351a5bbc953f7143 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Tue, 24 Feb 2026 23:13:30 +0200 Subject: [PATCH 5/8] Use bounded channel instead of concurrent queue --- .../MDSQLite/MDSQLiteAdapter.cs | 55 +++++++++---------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index abf4e12..15ec44f 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -1,7 +1,8 @@ namespace PowerSync.Common.MDSQLite; using System; -using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Data.Sqlite; @@ -27,8 +28,7 @@ public class MDSQLiteAdapter : EventStream, IDBAdapter private readonly AsyncLock writeMutex = new(); // Many readers - private ConcurrentQueue readPool = null!; - private SemaphoreSlim readLock = null!; + private Channel readPool = null!; private readonly Task initialized; @@ -96,8 +96,7 @@ private async Task Init() } } - readPool = new ConcurrentQueue(); - readLock = new SemaphoreSlim(resolvedOptions.ReadPoolSize, resolvedOptions.ReadPoolSize); + readPool = Channel.CreateBounded(resolvedOptions.ReadPoolSize); for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) { var readConnection = await OpenConnection(options.Name); @@ -105,7 +104,7 @@ private async Task Init() { await readConnection.Execute(statement); } - readPool.Enqueue(readConnection); + await readPool.Writer.WriteAsync(readConnection); } tablesUpdatedCts = new CancellationTokenSource(); @@ -153,9 +152,9 @@ protected virtual void LoadExtension(SqliteConnection db) try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ } base.Close(); writeConnection?.Close(); - await LockReaders(); - foreach (var conn in readPool) conn.Close(); - UnlockReaders(); + var readConnections = await LockReaders(); + readPool.Writer.Complete(); + foreach (var conn in readConnections) conn.Close(); } public async Task Execute(string query, object?[]? parameters = null) @@ -207,21 +206,15 @@ public async Task ReadLock(Func> fn, DBLockOptions? { await initialized; - T result; - using (await readLock.LockAsync()) + var conn = await readPool.Reader.ReadAsync(); + try { - readPool.TryDequeue(out var conn); - try - { - result = await fn(conn); - } - finally - { - readPool.Enqueue(conn); - } + return await fn(conn); + } + finally + { + readPool.Writer.TryWrite(conn); } - - return result; } public async Task WriteLock(Func fn, DBLockOptions? options = null) @@ -308,24 +301,26 @@ public async Task RefreshSchema() { await initialized; await writeConnection.RefreshSchema(); - await LockReaders(); - foreach (var conn in readPool) await conn.RefreshSchema(); - UnlockReaders(); + var connections = await LockReaders(); + foreach (var conn in connections) await conn.RefreshSchema(); + UnlockReaders(connections); } - private async Task LockReaders() + private async Task> LockReaders() { + var connections = new List(resolvedOptions.ReadPoolSize); for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) { - await readLock.WaitAsync(); + connections.Add(await readPool.Reader.ReadAsync()); } + return connections; } - private void UnlockReaders() + private void UnlockReaders(List connections) { - for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) + foreach (var conn in connections) { - readLock.Release(); + readPool.Writer.TryWrite(conn); } } } From 08dd84d00e4260852b71c9c189b0b411fd450bdf Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Tue, 24 Feb 2026 23:20:45 +0200 Subject: [PATCH 6/8] Add WithAllReaders --- .../MDSQLite/MDSQLiteAdapter.cs | 47 +++++++++++++++---- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index 15ec44f..0385bb4 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -152,9 +152,11 @@ protected virtual void LoadExtension(SqliteConnection db) try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ } base.Close(); writeConnection?.Close(); - var readConnections = await LockReaders(); + await WithAllReaders((connections) => + { + foreach (var conn in connections) conn.Close(); + }); readPool.Writer.Complete(); - foreach (var conn in readConnections) conn.Close(); } public async Task Execute(string query, object?[]? parameters = null) @@ -301,26 +303,51 @@ public async Task RefreshSchema() { await initialized; await writeConnection.RefreshSchema(); - var connections = await LockReaders(); - foreach (var conn in connections) await conn.RefreshSchema(); - UnlockReaders(connections); + await WithAllReaders(async (connections) => + { + foreach (var conn in connections) await conn.RefreshSchema(); + }); } - private async Task> LockReaders() + private async Task WithAllReaders(Func, Task> callback) { var connections = new List(resolvedOptions.ReadPoolSize); for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) { connections.Add(await readPool.Reader.ReadAsync()); } - return connections; + + try + { + await callback(connections); + } + finally + { + foreach (var conn in connections) + { + readPool.Writer.TryWrite(conn); + } + } } - private void UnlockReaders(List connections) + private async Task WithAllReaders(Action> callback) { - foreach (var conn in connections) + var connections = new List(resolvedOptions.ReadPoolSize); + for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) { - readPool.Writer.TryWrite(conn); + connections.Add(await readPool.Reader.ReadAsync()); + } + + try + { + callback(connections); + } + finally + { + foreach (var conn in connections) + { + readPool.Writer.TryWrite(conn); + } } } } From fa6b26fb55c511ddc3ef318735f37c1bfe9d3d54 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Wed, 25 Feb 2026 16:42:30 +0200 Subject: [PATCH 7/8] Changelog --- PowerSync/PowerSync.Common/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index b9cc091..0c497a3 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.0.11-alpha.1 +- Pool read connections in `MDSQLiteAdapter`, improving performance in any case where multiple queries run simultaneously (eg. via `Watch`). The number of connections can be set via `MDSQLiteOptions.ReadPoolSize` and defaults to 5. - Removed the `RunListener` and `RunListenerAsync` APIs from `IEventStream`. Users are encouraged to use the `Listen` or `ListenAsync` APIs instead (`RunListener` itself was implemented using the `Listen` API). - Changed the `PowerSyncDatabase.Watch` syntax to return an IAsyncEnumerable instead of accepting a callback. This allows users to handle database changes when they are ready instead of us eagerly running the callback as soon as a change is detected. From 0445457fb2f569fa25d5b94b6f7a99c20d2905f6 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Thu, 26 Feb 2026 14:24:48 +0200 Subject: [PATCH 8/8] Create MDSQLiteConnectionPool class --- .../MDSQLite/MDSQLiteAdapter.cs | 127 ++++++++++++------ 1 file changed, 89 insertions(+), 38 deletions(-) diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index c207db0..836c8ea 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -28,7 +28,7 @@ public class MDSQLiteAdapter : EventStream, IDBAdapter private readonly AsyncLock writeMutex = new(); // Many readers - private Channel readPool = null!; + private MDSQLiteConnectionPool readPool = null!; private readonly Task initialized; @@ -64,8 +64,6 @@ private RequiredMDSQLiteOptions ResolveMDSQLiteOptions(MDSQLiteOptions? options) private async Task Init() { - writeConnection = await OpenConnection(options.Name); - string[] baseStatements = [ $"PRAGMA busy_timeout = {resolvedOptions.LockTimeoutMs}", @@ -87,22 +85,28 @@ private async Task Init() "PRAGMA query_only = true", ]; + + // Prepare write connection + writeConnection = await OpenConnection(options.Name); foreach (var statement in writeConnectionStatements) { await writeConnection!.Execute(statement); } - readPool = Channel.CreateBounded(resolvedOptions.ReadPoolSize); - for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) + // Prepare read pool and create connection factory + Func> readConnectionFactory = async () => { var readConnection = await OpenConnection(options.Name); foreach (var statement in readConnectionStatements) { await readConnection.Execute(statement); } - await readPool.Writer.WriteAsync(readConnection); - } + return readConnection; + }; + readPool = new MDSQLiteConnectionPool(resolvedOptions, readConnectionFactory); + await readPool.Init(); + // Register TablesUpdated listener tablesUpdatedCts = new CancellationTokenSource(); tablesUpdatedTask = Task.Run(async () => { @@ -148,11 +152,7 @@ protected virtual void LoadExtension(SqliteConnection db) try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ } base.Close(); writeConnection?.Close(); - await WithAllReaders((connections) => - { - foreach (var conn in connections) conn.Close(); - }); - readPool.Writer.Complete(); + await readPool.Close(); } public async Task Execute(string query, object?[]? parameters = null) @@ -203,16 +203,7 @@ public async Task ReadTransaction(Func> fn, DBLockOp public async Task ReadLock(Func> fn, DBLockOptions? options = null) { await initialized; - - var conn = await readPool.Reader.ReadAsync(); - try - { - return await fn(conn); - } - finally - { - readPool.Writer.TryWrite(conn); - } + return await readPool.Lease(fn); } public async Task WriteLock(Func fn, DBLockOptions? options = null) @@ -299,53 +290,113 @@ public async Task RefreshSchema() { await initialized; await writeConnection.RefreshSchema(); - await WithAllReaders(async (connections) => + await readPool.LeaseAll(async (connections) => { foreach (var conn in connections) await conn.RefreshSchema(); }); } +} + +class MDSQLiteConnectionPool +{ + private readonly RequiredMDSQLiteOptions _options; + private readonly Channel _channel; + private readonly int _poolSize; + private readonly Func> _connectionFactory; + + private readonly Task _initialized; + + public MDSQLiteConnectionPool(RequiredMDSQLiteOptions options, Func> connectionFactory) + { + _options = options; + _channel = Channel.CreateBounded(options.ReadPoolSize); + _poolSize = options.ReadPoolSize; + _connectionFactory = connectionFactory; + _initialized = Initialize(); + } + + public async Task Init() => await _initialized; - private async Task WithAllReaders(Func, Task> callback) + private async Task Initialize() { - var connections = new List(resolvedOptions.ReadPoolSize); - for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) + for (int i = 0; i < _poolSize; i++) { - connections.Add(await readPool.Reader.ReadAsync()); + var connection = await _connectionFactory(); + await _channel.Writer.WriteAsync(connection); } + } + public async Task Lease(Func> callback) + { + await _initialized; + var connection = await _channel.Reader.ReadAsync(); try { - await callback(connections); + return await callback(connection); } finally { - foreach (var conn in connections) - { - readPool.Writer.TryWrite(conn); - } + await _channel.Writer.WriteAsync(connection); } } - private async Task WithAllReaders(Action> callback) + public async Task LeaseAll(Func, Task> callback) { - var connections = new List(resolvedOptions.ReadPoolSize); - for (int i = 0; i < resolvedOptions.ReadPoolSize; i++) + await _initialized; + var connections = new List(_poolSize); + for (int i = 0; i < _poolSize; i++) { - connections.Add(await readPool.Reader.ReadAsync()); + connections.Add(await _channel.Reader.ReadAsync()); } try { - callback(connections); + await callback(connections); } finally { foreach (var conn in connections) { - readPool.Writer.TryWrite(conn); + _channel.Writer.TryWrite(conn); } } } + + private async Task OpenConnection(string dbFilename) + { + var db = OpenDatabase(dbFilename); + LoadExtension(db); + + var connection = new MDSQLiteConnection(new MDSQLiteConnectionOptions(db)); + await connection.Execute("SELECT powersync_init()"); + + return connection; + } + + private static SqliteConnection OpenDatabase(string dbFilename) + { + string connectionString = $"Data Source={dbFilename};Pooling=False;"; + var connection = new SqliteConnection(connectionString); + connection.Open(); + return connection; + } + + private void LoadExtension(SqliteConnection db) + { + string extensionPath = PowerSyncPathResolver.GetNativeLibraryPath(AppContext.BaseDirectory); + db.EnableExtensions(true); + db.LoadExtension(extensionPath, "sqlite3_powersync_init"); + } + + public async Task Close() + { + await LeaseAll((connections) => + { + foreach (var conn in connections) conn.Close(); + return Task.CompletedTask; + }); + _channel.Writer.TryComplete(); + } } public class MDSQLiteTransaction(MDSQLiteConnection connection) : ITransaction