Skip to content

Commit 043806f

Browse files
committed
Cleanup tests, fix Watched queries not cancelling properly, add more tests
1 parent 2560088 commit 043806f

4 files changed

Lines changed: 219 additions & 112 deletions

File tree

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 86 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -759,41 +759,31 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =
759759

760760
// Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the
761761
// connection is established
762-
return OnChangeCore(powersyncTables, listener, signal.Token, options?.TriggerImmediately == true);
762+
return OnChangeCore(powersyncTables, listener, signal, options?.TriggerImmediately == true);
763763
}
764764

765765
private async IAsyncEnumerable<WatchOnChangeEvent> OnChangeCore(
766766
HashSet<string> watchedTables,
767767
IAsyncEnumerable<DBAdapterEvent> listener,
768-
[EnumeratorCancellation] CancellationToken signal,
768+
CancellationTokenSource signal,
769769
bool triggerImmediately
770770
)
771771
{
772-
if (triggerImmediately == true)
773-
{
774-
yield return new WatchOnChangeEvent { ChangedTables = [] };
775-
}
776-
777-
HashSet<string> changedTables = new();
778-
await foreach (var e in listener)
772+
try
779773
{
780-
if (signal.IsCancellationRequested) yield break;
781-
if (e.TablesUpdated == null) continue;
782-
783-
changedTables.Clear();
784-
GetTablesFromNotification(e.TablesUpdated, changedTables);
785-
changedTables.IntersectWith(watchedTables);
786-
787-
if (changedTables.Count == 0) continue;
788-
789-
var update = new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
790-
791-
// Convert from 'ps_data__<name>' to '<name>'
792-
for (int i = 0; i < update.ChangedTables.Length; i++)
774+
await foreach (var update in OnRawTableChange(watchedTables, listener, signal.Token, triggerImmediately))
793775
{
794-
update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]);
776+
// Convert from 'ps_data__<name>' to '<name>'
777+
for (int i = 0; i < update.ChangedTables.Length; i++)
778+
{
779+
update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]);
780+
}
781+
yield return update;
795782
}
796-
yield return update;
783+
}
784+
finally
785+
{
786+
signal.Dispose();
797787
}
798788
}
799789

@@ -845,7 +835,7 @@ IAsyncEnumerable<DBAdapterEvent> initialListener
845835
var schemaChanged = new TaskCompletionSource<bool>();
846836

