Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions PowerSync/PowerSync.Common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.0.11-alpha.1

- `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking.
- Removed the `RunListener` and `RunListenerAsync` APIs from `IEventStream`. Users are encouraged to use the `Listen` or `ListenAsync` APIs instead (`RunListener` itself was implemented using the `Listen` API).
- Changed the `PowerSyncDatabase.Watch` syntax to return an IAsyncEnumerable instead of accepting a callback. This allows users to handle database changes when they are ready instead of us eagerly running the callback as soon as a change is detected.

Expand Down
4 changes: 3 additions & 1 deletion PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ IAsyncEnumerable<DBAdapterEvent> initialListener
powersyncTables,
currentListener,
currentRestartCts.Token,
isRestart || options?.TriggerImmediately == true
isRestart || (options?.TriggerImmediately == true)
).GetAsyncEnumerator(currentRestartCts.Token);

// Continually wait for either OnChange or SchemaChanged to fire
Expand Down Expand Up @@ -972,6 +972,8 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
GetTablesFromNotification(e.TablesUpdated, changedTables);
changedTables.IntersectWith(watchedTables);

if (changedTables.Count == 0) continue;

yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] };
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,24 +226,27 @@ public async Task Connect(PowerSyncConnectionOptions? options = null)
var tcs = new TaskCompletionSource<bool>();
var cts = new CancellationTokenSource();

var _ = Task.Run(() =>
// Subscribe to events before starting StreamingSync to not miss the Connected == true event
var listener = ListenAsync(cts.Token);

streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options);

var _ = Task.Run(async () =>
{
foreach (var status in Listen(cts.Token))
await foreach (var status in listener)
{
if (status.StatusChanged != null)
if (status.StatusChanged?.Connected == true)
{
if (status.StatusChanged.Connected == false)
{
logger.LogWarning("Initial connect attempt did not successfully connect to server");
}

tcs.SetResult(true);
tcs.TrySetResult(true);
cts.Cancel();
return;
}
}
});

streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options);
// Connection closed prematurely
logger.LogWarning("Initial connect attempt did not successfully connect to server");
tcs.TrySetResult(true);
});

await tcs.Task;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class RequestStreamSubscription
[JsonProperty("parameters")]
public Dictionary<string, object> Parameters { get; set; } = new();

[JsonProperty("override_priority", NullValueHandling = NullValueHandling.Ignore)]
[JsonProperty("override_priority")]
public int? OverridePriority { get; set; }

}
Expand Down
10 changes: 5 additions & 5 deletions PowerSync/PowerSync.Common/DB/Schema/CompiledTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ public void Validate()
columnNames.Add(columnName);
}

foreach (var kvp in Indexes)
foreach (var index in IndexesJSON)
{
var indexName = kvp.Key;
var indexColumns = kvp.Value;
var indexName = index.Name;
var indexColumns = index.Columns;

if (InvalidSQLCharacters.IsMatch(indexName))
{
Expand All @@ -114,9 +114,9 @@ public void Validate()

foreach (var indexColumn in indexColumns)
{
if (!columnNames.Contains(indexColumn))
if (!columnNames.Contains(indexColumn.Name))
{
throw new Exception($"Column {indexColumn} not found for index {indexName}");
throw new Exception($"Column {indexColumn.Name} not found for index {indexName}");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions PowerSync/PowerSync.Common/DB/Schema/IndexedColumnJSON.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ class IndexedColumnJSONOptions(string Name, bool Ascending = true)

class IndexedColumnJSON(IndexedColumnJSONOptions options)
{
protected string Name { get; set; } = options.Name;
public string Name { get; set; } = options.Name;

protected bool Ascending { get; set; } = options.Ascending;
public bool Ascending { get; set; } = options.Ascending;

public object ToJSONObject(CompiledTable parentTable)
{
Expand Down
60 changes: 32 additions & 28 deletions PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,74 +171,78 @@ private static List<string> PrepareQueryString(ref string query, int parameterCo
return dynamicParamsList;
}

public async Task<T[]> GetAll<T>(string query, object?[]? parameters = null)
public Task<T[]> GetAll<T>(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return [.. await Db.QueryAsync<T>(query, dynamicParams, commandType: CommandType.Text)];
return Task.Run(async () => (await Db.QueryAsync<T>(query, dynamicParams, commandType: CommandType.Text)).ToArray());
}

public async Task<dynamic[]> GetAll(string query, object?[]? parameters = null)
public Task<dynamic[]> GetAll(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return [.. await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)];
return Task.Run(async () => (await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)).ToArray());
}

public async Task<T?> GetOptional<T>(string query, object?[]? parameters = null)
public Task<T?> GetOptional<T>(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return await Db.QueryFirstOrDefaultAsync<T>(query, dynamicParams, commandType: CommandType.Text);
return Task.Run(() => Db.QueryFirstOrDefaultAsync<T>(query, dynamicParams, commandType: CommandType.Text));
}

public async Task<dynamic?> GetOptional(string query, object?[]? parameters = null)
public Task<dynamic?> GetOptional(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return await Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text);
return Task.Run(() => Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text));
}

public async Task<T> Get<T>(string query, object?[]? parameters = null)
public Task<T> Get<T>(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return await Db.QueryFirstAsync<T>(query, dynamicParams, commandType: CommandType.Text);
return Task.Run(() => Db.QueryFirstAsync<T>(query, dynamicParams, commandType: CommandType.Text));
}

public async Task<dynamic> Get(string query, object?[]? parameters = null)
public Task<dynamic> Get(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return await Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text);
return Task.Run(() => Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text));
}

public async Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
public Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text);

return new NonQueryResult
return Task.Run(async () =>
{
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
RowsAffected = rowsAffected,
};
int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text);
return new NonQueryResult
{
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
RowsAffected = rowsAffected,
};
});
}

public async Task<NonQueryResult> ExecuteBatch(string query, object?[][]? parameters = null)
public Task<NonQueryResult> ExecuteBatch(string query, object?[][]? parameters = null)
{
if (parameters == null || parameters.Length == 0)
{
return new NonQueryResult { RowsAffected = 0 };
return Task.FromResult(new NonQueryResult { RowsAffected = 0 });
}

List<DynamicParameters>? dynamicParamsList = PrepareQuery(ref query, parameters);
if (dynamicParamsList == null)
{
return new NonQueryResult { RowsAffected = 0 };
return Task.FromResult(new NonQueryResult { RowsAffected = 0 });
}

int rowsAffected = await Db.ExecuteAsync(query, dynamicParamsList, commandType: CommandType.Text);

return new NonQueryResult
return Task.Run(async () =>
{
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
RowsAffected = rowsAffected,
};
int rowsAffected = await Db.ExecuteAsync(query, dynamicParamsList, commandType: CommandType.Text);
return new NonQueryResult
{
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
RowsAffected = rowsAffected,
};
});
}

public new void Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ await db.Execute(
Assert.Null(row.make);
}

[Fact]
public async Task QueriesRunOnAnotherThread()
{
int preCallThreadId = Environment.CurrentManagedThreadId;
await db.GetAll("select * from assets");
int postCallThreadId = Environment.CurrentManagedThreadId;

Assert.NotEqual(preCallThreadId, postCallThreadId);
}

[Fact]
public async Task FailedInsertTest()
{
Expand Down Expand Up @@ -354,15 +364,11 @@ public async Task QueueSimultaneousExecutionsTest()

await db.WriteLock(async context =>
{
var tasks = Enumerable.Range(0, operationCount)
.Select(async index =>
{
await context.Execute("SELECT * FROM assets");
order.Add(index);
})
.ToArray();

await Task.WhenAll(tasks);
for (int i = 0; i < operationCount; i++)
{
await context.Execute("SELECT * FROM assets");
order.Add(i);
}
});

var expectedOrder = Enumerable.Range(0, operationCount).ToList();
Expand Down Expand Up @@ -700,13 +706,11 @@ public async Task WatchSchemaResetTest()
{
await foreach (var result in listener)
{
if (result.Length > 0)
{
lastCount = result[0].count;
}
lastCount = result[0].count;
sem.Release();
}
});
}, testCts.Token);
Assert.False(await sem.WaitAsync(200));

var resolved = await db.GetSourceTables(QUERY, null);
Assert.Single(resolved);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public async Task InitializeAsync()
{
syncService = new MockSyncService();
db = syncService.CreateDatabase();
await db.Init();
}

public async Task DisposeAsync()
Expand Down