Skip to content

Commit 38f9225

Browse files
committed
Use new pattern around codebase
1 parent fc86979 commit 38f9225

10 files changed

Lines changed: 147 additions & 87 deletions

File tree

PowerSync/PowerSync.Common/Client/ConnectionManager.cs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,25 @@ public class StoredConnectionOptions(
7979
public PowerSyncConnectionOptions Options { get; set; } = options;
8080
}
8181

82-
public class ConnectionManagerEvent
82+
public class ConnectionManagerEvents : EventManager
8383
{
84-
public StreamingSyncImplementation? SyncStreamCreated { get; set; }
84+
public interface IConnectionManagerEvent;
85+
86+
public class SyncStreamCreatedEvent(StreamingSyncImplementation ssi) : IConnectionManagerEvent
87+
{
88+
public StreamingSyncImplementation SyncStreamCreated { get; set; } = ssi;
89+
}
90+
91+
public EventStream<SyncStreamCreatedEvent> OnSyncStreamCreated { get; } = new();
92+
93+
public ConnectionManagerEvents()
94+
{
95+
Register(OnSyncStreamCreated);
96+
}
8597
}
8698

87-
public class ConnectionManager : EventStream<ConnectionManagerEvent>
99+
public class ConnectionManager : ICloseable
88100
{
89-
90101
/// <summary>
91102
/// Tracks active connection attempts
92103
/// </summary>
@@ -122,6 +133,7 @@ public class ConnectionManager : EventStream<ConnectionManagerEvent>
122133

123134
public IPowerSyncBackendConnector? Connector => PendingConnectionOptions?.Connector;
124135

136+
public ConnectionManagerEvents Events { get; protected set; } = new();
125137

126138
public PowerSyncConnectionOptions? ConnectionOptions => PendingConnectionOptions?.Options;
127139

@@ -148,9 +160,9 @@ public ConnectionManager(Func<IPowerSyncBackendConnector, CreateSyncImplementati
148160
SyncDisposer = null;
149161
}
150162

151-
public new void Close()
163+
public void Close()
152164
{
153-
base.Close();
165+
Events.Close();
154166
SyncStreamImplementation?.Close();
155167
SyncDisposer?.Invoke();
156168
}
@@ -274,7 +286,7 @@ async Task InitSyncStream()
274286
RetryDelayMs = options.RetryDelayMs,
275287
});
276288

277-
Emit(new ConnectionManagerEvent { SyncStreamCreated = result.Sync });
289+
Events.Emit(new ConnectionManagerEvents.SyncStreamCreatedEvent(result.Sync));
278290
SyncStreamImplementation = result.Sync;
279291
SyncDisposer = result.OnDispose;
280292
await SyncStreamImplementation.WaitForReady();

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -233,19 +233,16 @@ public PowerSyncDatabase(PowerSyncDatabaseOptions options)
233233
});
234234

235235
var syncStreamStatusCts = CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token);
236-
var syncStreamStatusListener = syncStreamImplementation.ListenAsync(syncStreamStatusCts.Token);
236+
var syncStreamStatusListener = syncStreamImplementation.Events.OnStatusChanged.ListenAsync(syncStreamStatusCts.Token);
237237
var _ = Task.Run(async () =>
238238
{
239239
await foreach (var update in syncStreamStatusListener)
240240
{
241-
if (update.StatusChanged != null)
241+
CurrentStatus = new SyncStatus(new SyncStatusOptions(update.Status.Options)
242242
{
243-
CurrentStatus = new SyncStatus(new SyncStatusOptions(update.StatusChanged.Options)
244-
{
245-
HasSynced = CurrentStatus?.HasSynced == true || update.StatusChanged.LastSyncedAt != null,
246-
});
247-
Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus));
248-
}
243+
HasSynced = CurrentStatus?.HasSynced == true || update.Status.LastSyncedAt != null,
244+
});
245+
Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus));
249246
}
250247
});
251248

@@ -784,7 +781,7 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =
784781
? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, options.Signal.Value)
785782
: CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token);
786783

787-
var listener = Database.ListenAsync(signal.Token);
784+
var listener = Database.Events.OnTablesUpdated.ListenAsync(signal.Token);
788785

789786
// Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the
790787
// connection is established
@@ -793,7 +790,7 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =
793790

794791
private async IAsyncEnumerable<WatchOnChangeEvent> OnChangeCore(
795792
HashSet<string> watchedTables,
796-
IAsyncEnumerable<DBAdapterEvent> listener,
793+
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
797794
[EnumeratorCancellation] CancellationToken signal,
798795
bool triggerImmediately
799796
)
@@ -807,7 +804,6 @@ bool triggerImmediately
807804
await foreach (var e in listener)
808805
{
809806
if (signal.IsCancellationRequested) yield break;
810-
if (e.TablesUpdated == null) continue;
811807

812808
changedTables.Clear();
813809
GetTablesFromNotification(e.TablesUpdated, changedTables);
@@ -857,7 +853,7 @@ public IAsyncEnumerable<T[]> Watch<T>(
857853
// so that table changes between Watch() being called and iteration starting are not missed.
858854
// This mirrors the pattern used in OnChange().
859855
var initialRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
860-
var initialListener = Database.ListenAsync(initialRestartCts.Token);
856+
var initialListener = Database.Events.OnTablesUpdated.ListenAsync(initialRestartCts.Token);
861857

862858
return WatchCore<T>(sql, parameters, options, signal, initialRestartCts, initialListener);
863859
}
@@ -868,7 +864,7 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
868864
SQLWatchOptions options,
869865
CancellationTokenSource signal,
870866
CancellationTokenSource initialRestartCts,
871-
IAsyncEnumerable<DBAdapterEvent> initialListener
867+
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> initialListener
872868
)
873869
{
874870
var schemaChanged = new TaskCompletionSource<bool>();
@@ -930,7 +926,7 @@ IAsyncEnumerable<DBAdapterEvent> initialListener
930926
// Establish a new listener BEFORE resolving source tables in the next iteration,
931927
// so that changes during the async GetSourceTables call are not missed.
932928
currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
933-
currentListener = Database.ListenAsync(currentRestartCts.Token);
929+
currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token);
934930

