-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathSyncStreamsTests.cs
More file actions
214 lines (172 loc) · 7.23 KB
/
Copy pathSyncStreamsTests.cs
File metadata and controls
214 lines (172 loc) · 7.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
namespace PowerSync.Common.Tests.Client.Sync;
using System.Runtime.CompilerServices;
using Common.Client.Sync.Stream;
using Newtonsoft.Json;
using PowerSync.Common.Client;
using PowerSync.Common.DB.Crud;
using PowerSync.Common.DB.Schema;
using PowerSync.Common.Tests.Utils;
using PowerSync.Common.Tests.Utils.Sync;
/// <summary>
/// dotnet test -v n --framework net8.0 --filter "SyncStreamsTests"
/// </summary>
public class SyncStreamsTests : IAsyncLifetime
{
MockSyncService syncService = null!;
PowerSyncDatabase db = null!;
public async Task InitializeAsync()
{
syncService = new MockSyncService();
db = syncService.CreateDatabase();
await db.Init();
}
public async Task DisposeAsync()
{
await db.Disconnect();
await db.Execute("DELETE FROM ps_stream_subscriptions");
await db.DisconnectAndClear();
syncService.Close();
await db.Close();
DatabaseUtils.CleanDb(db.Database.Name);
}
[Fact]
public async Task CanDisableDefaultStreams()
{
await db.Connect(new TestConnector(), new PowerSyncConnectionOptions
{
IncludeDefaultStreams = false
});
TestUtils.DeepEquivalent(new RequestStream { IncludeDefaults = false, Subscriptions = [] }, syncService.Requests[0].Streams);
}
[Fact]
public async Task BasicSubscribeTest()
{
var a = await db.SyncStream("a").Subscribe();
await db.Connect(new TestConnector(), new PowerSyncConnectionOptions());
Assert.Equal(1, syncService.Requests[0]?.Streams?.Subscriptions.Count);
Assert.Equal("a", syncService.Requests[0]?.Streams?.Subscriptions[0].Stream);
a.Unsubscribe();
}
[Fact]
public async Task SubscribesWithStreams()
{
var a = await db.SyncStream("stream", new Dictionary<string, object> { { "foo", "a" } }).Subscribe();
var b = await db.SyncStream("stream", new Dictionary<string, object> { { "foo", "b" } })
.Subscribe(new SyncStreamSubscribeOptions { Priority = new StreamPriority(1) });
await db.Connect(new TestConnector());
Assert.True(syncService.Requests[0]?.Streams?.IncludeDefaults);
Assert.Equal(2, syncService.Requests[0]?.Streams?.Subscriptions.Count);
TestUtils.DeepEquivalent(
new RequestStreamSubscription
{
Stream = "stream",
Parameters = new Dictionary<string, object> { { "foo", "a" } },
OverridePriority = null
},
syncService.Requests[0]?.Streams?.Subscriptions[0]
);
TestUtils.DeepEquivalent(
new RequestStreamSubscription
{
Stream = "stream",
Parameters = new Dictionary<string, object> { { "foo", "b" } },
OverridePriority = 1
},
syncService.Requests[0]?.Streams?.Subscriptions[1]
);
var statusTask = MockSyncService.NextStatus(db);
syncService.PushLine(
MockDataFactory.Checkpoint(lastOpId: 0, buckets: [
MockDataFactory.Bucket("a", 0, priority: 3, subscriptions: new object[] { new { sub = 0 } }),
MockDataFactory.Bucket("b", 0, priority: 1, subscriptions: new object[] { new { sub = 1 } })
], streams: [MockDataFactory.Stream("stream", false)])
);
var status = await statusTask;
foreach (var subscription in new[] { a, b })
{
var statusForStream = status.ForStream(subscription);
Assert.True(statusForStream!.Subscription.Active);
Assert.Null(statusForStream!.Subscription.LastSyncedAt);
Assert.True(statusForStream!.Subscription.HasExplicitSubscription);
}
await Task.Delay(100);
statusTask = MockSyncService.NextStatus(db);
syncService.PushLine(
MockDataFactory.CheckpointPartiallyComplete(lastOpId: "0", priority: 1)
);
status = await statusTask;
Assert.Null(status.ForStream(a)!.Subscription.LastSyncedAt);
Assert.NotNull(status.ForStream(b)!.Subscription.LastSyncedAt);
await b.WaitForFirstSync();
syncService.PushLine(MockDataFactory.CheckpointComplete(lastOpId: "0"));
await a.WaitForFirstSync();
}
[Fact]
public async Task ReportsDefaultStreams()
{
await db.Connect(new TestConnector());
var statusTask = MockSyncService.NextStatus(db);
syncService.PushLine(
MockDataFactory.Checkpoint(lastOpId: 0, buckets: [], streams: [MockDataFactory.Stream("default_stream", true)])
);
var status = await statusTask;
var statusSubscription = status.SyncStreams?[0];
Assert.NotNull(statusSubscription);
Assert.Equal("default_stream", statusSubscription!.Subscription.Name);
Assert.Null(statusSubscription!.Subscription.Parameters);
Assert.True(statusSubscription!.Subscription.IsDefault);
Assert.False(statusSubscription!.Subscription.HasExplicitSubscription);
}
[Fact]
public async Task ChangesSubscriptionsDynamically()
{
await db.Connect(new TestConnector());
var statusTask = MockSyncService.NextStatus(db);
syncService.PushLine(
MockDataFactory.Checkpoint(
lastOpId: 0,
buckets: []
)
);
await Task.Delay(100);
var subscription = await db.SyncStream("a").Subscribe();
await TestUtils.WaitForAsync(() => syncService.Requests.Count > 1);
Assert.Single(syncService.Requests[1]?.Streams?.Subscriptions!);
// Given that the subscription has a TTL, dropping the handle should not re-subscribe.
subscription.Unsubscribe();
await Task.Delay(100);
Assert.Equal(2, syncService.Requests.Count);
}
[Fact]
public async Task SubscriptionsUpdateWhileOfflineTest()
{
var statusTask = MockSyncService.NextStatus(db);
var subscription = await db.SyncStream("foo").Subscribe();
var status = await statusTask;
Assert.NotNull(status.ForStream(subscription));
}
// FIx
[Fact]
public async Task UnsubscribeMultipleTimesHasNoEffectTest()
{
var a = await db.SyncStream("a").Subscribe();
var aAgain = await db.SyncStream("a").Subscribe();
a.Unsubscribe();
a.Unsubscribe();
// Pretend the streams are expired - they should still be requested because the core extension extends the lifetime
// of streams currently referenced before connecting.
await db.Execute("UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000");
await db.Connect(new TestConnector());
Assert.True(syncService.Requests[0]?.Streams?.IncludeDefaults);
Assert.Single(syncService.Requests[0]?.Streams?.Subscriptions!);
aAgain.Unsubscribe();
}
[Fact]
public async Task UnsubscribeAllTest()
{
var a = await db.SyncStream("a").Subscribe();
await db.SyncStream("a").UnsubscribeAll();
await db.Connect(new TestConnector(), new PowerSyncConnectionOptions());
TestUtils.DeepEquivalent(new RequestStream { IncludeDefaults = true, Subscriptions = [] }, syncService.Requests[0].Streams);
}
}