diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index b9cc091..3806a96 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.0.11-alpha.1 +- `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking. - Removed the `RunListener` and `RunListenerAsync` APIs from `IEventStream`. Users are encouraged to use the `Listen` or `ListenAsync` APIs instead (`RunListener` itself was implemented using the `Listen` API). - Changed the `PowerSyncDatabase.Watch` syntax to return an IAsyncEnumerable instead of accepting a callback. This allows users to handle database changes when they are ready instead of us eagerly running the callback as soon as a change is detected. diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index be9b43a..176089d 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -882,7 +882,7 @@ IAsyncEnumerable initialListener powersyncTables, currentListener, currentRestartCts.Token, - isRestart || options?.TriggerImmediately == true + isRestart || (options?.TriggerImmediately == true) ).GetAsyncEnumerator(currentRestartCts.Token); // Continually wait for either OnChange or SchemaChanged to fire @@ -972,6 +972,8 @@ private async IAsyncEnumerable OnRawTableChange( GetTablesFromNotification(e.TablesUpdated, changedTables); changedTables.IntersectWith(watchedTables); + if (changedTables.Count == 0) continue; + yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] }; } } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 94659b1..631543b 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -226,24 +226,27 @@ public async Task Connect(PowerSyncConnectionOptions? options = null) var tcs = new TaskCompletionSource(); var cts = new CancellationTokenSource(); - var _ = Task.Run(() => + // Subscribe to events before starting StreamingSync to not miss the Connected == true event + var listener = ListenAsync(cts.Token); + + streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options); + + var _ = Task.Run(async () => { - foreach (var status in Listen(cts.Token)) + await foreach (var status in listener) { - if (status.StatusChanged != null) + if (status.StatusChanged?.Connected == true) { - if (status.StatusChanged.Connected == false) - { - logger.LogWarning("Initial connect attempt did not successfully connect to server"); - } - - tcs.SetResult(true); + tcs.TrySetResult(true); cts.Cancel(); + return; } } - }); - streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options); + // Connection closed prematurely + logger.LogWarning("Initial connect attempt did not successfully connect to server"); + tcs.TrySetResult(true); + }); await tcs.Task; } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs index 830aa8d..3f1d6ec 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs @@ -97,7 +97,7 @@ public class RequestStreamSubscription [JsonProperty("parameters")] public Dictionary Parameters { get; set; } = new(); - [JsonProperty("override_priority", NullValueHandling = NullValueHandling.Ignore)] + [JsonProperty("override_priority")] public int? OverridePriority { get; set; } } diff --git a/PowerSync/PowerSync.Common/DB/Schema/CompiledTable.cs b/PowerSync/PowerSync.Common/DB/Schema/CompiledTable.cs index c5cf790..9898ef5 100644 --- a/PowerSync/PowerSync.Common/DB/Schema/CompiledTable.cs +++ b/PowerSync/PowerSync.Common/DB/Schema/CompiledTable.cs @@ -102,10 +102,10 @@ public void Validate() columnNames.Add(columnName); } - foreach (var kvp in Indexes) + foreach (var index in IndexesJSON) { - var indexName = kvp.Key; - var indexColumns = kvp.Value; + var indexName = index.Name; + var indexColumns = index.Columns; if (InvalidSQLCharacters.IsMatch(indexName)) { @@ -114,9 +114,9 @@ public void Validate() foreach (var indexColumn in indexColumns) { - if (!columnNames.Contains(indexColumn)) + if (!columnNames.Contains(indexColumn.Name)) { - throw new Exception($"Column {indexColumn} not found for index {indexName}"); + throw new Exception($"Column {indexColumn.Name} not found for index {indexName}"); } } } diff --git a/PowerSync/PowerSync.Common/DB/Schema/IndexedColumnJSON.cs b/PowerSync/PowerSync.Common/DB/Schema/IndexedColumnJSON.cs index 29ce1b1..347cfa2 100644 --- a/PowerSync/PowerSync.Common/DB/Schema/IndexedColumnJSON.cs +++ b/PowerSync/PowerSync.Common/DB/Schema/IndexedColumnJSON.cs @@ -8,9 +8,9 @@ class IndexedColumnJSONOptions(string Name, bool Ascending = true) class IndexedColumnJSON(IndexedColumnJSONOptions options) { - protected string Name { get; set; } = options.Name; + public string Name { get; set; } = options.Name; - protected bool Ascending { get; set; } = options.Ascending; + public bool Ascending { get; set; } = options.Ascending; public object ToJSONObject(CompiledTable parentTable) { diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs index 1c30cd1..d6eb924 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs @@ -171,74 +171,78 @@ private static List PrepareQueryString(ref string query, int parameterCo return dynamicParamsList; } - public async Task GetAll(string query, object?[]? parameters = null) + public Task GetAll(string query, object?[]? parameters = null) { DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); - return [.. await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)]; + return Task.Run(async () => (await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)).ToArray()); } - public async Task GetAll(string query, object?[]? parameters = null) + public Task GetAll(string query, object?[]? parameters = null) { DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); - return [.. await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)]; + return Task.Run(async () => (await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)).ToArray()); } - public async Task GetOptional(string query, object?[]? parameters = null) + public Task GetOptional(string query, object?[]? parameters = null) { DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); - return await Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text); + return Task.Run(() => Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text)); } - public async Task GetOptional(string query, object?[]? parameters = null) + public Task GetOptional(string query, object?[]? parameters = null) { DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); - return await Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text); + return Task.Run(() => Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text)); } - public async Task Get(string query, object?[]? parameters = null) + public Task Get(string query, object?[]? parameters = null) { DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); - return await Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text); + return Task.Run(() => Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text)); } - public async Task Get(string query, object?[]? parameters = null) + public Task Get(string query, object?[]? parameters = null) { DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); - return await Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text); + return Task.Run(() => Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text)); } - public async Task Execute(string query, object?[]? parameters = null) + public Task Execute(string query, object?[]? parameters = null) { DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); - int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text); - - return new NonQueryResult + return Task.Run(async () => { - InsertId = raw.sqlite3_last_insert_rowid(Db.Handle), - RowsAffected = rowsAffected, - }; + int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text); + return new NonQueryResult + { + InsertId = raw.sqlite3_last_insert_rowid(Db.Handle), + RowsAffected = rowsAffected, + }; + }); } - public async Task ExecuteBatch(string query, object?[][]? parameters = null) + public Task ExecuteBatch(string query, object?[][]? parameters = null) { if (parameters == null || parameters.Length == 0) { - return new NonQueryResult { RowsAffected = 0 }; + return Task.FromResult(new NonQueryResult { RowsAffected = 0 }); } List? dynamicParamsList = PrepareQuery(ref query, parameters); if (dynamicParamsList == null) { - return new NonQueryResult { RowsAffected = 0 }; + return Task.FromResult(new NonQueryResult { RowsAffected = 0 }); } - int rowsAffected = await Db.ExecuteAsync(query, dynamicParamsList, commandType: CommandType.Text); - - return new NonQueryResult + return Task.Run(async () => { - InsertId = raw.sqlite3_last_insert_rowid(Db.Handle), - RowsAffected = rowsAffected, - }; + int rowsAffected = await Db.ExecuteAsync(query, dynamicParamsList, commandType: CommandType.Text); + return new NonQueryResult + { + InsertId = raw.sqlite3_last_insert_rowid(Db.Handle), + RowsAffected = rowsAffected, + }; + }); } public new void Close() diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs index 34753a0..c91daa0 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs @@ -115,6 +115,16 @@ await db.Execute( Assert.Null(row.make); } + [Fact] + public async Task QueriesRunOnAnotherThread() + { + int preCallThreadId = Environment.CurrentManagedThreadId; + await db.GetAll("select * from assets"); + int postCallThreadId = Environment.CurrentManagedThreadId; + + Assert.NotEqual(preCallThreadId, postCallThreadId); + } + [Fact] public async Task FailedInsertTest() { @@ -354,15 +364,11 @@ public async Task QueueSimultaneousExecutionsTest() await db.WriteLock(async context => { - var tasks = Enumerable.Range(0, operationCount) - .Select(async index => - { - await context.Execute("SELECT * FROM assets"); - order.Add(index); - }) - .ToArray(); - - await Task.WhenAll(tasks); + for (int i = 0; i < operationCount; i++) + { + await context.Execute("SELECT * FROM assets"); + order.Add(i); + } }); var expectedOrder = Enumerable.Range(0, operationCount).ToList(); @@ -700,13 +706,11 @@ public async Task WatchSchemaResetTest() { await foreach (var result in listener) { - if (result.Length > 0) - { - lastCount = result[0].count; - } + lastCount = result[0].count; sem.Release(); } - }); + }, testCts.Token); + Assert.False(await sem.WaitAsync(200)); var resolved = await db.GetSourceTables(QUERY, null); Assert.Single(resolved); diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs index 13c4e59..286ea4a 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs @@ -25,6 +25,7 @@ public async Task InitializeAsync() { syncService = new MockSyncService(); db = syncService.CreateDatabase(); + await db.Init(); } public async Task DisposeAsync()