Skip to content

Commit c60461b

Browse files
committed
Offload queries to task
1 parent b1064bc commit c60461b

6 files changed

Lines changed: 69 additions & 55 deletions

File tree

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -226,24 +226,27 @@ public async Task Connect(PowerSyncConnectionOptions? options = null)
226226
var tcs = new TaskCompletionSource<bool>();
227227
var cts = new CancellationTokenSource();
228228

229-
var _ = Task.Run(() =>
229+
// Subscribe to events before starting StreamingSync to not miss the Connected == true event
230+
var listener = ListenAsync(cts.Token);
231+
232+
streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options);
233+
234+
var _ = Task.Run(async () =>
230235
{
231-
foreach (var status in Listen(cts.Token))
236+
await foreach (var status in listener)
232237
{
233-
if (status.StatusChanged != null)
238+
if (status.StatusChanged?.Connected == true)
234239
{
235-
if (status.StatusChanged.Connected == false)
236-
{
237-
logger.LogWarning("Initial connect attempt did not successfully connect to server");
238-
}
239-
240-
tcs.SetResult(true);
240+
tcs.TrySetResult(true);
241241
cts.Cancel();
242+
return;
242243
}
243244
}
244-
});
245245

246-
streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options);
246+
// Connection closed prematurely
247+
logger.LogWarning("Initial connect attempt did not successfully connect to server");
248+
tcs.TrySetResult(true);
249+
});
247250

248251
await tcs.Task;
249252
}

PowerSync/PowerSync.Common/DB/Schema/CompiledTable.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,10 @@ public void Validate()
102102
columnNames.Add(columnName);
103103
}
104104

105-
foreach (var kvp in Indexes)
105+
foreach (var index in IndexesJSON)
106106
{
107-
var indexName = kvp.Key;
108-
var indexColumns = kvp.Value;
107+
var indexName = index.Name;
108+
var indexColumns = index.Columns;
109109

110110
if (InvalidSQLCharacters.IsMatch(indexName))
111111
{
@@ -114,9 +114,9 @@ public void Validate()
114114

115115
foreach (var indexColumn in indexColumns)
116116
{
117-
if (!columnNames.Contains(indexColumn))
117+
if (!columnNames.Contains(indexColumn.Name))
118118
{
119-
throw new Exception($"Column {indexColumn} not found for index {indexName}");
119+
throw new Exception($"Column {indexColumn.Name} not found for index {indexName}");
120120
}
121121
}
122122
}

PowerSync/PowerSync.Common/DB/Schema/IndexedColumnJSON.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ class IndexedColumnJSONOptions(string Name, bool Ascending = true)
88

99
class IndexedColumnJSON(IndexedColumnJSONOptions options)
1010
{
11-
protected string Name { get; set; } = options.Name;
11+
public string Name { get; set; } = options.Name;
1212

13-
protected bool Ascending { get; set; } = options.Ascending;
13+
public bool Ascending { get; set; } = options.Ascending;
1414

1515
public object ToJSONObject(CompiledTable parentTable)
1616
{

PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -171,74 +171,78 @@ private static List<string> PrepareQueryString(ref string query, int parameterCo
171171
return dynamicParamsList;
172172
}
173173

174-
public async Task<T[]> GetAll<T>(string query, object?[]? parameters = null)
174+
public Task<T[]> GetAll<T>(string query, object?[]? parameters = null)
175175
{
176176
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
177-
return [.. await Db.QueryAsync<T>(query, dynamicParams, commandType: CommandType.Text)];
177+
return Task.Run(async () => (await Db.QueryAsync<T>(query, dynamicParams, commandType: CommandType.Text)).ToArray());
178178
}
179179

180-
public async Task<dynamic[]> GetAll(string query, object?[]? parameters = null)
180+
public Task<dynamic[]> GetAll(string query, object?[]? parameters = null)
181181
{
182182
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
183-
return [.. await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)];
183+
return Task.Run(async () => (await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)).ToArray());
184184
}
185185

186-
public async Task<T?> GetOptional<T>(string query, object?[]? parameters = null)
186+
public Task<T?> GetOptional<T>(string query, object?[]? parameters = null)
187187
{
188188
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
189-
return await Db.QueryFirstOrDefaultAsync<T>(query, dynamicParams, commandType: CommandType.Text);
189+
return Task.Run(() => Db.QueryFirstOrDefaultAsync<T>(query, dynamicParams, commandType: CommandType.Text));
190190
}
191191

192-
public async Task<dynamic?> GetOptional(string query, object?[]? parameters = null)
192+
public Task<dynamic?> GetOptional(string query, object?[]? parameters = null)
193193
{
194194
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
195-
return await Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text);
195+
return Task.Run(() => Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text));
196196
}
197197

