Skip to content

Commit f3dc694

Browse files
committed
Throttle Watch<T> and OnChange
1 parent f2a7171 commit f3dc694

4 files changed

Lines changed: 280 additions & 28 deletions

File tree

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 104 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace PowerSync.Common.Client;
22

3+
using System.Diagnostics;
34
using System.Runtime.CompilerServices;
45
using System.Text.RegularExpressions;
56
using System.Threading.Tasks;
@@ -131,7 +132,7 @@ public class PowerSyncDatabase : IPowerSyncDatabase
131132
public IDBAdapter Database { get; protected set; }
132133
private CompiledSchema schema;
133134

134-
private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30;
135+
private const int DEFAULT_WATCH_THROTTLE_MS = 30;
135136
private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled);
136137

137138
public bool Closed { get; protected set; }
@@ -785,19 +786,21 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =
785786

786787
// Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the
787788
// connection is established
788-
return OnChangeCore(powersyncTables, listener, signal, options?.TriggerImmediately == true);
789+
var throttleMs = options?.ThrottleMs ?? DEFAULT_WATCH_THROTTLE_MS;
790+
return OnChangeCore(powersyncTables, listener, signal, options?.TriggerImmediately == true, throttleMs);
789791
}
790792

791793
private async IAsyncEnumerable<WatchOnChangeEvent> OnChangeCore(
792794
HashSet<string> watchedTables,
793795
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
794796
CancellationTokenSource signal,
795-
bool triggerImmediately
797+
bool triggerImmediately,
798+
int throttleMs = DEFAULT_WATCH_THROTTLE_MS
796799
)
797800
{
798801
try
799802
{
800-
await foreach (var update in OnRawTableChange(watchedTables, listener, signal.Token, triggerImmediately))
803+
await foreach (var update in OnRawTableChange(watchedTables, listener, signal.Token, triggerImmediately, throttleMs))
801804
{
802805
// Convert from 'ps_data__<name>' to '<name>'
803806
for (int i = 0; i < update.ChangedTables.Length; i++)
@@ -875,6 +878,7 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
875878
bool isRestart = false;
876879
var currentRestartCts = initialRestartCts;
877880
var currentListener = initialListener;
881+
var throttleMs = options?.ThrottleMs ?? DEFAULT_WATCH_THROTTLE_MS;
878882

879883
try
880884
{
@@ -898,7 +902,8 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
898902
powersyncTables,
899903
currentListener,
900904
currentRestartCts.Token,
901-
isRestart || (options?.TriggerImmediately == true)
905+
isRestart || (options?.TriggerImmediately == true),
906+
throttleMs
902907
).GetAsyncEnumerator();
903908

904909
// Continually wait for either OnChange or SchemaChanged to fire
@@ -986,26 +991,111 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
986991
HashSet<string> watchedTables,
987992
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
988993
[EnumeratorCancellation] CancellationToken token,
989-
bool triggerImmediately = false
994+
bool triggerImmediately = false,
995+
int throttleMs = DEFAULT_WATCH_THROTTLE_MS
990996
)
991997
{
992998
if (triggerImmediately)
993999
{
9941000
yield return new WatchOnChangeEvent { ChangedTables = [] };
9951001
}
9961002

997-
HashSet<string> changedTables = new();
998-
await foreach (var e in listener)
1003+
if (throttleMs <= 0)
9991004
{
1000-
// Extract the changed tables and intersect with the watched tables
1001-
changedTables.Clear();
1002-
GetTablesFromNotification(e.TablesUpdated, changedTables);
1003-
changedTables.IntersectWith(watchedTables);
1005+
// Fast path: no throttling
1006+
HashSet<string> changedTables = new();
1007+
await foreach (var e in listener)
1008+
{
1009+
changedTables.Clear();
1010+
GetTablesFromNotification(e.TablesUpdated, changedTables);
1011+
changedTables.IntersectWith(watchedTables);
1012+
if (changedTables.Count == 0) continue;
1013+
yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
1014+
}
1015+
yield break;
1016+
}
1017+
1018+
// Throttled path: Task.WhenAny loop with leading-edge emit
1019+
var listenerEnumerator = listener.GetAsyncEnumerator(token);
1020+
try
1021+
{
1022+
var accumulated = new HashSet<string>();
1023+
long lastYieldTime = 0;
1024+
Task<bool> moveNextTask = listenerEnumerator.MoveNextAsync().AsTask();
1025+
Task? throttleTask = null;
1026+
1027+
while (true)
1028+
{
1029+
if (throttleTask != null)
1030+
await Task.WhenAny(moveNextTask, throttleTask);
1031+
else
1032+
{
1033+
try { await moveNextTask; }
1034+
catch (OperationCanceledException) { break; }
1035+
}
10041036

1005-
if (changedTables.Count == 0) continue;
1037+
// Throttle timer fired without a new event
1038+
if (throttleTask != null && throttleTask.IsCompleted && !moveNextTask.IsCompleted)
1039+
{
1040+
if (accumulated.Count > 0)
1041+
{
1042+
lastYieldTime = Stopwatch.GetTimestamp();
1043+
yield return new WatchOnChangeEvent { ChangedTables = [.. accumulated] };
1044+
accumulated.Clear();
1045+
}
1046+
throttleTask = null;
1047+
continue;
1048+
}
1049+
1050+
// A new event arrived (possibly alongside throttle)
1051+
bool hasNext;
1052+
try { hasNext = await moveNextTask; }
1053+
catch (OperationCanceledException) { break; }
1054+
if (!hasNext) break;
10061055

1007-
yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
1056+
AccumulateMatchingTables(listenerEnumerator.Current, watchedTables, accumulated);
1057+
1058+
if (accumulated.Count > 0)
1059+
{
1060+
var now = Stopwatch.GetTimestamp();
1061+
var elapsedMs = (now - lastYieldTime) * 1000.0 / Stopwatch.Frequency;
1062+
1063+
if (elapsedMs >= throttleMs)
1064+
{
1065+
// Leading edge: emit immediately
1066+
lastYieldTime = now;
1067+
yield return new WatchOnChangeEvent { ChangedTables = [.. accumulated] };
1068+
accumulated.Clear();
1069+
throttleTask = null;
1070+
}
1071+
else
1072+
{
1073+
throttleTask ??= Task.Delay((int)(throttleMs - elapsedMs), token);
1074+
}
1075+
}
1076+
1077+
moveNextTask = listenerEnumerator.MoveNextAsync().AsTask();
1078+
}
1079+
1080+
if (accumulated.Count > 0)
1081+
yield return new WatchOnChangeEvent { ChangedTables = [.. accumulated] };
10081082
}
1083+
finally
1084+
{
1085+
await listenerEnumerator.DisposeAsync();
1086+
}
1087+
}
1088+
1089+
private static void AccumulateMatchingTables(
1090+
DBAdapterEvents.TablesUpdatedEvent e,
1091+
HashSet<string> watchedTables,
1092+
HashSet<string> accumulated
1093+
)
1094+
{
1095+
var tables = new HashSet<string>();
1096+
GetTablesFromNotification(e.TablesUpdated, tables);
1097+
tables.IntersectWith(watchedTables);
1098+
accumulated.UnionWith(tables);
10091099
}
10101100

10111101
private static void GetTablesFromNotification(INotification updateNotification, HashSet<string> changedTables)

Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs

Lines changed: 176 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -842,26 +842,196 @@ public async Task Watch_TriggerImmediately_False()
842842
[Fact(Timeout = 2000)]
843843
public async Task Watch_CancelsOnTokenCancellation()
844844
{
845-
var cts = CancellationTokenSource.CreateLinkedTokenSource(testCts.Token);
846845
var tcs = new TaskCompletionSource<bool>();
847-
var sem = new SemaphoreSlim(0);
846+
848847
var listener = db.Watch<CountResult>(
849848
"SELECT COUNT(*) AS count FROM assets",
850849
null,
851-
new() { Signal = cts.Token, TriggerImmediately = false });
850+
new() { Signal = testCts.Token });
852851

853852
// Sem == received result
854853
// TCS == received cancellation
855854
_ = Task.Run(async () =>
856855
{
857-
await foreach (var _ in listener) { sem.Release(); }
856+
await foreach (var _ in listener) { }
858857
tcs.TrySetResult(true);
859858
});
860859

861860
await TestUtils.InsertRandomAssets(db, 3);
862-
Assert.True(await sem.WaitAsync(200));
863861

864-
cts.Cancel();
862+
testCts.Cancel();
863+
Assert.True(await tcs.Task);
864+
}
865+
866+
[Fact(Timeout = 5000)]
867+
public async Task OnChange_ThrottlesBatchesRapidChanges()
868+
{
869+
int eventCount = 0;
870+
var tcs = new TaskCompletionSource<bool>();
871+
872+
var listener = db.OnChange(new SQLWatchOptions
873+
{
874+
Tables = ["assets"],
875+
Signal = testCts.Token,
876+
ThrottleMs = 200,
877+
});
878+
879+
_ = Task.Run(async () =>
880+
{
881+
try
882+
{
883+
await foreach (var _ in listener)
884+
{
885+
Interlocked.Increment(ref eventCount);
886+
}
887+
tcs.TrySetResult(true);
888+
}
889+
catch (Exception ex)
890+
{
891+
tcs.TrySetException(ex);
892+
}
893+
});
894+
895+
for (int i = 0; i < 5; i++)
896+
{
897+
await TestUtils.InsertRandomAsset(db);
898+
}
899+
900+
testCts.Cancel();
901+
Assert.True(await tcs.Task);
902+
903+
Assert.True(eventCount < 5, $"Expected fewer than 5 events but got {eventCount}");
904+
}
905+
906+
[Fact(Timeout = 5000)]
907+
public async Task Watch_ThrottlesBatchesRapidChanges()
908+
{
909+
int eventCount = 0;
910+
long lastCount = 0;
911+
var tcs = new TaskCompletionSource<bool>();
912+
913+
var listener = db.Watch<CountResult>(
914+
"SELECT COUNT(*) AS count FROM assets",
915+
null,
916+
new() { Signal = testCts.Token, ThrottleMs = 200 });
917+
918+
_ = Task.Run(async () =>
919+
{
920+
try
921+
{
922+
await foreach (var rows in listener)
923+
{
924+
lastCount = rows.First().count;
925+
Interlocked.Increment(ref eventCount);
926+
}
927+
tcs.TrySetResult(true);
928+
}
929+
catch (Exception ex)
930+
{
931+
tcs.TrySetException(ex);
932+
}
933+
});
934+
935+
for (int i = 0; i < 5; i++)
936+
{
937+
await TestUtils.InsertRandomAsset(db);
938+
}
939+
940+
testCts.Cancel();
941+
Assert.True(await tcs.Task);
942+
943+
Assert.Equal(5, lastCount);
944+
Assert.True(eventCount < 5, $"Expected fewer than 5 events but got {eventCount}");
945+
}
946+
947+
[Fact(Timeout = 5000)]
948+
public async Task OnChange_NoThrottleWhenZero()
949+
{
950+
int eventCount = 0;
951+
using var sem = new SemaphoreSlim(0);
952+
953+
var listener = db.OnChange(new SQLWatchOptions
954+
{
955+
Tables = ["assets"],
956+
Signal = testCts.Token,
957+
ThrottleMs = 0,
958+
});
959+
960+
_ = Task.Run(async () =>
961+
{
962+
await foreach (var _ in listener)
963+
{
964+
Interlocked.Increment(ref eventCount);
965+
sem.Release();
966+
}
967+
}, testCts.Token);
968+
969+
for (int i = 0; i < 5; i++)
970+
{
971+
await db.Execute("INSERT INTO assets(id, description) VALUES(?, ?)", [Guid.NewGuid().ToString(), "test"]);
972+
Assert.True(await sem.WaitAsync(500));
973+
}
974+
975+
Assert.Equal(5, eventCount);
976+
}
977+
978+
[Fact(Timeout = 5000)]
979+
public async Task OnChange_FirstChangeIsNotDelayed()
980+
{
981+
using var sem = new SemaphoreSlim(0);
982+
var sw = Stopwatch.StartNew();
983+
984+
var listener = db.OnChange(new SQLWatchOptions
985+
{
986+
Tables = ["assets"],
987+
Signal = testCts.Token,
988+
ThrottleMs = 500,
989+
});
990+
991+
_ = Task.Run(async () =>
992+
{
993+
await foreach (var _ in listener)
994+
{
995+
sem.Release();
996+
break;
997+
}
998+
}, testCts.Token);
999+
1000+
await db.Execute("INSERT INTO assets(id, description) VALUES(?, ?)", [Guid.NewGuid().ToString(), "test"]);
1001+
1002+
Assert.True(await sem.WaitAsync(2000));
1003+
Assert.True(sw.ElapsedMilliseconds < 200, $"First event took {sw.ElapsedMilliseconds}ms, expected <200ms");
1004+
}
1005+
1006+
[Fact(Timeout = 5000)]
1007+
public async Task OnChange_ThrottleCancelledCleanly()
1008+
{
1009+
var tcs = new TaskCompletionSource<bool>();
1010+
1011+
var listener = db.OnChange(new SQLWatchOptions
1012+
{
1013+
Tables = ["assets"],
1014+
Signal = testCts.Token,
1015+
ThrottleMs = 500,
1016+
});
1017+
1018+
_ = Task.Run(async () =>
1019+
{
1020+
try
1021+
{
1022+
await foreach (var _ in listener) { }
1023+
tcs.TrySetResult(true);
1024+
}
1025+
catch (Exception ex)
1026+
{
1027+
tcs.TrySetException(ex);
1028+
}
1029+
});
1030+
1031+
// Insert to trigger the throttle delay, then cancel before the window expires
1032+
await db.Execute("INSERT INTO assets(id, description) VALUES(?, ?)", [Guid.NewGuid().ToString(), "test"]);
1033+
await Task.Delay(50);
1034+
testCts.Cancel();
8651035

8661036
Assert.True(await tcs.Task);
8671037
}

Tests/PowerSync/PowerSync.Common.Tests/PowerSync.Common.Tests.csproj

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,4 @@
3232
<ProjectReference Include="..\..\..\PowerSync\PowerSync.Common\PowerSync.Common.csproj" />
3333
</ItemGroup>
3434

35-
<ItemGroup>
36-
<Content Include="xunit.runner.json" CopyToOutputDirectory="PreserveNewest" />
37-
</ItemGroup>
38-
3935
</Project>

Tests/PowerSync/PowerSync.Common.Tests/xunit.runner.json

Lines changed: 0 additions & 4 deletions
This file was deleted.

0 commit comments

Comments
 (0)