diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index 95d792f..74b81f9 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.0.11-alpha.1 +- Pool read connections in `MDSQLiteAdapter`, improving performance in any case where multiple queries run simultaneously (eg. via `Watch`). The number of connections can be set via `MDSQLiteOptions.ReadPoolSize` and defaults to 5. - Updated to the latest version (0.4.11) of the core extension. - `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking. - Removed the `RunListener` and `RunListenerAsync` APIs from `IEventStream`. Users are encouraged to use the `Listen` or `ListenAsync` APIs instead (`RunListener` itself was implemented using the `Listen` API). diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 52e9eda..1d0a97c 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -497,7 +497,7 @@ public ISyncStream SyncStream(string name, Dictionary? parameter ConnectionManager.Close(); BucketStorageAdapter?.Close(); - Database.Close(); + await Database.Close(); Closed = true; Emit(new PowerSyncDBEvent { Closed = true }); } diff --git a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs index 7825cd5..a693075 100644 --- a/PowerSync/PowerSync.Common/DB/IDBAdapter.cs +++ b/PowerSync/PowerSync.Common/DB/IDBAdapter.cs @@ -118,7 +118,7 @@ public interface IDBAdapter : IEventStream, ILockContext /// /// Closes the adapter. /// - new void Close(); + new Task Close(); /// /// The name of the adapter. diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index 6400fe9..836c8ea 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -1,6 +1,8 @@ namespace PowerSync.Common.MDSQLite; using System; +using System.Collections.Generic; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Data.Sqlite; @@ -15,35 +17,35 @@ public class MDSQLiteAdapterOptions() public string Name { get; set; } = null!; public MDSQLiteOptions? SqliteOptions; - } public class MDSQLiteAdapter : EventStream, IDBAdapter { public string Name => options.Name; - public MDSQLiteConnection? writeConnection; - public MDSQLiteConnection? readConnection; + // One writer + private MDSQLiteConnection writeConnection = null!; + private readonly AsyncLock writeMutex = new(); + + // Many readers + private MDSQLiteConnectionPool readPool = null!; private readonly Task initialized; protected MDSQLiteAdapterOptions options; - protected RequiredMDSQLiteOptions resolvedMDSQLiteOptions; + protected RequiredMDSQLiteOptions resolvedOptions; private CancellationTokenSource? tablesUpdatedCts; private Task? tablesUpdatedTask; - private readonly AsyncLock writeMutex = new(); - private readonly AsyncLock readMutex = new(); - public MDSQLiteAdapter(MDSQLiteAdapterOptions options) { this.options = options; - resolvedMDSQLiteOptions = resolveMDSQLiteOptions(options.SqliteOptions); + resolvedOptions = ResolveMDSQLiteOptions(options.SqliteOptions); initialized = Init(); } - private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options) + private RequiredMDSQLiteOptions ResolveMDSQLiteOptions(MDSQLiteOptions? options) { var defaults = RequiredMDSQLiteOptions.DEFAULT_SQLITE_OPTIONS; return new RequiredMDSQLiteOptions @@ -55,28 +57,26 @@ private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options) TemporaryStorage = options?.TemporaryStorage ?? defaults.TemporaryStorage, LockTimeoutMs = options?.LockTimeoutMs ?? defaults.LockTimeoutMs, EncryptionKey = options?.EncryptionKey ?? defaults.EncryptionKey, - Extensions = options?.Extensions ?? defaults.Extensions + Extensions = options?.Extensions ?? defaults.Extensions, + ReadPoolSize = options?.ReadPoolSize ?? defaults.ReadPoolSize, }; } private async Task Init() { - writeConnection = await OpenConnection(options.Name); - readConnection = await OpenConnection(options.Name); - string[] baseStatements = [ - $"PRAGMA busy_timeout = {resolvedMDSQLiteOptions.LockTimeoutMs}", - $"PRAGMA cache_size = -{resolvedMDSQLiteOptions.CacheSizeKb}", - $"PRAGMA temp_store = {resolvedMDSQLiteOptions.TemporaryStorage}" + $"PRAGMA busy_timeout = {resolvedOptions.LockTimeoutMs}", + $"PRAGMA cache_size = -{resolvedOptions.CacheSizeKb}", + $"PRAGMA temp_store = {resolvedOptions.TemporaryStorage}" ]; string[] writeConnectionStatements = [ .. baseStatements, - $"PRAGMA journal_mode = {resolvedMDSQLiteOptions.JournalMode}", - $"PRAGMA journal_size_limit = {resolvedMDSQLiteOptions.JournalSizeLimit}", - $"PRAGMA synchronous = {resolvedMDSQLiteOptions.Synchronous}", + $"PRAGMA journal_mode = {resolvedOptions.JournalMode}", + $"PRAGMA journal_size_limit = {resolvedOptions.JournalSizeLimit}", + $"PRAGMA synchronous = {resolvedOptions.Synchronous}", ]; string[] readConnectionStatements = @@ -85,20 +85,32 @@ private async Task Init() "PRAGMA query_only = true", ]; + + // Prepare write connection + writeConnection = await OpenConnection(options.Name); foreach (var statement in writeConnectionStatements) { await writeConnection!.Execute(statement); } - foreach (var statement in readConnectionStatements) + // Prepare read pool and create connection factory + Func> readConnectionFactory = async () => { - await readConnection!.Execute(statement); - } + var readConnection = await OpenConnection(options.Name); + foreach (var statement in readConnectionStatements) + { + await readConnection.Execute(statement); + } + return readConnection; + }; + readPool = new MDSQLiteConnectionPool(resolvedOptions, readConnectionFactory); + await readPool.Init(); + // Register TablesUpdated listener tablesUpdatedCts = new CancellationTokenSource(); tablesUpdatedTask = Task.Run(async () => { - await foreach (var notification in writeConnection!.ListenAsync(tablesUpdatedCts.Token)) + await foreach (var notification in writeConnection.ListenAsync(tablesUpdatedCts.Token)) { if (notification.TablesUpdated != null) { @@ -121,7 +133,8 @@ protected async Task OpenConnection(string dbFilename) private static SqliteConnection OpenDatabase(string dbFilename) { - var connection = new SqliteConnection($"Data Source={dbFilename}"); + string connectionString = $"Data Source={dbFilename};Pooling=False;"; + var connection = new SqliteConnection(connectionString); connection.Open(); return connection; } @@ -133,13 +146,13 @@ protected virtual void LoadExtension(SqliteConnection db) db.LoadExtension(extensionPath, "sqlite3_powersync_init"); } - public new void Close() + public new async Task Close() { tablesUpdatedCts?.Cancel(); - try { tablesUpdatedTask?.Wait(TimeSpan.FromSeconds(2)); } catch { } + try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ } base.Close(); writeConnection?.Close(); - readConnection?.Close(); + await readPool.Close(); } public async Task Execute(string query, object?[]? parameters = null) @@ -184,20 +197,13 @@ public async Task Get(string sql, object?[]? parameters = null) public async Task ReadTransaction(Func> fn, DBLockOptions? options = null) { - return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction(readConnection!)!, fn)); + return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction((MDSQLiteConnection)ctx), fn)); } public async Task ReadLock(Func> fn, DBLockOptions? options = null) { await initialized; - - T result; - using (await readMutex.LockAsync()) - { - result = await fn(readConnection!); - } - - return result; + return await readPool.Lease(fn); } public async Task WriteLock(Func fn, DBLockOptions? options = null) @@ -206,10 +212,10 @@ public async Task WriteLock(Func fn, DBLockOptions? options using (await writeMutex.LockAsync()) { - await fn(writeConnection!); + await fn(writeConnection); } - writeConnection!.FlushUpdates(); + writeConnection.FlushUpdates(); } @@ -220,22 +226,22 @@ public async Task WriteLock(Func> fn, DBLockOptions? T result; using (await writeMutex.LockAsync()) { - result = await fn(writeConnection!); + result = await fn(writeConnection); } - writeConnection!.FlushUpdates(); + writeConnection.FlushUpdates(); return result; } public async Task WriteTransaction(Func fn, DBLockOptions? options = null) { - await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn)); + await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn)); } public async Task WriteTransaction(Func> fn, DBLockOptions? options = null) { - return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn)); + return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn)); } protected static async Task InternalTransaction( @@ -283,8 +289,113 @@ private static async Task RunTransaction( public async Task RefreshSchema() { await initialized; - await writeConnection!.RefreshSchema(); - await readConnection!.RefreshSchema(); + await writeConnection.RefreshSchema(); + await readPool.LeaseAll(async (connections) => + { + foreach (var conn in connections) await conn.RefreshSchema(); + }); + } +} + +class MDSQLiteConnectionPool +{ + private readonly RequiredMDSQLiteOptions _options; + private readonly Channel _channel; + private readonly int _poolSize; + private readonly Func> _connectionFactory; + + private readonly Task _initialized; + + public MDSQLiteConnectionPool(RequiredMDSQLiteOptions options, Func> connectionFactory) + { + _options = options; + _channel = Channel.CreateBounded(options.ReadPoolSize); + _poolSize = options.ReadPoolSize; + _connectionFactory = connectionFactory; + _initialized = Initialize(); + } + + public async Task Init() => await _initialized; + + private async Task Initialize() + { + for (int i = 0; i < _poolSize; i++) + { + var connection = await _connectionFactory(); + await _channel.Writer.WriteAsync(connection); + } + } + + public async Task Lease(Func> callback) + { + await _initialized; + var connection = await _channel.Reader.ReadAsync(); + try + { + return await callback(connection); + } + finally + { + await _channel.Writer.WriteAsync(connection); + } + } + + public async Task LeaseAll(Func, Task> callback) + { + await _initialized; + var connections = new List(_poolSize); + for (int i = 0; i < _poolSize; i++) + { + connections.Add(await _channel.Reader.ReadAsync()); + } + + try + { + await callback(connections); + } + finally + { + foreach (var conn in connections) + { + _channel.Writer.TryWrite(conn); + } + } + } + + private async Task OpenConnection(string dbFilename) + { + var db = OpenDatabase(dbFilename); + LoadExtension(db); + + var connection = new MDSQLiteConnection(new MDSQLiteConnectionOptions(db)); + await connection.Execute("SELECT powersync_init()"); + + return connection; + } + + private static SqliteConnection OpenDatabase(string dbFilename) + { + string connectionString = $"Data Source={dbFilename};Pooling=False;"; + var connection = new SqliteConnection(connectionString); + connection.Open(); + return connection; + } + + private void LoadExtension(SqliteConnection db) + { + string extensionPath = PowerSyncPathResolver.GetNativeLibraryPath(AppContext.BaseDirectory); + db.EnableExtensions(true); + db.LoadExtension(extensionPath, "sqlite3_powersync_init"); + } + + public async Task Close() + { + await LeaseAll((connections) => + { + foreach (var conn in connections) conn.Close(); + return Task.CompletedTask; + }); + _channel.Writer.TryComplete(); } } diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs index 7de8f6a..845b57c 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs @@ -102,6 +102,11 @@ public class MDSQLiteOptions /// Load extensions using the path and entryPoint. /// public SqliteExtension[]? Extensions { get; set; } + + /// + /// The number of MDSQLiteConnection objects to create for the read pool. + /// + public int? ReadPoolSize { get; set; } } public class RequiredMDSQLiteOptions : MDSQLiteOptions @@ -115,7 +120,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions TemporaryStorage = TemporaryStorageOption.MEMORY, LockTimeoutMs = 30000, EncryptionKey = null, - Extensions = [] + Extensions = [], + ReadPoolSize = 5, }; public new SqliteJournalMode JournalMode { get; set; } = null!; @@ -131,5 +137,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions public new TemporaryStorageOption TemporaryStorage { get; set; } = null!; public new int CacheSizeKb { get; set; } + public new SqliteExtension[] Extensions { get; set; } = null!; + + public new int ReadPoolSize { get; set; } }