Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,24 +226,27 @@ public async Task Connect(PowerSyncConnectionOptions? options = null)
var tcs = new TaskCompletionSource<bool>();
var cts = new CancellationTokenSource();

var _ = Task.Run(() =>
// Subscribe to events before starting StreamingSync to not miss the Connected == true event
var listener = ListenAsync(cts.Token);

streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options);

var _ = Task.Run(async () =>
{
foreach (var status in Listen(cts.Token))
await foreach (var status in listener)
{
if (status.StatusChanged != null)
if (status.StatusChanged?.Connected == true)
{
if (status.StatusChanged.Connected == false)
{
logger.LogWarning("Initial connect attempt did not successfully connect to server");
}

tcs.SetResult(true);
tcs.TrySetResult(true);
cts.Cancel();
return;
}
}
});

streamingSyncTask = StreamingSync(CancellationTokenSource.Token, options);
// Connection closed prematurely
logger.LogWarning("Initial connect attempt did not successfully connect to server");
tcs.TrySetResult(true);
});

await tcs.Task;
}
Expand Down
10 changes: 5 additions & 5 deletions PowerSync/PowerSync.Common/DB/Schema/CompiledTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ public void Validate()
columnNames.Add(columnName);
}

foreach (var kvp in Indexes)
foreach (var index in IndexesJSON)
{
var indexName = kvp.Key;
var indexColumns = kvp.Value;
var indexName = index.Name;
var indexColumns = index.Columns;

if (InvalidSQLCharacters.IsMatch(indexName))
{
Expand All @@ -114,9 +114,9 @@ public void Validate()

foreach (var indexColumn in indexColumns)
{
if (!columnNames.Contains(indexColumn))
if (!columnNames.Contains(indexColumn.Name))
{
throw new Exception($"Column {indexColumn} not found for index {indexName}");
throw new Exception($"Column {indexColumn.Name} not found for index {indexName}");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions PowerSync/PowerSync.Common/DB/Schema/IndexedColumnJSON.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ class IndexedColumnJSONOptions(string Name, bool Ascending = true)

class IndexedColumnJSON(IndexedColumnJSONOptions options)
{
protected string Name { get; set; } = options.Name;
public string Name { get; set; } = options.Name;

protected bool Ascending { get; set; } = options.Ascending;
public bool Ascending { get; set; } = options.Ascending;

public object ToJSONObject(CompiledTable parentTable)
{
Expand Down
60 changes: 32 additions & 28 deletions PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,74 +171,78 @@ private static List<string> PrepareQueryString(ref string query, int parameterCo
return dynamicParamsList;
}

public async Task<T[]> GetAll<T>(string query, object?[]? parameters = null)
public Task<T[]> GetAll<T>(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return [.. await Db.QueryAsync<T>(query, dynamicParams, commandType: CommandType.Text)];
return Task.Run(async () => (await Db.QueryAsync<T>(query, dynamicParams, commandType: CommandType.Text)).ToArray());
}

public async Task<dynamic[]> GetAll(string query, object?[]? parameters = null)
public Task<dynamic[]> GetAll(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return [.. await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)];
return Task.Run(async () => (await Db.QueryAsync(query, dynamicParams, commandType: CommandType.Text)).ToArray());
}

public async Task<T?> GetOptional<T>(string query, object?[]? parameters = null)
public Task<T?> GetOptional<T>(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return await Db.QueryFirstOrDefaultAsync<T>(query, dynamicParams, commandType: CommandType.Text);
return Task.Run(() => Db.QueryFirstOrDefaultAsync<T>(query, dynamicParams, commandType: CommandType.Text));
}

public async Task<dynamic?> GetOptional(string query, object?[]? parameters = null)
public Task<dynamic?> GetOptional(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return await Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text);
return Task.Run(() => Db.QueryFirstOrDefaultAsync(query, dynamicParams, commandType: CommandType.Text));
}

public async Task<T> Get<T>(string query, object?[]? parameters = null)
public Task<T> Get<T>(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return await Db.QueryFirstAsync<T>(query, dynamicParams, commandType: CommandType.Text);
return Task.Run(() => Db.QueryFirstAsync<T>(query, dynamicParams, commandType: CommandType.Text));
}

public async Task<dynamic> Get(string query, object?[]? parameters = null)
public Task<dynamic> Get(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
return await Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text);
return Task.Run(() => Db.QueryFirstAsync(query, dynamicParams, commandType: CommandType.Text));
}

public async Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
public Task<NonQueryResult> Execute(string query, object?[]? parameters = null)
{
DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters);
int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text);

return new NonQueryResult
return Task.Run(async () =>
{
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
RowsAffected = rowsAffected,
};
int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text);
return new NonQueryResult
{
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
RowsAffected = rowsAffected,
};
});
}

public async Task<NonQueryResult> ExecuteBatch(string query, object?[][]? parameters = null)
public Task<NonQueryResult> ExecuteBatch(string query, object?[][]? parameters = null)
{
if (parameters == null || parameters.Length == 0)
{
return new NonQueryResult { RowsAffected = 0 };
return Task.FromResult(new NonQueryResult { RowsAffected = 0 });
}

List<DynamicParameters>? dynamicParamsList = PrepareQuery(ref query, parameters);
if (dynamicParamsList == null)
{
return new NonQueryResult { RowsAffected = 0 };
return Task.FromResult(new NonQueryResult { RowsAffected = 0 });
}

int rowsAffected = await Db.ExecuteAsync(query, dynamicParamsList, commandType: CommandType.Text);

return new NonQueryResult
return Task.Run(async () =>
{
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
RowsAffected = rowsAffected,
};
int rowsAffected = await Db.ExecuteAsync(query, dynamicParamsList, commandType: CommandType.Text);
return new NonQueryResult
{
InsertId = raw.sqlite3_last_insert_rowid(Db.Handle),
RowsAffected = rowsAffected,
};
});
}

public new void Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ await db.Execute(
Assert.Null(row.make);
}

[Fact]
public async Task QueriesRunOnAnotherThread()
{
int preCallThreadId = Environment.CurrentManagedThreadId;
await db.GetAll("select * from assets");
int postCallThreadId = Environment.CurrentManagedThreadId;

Assert.NotEqual(preCallThreadId, postCallThreadId);
}

[Fact]
public async Task FailedInsertTest()
{
Expand Down Expand Up @@ -342,15 +352,11 @@ public async Task QueueSimultaneousExecutionsTest()

await db.WriteLock(async context =>
{
var tasks = Enumerable.Range(0, operationCount)
.Select(async index =>
{
await context.Execute("SELECT * FROM assets");
order.Add(index);
})
.ToArray();

await Task.WhenAll(tasks);
for (int i = 0; i < operationCount; i++)
{
await context.Execute("SELECT * FROM assets");
order.Add(i);
}
});

var expectedOrder = Enumerable.Range(0, operationCount).ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public async Task InitializeAsync()
{
syncService = new MockSyncService();
db = syncService.CreateDatabase();
await db.Init();
}

public async Task DisposeAsync()
Expand Down