Skip to content
1 change: 1 addition & 0 deletions PowerSync/PowerSync.Common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.0.11-alpha.1

- Pool read connections in `MDSQLiteAdapter`, improving performance in any case where multiple queries run simultaneously (eg. via `Watch`). The number of connections can be set via `MDSQLiteOptions.ReadPoolSize` and defaults to 5.
- Updated to the latest version (0.4.11) of the core extension.
- `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking.
- Removed the `RunListener` and `RunListenerAsync` APIs from `IEventStream`. Users are encouraged to use the `Listen` or `ListenAsync` APIs instead (`RunListener` itself was implemented using the `Listen` API).
Expand Down
2 changes: 1 addition & 1 deletion PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ public ISyncStream SyncStream(string name, Dictionary<string, object>? parameter
ConnectionManager.Close();
BucketStorageAdapter?.Close();

Database.Close();
await Database.Close();
Closed = true;
Emit(new PowerSyncDBEvent { Closed = true });
}
Expand Down
2 changes: 1 addition & 1 deletion PowerSync/PowerSync.Common/DB/IDBAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public interface IDBAdapter : IEventStream<DBAdapterEvent>, ILockContext
/// <summary>
/// Closes the adapter.
/// </summary>
new void Close();
new Task Close();

/// <summary>
/// The name of the adapter.
Expand Down
199 changes: 155 additions & 44 deletions PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace PowerSync.Common.MDSQLite;

using System;
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;

using Microsoft.Data.Sqlite;
Expand All @@ -15,35 +17,35 @@ public class MDSQLiteAdapterOptions()
public string Name { get; set; } = null!;

public MDSQLiteOptions? SqliteOptions;

}

public class MDSQLiteAdapter : EventStream<DBAdapterEvent>, IDBAdapter
{
public string Name => options.Name;

public MDSQLiteConnection? writeConnection;
public MDSQLiteConnection? readConnection;
// One writer
private MDSQLiteConnection writeConnection = null!;
private readonly AsyncLock writeMutex = new();

// Many readers
private MDSQLiteConnectionPool readPool = null!;

private readonly Task initialized;

protected MDSQLiteAdapterOptions options;

protected RequiredMDSQLiteOptions resolvedMDSQLiteOptions;
protected RequiredMDSQLiteOptions resolvedOptions;
private CancellationTokenSource? tablesUpdatedCts;
private Task? tablesUpdatedTask;

private readonly AsyncLock writeMutex = new();
private readonly AsyncLock readMutex = new();

public MDSQLiteAdapter(MDSQLiteAdapterOptions options)
{
this.options = options;
resolvedMDSQLiteOptions = resolveMDSQLiteOptions(options.SqliteOptions);
resolvedOptions = ResolveMDSQLiteOptions(options.SqliteOptions);
initialized = Init();
}

private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options)
private RequiredMDSQLiteOptions ResolveMDSQLiteOptions(MDSQLiteOptions? options)
{
var defaults = RequiredMDSQLiteOptions.DEFAULT_SQLITE_OPTIONS;
return new RequiredMDSQLiteOptions
Expand All @@ -55,28 +57,26 @@ private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options)
TemporaryStorage = options?.TemporaryStorage ?? defaults.TemporaryStorage,
LockTimeoutMs = options?.LockTimeoutMs ?? defaults.LockTimeoutMs,
EncryptionKey = options?.EncryptionKey ?? defaults.EncryptionKey,
Extensions = options?.Extensions ?? defaults.Extensions
Extensions = options?.Extensions ?? defaults.Extensions,
ReadPoolSize = options?.ReadPoolSize ?? defaults.ReadPoolSize,
};
}

private async Task Init()
{
writeConnection = await OpenConnection(options.Name);
readConnection = await OpenConnection(options.Name);

string[] baseStatements =
[
$"PRAGMA busy_timeout = {resolvedMDSQLiteOptions.LockTimeoutMs}",
$"PRAGMA cache_size = -{resolvedMDSQLiteOptions.CacheSizeKb}",
$"PRAGMA temp_store = {resolvedMDSQLiteOptions.TemporaryStorage}"
$"PRAGMA busy_timeout = {resolvedOptions.LockTimeoutMs}",
$"PRAGMA cache_size = -{resolvedOptions.CacheSizeKb}",
$"PRAGMA temp_store = {resolvedOptions.TemporaryStorage}"
];

string[] writeConnectionStatements =
[
.. baseStatements,
$"PRAGMA journal_mode = {resolvedMDSQLiteOptions.JournalMode}",
$"PRAGMA journal_size_limit = {resolvedMDSQLiteOptions.JournalSizeLimit}",
$"PRAGMA synchronous = {resolvedMDSQLiteOptions.Synchronous}",
$"PRAGMA journal_mode = {resolvedOptions.JournalMode}",
$"PRAGMA journal_size_limit = {resolvedOptions.JournalSizeLimit}",
$"PRAGMA synchronous = {resolvedOptions.Synchronous}",
];

string[] readConnectionStatements =
Expand All @@ -85,20 +85,32 @@ private async Task Init()
"PRAGMA query_only = true",
];


// Prepare write connection
writeConnection = await OpenConnection(options.Name);
foreach (var statement in writeConnectionStatements)
{
await writeConnection!.Execute(statement);
}

foreach (var statement in readConnectionStatements)
// Prepare read pool and create connection factory
Func<Task<MDSQLiteConnection>> readConnectionFactory = async () =>
{
await readConnection!.Execute(statement);
}
var readConnection = await OpenConnection(options.Name);
foreach (var statement in readConnectionStatements)
{
await readConnection.Execute(statement);
}
return readConnection;
};
readPool = new MDSQLiteConnectionPool(resolvedOptions, readConnectionFactory);
await readPool.Init();

// Register TablesUpdated listener
tablesUpdatedCts = new CancellationTokenSource();
tablesUpdatedTask = Task.Run(async () =>
{
await foreach (var notification in writeConnection!.ListenAsync(tablesUpdatedCts.Token))
await foreach (var notification in writeConnection.ListenAsync(tablesUpdatedCts.Token))
{
if (notification.TablesUpdated != null)
{
Expand All @@ -121,7 +133,8 @@ protected async Task<MDSQLiteConnection> OpenConnection(string dbFilename)

private static SqliteConnection OpenDatabase(string dbFilename)
{
var connection = new SqliteConnection($"Data Source={dbFilename}");
string connectionString = $"Data Source={dbFilename};Pooling=False;";
var connection = new SqliteConnection(connectionString);
connection.Open();
return connection;
}
Expand All @@ -133,13 +146,13 @@ protected virtual void LoadExtension(SqliteConnection db)
db.LoadExtension(extensionPath, "sqlite3_powersync_init");
}

public new void Close()
public new async Task Close()
{
tablesUpdatedCts?.Cancel();
try { tablesUpdatedTask?.Wait(TimeSpan.FromSeconds(2)); } catch { }
try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ }
base.Close();
writeConnection?.Close();
readConnection?.Close();
await readPool.Close();
}

public async Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
Expand Down Expand Up @@ -184,20 +197,13 @@ public async Task<dynamic> Get(string sql, object?[]? parameters = null)

public async Task<T> ReadTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOptions? options = null)
{
return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction(readConnection!)!, fn));
return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction((MDSQLiteConnection)ctx), fn));
}

public async Task<T> ReadLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions? options = null)
{
await initialized;

T result;
using (await readMutex.LockAsync())
{
result = await fn(readConnection!);
}

return result;
return await readPool.Lease(fn);
}

public async Task WriteLock(Func<ILockContext, Task> fn, DBLockOptions? options = null)
Expand All @@ -206,10 +212,10 @@ public async Task WriteLock(Func<ILockContext, Task> fn, DBLockOptions? options

using (await writeMutex.LockAsync())
{
await fn(writeConnection!);
await fn(writeConnection);
}

writeConnection!.FlushUpdates();
writeConnection.FlushUpdates();

}

Expand All @@ -220,22 +226,22 @@ public async Task<T> WriteLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions?
T result;
using (await writeMutex.LockAsync())
{
result = await fn(writeConnection!);
result = await fn(writeConnection);
}

writeConnection!.FlushUpdates();
writeConnection.FlushUpdates();

return result;
}

public async Task WriteTransaction(Func<ITransaction, Task> fn, DBLockOptions? options = null)
{
await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn));
await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn));
}

public async Task<T> WriteTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOptions? options = null)
{
return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn));
return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn));
}

protected static async Task InternalTransaction(
Expand Down Expand Up @@ -283,8 +289,113 @@ private static async Task RunTransaction(
public async Task RefreshSchema()
{
await initialized;
await writeConnection!.RefreshSchema();
await readConnection!.RefreshSchema();
await writeConnection.RefreshSchema();
await readPool.LeaseAll(async (connections) =>
{
foreach (var conn in connections) await conn.RefreshSchema();
});
}
}

class MDSQLiteConnectionPool
{
private readonly RequiredMDSQLiteOptions _options;
private readonly Channel<MDSQLiteConnection> _channel;
private readonly int _poolSize;
private readonly Func<Task<MDSQLiteConnection>> _connectionFactory;

private readonly Task _initialized;

public MDSQLiteConnectionPool(RequiredMDSQLiteOptions options, Func<Task<MDSQLiteConnection>> connectionFactory)
{
_options = options;
_channel = Channel.CreateBounded<MDSQLiteConnection>(options.ReadPoolSize);
_poolSize = options.ReadPoolSize;
_connectionFactory = connectionFactory;
_initialized = Initialize();
}

public async Task Init() => await _initialized;

private async Task Initialize()
{
for (int i = 0; i < _poolSize; i++)
{
var connection = await _connectionFactory();
await _channel.Writer.WriteAsync(connection);
}
}

public async Task<T> Lease<T>(Func<MDSQLiteConnection, Task<T>> callback)
{
await _initialized;
var connection = await _channel.Reader.ReadAsync();
try
{
return await callback(connection);
}
finally
{
await _channel.Writer.WriteAsync(connection);
}
}

public async Task LeaseAll(Func<List<MDSQLiteConnection>, Task> callback)
{
await _initialized;
var connections = new List<MDSQLiteConnection>(_poolSize);
for (int i = 0; i < _poolSize; i++)
{
connections.Add(await _channel.Reader.ReadAsync());
}

try
{
await callback(connections);
}
finally
{
foreach (var conn in connections)
{
_channel.Writer.TryWrite(conn);
}
}
}

private async Task<MDSQLiteConnection> OpenConnection(string dbFilename)
{
var db = OpenDatabase(dbFilename);
LoadExtension(db);

var connection = new MDSQLiteConnection(new MDSQLiteConnectionOptions(db));
await connection.Execute("SELECT powersync_init()");

return connection;
}

private static SqliteConnection OpenDatabase(string dbFilename)
{
string connectionString = $"Data Source={dbFilename};Pooling=False;";
var connection = new SqliteConnection(connectionString);
connection.Open();
return connection;
}

private void LoadExtension(SqliteConnection db)
{
string extensionPath = PowerSyncPathResolver.GetNativeLibraryPath(AppContext.BaseDirectory);
db.EnableExtensions(true);
db.LoadExtension(extensionPath, "sqlite3_powersync_init");
}

public async Task Close()
{
await LeaseAll((connections) =>
{
foreach (var conn in connections) conn.Close();
return Task.CompletedTask;
});
_channel.Writer.TryComplete();
}
}

Expand Down
11 changes: 10 additions & 1 deletion PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public class MDSQLiteOptions
/// Load extensions using the path and entryPoint.
/// </summary>
public SqliteExtension[]? Extensions { get; set; }

/// <summary>
/// The number of MDSQLiteConnection objects to create for the read pool.
/// </summary>
public int? ReadPoolSize { get; set; }
}

public class RequiredMDSQLiteOptions : MDSQLiteOptions
Expand All @@ -115,7 +120,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions
TemporaryStorage = TemporaryStorageOption.MEMORY,
LockTimeoutMs = 30000,
EncryptionKey = null,
Extensions = []
Extensions = [],
ReadPoolSize = 5,
};

public new SqliteJournalMode JournalMode { get; set; } = null!;
Expand All @@ -131,5 +137,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions
public new TemporaryStorageOption TemporaryStorage { get; set; } = null!;

public new int CacheSizeKb { get; set; }

public new SqliteExtension[] Extensions { get; set; } = null!;

public new int ReadPoolSize { get; set; }
}