Skip to content

Commit f2a7171

Browse files
authored
Test improvements (#49)
1 parent 4d51235 commit f2a7171

11 files changed

Lines changed: 365 additions & 298 deletions

File tree

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 80 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -785,40 +785,31 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =
785785

786786
// Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the
787787
// connection is established
788-
return OnChangeCore(powersyncTables, listener, signal.Token, options?.TriggerImmediately == true);
788+
return OnChangeCore(powersyncTables, listener, signal, options?.TriggerImmediately == true);
789789
}
790790

791791
private async IAsyncEnumerable<WatchOnChangeEvent> OnChangeCore(
792792
HashSet<string> watchedTables,
793793
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
794-
[EnumeratorCancellation] CancellationToken signal,
794+
CancellationTokenSource signal,
795795
bool triggerImmediately
796796
)
797797
{
798-
if (triggerImmediately == true)
799-
{
800-
yield return new WatchOnChangeEvent { ChangedTables = [] };
801-
}
802-
803-
HashSet<string> changedTables = new();
804-
await foreach (var e in listener)
798+
try
805799
{
806-
if (signal.IsCancellationRequested) yield break;
807-
808-
changedTables.Clear();
809-
GetTablesFromNotification(e.TablesUpdated, changedTables);
810-
changedTables.IntersectWith(watchedTables);
811-
812-
if (changedTables.Count == 0) continue;
813-
814-
var update = new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
815-
816-
// Convert from 'ps_data__<name>' to '<name>'
817-
for (int i = 0; i < update.ChangedTables.Length; i++)
800+
await foreach (var update in OnRawTableChange(watchedTables, listener, signal.Token, triggerImmediately))
818801
{
819-
update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]);
802+
// Convert from 'ps_data__<name>' to '<name>'
803+
for (int i = 0; i < update.ChangedTables.Length; i++)
804+
{
805+
update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]);
806+
}
807+
yield return update;
820808
}
821-
yield return update;
809+
}
810+
finally
811+
{
812+
signal.Dispose();
822813
}
823814
}
824815

@@ -870,7 +861,7 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
870861
var schemaChanged = new TaskCompletionSource<bool>();
871862

872863
// Listen for schema changes in the background
873-
_ = Task.Run(async () =>
864+
var schemaListenerTask = Task.Run(async () =>
874865
{
875866
await foreach (var update in Events.OnSchemaChanged.ListenAsync(signal.Token))
876867
{
@@ -885,60 +876,80 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
885876
var currentRestartCts = initialRestartCts;
886877
var currentListener = initialListener;
887878

888-
while (!signal.Token.IsCancellationRequested)
879+
try
889880
{
890-
// Resolve tables
891-
HashSet<string> powersyncTables;
892-
if (options?.Tables != null)
893-
{
894-
powersyncTables = [.. options
895-
.Tables
896-
.SelectMany<string, string>(table => [$"ps_data__{table}", $"ps_data_local__{table}"]
897-
)];
898-
}
899-
else
900-
{
901-
powersyncTables = await GetSourceTables(sql, parameters);
902-
}
903-
904-
var enumerator = OnRawTableChange(
905-
powersyncTables,
906-
currentListener,
907-
currentRestartCts.Token,
908-
isRestart || (options?.TriggerImmediately == true)
909-
).GetAsyncEnumerator(currentRestartCts.Token);
910-
911-
// Continually wait for either OnChange or SchemaChanged to fire
912-
while (true)
881+
while (!signal.Token.IsCancellationRequested)
913882
{
914-
var currentSchemaTask = schemaChanged.Task;
915-
var onChangeTask = enumerator.MoveNextAsync().AsTask();
916-
var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask);
917-
918-
if (completedTask == currentSchemaTask)
883+
// Resolve tables
884+
HashSet<string> powersyncTables;
885+
if (options?.Tables != null)
886+
{
887+
powersyncTables = [.. options
888+
.Tables
889+
.SelectMany<string, string>(table => [$"ps_data__{table}", $"ps_data_local__{table}"]
890+
)];
891+
}
892+
else
919893
{
920-
currentRestartCts.Cancel();
921-
isRestart = true;
922-
// Let the current task complete/cancel gracefully
923-
try { await onChangeTask; }
924-
catch (OperationCanceledException) { }
925-
926-
// Establish a new listener BEFORE resolving source tables in the next iteration,
927-
// so that changes during the async GetSourceTables call are not missed.
928-
currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
929-
currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token);
930-
931-
break;
894+
powersyncTables = await GetSourceTables(sql, parameters);
932895
}
933896

934-
var update = enumerator.Current;
935-
if (update.ChangedTables != null)
897+
var enumerator = OnRawTableChange(
898+
powersyncTables,
899+
currentListener,
900+
currentRestartCts.Token,
901+
isRestart || (options?.TriggerImmediately == true)
902+
).GetAsyncEnumerator();
903+
904+
// Continually wait for either OnChange or SchemaChanged to fire
905+
while (true)
936906
{
937-
if (signal.IsCancellationRequested) yield break;
938-
yield return await GetAll<T>(sql, parameters);
907+
var currentSchemaTask = schemaChanged.Task;
908+
var onChangeTask = enumerator.MoveNextAsync().AsTask();
909+
var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask);
910+
911+
if (completedTask == currentSchemaTask)
912+
{
913+
var oldRestartCts = currentRestartCts;
914+
oldRestartCts.Cancel();
915+
isRestart = true;
916+
// Let the current task complete/cancel gracefully
917+
try { await onChangeTask; }
918+
catch (OperationCanceledException) { }
919+
920+
// Establish a new listener BEFORE resolving source tables in the next iteration,
921+
// so that changes during the async GetSourceTables call are not missed.
922+
currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
923+
currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token);
924+
oldRestartCts.Dispose();
925+
926+
break;
927+
}
928+
929+
// Await onChangeTask to propagate cancellation and detect end-of-enumeration
930+
bool hasNext;
931+
try { hasNext = await onChangeTask; }
932+
catch (OperationCanceledException) { yield break; }
933+
934+
if (!hasNext) break;
935+
936+
var update = enumerator.Current;
937+
if (update.ChangedTables != null)
938+
{
939+
yield return await GetAll<T>(sql, parameters);
940+
}
939941
}
940942
}
941943
}
944+
finally
945+
{
946+
signal.Cancel();
947+
try { await schemaListenerTask; }
948+
catch (OperationCanceledException) { }
949+
950+
currentRestartCts.Dispose();
951+
signal.Dispose();
952+
}
942953
}
943954

944955
private class ExplainedResult
@@ -986,8 +997,6 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
986997
HashSet<string> changedTables = new();
987998
await foreach (var e in listener)
988999
{
989-
if (token.IsCancellationRequested) break;
990-
9911000
// Extract the changed tables and intersect with the watched tables
9921001
changedTables.Clear();
9931002
GetTablesFromNotification(e.TablesUpdated, changedTables);

0 commit comments

Comments
 (0)