935931
break;
936932
}
@@ -977,7 +973,7 @@ internal async Task<HashSet<string>> GetSourceTables(string sql, object?[]? para
977973

978974
private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
979975
HashSet<string> watchedTables,
980-
IAsyncEnumerable<DBAdapterEvent> listener,
976+
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
981977
[EnumeratorCancellation] CancellationToken token,
982978
bool triggerImmediately = false
983979
)
@@ -990,19 +986,16 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
990986
HashSet<string> changedTables = new();
991987
await foreach (var e in listener)
992988
{
993-
if (e.TablesUpdated != null)
994-
{
995-
if (token.IsCancellationRequested) break;
989+
if (token.IsCancellationRequested) break;
996990

997-
// Extract the changed tables and intersect with the watched tables
998-
changedTables.Clear();
999-
GetTablesFromNotification(e.TablesUpdated, changedTables);
1000-
changedTables.IntersectWith(watchedTables);
991+
// Extract the changed tables and intersect with the watched tables
992+
changedTables.Clear();
993+
GetTablesFromNotification(e.TablesUpdated, changedTables);
994+
changedTables.IntersectWith(watchedTables);
1001995

1002-
if (changedTables.Count == 0) continue;
996+
if (changedTables.Count == 0) continue;
1003997

1004-
yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
1005-
}
998+
yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
1006999
}
10071000
}
10081001

PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,23 @@ public static class PSInternalTable
100100
public static readonly string UNTYPED = "ps_untyped";
101101
}
102102

103-
public class BucketStorageEvent
103+
public class BucketStorageEvents : EventManager
104104
{
105-
public bool CrudUpdate { get; set; }
105+
public interface IBucketStorageEvent;
106+
107+
public class CrudUpdateEvent : IBucketStorageEvent;
108+
109+
public EventStream<CrudUpdateEvent> OnCrudUpdate = new();
110+
111+
public BucketStorageEvents()
112+
{
113+
Register(OnCrudUpdate);
114+
}
106115
}
107116

108-
public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
117+
public interface IBucketStorageAdapter : ICloseable
109118
{
119+
BucketStorageEvents Events { get; }
110120
Task Init();
111121

112122
Task<CrudEntry?> NextCrudItem();

PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ namespace PowerSync.Common.Client.Sync.Bucket;
1212

1313
using PowerSync.Common.DB;
1414
using PowerSync.Common.DB.Crud;
15-
using PowerSync.Common.Utils;
1615

17-
public class SqliteBucketStorage : EventStream<BucketStorageEvent>, IBucketStorageAdapter
16+
public class SqliteBucketStorage : IBucketStorageAdapter
1817
{
19-
2018
public static readonly string MAX_OP_ID = "9223372036854775807";
2119

20+
public BucketStorageEvents Events { get; } = new();
21+
2222
private readonly IDBAdapter db;
2323
private bool hasCompletedSync;
2424
private readonly HashSet<string> tableNames;
@@ -27,29 +27,27 @@ public class SqliteBucketStorage : EventStream<BucketStorageEvent>, IBucketStora
2727
private readonly ILogger logger;
2828

2929
private readonly CancellationTokenSource updateCts;
30+
private readonly Task updateTask;
3031

3132
private record ExistingTableRowsResult(string name);
3233

3334
public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null)
3435
{
3536
this.db = db;
36-
this.logger = logger ?? NullLogger.Instance; ;
37+
this.logger = logger ?? NullLogger.Instance;
3738
hasCompletedSync = false;
3839
tableNames = [];
3940

4041
updateCts = new CancellationTokenSource();
4142

42-
var _ = Task.Run(() =>
43+
updateTask = Task.Run(() =>
4344
{
44-
foreach (var update in db.Listen(updateCts.Token))
45+
foreach (var update in db.Events.OnTablesUpdated.Listen(updateCts.Token))
4546
{
46-
if (update.TablesUpdated != null)
47+
var tables = DBAdapterUtils.ExtractTableUpdates(update.TablesUpdated);
48+
if (tables.Contains(PSInternalTable.CRUD))
4749
{
48-
var tables = DBAdapterUtils.ExtractTableUpdates(update.TablesUpdated);
49-
if (tables.Contains(PSInternalTable.CRUD))
50-
{
51-
Emit(new BucketStorageEvent { CrudUpdate = true });
52-
}
50+
Events.Emit(new BucketStorageEvents.CrudUpdateEvent());
5351
}
5452
}
5553
});
@@ -67,10 +65,11 @@ public async Task Init()
6765
}
6866
}
6967

70-
public new void Close()
68+
public void Close()
7169
{
7270
updateCts.Cancel();
73-
base.Close();
71+
try { updateTask.Wait(2000); } catch (Exception) { }
72+
Events.Close();
7473
}
7574

7675
private record ClientIdResult(string? client_id);

0 commit comments

Comments
 (0)