Skip to content

Commit b1064bc

Browse files
authored
[Fix] Refresh watched queries on schema update (#42)
1 parent b993169 commit b1064bc

5 files changed

Lines changed: 241 additions & 57 deletions

File tree

PowerSync/PowerSync.Common/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## 0.0.10-alpha.1
44

5+
- Fixed watched queries sometimes resolving to the wrong underlying tables after a schema change.
6+
- Fixed some properties in Table not being public when they are meant to be.
57
- Fixed a bug where custom indexes were not being sent to the PowerSync SQLite extension.
68
- Added a new model-based syntax for defining the PowerSync schema (the old syntax is still functional). This syntax uses classes marked with attributes to define the PowerSync schema. The classes can then also be used for queries later on.
79

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 111 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -768,41 +768,61 @@ private async Task<IDisposable> WatchInternal<T>(
768768
Func<string, object?[]?, Task<T[]>> getter
769769
)
770770
{
771-
try
772-
{
773-
var resolvedTables = await ResolveTables(query, parameters, options);
774-
var result = await getter(query, parameters);
775-
handler.OnResult(result);
771+
var subscription = new WatchSubscription();
776772

777-
var subscription = OnChange(new WatchOnChangeHandler
773+
async Task ResetQuery()
774+
{
775+
try
778776
{
779-
OnChange = async (change) =>
777+
var resolvedTables = await ResolveTables(query, parameters, options);
778+
var result = await getter(query, parameters);
779+
handler.OnResult(result);
780+
781+
var onChangeListener = OnChange(new WatchOnChangeHandler
780782
{
781-
try
783+
OnChange = async (change) =>
782784
{
783-
var result = await getter(query, parameters);
784-
handler.OnResult(result);
785-
}
786-
catch (Exception ex)
787-
{
788-
handler.OnError?.Invoke(ex);
789-
}
790-
},
791-
OnError = handler.OnError
792-
}, new SQLWatchOptions
793-
{
794-
Tables = resolvedTables,
795-
Signal = options?.Signal,
796-
ThrottleMs = options?.ThrottleMs
797-
});
785+
try
786+
{
787+
var result = await getter(query, parameters);
788+
handler.OnResult(result);
789+
}
790+
catch (Exception ex)
791+
{
792+
handler.OnError?.Invoke(ex);
793+
}
794+
},
795+
OnError = handler.OnError
796+
}, new SQLWatchOptions
797+
{
798+
Tables = resolvedTables,
799+
Signal = options?.Signal,
800+
ThrottleMs = options?.ThrottleMs
801+
});
798802

799-
return subscription;
803+
subscription.SetOnChangeListener(onChangeListener);
804+
}
805+
catch (Exception ex)
806+
{
807+
handler.OnError?.Invoke(ex);
808+
throw;
809+
}
800810
}
801-
catch (Exception ex)
811+
812+
// Register initial subscription
813+
await ResetQuery();
814+
815+
// Listen for schema changes and reset listener
816+
var schemaListener = RunListener(async (e) =>
802817
{
803-
handler.OnError?.Invoke(ex);
804-
throw;
805-
}
818+
if (e.SchemaChanged != null)
819+
{
820+
await ResetQuery();
821+
}
822+
});
823+
subscription.SetSchemaListener(schemaListener);
824+
825+
return subscription;
806826
}
807827

808828
private class ExplainedResult
@@ -869,7 +889,7 @@ void flushTableUpdates()
869889
});
870890
}
871891

872-
var cts = Database.RunListener((update) =>
892+
var dbListenerCts = Database.RunListener((update) =>
873893
{
874894
if (update.TablesUpdated != null)
875895
{
@@ -885,27 +905,29 @@ void flushTableUpdates()
885905
}
886906
});
887907

888-
CancellationTokenSource linkedCts;
889-
if (options?.Signal.HasValue == true)
908+
CancellationTokenSource stopRunningCts;
909+
910+
if (options?.Signal != null)
890911
{
891-
// Cancel on global CTS cancellation or user token cancellation
892-
linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
912+
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
893913
watchSubscriptionCts.Token,
894914
options.Signal.Value
895915
);
916+
stopRunningCts = linkedCts;
896917
}
897918
else
898919
{
899-
// Cancel on global CTS cancellation
900-
linkedCts = watchSubscriptionCts;
920+
stopRunningCts = watchSubscriptionCts;
901921
}
902922

