Skip to content

Commit 0445457

Browse files
committed
Create MDSQLiteConnectionPool class
1 parent b30f184 commit 0445457

1 file changed

Lines changed: 89 additions & 38 deletions

File tree

PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs

Lines changed: 89 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class MDSQLiteAdapter : EventStream<DBAdapterEvent>, IDBAdapter
2828
private readonly AsyncLock writeMutex = new();
2929

3030
// Many readers
31-
private Channel<MDSQLiteConnection> readPool = null!;
31+
private MDSQLiteConnectionPool readPool = null!;
3232

3333
private readonly Task initialized;
3434

@@ -64,8 +64,6 @@ private RequiredMDSQLiteOptions ResolveMDSQLiteOptions(MDSQLiteOptions? options)
6464

6565
private async Task Init()
6666
{
67-
writeConnection = await OpenConnection(options.Name);
68-
6967
string[] baseStatements =
7068
[
7169
$"PRAGMA busy_timeout = {resolvedOptions.LockTimeoutMs}",
@@ -87,22 +85,28 @@ private async Task Init()
8785
"PRAGMA query_only = true",
8886
];
8987

88+
89+
// Prepare write connection
90+
writeConnection = await OpenConnection(options.Name);
9091
foreach (var statement in writeConnectionStatements)
9192
{
9293
await writeConnection!.Execute(statement);
9394
}
9495

95-
readPool = Channel.CreateBounded<MDSQLiteConnection>(resolvedOptions.ReadPoolSize);
96-
for (int i = 0; i < resolvedOptions.ReadPoolSize; i++)
96+
// Prepare read pool and create connection factory
97+
Func<Task<MDSQLiteConnection>> readConnectionFactory = async () =>
9798
{
9899
var readConnection = await OpenConnection(options.Name);
99100
foreach (var statement in readConnectionStatements)
100101
{
101102
await readConnection.Execute(statement);
102103
}
103-
await readPool.Writer.WriteAsync(readConnection);
104-
}
104+
return readConnection;
105+
};
106+
readPool = new MDSQLiteConnectionPool(resolvedOptions, readConnectionFactory);
107+
await readPool.Init();
105108

109+
// Register TablesUpdated listener
106110
tablesUpdatedCts = new CancellationTokenSource();
107111
tablesUpdatedTask = Task.Run(async () =>
108112
{
@@ -148,11 +152,7 @@ protected virtual void LoadExtension(SqliteConnection db)
148152
try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ }
149153
base.Close();
150154
writeConnection?.Close();
151-
await WithAllReaders((connections) =>
152-
{
153-
foreach (var conn in connections) conn.Close();
154-
});
155-
readPool.Writer.Complete();
155+
await readPool.Close();
156156
}
157157

158158
public async Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
@@ -203,16 +203,7 @@ public async Task<T> ReadTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOp
203203
public async Task<T> ReadLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions? options = null)
204204
{
205205
await initialized;
206-
207-
var conn = await readPool.Reader.ReadAsync();
208-
try
209-
{
210-
return await fn(conn);
211-
}
212-
finally
213-
{
214-
readPool.Writer.TryWrite(conn);
215-
}
206+
return await readPool.Lease(fn);
216207
}
217208