198-
public async Task<T> Get<T>(string query, object?[]? parameters = null)
198+
public Task<T> Get<T>(string query, object?[]? parameters = null)
199199
{
200200
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
201-
return await Db.QueryFirstAsync<T>(query, dynamicParams, commandType: CommandType.Text);
201+
return Task.Run(() => Db.QueryFirstAsync<T>(query, dynamicParams, commandType: CommandType.Text));
202202
}
203203

204-
public async Task<dynamic> Get(string query, object?[]? parameters = null)
204+
public Task<dynamic> Get(string query, object?[]? parameters = null)
205205
{
206206
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
207-
return await Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text);
207+
return Task.Run(() => Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text));
208208
}
209209

210-
public async Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
210+
public Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
211211
{
212212
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
213-
int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text);
214-
215-
return new NonQueryResult
213+
return Task.Run(async () =>
216214
{
217-
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
218-
RowsAffected = rowsAffected,
219-
};
215+
int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text);
216+
return new NonQueryResult
217+
{
218+
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
219+
RowsAffected = rowsAffected,
220+
};
221+
});
220222
}
221223

222-
public async Task<NonQueryResult> ExecuteBatch(string query, object?[][]? parameters = null)
224+
public Task<NonQueryResult> ExecuteBatch(string query, object?[][]? parameters = null)
223225
{
224226
if (parameters == null || parameters.Length == 0)
225227
{
226-
return new NonQueryResult { RowsAffected = 0 };
228+
return Task.FromResult(new NonQueryResult { RowsAffected = 0 });
227229
}
228230

229231
List<DynamicParameters>? dynamicParamsList = PrepareQuery(ref query, parameters);
230232
if (dynamicParamsList == null)
231233
{
232-
return new NonQueryResult { RowsAffected = 0 };
234+
return Task.FromResult(new NonQueryResult { RowsAffected = 0 });
233235
}
234236

235-
int rowsAffected = await Db.ExecuteAsync(query, dynamicParamsList, commandType: CommandType.Text);
236-
237-
return new NonQueryResult
237+
return Task.Run(async () =>
238238
{
239-
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
240-
RowsAffected = rowsAffected,
241-
};
239+
int rowsAffected = await Db.ExecuteAsync(query, dynamicParamsList, commandType: CommandType.Text);
240+
return new NonQueryResult
241+
{
242+
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
243+
RowsAffected = rowsAffected,
244+
};
245+
});
242246
}
243247

244248
public new void Close()

Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ await db.Execute(
103103
Assert.Null(row.make);
104104
}
105105

106+
[Fact]
107+
public async Task QueriesRunOnAnotherThread()
108+
{
109+
int preCallThreadId = Environment.CurrentManagedThreadId;
110+
await db.GetAll("select * from assets");
111+
int postCallThreadId = Environment.CurrentManagedThreadId;
112+
113+
Assert.NotEqual(preCallThreadId, postCallThreadId);
114+
}
115+
106116
[Fact]
107117
public async Task FailedInsertTest()
108118
{
@@ -342,15 +352,11 @@ public async Task QueueSimultaneousExecutionsTest()
342352

343353
await db.WriteLock(async context =>
344354
{
345-
var tasks = Enumerable.Range(0, operationCount)
346-
.Select(async index =>
347-
{
348-
await context.Execute("SELECT * FROM assets");
349-
order.Add(index);
350-
})
351-
.ToArray();
352-
353-
await Task.WhenAll(tasks);
355+
for (int i = 0; i < operationCount; i++)
356+
{
357+
await context.Execute("SELECT * FROM assets");
358+
order.Add(i);
359+
}
354360
});
355361

356362
var expectedOrder = Enumerable.Range(0, operationCount).ToList();

Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public async Task InitializeAsync()
2525
{
2626
syncService = new MockSyncService();
2727
db = syncService.CreateDatabase();
28+
await db.Init();
2829
}
2930

3031
public async Task DisposeAsync()

0 commit comments

Comments
 (0)