903-
var registration = linkedCts.Token.Register(() =>
923+
var stopRunningReg = stopRunningCts.Token.Register(dbListenerCts.Cancel);
924+
925+
return new ActionDisposable(() =>
904926
{
905-
cts.Cancel();
927+
stopRunningReg.Dispose();
928+
dbListenerCts.Cancel();
929+
dbListenerCts.Dispose();
906930
});
907-
908-
return new WatchSubscription(cts, registration);
909931
}
910932

911933
private static void HandleTableChanges(HashSet<string> changedTables, HashSet<string> watchedTables, Action<string[]> onDetectedChanges)
@@ -968,21 +990,63 @@ public class WatchOnChangeHandler
968990
public Action<Exception>? OnError { get; set; }
969991
}
970992

971-
public class WatchSubscription(CancellationTokenSource cts, CancellationTokenRegistration registration) : IDisposable
993+
public class WatchSubscription : IDisposable
972994
{
973-
private readonly CancellationTokenSource _cts = cts;
974-
private readonly CancellationTokenRegistration _registration = registration;
995+
private IDisposable? _onChangeListener;
996+
private IDisposable? _schemaListener;
997+
private readonly object _lock = new();
975998
private bool _disposed;
976999

977-
public bool Disposed { get { return _disposed; } }
1000+
internal void SetSchemaListener(IDisposable listener)
1001+
{
1002+
lock (_lock)
1003+
{
1004+
if (_disposed)
1005+
{
1006+
listener.Dispose();
1007+
return;
1008+
}
1009+
_schemaListener?.Dispose();
1010+
_schemaListener = listener;
1011+
}
1012+
}
1013+
1014+
internal void SetOnChangeListener(IDisposable listener)
1015+
{
1016+
lock (_lock)
1017+
{
1018+
if (_disposed)
1019+
{
1020+
listener.Dispose();
1021+
return;
1022+
}
1023+
_onChangeListener?.Dispose();
1024+
_onChangeListener = listener;
1025+
}
1026+
}
1027+
1028+
public void Dispose()
1029+
{
1030+
lock (_lock)
1031+
{
1032+
if (_disposed) return;
1033+
_disposed = true;
1034+
1035+
_onChangeListener?.Dispose();
1036+
_schemaListener?.Dispose();
1037+
}
1038+
}
1039+
}
1040+
1041+
public class ActionDisposable(Action onDispose) : IDisposable
1042+
{
1043+
private readonly Action _onDispose = onDispose;
1044+
private bool _disposed = false;
9781045

9791046
public void Dispose()
9801047
{
9811048
if (_disposed) return;
9821049
_disposed = true;
983-
984-
_registration.Dispose();
985-
_cts.Cancel();
986-
_cts.Dispose();
1050+
_onDispose();
9871051
}
9881052
}

