Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions PowerSync/PowerSync.Common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 0.0.10-alpha.1

- Fixed watched queries sometimes resolving to the wrong underlying tables after a schema change.
- Fixed some properties in Table not being public when they are meant to be.
- Fixed a bug where custom indexes were not being sent to the PowerSync SQLite extension.
- Added a new model-based syntax for defining the PowerSync schema (the old syntax is still functional). This syntax uses classes marked with attributes to define the PowerSync schema. The classes can then also be used for queries later on.

Expand Down Expand Up @@ -40,6 +42,8 @@ public class Schema
var todos = powerSync.GetAll<Todo>("SELECT * FROM todos");
```

> > > > > > > main

## 0.0.9-alpha.1

- _Breaking:_ Further updated schema definition syntax.
Expand Down
158 changes: 111 additions & 47 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -768,41 +768,61 @@ private async Task<IDisposable> WatchInternal<T>(
Func<string, object?[]?, Task<T[]>> getter
)
{
try
{
var resolvedTables = await ResolveTables(query, parameters, options);
var result = await getter(query, parameters);
handler.OnResult(result);
var subscription = new WatchSubscription();

var subscription = OnChange(new WatchOnChangeHandler
async Task ResetQuery()
{
try
{
OnChange = async (change) =>
var resolvedTables = await ResolveTables(query, parameters, options);
var result = await getter(query, parameters);
handler.OnResult(result);

var onChangeListener = OnChange(new WatchOnChangeHandler
{
try
OnChange = async (change) =>
{
var result = await getter(query, parameters);
handler.OnResult(result);
}
catch (Exception ex)
{
handler.OnError?.Invoke(ex);
}
},
OnError = handler.OnError
}, new SQLWatchOptions
{
Tables = resolvedTables,
Signal = options?.Signal,
ThrottleMs = options?.ThrottleMs
});
try
{
var result = await getter(query, parameters);
handler.OnResult(result);
}
catch (Exception ex)
{
handler.OnError?.Invoke(ex);
}
},
OnError = handler.OnError
}, new SQLWatchOptions
{
Tables = resolvedTables,
Signal = options?.Signal,
ThrottleMs = options?.ThrottleMs
});

return subscription;
subscription.SetOnChangeListener(onChangeListener);
}
catch (Exception ex)
{
handler.OnError?.Invoke(ex);
throw;
}
}
catch (Exception ex)

// Register initial subscription
await ResetQuery();

// Listen for schema changes and reset listener
var schemaListener = RunListener(async (e) =>
{
handler.OnError?.Invoke(ex);
throw;
}
if (e.SchemaChanged != null)
{
await ResetQuery();
}
});
subscription.SetSchemaListener(schemaListener);

return subscription;
}

private class ExplainedResult
Expand Down Expand Up @@ -869,7 +889,7 @@ void flushTableUpdates()
});
}

var cts = Database.RunListener((update) =>
var dbListenerCts = Database.RunListener((update) =>
{
if (update.TablesUpdated != null)
{
Expand All @@ -885,27 +905,29 @@ void flushTableUpdates()
}
});

CancellationTokenSource linkedCts;
if (options?.Signal.HasValue == true)
CancellationTokenSource stopRunningCts;

if (options?.Signal != null)
{
// Cancel on global CTS cancellation or user token cancellation
linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
watchSubscriptionCts.Token,
options.Signal.Value
);
stopRunningCts = linkedCts;
}
else
{
// Cancel on global CTS cancellation
linkedCts = watchSubscriptionCts;
stopRunningCts = watchSubscriptionCts;
}

var registration = linkedCts.Token.Register(() =>
var stopRunningReg = stopRunningCts.Token.Register(dbListenerCts.Cancel);

return new ActionDisposable(() =>
{
cts.Cancel();
stopRunningReg.Dispose();
dbListenerCts.Cancel();
dbListenerCts.Dispose();
});

return new WatchSubscription(cts, registration);
}

private static void HandleTableChanges(HashSet<string> changedTables, HashSet<string> watchedTables, Action<string[]> onDetectedChanges)
Expand Down Expand Up @@ -968,21 +990,63 @@ public class WatchOnChangeHandler
public Action<Exception>? OnError { get; set; }
}

public class WatchSubscription(CancellationTokenSource cts, CancellationTokenRegistration registration) : IDisposable
public class WatchSubscription : IDisposable
{
private readonly CancellationTokenSource _cts = cts;
private readonly CancellationTokenRegistration _registration = registration;
private IDisposable? _onChangeListener;
private IDisposable? _schemaListener;
private readonly object _lock = new();
private bool _disposed;

public bool Disposed { get { return _disposed; } }
internal void SetSchemaListener(IDisposable listener)
{
lock (_lock)
{
if (_disposed)
{
listener.Dispose();
return;
}
_schemaListener?.Dispose();
_schemaListener = listener;
}
}

internal void SetOnChangeListener(IDisposable listener)
{
lock (_lock)
{
if (_disposed)
{
listener.Dispose();
return;
}
_onChangeListener?.Dispose();
_onChangeListener = listener;
}
}

public void Dispose()
{
lock (_lock)
{
if (_disposed) return;
_disposed = true;

_onChangeListener?.Dispose();
_schemaListener?.Dispose();
}
}
}

public class ActionDisposable(Action onDispose) : IDisposable
{
private readonly Action _onDispose = onDispose;
private bool _disposed = false;

public void Dispose()
{
if (_disposed) return;
_disposed = true;

_registration.Dispose();
_cts.Cancel();
_cts.Dispose();
_onDispose();
}
}
8 changes: 4 additions & 4 deletions PowerSync/PowerSync.Common/DB/Schema/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,22 @@ public bool InsertOnly
get { return Options.InsertOnly; }
set { Options.InsertOnly = value; }
}
string? ViewName
public string? ViewName
{
get { return Options.ViewName; }
set { Options.ViewName = value; }
}
bool TrackMetadata
public bool TrackMetadata
{
get { return Options.TrackMetadata; }
set { Options.TrackMetadata = value; }
}
TrackPreviousOptions? TrackPreviousValues
public TrackPreviousOptions? TrackPreviousValues
{
get { return Options.TrackPreviousValues; }
set { Options.TrackPreviousValues = value; }
}
bool IgnoreEmptyUpdates
public bool IgnoreEmptyUpdates
{
get { return Options.IgnoreEmptyUpdates; }
set { Options.IgnoreEmptyUpdates = value; }
Expand Down
Loading