218209
public async Task WriteLock(Func<ILockContext, Task> fn, DBLockOptions? options = null)
@@ -299,53 +290,113 @@ public async Task RefreshSchema()
299290
{
300291
await initialized;
301292
await writeConnection.RefreshSchema();
302-
await WithAllReaders(async (connections) =>
293+
await readPool.LeaseAll(async (connections) =>
303294
{
304295
foreach (var conn in connections) await conn.RefreshSchema();
305296
});
306297
}
298+
}
299+
300+
class MDSQLiteConnectionPool
301+
{
302+
private readonly RequiredMDSQLiteOptions _options;
303+
private readonly Channel<MDSQLiteConnection> _channel;
304+
private readonly int _poolSize;
305+
private readonly Func<Task<MDSQLiteConnection>> _connectionFactory;
306+
307+
private readonly Task _initialized;
308+
309+
public MDSQLiteConnectionPool(RequiredMDSQLiteOptions options, Func<Task<MDSQLiteConnection>> connectionFactory)
310+
{
311+
_options = options;
312+
_channel = Channel.CreateBounded<MDSQLiteConnection>(options.ReadPoolSize);
313+
_poolSize = options.ReadPoolSize;
314+
_connectionFactory = connectionFactory;
315+
_initialized = Initialize();
316+
}
317+
318+
public async Task Init() => await _initialized;
307319

308-
private async Task WithAllReaders(Func<List<MDSQLiteConnection>, Task> callback)
320+
private async Task Initialize()
309321
{
310-
var connections = new List<MDSQLiteConnection>(resolvedOptions.ReadPoolSize);
311-
for (int i = 0; i < resolvedOptions.ReadPoolSize; i++)
322+
for (int i = 0; i < _poolSize; i++)
312323
{
313-
connections.Add(await readPool.Reader.ReadAsync());
324+
var connection = await _connectionFactory();
325+
await _channel.Writer.WriteAsync(connection);
314326
}
327+
}
315328

329+
public async Task<T> Lease<T>(Func<MDSQLiteConnection, Task<T>> callback)
330+
{
331+
await _initialized;
332+
var connection = await _channel.Reader.ReadAsync();
316333
try
317334
{
318-
await callback(connections);
335+
return await callback(connection);
319336
}
320337
finally
321338
{
322-
foreach (var conn in connections)
323-
{
324-
readPool.Writer.TryWrite(conn);
325-
}
339+
await _channel.Writer.WriteAsync(connection);
326340
}
327341
}
328342

329-
private async Task WithAllReaders(Action<List<MDSQLiteConnection>> callback)
343+
public async Task LeaseAll(Func<List<MDSQLiteConnection>, Task> callback)
330344
{
331-
var connections = new List<MDSQLiteConnection>(resolvedOptions.ReadPoolSize);
332-
for (int i = 0; i < resolvedOptions.ReadPoolSize; i++)
345+
await _initialized;
346+
var connections = new List<MDSQLiteConnection>(_poolSize);
347+
for (int i = 0; i < _poolSize; i++)
333348
{
334-
connections.Add(await readPool.Reader.ReadAsync());
349+
connections.Add(await _channel.Reader.ReadAsync());
335350
}
336351

337352
try
338353
{
339-
callback(connections);
354+
await callback(connections);
340355
}
341356
finally
342357
{
343358
foreach (var conn in connections)
344359
{
345-
readPool.Writer.TryWrite(conn);
360+
_channel.Writer.TryWrite(conn);
346361
}
347362
}
348363
}
364+
365+
private async Task<MDSQLiteConnection> OpenConnection(string dbFilename)
366+
{
367+
var db = OpenDatabase(dbFilename);
368+
LoadExtension(db);
369+
370+
var connection = new MDSQLiteConnection(new MDSQLiteConnectionOptions(db));
371+
await connection.Execute("SELECT powersync_init()");
372+
373+
return connection;
374+
}
375+
376+
private static SqliteConnection OpenDatabase(string dbFilename)
377+
{
378+
string connectionString = $"Data Source={dbFilename};Pooling=False;";
379+
var connection = new SqliteConnection(connectionString);
380+
connection.Open();
381+
return connection;
382+
}
383+
384+
private void LoadExtension(SqliteConnection db)
385+
{
386+
string extensionPath = PowerSyncPathResolver.GetNativeLibraryPath(AppContext.BaseDirectory);
387+
db.EnableExtensions(true);
388+
db.LoadExtension(extensionPath, "sqlite3_powersync_init");
389+
}
390+
391+
public async Task Close()
392+
{
393+
await LeaseAll((connections) =>
394+
{
395+
foreach (var conn in connections) conn.Close();
396+
return Task.CompletedTask;
397+
});
398+
_channel.Writer.TryComplete();
399+
}
349400
}
350401

351402
public class MDSQLiteTransaction(MDSQLiteConnection connection) : ITransaction

0 commit comments

Comments
 (0)