847837
// Listen for schema changes in the background
848-
_ = Task.Run(async () =>
838+
var schemaListenerTask = Task.Run(async () =>
849839
{
850840
await foreach (var update in ListenAsync(signal.Token))
851841
{
@@ -863,60 +853,80 @@ IAsyncEnumerable<DBAdapterEvent> initialListener
863853
var currentRestartCts = initialRestartCts;
864854
var currentListener = initialListener;
865855

866-
while (!signal.Token.IsCancellationRequested)
856+
try
867857
{
868-
// Resolve tables
869-
HashSet<string> powersyncTables;
870-
if (options?.Tables != null)
871-
{
872-
powersyncTables = [.. options
873-
.Tables
874-
.SelectMany<string, string>(table => [$"ps_data__{table}", $"ps_data_local__{table}"]
875-
)];
876-
}
877-
else
858+
while (!signal.Token.IsCancellationRequested)
878859
{
879-
powersyncTables = await GetSourceTables(sql, parameters);
880-
}
881-
882-
var enumerator = OnRawTableChange(
883-
powersyncTables,
884-
currentListener,
885-
currentRestartCts.Token,
886-
isRestart || (options?.TriggerImmediately == true)
887-
).GetAsyncEnumerator(currentRestartCts.Token);
888-
889-
// Continually wait for either OnChange or SchemaChanged to fire
890-
while (true)
891-
{
892-
var currentSchemaTask = schemaChanged.Task;
893-
var onChangeTask = enumerator.MoveNextAsync().AsTask();
894-
var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask);
895-
896-
if (completedTask == currentSchemaTask)
860+
// Resolve tables
861+
HashSet<string> powersyncTables;
862+
if (options?.Tables != null)
897863
{
898-
currentRestartCts.Cancel();
899-
isRestart = true;
900-
// Let the current task complete/cancel gracefully
901-
try { await onChangeTask; }
902-
catch (OperationCanceledException) { }
903-
904-
// Establish a new listener BEFORE resolving source tables in the next iteration,
905-
// so that changes during the async GetSourceTables call are not missed.
906-
currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
907-
currentListener = Database.ListenAsync(currentRestartCts.Token);
908-
909-
break;
864+
powersyncTables = [.. options
865+
.Tables
866+
.SelectMany<string, string>(table => [$"ps_data__{table}", $"ps_data_local__{table}"]
867+
)];
910868
}
869+
else
870+
{
871+
powersyncTables = await GetSourceTables(sql, parameters);
872+
}
873+
874+
var enumerator = OnRawTableChange(
875+
powersyncTables,
876+
currentListener,
877+
currentRestartCts.Token,
878+
isRestart || (options?.TriggerImmediately == true)
879+
).GetAsyncEnumerator();
911880

912-
var update = enumerator.Current;
913-
if (update.ChangedTables != null)
881+
// Continually wait for either OnChange or SchemaChanged to fire
882+
while (true)
914883
{
915-
if (signal.IsCancellationRequested) yield break;
916-
yield return await GetAll<T>(sql, parameters);
884+
var currentSchemaTask = schemaChanged.Task;
885+
var onChangeTask = enumerator.MoveNextAsync().AsTask();
886+
var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask);
887+
888+
if (completedTask == currentSchemaTask)
889+
{
890+
var oldRestartCts = currentRestartCts;
891+
oldRestartCts.Cancel();
892+
isRestart = true;
893+
// Let the current task complete/cancel gracefully
894+
try { await onChangeTask; }
895+
catch (OperationCanceledException) { }
896+
897+
// Establish a new listener BEFORE resolving source tables in the next iteration,
898+
// so that changes during the async GetSourceTables call are not missed.
899+
currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
900+
currentListener = Database.ListenAsync(currentRestartCts.Token);
901+
oldRestartCts.Dispose();
902+
903+
break;
904+
}
905+
906+
// Await onChangeTask to propagate cancellation and detect end-of-enumeration
907+
bool hasNext;
908+
try { hasNext = await onChangeTask; }
909+
catch (OperationCanceledException) { yield break; }
910+
911+
if (!hasNext) break;
912+
913+
var update = enumerator.Current;
914+
if (update.ChangedTables != null)
915+
{
916+
yield return await GetAll<T>(sql, parameters);
917+
}
917918
}
918919
}
919920
}
921+
finally
922+
{
923+
signal.Cancel();
924+
try { await schemaListenerTask; }
925+
catch (OperationCanceledException) { }
926+
927+
currentRestartCts.Dispose();
928+
signal.Dispose();
929+
}
920930
}
921931

922932
private class ExplainedResult
@@ -964,19 +974,15 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
964974
HashSet<string> changedTables = new();
965975
await foreach (var e in listener)
966976
{
967-
if (e.TablesUpdated != null)
968-
{
969-
if (token.IsCancellationRequested) break;
977+
if (e.TablesUpdated == null) continue;
970978

971-
// Extract the changed tables and intersect with the watched tables
972-
changedTables.Clear();
973-
GetTablesFromNotification(e.TablesUpdated, changedTables);
974-
changedTables.IntersectWith(watchedTables);
979+
changedTables.Clear();
980+
GetTablesFromNotification(e.TablesUpdated, changedTables);
981+
changedTables.IntersectWith(watchedTables);
975982

976-
if (changedTables.Count == 0) continue;
983+
if (changedTables.Count == 0) continue;
977984

978-
yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
979-
}
985+
yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
980986
}
981987
}
982988

0 commit comments

Comments
 (0)