PowerSync/PowerSync.Common/DB/Schema/Table.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,22 +84,22 @@ public bool InsertOnly
8484
get { return Options.InsertOnly; }
8585
set { Options.InsertOnly = value; }
8686
}
87-
string? ViewName
87+
public string? ViewName
8888
{
8989
get { return Options.ViewName; }
9090
set { Options.ViewName = value; }
9191
}
92-
bool TrackMetadata
92+
public bool TrackMetadata
9393
{
9494
get { return Options.TrackMetadata; }
9595
set { Options.TrackMetadata = value; }
9696
}
97-
TrackPreviousOptions? TrackPreviousValues
97+
public TrackPreviousOptions? TrackPreviousValues
9898
{
9999
get { return Options.TrackPreviousValues; }
100100
set { Options.TrackPreviousValues = value; }
101101
}
102-
bool IgnoreEmptyUpdates
102+
public bool IgnoreEmptyUpdates
103103
{
104104
get { return Options.IgnoreEmptyUpdates; }
105105
set { Options.IgnoreEmptyUpdates = value; }

Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ namespace PowerSync.Common.Tests.Client;
77
using Newtonsoft.Json;
88

99
using PowerSync.Common.Client;
10+
using PowerSync.Common.DB.Schema;
1011
using PowerSync.Common.Tests.Models;
1112

1213
/// <summary>
@@ -573,7 +574,7 @@ await tx.Execute(
573574
}
574575

575576
[Fact(Timeout = 2000)]
576-
public async void WatchDisposableSubscriptionTest()
577+
public async Task WatchDisposableSubscriptionTest()
577578
{
578579
int callCount = 0;
579580
var semaphore = new SemaphoreSlim(0);
@@ -609,7 +610,7 @@ await db.Execute(
609610
}
610611

611612
[Fact(Timeout = 2000)]
612-
public async void WatchDisposableCustomTokenTest()
613+
public async Task WatchDisposableCustomTokenTest()
613614
{
614615
var customTokenSource = new CancellationTokenSource();
615616
int callCount = 0;
@@ -649,8 +650,8 @@ await db.Execute(
649650
Assert.Equal(2, callCount);
650651
}
651652

652-
[Fact(Timeout = 2500)]
653-
public async void WatchSingleCancelledTest()
653+
[Fact(Timeout = 3000)]
654+
public async Task WatchSingleCancelledTest()
654655
{
655656
int callCount = 0;
656657

@@ -688,13 +689,107 @@ await db.Execute(
688689
);
689690

690691
// Ensure nothing received from cancelled result
691-
bool receivedResult = await semCancelled.WaitAsync(100);
692-
Assert.False(receivedResult, "Received update after disposal");
692+
Assert.False(await semCancelled.WaitAsync(100));
693693

694694
await semAlwaysRunning.WaitAsync();
695695
Assert.Equal(5, callCount);
696696
}
697697

698+
[Fact(Timeout = 3000)]
699+
public async Task WatchSchemaResetTest()
700+
{
701+
var dbId = Guid.NewGuid().ToString();
702+
var db = new PowerSyncDatabase(new()
703+
{
704+
Database = new SQLOpenOptions
705+
{
706+
DbFilename = $"powerSyncWatch_{dbId}.db",
707+
},
708+
Schema = TestSchema.MakeOptionalSyncSchema(false)
709+
});
710+
711+
var sem = new SemaphoreSlim(0);
712+
long lastCount = -1;
713+
714+
string querySql = "SELECT COUNT(*) AS count FROM assets";
715+
var query = await db.Watch(querySql, [], new WatchHandler<CountResult>
716+
{
717+
OnResult = (result) =>
718+
{
719+
lastCount = result[0].count;
720+
sem.Release();
721+
},
722+
OnError = error => throw error
723+
});
724+
Assert.True(await sem.WaitAsync(100));
725+
Assert.Equal(0, lastCount);
726+
727+
var resolved = await GetSourceTables(db, querySql);
728+
Assert.Single(resolved);
729+
Assert.Contains("ps_data_local__local_assets", resolved);
730+
731+
for (int i = 0; i < 3; i++)
732+
{
733+
await db.Execute(
734+
"insert into assets(id, description, make) values (?, ?, ?)",
735+
[Guid.NewGuid().ToString(), "some desc", "some make"]
736+
);
737+
Assert.True(await sem.WaitAsync(100));
738+
Assert.Equal(i + 1, lastCount);
739+
}
740+
Assert.Equal(3, lastCount);
741+
742+
await db.UpdateSchema(TestSchema.MakeOptionalSyncSchema(true));
743+
744+
resolved = await GetSourceTables(db, querySql);
745+
Assert.Single(resolved);
746+
Assert.Contains("ps_data__assets", resolved);
747+
748+
Assert.True(await sem.WaitAsync(100));
749+
Assert.Equal(0, lastCount);
750+
751+
await db.Execute("insert into assets select * from inactive_local_assets");
752+
Assert.True(await sem.WaitAsync(500));
753+
Assert.Equal(3, lastCount);
754+
755+
// Sanity check
756+
query.Dispose();
757+
758+
await db.Execute("delete from assets");
759+
Assert.False(await sem.WaitAsync(100));
760+
Assert.Equal(3, lastCount);
761+
}
762+
763+
private class ExplainedResult
764+
{
765+
public int addr = 0;
766+
public string opcode = "";
767+
public int p1 = 0;
768+
public int p2 = 0;
769+
public int p3 = 0;
770+
public string p4 = "";
771+
public int p5 = 0;
772+
}
773+
private record TableSelectResult(string tbl_name);
774+
private async Task<List<string>> GetSourceTables(PowerSyncDatabase db, string sql, object?[]? parameters = null)
775+
{
776+
var explained = await db.GetAll<ExplainedResult>(
777+
$"EXPLAIN {sql}", parameters
778+
);
779+
780+
var rootPages = explained
781+
.Where(row => row.opcode == "OpenRead" && row.p3 == 0)
782+
.Select(row => row.p2)
783+
.ToList();
784+
785+
var tables = await db.GetAll<TableSelectResult>(
786+
"SELECT DISTINCT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))",
787+
[JsonConvert.SerializeObject(rootPages)]
788+
);
789+
790+
return tables.Select(row => row.tbl_name).ToList();
791+
}
792+
698793
[Fact]
699794
public async Task Attributes_ColumnAliasing()
700795
{

0 commit comments

Comments
 (0)