Skip to content

Commit 38005f4

Browse files
authored
feat: pool read connections in MDSQLiteAdapter (#45)
1 parent 789629d commit 38005f4

5 files changed

Lines changed: 168 additions & 47 deletions

File tree

PowerSync/PowerSync.Common/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 0.0.11-alpha.1
44

5+
- Pool read connections in `MDSQLiteAdapter`, improving performance in any case where multiple queries run simultaneously (eg. via `Watch`). The number of connections can be set via `MDSQLiteOptions.ReadPoolSize` and defaults to 5.
56
- Updated to the latest version (0.4.11) of the core extension.
67
- `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking.
78
- Removed the `RunListener` and `RunListenerAsync` APIs from `IEventStream`. Users are encouraged to use the `Listen` or `ListenAsync` APIs instead (`RunListener` itself was implemented using the `Listen` API).

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ public ISyncStream SyncStream(string name, Dictionary<string, object>? parameter
497497
ConnectionManager.Close();
498498
BucketStorageAdapter?.Close();
499499

500-
Database.Close();
500+
await Database.Close();
501501
Closed = true;
502502
Emit(new PowerSyncDBEvent { Closed = true });
503503
}

PowerSync/PowerSync.Common/DB/IDBAdapter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public interface IDBAdapter : IEventStream<DBAdapterEvent>, ILockContext
118118
/// <summary>
119119
/// Closes the adapter.
120120
/// </summary>
121-
new void Close();
121+
new Task Close();
122122

123123
/// <summary>
124124
/// The name of the adapter.

PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs

Lines changed: 155 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
namespace PowerSync.Common.MDSQLite;
22

33
using System;
4+
using System.Collections.Generic;
5+
using System.Threading.Channels;
46
using System.Threading.Tasks;
57

68
using Microsoft.Data.Sqlite;
@@ -15,35 +17,35 @@ public class MDSQLiteAdapterOptions()
1517
public string Name { get; set; } = null!;
1618

1719
public MDSQLiteOptions? SqliteOptions;
18-
1920
}
2021

2122
public class MDSQLiteAdapter : EventStream<DBAdapterEvent>, IDBAdapter
2223
{
2324
public string Name => options.Name;
2425

25-
public MDSQLiteConnection? writeConnection;
26-
public MDSQLiteConnection? readConnection;
26+
// One writer
27+
private MDSQLiteConnection writeConnection = null!;
28+
private readonly AsyncLock writeMutex = new();
29+
30+
// Many readers
31+
private MDSQLiteConnectionPool readPool = null!;
2732

2833
private readonly Task initialized;
2934

3035
protected MDSQLiteAdapterOptions options;
3136

32-
protected RequiredMDSQLiteOptions resolvedMDSQLiteOptions;
37+
protected RequiredMDSQLiteOptions resolvedOptions;
3338
private CancellationTokenSource? tablesUpdatedCts;
3439
private Task? tablesUpdatedTask;
3540

36-
private readonly AsyncLock writeMutex = new();
37-
private readonly AsyncLock readMutex = new();
38-
3941
public MDSQLiteAdapter(MDSQLiteAdapterOptions options)
4042
{
4143
this.options = options;
42-
resolvedMDSQLiteOptions = resolveMDSQLiteOptions(options.SqliteOptions);
44+
resolvedOptions = ResolveMDSQLiteOptions(options.SqliteOptions);
4345
initialized = Init();
4446
}
4547

46-
private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options)
48+
private RequiredMDSQLiteOptions ResolveMDSQLiteOptions(MDSQLiteOptions? options)
4749
{
4850
var defaults = RequiredMDSQLiteOptions.DEFAULT_SQLITE_OPTIONS;
4951
return new RequiredMDSQLiteOptions
@@ -55,28 +57,26 @@ private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options)
5557
TemporaryStorage = options?.TemporaryStorage ?? defaults.TemporaryStorage,
5658
LockTimeoutMs = options?.LockTimeoutMs ?? defaults.LockTimeoutMs,
5759
EncryptionKey = options?.EncryptionKey ?? defaults.EncryptionKey,
58-
Extensions = options?.Extensions ?? defaults.Extensions
60+
Extensions = options?.Extensions ?? defaults.Extensions,
61+
ReadPoolSize = options?.ReadPoolSize ?? defaults.ReadPoolSize,
5962
};
6063
}
6164

6265
private async Task Init()
6366
{
64-
writeConnection = await OpenConnection(options.Name);
65-
readConnection = await OpenConnection(options.Name);
66-
6767
string[] baseStatements =
6868
[
69-
$"PRAGMA busy_timeout = {resolvedMDSQLiteOptions.LockTimeoutMs}",
70-
$"PRAGMA cache_size = -{resolvedMDSQLiteOptions.CacheSizeKb}",
71-
$"PRAGMA temp_store = {resolvedMDSQLiteOptions.TemporaryStorage}"
69+
$"PRAGMA busy_timeout = {resolvedOptions.LockTimeoutMs}",
70+
$"PRAGMA cache_size = -{resolvedOptions.CacheSizeKb}",
71+
$"PRAGMA temp_store = {resolvedOptions.TemporaryStorage}"
7272
];
7373

7474
string[] writeConnectionStatements =
7575
[
7676
.. baseStatements,
77-
$"PRAGMA journal_mode = {resolvedMDSQLiteOptions.JournalMode}",
78-
$"PRAGMA journal_size_limit = {resolvedMDSQLiteOptions.JournalSizeLimit}",
79-
$"PRAGMA synchronous = {resolvedMDSQLiteOptions.Synchronous}",
77+
$"PRAGMA journal_mode = {resolvedOptions.JournalMode}",
78+
$"PRAGMA journal_size_limit = {resolvedOptions.JournalSizeLimit}",
79+
$"PRAGMA synchronous = {resolvedOptions.Synchronous}",
8080
];
8181

8282
string[] readConnectionStatements =
@@ -85,20 +85,32 @@ private async Task Init()
8585
"PRAGMA query_only = true",
8686
];
8787

88+
89+
// Prepare write connection
90+
writeConnection = await OpenConnection(options.Name);
8891
foreach (var statement in writeConnectionStatements)
8992
{
9093
await writeConnection!.Execute(statement);
9194
}
9295

93-
foreach (var statement in readConnectionStatements)
96+
// Prepare read pool and create connection factory
97+
Func<Task<MDSQLiteConnection>> readConnectionFactory = async () =>
9498
{
95-
await readConnection!.Execute(statement);
96-
}
99+
var readConnection = await OpenConnection(options.Name);
100+
foreach (var statement in readConnectionStatements)
101+
{
102+
await readConnection.Execute(statement);
103+
}
104+
return readConnection;
105+
};
106+
readPool = new MDSQLiteConnectionPool(resolvedOptions, readConnectionFactory);
107+
await readPool.Init();
97108

109+
// Register TablesUpdated listener
98110
tablesUpdatedCts = new CancellationTokenSource();
99111
tablesUpdatedTask = Task.Run(async () =>
100112
{
101-
await foreach (var notification in writeConnection!.ListenAsync(tablesUpdatedCts.Token))
113+
await foreach (var notification in writeConnection.ListenAsync(tablesUpdatedCts.Token))
102114
{
103115
if (notification.TablesUpdated != null)
104116
{
@@ -121,7 +133,8 @@ protected async Task<MDSQLiteConnection> OpenConnection(string dbFilename)
121133

122134
private static SqliteConnection OpenDatabase(string dbFilename)
123135
{
124-
var connection = new SqliteConnection($"Data Source={dbFilename}");
136+
string connectionString = $"Data Source={dbFilename};Pooling=False;";
137+
var connection = new SqliteConnection(connectionString);
125138
connection.Open();
126139
return connection;
127140
}
@@ -133,13 +146,13 @@ protected virtual void LoadExtension(SqliteConnection db)
133146
db.LoadExtension(extensionPath, "sqlite3_powersync_init");
134147
}
135148

136-
public new void Close()
149+
public new async Task Close()
137150
{
138151
tablesUpdatedCts?.Cancel();
139-
try { tablesUpdatedTask?.Wait(TimeSpan.FromSeconds(2)); } catch { }
152+
try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ }
140153
base.Close();
141154
writeConnection?.Close();
142-
readConnection?.Close();
155+
await readPool.Close();
143156
}
144157

145158
public async Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
@@ -184,20 +197,13 @@ public async Task<dynamic> Get(string sql, object?[]? parameters = null)
184197

185198
public async Task<T> ReadTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOptions? options = null)
186199
{
187-
return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction(readConnection!)!, fn));
200+
return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction((MDSQLiteConnection)ctx), fn));
188201
}
189202

190203
public async Task<T> ReadLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions? options = null)
191204
{
192205
await initialized;
193-
194-
T result;
195-
using (await readMutex.LockAsync())
196-
{
197-
result = await fn(readConnection!);
198-
}
199-
200-
return result;
206+
return await readPool.Lease(fn);
201207
}
202208

203209
public async Task WriteLock(Func<ILockContext, Task> fn, DBLockOptions? options = null)
@@ -206,10 +212,10 @@ public async Task WriteLock(Func<ILockContext, Task> fn, DBLockOptions? options
206212

207213
using (await writeMutex.LockAsync())
208214
{
209-
await fn(writeConnection!);
215+
await fn(writeConnection);
210216
}
211217

212-
writeConnection!.FlushUpdates();
218+
writeConnection.FlushUpdates();
213219

214220
}
215221

@@ -220,22 +226,22 @@ public async Task<T> WriteLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions?
220226
T result;
221227
using (await writeMutex.LockAsync())
222228
{
223-
result = await fn(writeConnection!);
229+
result = await fn(writeConnection);
224230
}
225231

226-
writeConnection!.FlushUpdates();
232+
writeConnection.FlushUpdates();
227233

228234
return result;
229235
}
230236

231237
public async Task WriteTransaction(Func<ITransaction, Task> fn, DBLockOptions? options = null)
232238
{
233-
await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn));
239+
await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn));
234240
}
235241

236242
public async Task<T> WriteTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOptions? options = null)
237243
{
238-
return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn));
244+
return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn));
239245
}
240246

241247
protected static async Task InternalTransaction(
@@ -283,8 +289,113 @@ private static async Task RunTransaction(
283289
public async Task RefreshSchema()
284290
{
285291
await initialized;
286-
await writeConnection!.RefreshSchema();
287-
await readConnection!.RefreshSchema();
292+
await writeConnection.RefreshSchema();
293+
await readPool.LeaseAll(async (connections) =>
294+
{
295+
foreach (var conn in connections) await conn.RefreshSchema();
296+
});
297+
}
298+
}
299+
300+
class MDSQLiteConnectionPool
301+
{
302+
private readonly RequiredMDSQLiteOptions _options;
303+
private readonly Channel<MDSQLiteConnection> _channel;
304+
private readonly int _poolSize;
305+
private readonly Func<Task<MDSQLiteConnection>> _connectionFactory;
306+
307+
private readonly Task _initialized;
308+
309+
public MDSQLiteConnectionPool(RequiredMDSQLiteOptions options, Func<Task<MDSQLiteConnection>> connectionFactory)
310+
{
311+
_options = options;
312+
_channel = Channel.CreateBounded<MDSQLiteConnection>(options.ReadPoolSize);
313+
_poolSize = options.ReadPoolSize;
314+
_connectionFactory = connectionFactory;
315+
_initialized = Initialize();
316+
}
317+
318+
public async Task Init() => await _initialized;
319+
320+
private async Task Initialize()
321+
{
322+
for (int i = 0; i < _poolSize; i++)
323+
{
324+
var connection = await _connectionFactory();
325+
await _channel.Writer.WriteAsync(connection);
326+
}
327+
}
328+
329+
public async Task<T> Lease<T>(Func<MDSQLiteConnection, Task<T>> callback)
330+
{
331+
await _initialized;
332+
var connection = await _channel.Reader.ReadAsync();
333+
try
334+
{
335+
return await callback(connection);
336+
}
337+
finally
338+
{
339+
await _channel.Writer.WriteAsync(connection);
340+
}
341+
}
342+
343+
public async Task LeaseAll(Func<List<MDSQLiteConnection>, Task> callback)
344+
{
345+
await _initialized;
346+
var connections = new List<MDSQLiteConnection>(_poolSize);
347+
for (int i = 0; i < _poolSize; i++)
348+
{
349+
connections.Add(await _channel.Reader.ReadAsync());
350+
}
351+
352+
try
353+
{
354+
await callback(connections);
355+
}
356+
finally
357+
{
358+
foreach (var conn in connections)
359+
{
360+
_channel.Writer.TryWrite(conn);
361+
}
362+
}
363+
}
364+
365+
private async Task<MDSQLiteConnection> OpenConnection(string dbFilename)
366+
{
367+
var db = OpenDatabase(dbFilename);
368+
LoadExtension(db);
369+
370+
var connection = new MDSQLiteConnection(new MDSQLiteConnectionOptions(db));
371+
await connection.Execute("SELECT powersync_init()");
372+
373+
return connection;
374+
}
375+
376+
private static SqliteConnection OpenDatabase(string dbFilename)
377+
{
378+
string connectionString = $"Data Source={dbFilename};Pooling=False;";
379+
var connection = new SqliteConnection(connectionString);
380+
connection.Open();
381+
return connection;
382+
}
383+
384+
private void LoadExtension(SqliteConnection db)
385+
{
386+
string extensionPath = PowerSyncPathResolver.GetNativeLibraryPath(AppContext.BaseDirectory);
387+
db.EnableExtensions(true);
388+
db.LoadExtension(extensionPath, "sqlite3_powersync_init");
389+
}
390+
391+
public async Task Close()
392+
{
393+
await LeaseAll((connections) =>
394+
{
395+
foreach (var conn in connections) conn.Close();
396+
return Task.CompletedTask;
397+
});
398+
_channel.Writer.TryComplete();
288399
}
289400
}
290401

PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ public class MDSQLiteOptions
102102
/// Load extensions using the path and entryPoint.
103103
/// </summary>
104104
public SqliteExtension[]? Extensions { get; set; }
105+
106+
/// <summary>
107+
/// The number of MDSQLiteConnection objects to create for the read pool.
108+
/// </summary>
109+
public int? ReadPoolSize { get; set; }
105110
}
106111

107112
public class RequiredMDSQLiteOptions : MDSQLiteOptions
@@ -115,7 +120,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions
115120
TemporaryStorage = TemporaryStorageOption.MEMORY,
116121
LockTimeoutMs = 30000,
117122
EncryptionKey = null,
118-
Extensions = []
123+
Extensions = [],
124+
ReadPoolSize = 5,
119125
};
120126

121127
public new SqliteJournalMode JournalMode { get; set; } = null!;
@@ -131,5 +137,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions
131137
public new TemporaryStorageOption TemporaryStorage { get; set; } = null!;
132138

133139
public new int CacheSizeKb { get; set; }
140+
134141
public new SqliteExtension[] Extensions { get; set; } = null!;
142+
143+
public new int ReadPoolSize { get; set; }
135144
}

0 commit comments

Comments
 (0)